diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 8cf949d69..52f8fec54 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -400,7 +400,10 @@ public class WhisperServerService extends Application listeners; + private final ExecutorService chunkPreReadExecutorService; private final DynamicConfigurationManager dynamicConfigurationManager; @@ -45,18 +49,19 @@ public class AccountDatabaseCrawler implements Managed, Runnable { private boolean finished; public AccountDatabaseCrawler(AccountsManager accounts, - AccountDatabaseCrawlerCache cache, - List listeners, - int chunkSize, - long chunkIntervalMs, - DynamicConfigurationManager dynamicConfigurationManager) - { - this.accounts = accounts; - this.chunkSize = chunkSize; - this.chunkIntervalMs = chunkIntervalMs; - this.workerId = UUID.randomUUID().toString(); - this.cache = cache; - this.listeners = listeners; + AccountDatabaseCrawlerCache cache, + List listeners, + int chunkSize, + long chunkIntervalMs, + ExecutorService chunkPreReadExecutorService, + DynamicConfigurationManager dynamicConfigurationManager) { + this.accounts = accounts; + this.chunkSize = chunkSize; + this.chunkIntervalMs = chunkIntervalMs; + this.workerId = UUID.randomUUID().toString(); + this.cache = cache; + this.listeners = listeners; + this.chunkPreReadExecutorService = chunkPreReadExecutorService; this.dynamicConfigurationManager = dynamicConfigurationManager; } @@ -136,6 +141,8 @@ public class AccountDatabaseCrawler implements Managed, Runnable { final AccountCrawlChunk chunkAccounts = readChunk(fromUuid, chunkSize, useDynamo); + primeDatabaseForNextChunkAsync(chunkAccounts.getLastUuid(), chunkSize, useDynamo); + if (chunkAccounts.getAccounts().isEmpty()) { logger.info("Finished crawl"); listeners.forEach(listener -> listener.onCrawlEnd(fromUuid)); @@ -156,8 +163,26 @@ public class AccountDatabaseCrawler implements Managed, Runnable { } } + /** + * This is an optimization based on the observation that cold reads of chunks are slow, but subsequent reads of the + * same chunk (within a few minutes) are fast. We can’t easily store the actual result data, since the next chunk + * might be processed elsewhere, but the time savings are still substantial. + */ + private void primeDatabaseForNextChunkAsync(Optional fromUuid, int chunkSize, boolean useDynamo) { + if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration() + .isCrawlerPreReadNextChunkEnabled()) { + if (!useDynamo && fromUuid.isPresent()) { + chunkPreReadExecutorService.submit(() -> readChunk(fromUuid, chunkSize, false, preReadChunkTimer)); + } + } + } + private AccountCrawlChunk readChunk(Optional fromUuid, int chunkSize, boolean useDynamo) { - try (Timer.Context timer = readChunkTimer.time()) { + return readChunk(fromUuid, chunkSize, useDynamo, readChunkTimer); + } + + private AccountCrawlChunk readChunk(Optional fromUuid, int chunkSize, boolean useDynamo, Timer readTimer) { + try (Timer.Context timer = readTimer.time()) { if (fromUuid.isPresent()) { return useDynamo diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java index 15dbf5a90..4fcad83f7 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java @@ -18,6 +18,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutorService; import org.junit.Before; import org.junit.Test; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration; @@ -67,7 +68,8 @@ public class AccountDatabaseCrawlerIntegrationTest extends AbstractRedisClusterT when(dynamicConfiguration.getAccountsDynamoDbMigrationConfiguration()).thenReturn(mock(DynamicAccountsDynamoDbMigrationConfiguration.class)); final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(getRedisCluster()); - accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS, dynamicConfigurationManager); + accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE, + CHUNK_INTERVAL_MS, mock(ExecutorService.class), dynamicConfigurationManager); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrationCrawlerIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrationCrawlerIntegrationTest.java index a5d4c1b8e..f9e2ad22d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrationCrawlerIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrationCrawlerIntegrationTest.java @@ -19,6 +19,7 @@ import com.opentable.db.postgres.junit5.PreparedDbExtension; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -37,6 +38,7 @@ import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; +import org.whispersystems.textsecuregcm.tests.util.SynchronousExecutorService; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex; @@ -217,8 +219,12 @@ class AccountsDynamoDbMigrationCrawlerIntegrationTest { final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache( REDIS_CLUSTER_EXTENSION.getRedisCluster()); - accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(dynamoDbMigrator, pushFeedbackProcessor), CHUNK_SIZE, - CHUNK_INTERVAL_MS, dynamicConfigurationManager); + // Using a synchronous service doesn’t meaningfully impact the test + final ExecutorService chunkPreReadExecutorService = new SynchronousExecutorService(); + + accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, + List.of(dynamoDbMigrator, pushFeedbackProcessor), CHUNK_SIZE, + CHUNK_INTERVAL_MS, chunkPreReadExecutorService, dynamicConfigurationManager); } void createAdditionalDynamoDbTables() { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java index e2dc9a526..e489c4f8c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java @@ -5,55 +5,43 @@ package org.whispersystems.textsecuregcm.tests.storage; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Optional; -import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; -import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.AccountCrawlChunk; -import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler; -import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache; -import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener; -import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException; -import org.whispersystems.textsecuregcm.storage.AccountsManager; -import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; +import org.whispersystems.textsecuregcm.storage.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; class AccountDatabaseCrawlerTest { private static final UUID ACCOUNT1 = UUID.randomUUID(); private static final UUID ACCOUNT2 = UUID.randomUUID(); - private static final int CHUNK_SIZE = 1000; + private static final int CHUNK_SIZE = 1000; private static final long CHUNK_INTERVAL_MS = 30_000L; private final Account account1 = mock(Account.class); private final Account account2 = mock(Account.class); - private final AccountsManager accounts = mock(AccountsManager.class); + private final AccountsManager accounts = mock(AccountsManager.class); private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class); - private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class); + private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class); + + private final ExecutorService chunkPreReadExecutorService = mock(ExecutorService.class); private final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class); - private final AccountDatabaseCrawler crawler = new AccountDatabaseCrawler(accounts, cache, Arrays.asList(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS, dynamicConfigurationManager); + private final AccountDatabaseCrawler crawler = new AccountDatabaseCrawler(accounts, cache, Arrays.asList(listener), + CHUNK_SIZE, CHUNK_INTERVAL_MS, chunkPreReadExecutorService, dynamicConfigurationManager); private DynamicAccountsDynamoDbMigrationConfiguration dynamicAccountsDynamoDbMigrationConfiguration; @BeforeEach @@ -62,12 +50,16 @@ class AccountDatabaseCrawlerTest { when(account2.getUuid()).thenReturn(ACCOUNT2); when(accounts.getAllFrom(anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account1, account2), ACCOUNT2)); - when(accounts.getAllFrom(eq(ACCOUNT1), anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2)); + when(accounts.getAllFrom(eq(ACCOUNT1), anyInt())).thenReturn( + new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2)); when(accounts.getAllFrom(eq(ACCOUNT2), anyInt())).thenReturn(new AccountCrawlChunk(Collections.emptyList(), null)); - when(accounts.getAllFromDynamo(anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account1, account2), ACCOUNT2)); - when(accounts.getAllFromDynamo(eq(ACCOUNT1), anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2)); - when(accounts.getAllFromDynamo(eq(ACCOUNT2), anyInt())).thenReturn(new AccountCrawlChunk(Collections.emptyList(), null)); + when(accounts.getAllFromDynamo(anyInt())).thenReturn( + new AccountCrawlChunk(Arrays.asList(account1, account2), ACCOUNT2)); + when(accounts.getAllFromDynamo(eq(ACCOUNT1), anyInt())).thenReturn( + new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2)); + when(accounts.getAllFromDynamo(eq(ACCOUNT2), anyInt())).thenReturn( + new AccountCrawlChunk(Collections.emptyList(), null)); when(cache.claimActiveWork(any(), anyLong())).thenReturn(true); when(cache.isAccelerated()).thenReturn(false); @@ -75,7 +67,8 @@ class AccountDatabaseCrawlerTest { final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); dynamicAccountsDynamoDbMigrationConfiguration = mock(DynamicAccountsDynamoDbMigrationConfiguration.class); - when(dynamicConfiguration.getAccountsDynamoDbMigrationConfiguration()).thenReturn(dynamicAccountsDynamoDbMigrationConfiguration); + when(dynamicConfiguration.getAccountsDynamoDbMigrationConfiguration()).thenReturn( + dynamicAccountsDynamoDbMigrationConfiguration); } @ParameterizedTest