Add `AccountDatabaseCrawler.dedicatedDynamoMigrationCrawler`
This commit is contained in:
parent
ef0900f3ac
commit
ecee189ad8
|
@ -552,6 +552,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
config.getDynamoDbMigrationCrawlerConfiguration().getChunkIntervalMs(),
|
config.getDynamoDbMigrationCrawlerConfiguration().getChunkIntervalMs(),
|
||||||
accountsCrawlerChunkPreReadExecutor,
|
accountsCrawlerChunkPreReadExecutor,
|
||||||
dynamicConfigurationManager);
|
dynamicConfigurationManager);
|
||||||
|
accountDynamoDbMigrationCrawler.setDedicatedDynamoMigrationCrawler(true);
|
||||||
|
|
||||||
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccountsManager, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
|
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccountsManager, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
|
||||||
MigrationRetryAccountsTableCrawler migrationRetryAccountsTableCrawler = new MigrationRetryAccountsTableCrawler(
|
MigrationRetryAccountsTableCrawler migrationRetryAccountsTableCrawler = new MigrationRetryAccountsTableCrawler(
|
||||||
|
|
|
@ -48,6 +48,9 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
||||||
private AtomicBoolean running = new AtomicBoolean(false);
|
private AtomicBoolean running = new AtomicBoolean(false);
|
||||||
private boolean finished;
|
private boolean finished;
|
||||||
|
|
||||||
|
// temporary to control behavior during the Postgres → Dynamo transition
|
||||||
|
private boolean dedicatedDynamoMigrationCrawler;
|
||||||
|
|
||||||
public AccountDatabaseCrawler(AccountsManager accounts,
|
public AccountDatabaseCrawler(AccountsManager accounts,
|
||||||
AccountDatabaseCrawlerCache cache,
|
AccountDatabaseCrawlerCache cache,
|
||||||
List<AccountDatabaseCrawlerListener> listeners,
|
List<AccountDatabaseCrawlerListener> listeners,
|
||||||
|
@ -128,7 +131,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
||||||
|
|
||||||
try (Timer.Context timer = processChunkTimer.time()) {
|
try (Timer.Context timer = processChunkTimer.time()) {
|
||||||
|
|
||||||
final boolean useDynamo = dynamicConfigurationManager.getConfiguration()
|
final boolean useDynamo = !dedicatedDynamoMigrationCrawler && dynamicConfigurationManager.getConfiguration()
|
||||||
.getAccountsDynamoDbMigrationConfiguration()
|
.getAccountsDynamoDbMigrationConfiguration()
|
||||||
.isDynamoCrawlerEnabled();
|
.isDynamoCrawlerEnabled();
|
||||||
|
|
||||||
|
@ -212,6 +215,10 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setDedicatedDynamoMigrationCrawler(final boolean dedicatedDynamoMigrationCrawler) {
|
||||||
|
this.dedicatedDynamoMigrationCrawler = dedicatedDynamoMigrationCrawler;
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void sleepWhileRunning(long delayMs) {
|
private synchronized void sleepWhileRunning(long delayMs) {
|
||||||
if (running.get()) Util.wait(this, delayMs);
|
if (running.get()) Util.wait(this, delayMs);
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,21 +5,38 @@
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.tests.storage;
|
package org.whispersystems.textsecuregcm.tests.storage;
|
||||||
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import org.junit.jupiter.params.provider.ValueSource;
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration;
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import org.whispersystems.textsecuregcm.storage.*;
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyNoInteractions;
|
||||||
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
|
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import static org.mockito.ArgumentMatchers.*;
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
import static org.mockito.Mockito.*;
|
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration;
|
||||||
|
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.AccountCrawlChunk;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||||
|
|
||||||
class AccountDatabaseCrawlerTest {
|
class AccountDatabaseCrawlerTest {
|
||||||
|
|
||||||
|
@ -132,7 +149,43 @@ class AccountDatabaseCrawlerTest {
|
||||||
verify(cache, times(1)).isAccelerated();
|
verify(cache, times(1)).isAccelerated();
|
||||||
verify(cache, times(1)).releaseActiveWork(any(String.class));
|
verify(cache, times(1)).releaseActiveWork(any(String.class));
|
||||||
|
|
||||||
verifyZeroInteractions(account1);
|
verifyNoInteractions(account1);
|
||||||
|
|
||||||
|
verifyNoMoreInteractions(account2);
|
||||||
|
verifyNoMoreInteractions(accounts);
|
||||||
|
verifyNoMoreInteractions(listener);
|
||||||
|
verifyNoMoreInteractions(cache);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(booleans = {true, false})
|
||||||
|
void testCrawlChunk_useDynamoDedicatedMigrationCrawler(final boolean dedicatedMigrationCrawler) throws Exception {
|
||||||
|
crawler.setDedicatedDynamoMigrationCrawler(dedicatedMigrationCrawler);
|
||||||
|
|
||||||
|
when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(true);
|
||||||
|
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1));
|
||||||
|
when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT1));
|
||||||
|
|
||||||
|
boolean accelerated = crawler.doPeriodicWork();
|
||||||
|
assertThat(accelerated).isFalse();
|
||||||
|
|
||||||
|
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
|
||||||
|
verify(cache, times(dedicatedMigrationCrawler ? 1 : 0)).getLastUuid();
|
||||||
|
verify(cache, times(dedicatedMigrationCrawler ? 0 : 1)).getLastUuidDynamo();
|
||||||
|
if (dedicatedMigrationCrawler) {
|
||||||
|
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
|
||||||
|
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
|
||||||
|
} else {
|
||||||
|
verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE));
|
||||||
|
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE));
|
||||||
|
}
|
||||||
|
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
|
||||||
|
verify(cache, times(dedicatedMigrationCrawler ? 1 : 0)).setLastUuid(eq(Optional.of(ACCOUNT2)));
|
||||||
|
verify(cache, times(dedicatedMigrationCrawler ? 0 : 1)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2)));
|
||||||
|
verify(cache, times(1)).isAccelerated();
|
||||||
|
verify(cache, times(1)).releaseActiveWork(any(String.class));
|
||||||
|
|
||||||
|
verifyNoInteractions(account1);
|
||||||
|
|
||||||
verifyNoMoreInteractions(account2);
|
verifyNoMoreInteractions(account2);
|
||||||
verifyNoMoreInteractions(accounts);
|
verifyNoMoreInteractions(accounts);
|
||||||
|
|
Loading…
Reference in New Issue