full impl of database restart
This commit is contained in:
parent
f18d310348
commit
3091a93a52
|
@ -133,8 +133,16 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
|||
cache.setLastNumber(Optional.empty());
|
||||
cache.clearAccelerate();
|
||||
} else {
|
||||
listeners.forEach(listener -> { listener.onCrawlChunk(fromNumber, chunkAccounts); });
|
||||
cache.setLastNumber(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getNumber()));
|
||||
try {
|
||||
for (AccountDatabaseCrawlerListener listener : listeners) {
|
||||
listener.onCrawlChunk(fromNumber, chunkAccounts);
|
||||
}
|
||||
cache.setLastNumber(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getNumber()));
|
||||
} catch (AccountDatabaseCrawlerRestartException e) {
|
||||
cache.setLastNumber(Optional.empty());
|
||||
cache.clearAccelerate();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,6 +21,6 @@ import java.util.Optional;
|
|||
|
||||
public interface AccountDatabaseCrawlerListener {
|
||||
void onCrawlStart();
|
||||
void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts);
|
||||
void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException;
|
||||
void onCrawlEnd(Optional<String> fromNumber);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Copyright (C) 2018 Open WhisperSystems
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
public class AccountDatabaseCrawlerRestartException extends Exception {
|
||||
public AccountDatabaseCrawlerRestartException(String s) {
|
||||
super(s);
|
||||
}
|
||||
|
||||
public AccountDatabaseCrawlerRestartException(Exception e) {
|
||||
super(e);
|
||||
}
|
||||
}
|
|
@ -63,13 +63,15 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
|
|||
|
||||
}
|
||||
|
||||
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) {
|
||||
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
|
||||
|
||||
updateDirectoryCache(chunkAccounts);
|
||||
|
||||
DirectoryReconciliationRequest request = createChunkRequest(fromNumber, chunkAccounts);
|
||||
DirectoryReconciliationResponse response = sendChunk(request);
|
||||
|
||||
if (response.getStatus() == DirectoryReconciliationResponse.Status.MISSING) {
|
||||
throw new AccountDatabaseCrawlerRestartException("directory reconciler missing");
|
||||
}
|
||||
}
|
||||
|
||||
private void updateDirectoryCache(List<Account> accounts) {
|
||||
|
|
|
@ -22,12 +22,14 @@ import org.whispersystems.textsecuregcm.storage.Accounts;
|
|||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
@ -68,7 +70,7 @@ public class AccountDatabaseCrawlerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCrawlStart() {
|
||||
public void testCrawlStart() throws AccountDatabaseCrawlerRestartException {
|
||||
when(cache.getLastNumber()).thenReturn(Optional.empty());
|
||||
|
||||
boolean accelerated = crawler.doPeriodicWork();
|
||||
|
@ -94,7 +96,7 @@ public class AccountDatabaseCrawlerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCrawlChunk() {
|
||||
public void testCrawlChunk() throws AccountDatabaseCrawlerRestartException {
|
||||
when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1));
|
||||
|
||||
boolean accelerated = crawler.doPeriodicWork();
|
||||
|
@ -119,7 +121,7 @@ public class AccountDatabaseCrawlerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCrawlChunkAccelerated() {
|
||||
public void testCrawlChunkAccelerated() throws AccountDatabaseCrawlerRestartException {
|
||||
when(cache.isAccelerated()).thenReturn(true);
|
||||
when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1));
|
||||
|
||||
|
@ -144,6 +146,33 @@ public class AccountDatabaseCrawlerTest {
|
|||
verifyNoMoreInteractions(cache);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCrawlChunkRestart() throws AccountDatabaseCrawlerRestartException {
|
||||
when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1));
|
||||
doThrow(AccountDatabaseCrawlerRestartException.class).when(listener).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
|
||||
|
||||
boolean accelerated = crawler.doPeriodicWork();
|
||||
assertThat(accelerated).isFalse();
|
||||
|
||||
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
|
||||
verify(cache, times(1)).getLastNumber();
|
||||
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
|
||||
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
|
||||
verify(account2, times(0)).getNumber();
|
||||
verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
|
||||
verify(cache, times(1)).setLastNumber(eq(Optional.empty()));
|
||||
verify(cache, times(1)).clearAccelerate();
|
||||
verify(cache, times(1)).isAccelerated();
|
||||
verify(cache, times(1)).releaseActiveWork(any(String.class));
|
||||
|
||||
verifyZeroInteractions(account1);
|
||||
|
||||
verifyNoMoreInteractions(account2);
|
||||
verifyNoMoreInteractions(accounts);
|
||||
verifyNoMoreInteractions(listener);
|
||||
verifyNoMoreInteractions(cache);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCrawlEnd() {
|
||||
when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT2));
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.whispersystems.textsecuregcm.tests.storage;
|
|||
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.ActiveUserCounter;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
||||
|
@ -135,7 +136,7 @@ public class ActiveUserCounterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCrawlChunkValidAccount() {
|
||||
public void testCrawlChunkValidAccount() throws AccountDatabaseCrawlerRestartException {
|
||||
activeUserCounter.onCrawlChunk(Optional.of(NUMBER_IOS), Arrays.asList(iosAccount));
|
||||
|
||||
verify(iosAccount, times(1)).getMasterDevice();
|
||||
|
@ -164,7 +165,7 @@ public class ActiveUserCounterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCrawlChunkNoDeviceAccount() {
|
||||
public void testCrawlChunkNoDeviceAccount() throws AccountDatabaseCrawlerRestartException {
|
||||
activeUserCounter.onCrawlChunk(Optional.of(NUMBER_NODEVICE), Arrays.asList(noDeviceAccount));
|
||||
|
||||
verify(noDeviceAccount, times(1)).getMasterDevice();
|
||||
|
@ -188,7 +189,7 @@ public class ActiveUserCounterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCrawlChunkMixedAccount() {
|
||||
public void testCrawlChunkMixedAccount() throws AccountDatabaseCrawlerRestartException {
|
||||
activeUserCounter.onCrawlChunk(Optional.of(NUMBER_IOS), Arrays.asList(iosAccount, androidAccount, noDeviceAccount));
|
||||
|
||||
verify(iosAccount, times(1)).getMasterDevice();
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.whispersystems.textsecuregcm.entities.ClientContact;
|
|||
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest;
|
||||
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException;
|
||||
import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle;
|
||||
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
|
||||
import org.whispersystems.textsecuregcm.storage.DirectoryReconciler;
|
||||
|
@ -63,7 +64,7 @@ public class DirectoryReconcilerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCrawlChunkValid() {
|
||||
public void testCrawlChunkValid() throws AccountDatabaseCrawlerRestartException {
|
||||
when(reconciliationClient.sendChunk(any())).thenReturn(successResponse);
|
||||
directoryReconciler.onCrawlChunk(Optional.of(VALID_NUMBER), Arrays.asList(activeAccount, inactiveAccount));
|
||||
|
||||
|
|
Loading…
Reference in New Issue