diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index dc1162ab1..de0ce4b08 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -552,6 +552,7 @@ public class WhisperServerService extends Application listeners, @@ -128,7 +131,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable { try (Timer.Context timer = processChunkTimer.time()) { - final boolean useDynamo = dynamicConfigurationManager.getConfiguration() + final boolean useDynamo = !dedicatedDynamoMigrationCrawler && dynamicConfigurationManager.getConfiguration() .getAccountsDynamoDbMigrationConfiguration() .isDynamoCrawlerEnabled(); @@ -212,6 +215,10 @@ public class AccountDatabaseCrawler implements Managed, Runnable { } } + public void setDedicatedDynamoMigrationCrawler(final boolean dedicatedDynamoMigrationCrawler) { + this.dedicatedDynamoMigrationCrawler = dedicatedDynamoMigrationCrawler; + } + private synchronized void sleepWhileRunning(long delayMs) { if (running.get()) Util.wait(this, delayMs); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java index e489c4f8c..6b2743d7b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java @@ -5,21 +5,38 @@ package org.whispersystems.textsecuregcm.tests.storage; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; -import org.whispersystems.textsecuregcm.storage.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + import java.util.Arrays; import java.util.Collections; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.*; -import static org.mockito.Mockito.*; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountCrawlChunk; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; class AccountDatabaseCrawlerTest { @@ -132,7 +149,43 @@ class AccountDatabaseCrawlerTest { verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); - verifyZeroInteractions(account1); + verifyNoInteractions(account1); + + verifyNoMoreInteractions(account2); + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(cache); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCrawlChunk_useDynamoDedicatedMigrationCrawler(final boolean dedicatedMigrationCrawler) throws Exception { + crawler.setDedicatedDynamoMigrationCrawler(dedicatedMigrationCrawler); + + when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(true); + when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1)); + when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT1)); + + boolean accelerated = crawler.doPeriodicWork(); + assertThat(accelerated).isFalse(); + + verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); + verify(cache, times(dedicatedMigrationCrawler ? 1 : 0)).getLastUuid(); + verify(cache, times(dedicatedMigrationCrawler ? 0 : 1)).getLastUuidDynamo(); + if (dedicatedMigrationCrawler) { + verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); + } else { + verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE)); + } + verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); + verify(cache, times(dedicatedMigrationCrawler ? 1 : 0)).setLastUuid(eq(Optional.of(ACCOUNT2))); + verify(cache, times(dedicatedMigrationCrawler ? 0 : 1)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2))); + verify(cache, times(1)).isAccelerated(); + verify(cache, times(1)).releaseActiveWork(any(String.class)); + + verifyNoInteractions(account1); verifyNoMoreInteractions(account2); verifyNoMoreInteractions(accounts);