Add asynchronous chunk pre-read to `AccountDatabaseCrawler`
This commit is contained in:
parent
624e40e3b7
commit
b91a69d8b3
|
@ -400,7 +400,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
ExecutorService backupServiceExecutor = environment.lifecycle().executorService(name(getClass(), "backupService-%d")).maxThreads(1).minThreads(1).build();
|
||||
ExecutorService storageServiceExecutor = environment.lifecycle().executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
|
||||
ExecutorService donationExecutor = environment.lifecycle().executorService(name(getClass(), "donation-%d")).maxThreads(1).minThreads(1).build();
|
||||
ExecutorService multiRecipientMessageExecutor = environment.lifecycle().executorService(name(getClass(), "multiRecipientMessage-%d")).minThreads(64).maxThreads(64).build();
|
||||
ExecutorService multiRecipientMessageExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "multiRecipientMessage-%d")).minThreads(64).maxThreads(64).build();
|
||||
ExecutorService accountsCrawlerChunkPreReadExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "accountsCrawler-%d")).maxThreads(2).minThreads(2).build();
|
||||
|
||||
ExternalServiceCredentialGenerator directoryCredentialsGenerator = new ExternalServiceCredentialGenerator(config.getDirectoryConfiguration().getDirectoryClientConfiguration().getUserAuthenticationTokenSharedSecret(),
|
||||
config.getDirectoryConfiguration().getDirectoryClientConfiguration().getUserAuthenticationTokenUserIdSecret(),
|
||||
|
@ -504,6 +507,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners,
|
||||
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
|
||||
config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs(),
|
||||
accountsCrawlerChunkPreReadExecutor,
|
||||
dynamicConfigurationManager);
|
||||
|
||||
AccountDatabaseCrawlerCache dynamoDbMigrationCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster);
|
||||
|
@ -513,6 +517,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
List.of(new AccountsDynamoDbMigrator(accountsDynamoDb, dynamicConfigurationManager)),
|
||||
config.getDynamoDbMigrationCrawlerConfiguration().getChunkSize(),
|
||||
config.getDynamoDbMigrationCrawlerConfiguration().getChunkIntervalMs(),
|
||||
accountsCrawlerChunkPreReadExecutor,
|
||||
dynamicConfigurationManager);
|
||||
|
||||
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccountsManager, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
|
||||
|
|
|
@ -23,6 +23,9 @@ public class DynamicAccountsDynamoDbMigrationConfiguration {
|
|||
@JsonProperty
|
||||
boolean logMismatches;
|
||||
|
||||
@JsonProperty
|
||||
boolean crawlerPreReadNextChunkEnabled;
|
||||
|
||||
@JsonProperty
|
||||
boolean dynamoCrawlerEnabled;
|
||||
|
||||
|
@ -71,6 +74,10 @@ public class DynamicAccountsDynamoDbMigrationConfiguration {
|
|||
return logMismatches;
|
||||
}
|
||||
|
||||
public boolean isCrawlerPreReadNextChunkEnabled() {
|
||||
return crawlerPreReadNextChunkEnabled;
|
||||
}
|
||||
|
||||
public boolean isDynamoCrawlerEnabled() {
|
||||
return dynamoCrawlerEnabled;
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import io.dropwizard.lifecycle.Managed;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -26,6 +27,8 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
|||
private static final Logger logger = LoggerFactory.getLogger(AccountDatabaseCrawler.class);
|
||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private static final Timer readChunkTimer = metricRegistry.timer(name(AccountDatabaseCrawler.class, "readChunk"));
|
||||
private static final Timer preReadChunkTimer = metricRegistry.timer(
|
||||
name(AccountDatabaseCrawler.class, "preReadChunk"));
|
||||
private static final Timer processChunkTimer = metricRegistry.timer(
|
||||
name(AccountDatabaseCrawler.class, "processChunk"));
|
||||
|
||||
|
@ -38,6 +41,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
|||
private final String workerId;
|
||||
private final AccountDatabaseCrawlerCache cache;
|
||||
private final List<AccountDatabaseCrawlerListener> listeners;
|
||||
private final ExecutorService chunkPreReadExecutorService;
|
||||
|
||||
private final DynamicConfigurationManager dynamicConfigurationManager;
|
||||
|
||||
|
@ -45,18 +49,19 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
|||
private boolean finished;
|
||||
|
||||
public AccountDatabaseCrawler(AccountsManager accounts,
|
||||
AccountDatabaseCrawlerCache cache,
|
||||
List<AccountDatabaseCrawlerListener> listeners,
|
||||
int chunkSize,
|
||||
long chunkIntervalMs,
|
||||
DynamicConfigurationManager dynamicConfigurationManager)
|
||||
{
|
||||
this.accounts = accounts;
|
||||
this.chunkSize = chunkSize;
|
||||
this.chunkIntervalMs = chunkIntervalMs;
|
||||
this.workerId = UUID.randomUUID().toString();
|
||||
this.cache = cache;
|
||||
this.listeners = listeners;
|
||||
AccountDatabaseCrawlerCache cache,
|
||||
List<AccountDatabaseCrawlerListener> listeners,
|
||||
int chunkSize,
|
||||
long chunkIntervalMs,
|
||||
ExecutorService chunkPreReadExecutorService,
|
||||
DynamicConfigurationManager dynamicConfigurationManager) {
|
||||
this.accounts = accounts;
|
||||
this.chunkSize = chunkSize;
|
||||
this.chunkIntervalMs = chunkIntervalMs;
|
||||
this.workerId = UUID.randomUUID().toString();
|
||||
this.cache = cache;
|
||||
this.listeners = listeners;
|
||||
this.chunkPreReadExecutorService = chunkPreReadExecutorService;
|
||||
|
||||
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
||||
}
|
||||
|
@ -136,6 +141,8 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
|||
|
||||
final AccountCrawlChunk chunkAccounts = readChunk(fromUuid, chunkSize, useDynamo);
|
||||
|
||||
primeDatabaseForNextChunkAsync(chunkAccounts.getLastUuid(), chunkSize, useDynamo);
|
||||
|
||||
if (chunkAccounts.getAccounts().isEmpty()) {
|
||||
logger.info("Finished crawl");
|
||||
listeners.forEach(listener -> listener.onCrawlEnd(fromUuid));
|
||||
|
@ -156,8 +163,26 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is an optimization based on the observation that cold reads of chunks are slow, but subsequent reads of the
|
||||
* same chunk (within a few minutes) are fast. We can’t easily store the actual result data, since the next chunk
|
||||
* might be processed elsewhere, but the time savings are still substantial.
|
||||
*/
|
||||
private void primeDatabaseForNextChunkAsync(Optional<UUID> fromUuid, int chunkSize, boolean useDynamo) {
|
||||
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
|
||||
.isCrawlerPreReadNextChunkEnabled()) {
|
||||
if (!useDynamo && fromUuid.isPresent()) {
|
||||
chunkPreReadExecutorService.submit(() -> readChunk(fromUuid, chunkSize, false, preReadChunkTimer));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private AccountCrawlChunk readChunk(Optional<UUID> fromUuid, int chunkSize, boolean useDynamo) {
|
||||
try (Timer.Context timer = readChunkTimer.time()) {
|
||||
return readChunk(fromUuid, chunkSize, useDynamo, readChunkTimer);
|
||||
}
|
||||
|
||||
private AccountCrawlChunk readChunk(Optional<UUID> fromUuid, int chunkSize, boolean useDynamo, Timer readTimer) {
|
||||
try (Timer.Context timer = readTimer.time()) {
|
||||
|
||||
if (fromUuid.isPresent()) {
|
||||
return useDynamo
|
||||
|
|
|
@ -18,6 +18,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration;
|
||||
|
@ -67,7 +68,8 @@ public class AccountDatabaseCrawlerIntegrationTest extends AbstractRedisClusterT
|
|||
when(dynamicConfiguration.getAccountsDynamoDbMigrationConfiguration()).thenReturn(mock(DynamicAccountsDynamoDbMigrationConfiguration.class));
|
||||
|
||||
final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(getRedisCluster());
|
||||
accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS, dynamicConfigurationManager);
|
||||
accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE,
|
||||
CHUNK_INTERVAL_MS, mock(ExecutorService.class), dynamicConfigurationManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -19,6 +19,7 @@ import com.opentable.db.postgres.junit5.PreparedDbExtension;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -37,6 +38,7 @@ import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
|
|||
import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient;
|
||||
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
|
||||
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
|
||||
import org.whispersystems.textsecuregcm.tests.util.SynchronousExecutorService;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
|
||||
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
|
||||
|
@ -217,8 +219,12 @@ class AccountsDynamoDbMigrationCrawlerIntegrationTest {
|
|||
final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(
|
||||
REDIS_CLUSTER_EXTENSION.getRedisCluster());
|
||||
|
||||
accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(dynamoDbMigrator, pushFeedbackProcessor), CHUNK_SIZE,
|
||||
CHUNK_INTERVAL_MS, dynamicConfigurationManager);
|
||||
// Using a synchronous service doesn’t meaningfully impact the test
|
||||
final ExecutorService chunkPreReadExecutorService = new SynchronousExecutorService();
|
||||
|
||||
accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache,
|
||||
List.of(dynamoDbMigrator, pushFeedbackProcessor), CHUNK_SIZE,
|
||||
CHUNK_INTERVAL_MS, chunkPreReadExecutorService, dynamicConfigurationManager);
|
||||
}
|
||||
|
||||
void createAdditionalDynamoDbTables() {
|
||||
|
|
|
@ -5,55 +5,43 @@
|
|||
|
||||
package org.whispersystems.textsecuregcm.tests.storage;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountCrawlChunk;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
import org.whispersystems.textsecuregcm.storage.*;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
class AccountDatabaseCrawlerTest {
|
||||
|
||||
private static final UUID ACCOUNT1 = UUID.randomUUID();
|
||||
private static final UUID ACCOUNT2 = UUID.randomUUID();
|
||||
|
||||
private static final int CHUNK_SIZE = 1000;
|
||||
private static final int CHUNK_SIZE = 1000;
|
||||
private static final long CHUNK_INTERVAL_MS = 30_000L;
|
||||
|
||||
private final Account account1 = mock(Account.class);
|
||||
private final Account account2 = mock(Account.class);
|
||||
|
||||
private final AccountsManager accounts = mock(AccountsManager.class);
|
||||
private final AccountsManager accounts = mock(AccountsManager.class);
|
||||
private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class);
|
||||
private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class);
|
||||
private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class);
|
||||
|
||||
private final ExecutorService chunkPreReadExecutorService = mock(ExecutorService.class);
|
||||
|
||||
private final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
|
||||
|
||||
private final AccountDatabaseCrawler crawler = new AccountDatabaseCrawler(accounts, cache, Arrays.asList(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS, dynamicConfigurationManager);
|
||||
private final AccountDatabaseCrawler crawler = new AccountDatabaseCrawler(accounts, cache, Arrays.asList(listener),
|
||||
CHUNK_SIZE, CHUNK_INTERVAL_MS, chunkPreReadExecutorService, dynamicConfigurationManager);
|
||||
private DynamicAccountsDynamoDbMigrationConfiguration dynamicAccountsDynamoDbMigrationConfiguration;
|
||||
|
||||
@BeforeEach
|
||||
|
@ -62,12 +50,16 @@ class AccountDatabaseCrawlerTest {
|
|||
when(account2.getUuid()).thenReturn(ACCOUNT2);
|
||||
|
||||
when(accounts.getAllFrom(anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account1, account2), ACCOUNT2));
|
||||
when(accounts.getAllFrom(eq(ACCOUNT1), anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2));
|
||||
when(accounts.getAllFrom(eq(ACCOUNT1), anyInt())).thenReturn(
|
||||
new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2));
|
||||
when(accounts.getAllFrom(eq(ACCOUNT2), anyInt())).thenReturn(new AccountCrawlChunk(Collections.emptyList(), null));
|
||||
|
||||
when(accounts.getAllFromDynamo(anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account1, account2), ACCOUNT2));
|
||||
when(accounts.getAllFromDynamo(eq(ACCOUNT1), anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2));
|
||||
when(accounts.getAllFromDynamo(eq(ACCOUNT2), anyInt())).thenReturn(new AccountCrawlChunk(Collections.emptyList(), null));
|
||||
when(accounts.getAllFromDynamo(anyInt())).thenReturn(
|
||||
new AccountCrawlChunk(Arrays.asList(account1, account2), ACCOUNT2));
|
||||
when(accounts.getAllFromDynamo(eq(ACCOUNT1), anyInt())).thenReturn(
|
||||
new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2));
|
||||
when(accounts.getAllFromDynamo(eq(ACCOUNT2), anyInt())).thenReturn(
|
||||
new AccountCrawlChunk(Collections.emptyList(), null));
|
||||
|
||||
when(cache.claimActiveWork(any(), anyLong())).thenReturn(true);
|
||||
when(cache.isAccelerated()).thenReturn(false);
|
||||
|
@ -75,7 +67,8 @@ class AccountDatabaseCrawlerTest {
|
|||
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
|
||||
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
|
||||
dynamicAccountsDynamoDbMigrationConfiguration = mock(DynamicAccountsDynamoDbMigrationConfiguration.class);
|
||||
when(dynamicConfiguration.getAccountsDynamoDbMigrationConfiguration()).thenReturn(dynamicAccountsDynamoDbMigrationConfiguration);
|
||||
when(dynamicConfiguration.getAccountsDynamoDbMigrationConfiguration()).thenReturn(
|
||||
dynamicAccountsDynamoDbMigrationConfiguration);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
|
Loading…
Reference in New Issue