From 8d1c26d07d406eccf5499feab051086cea8e5fe0 Mon Sep 17 00:00:00 2001 From: Chris Eager <79161849+eager-signal@users.noreply.github.com> Date: Wed, 17 May 2023 12:22:49 -0500 Subject: [PATCH] Add `CrawlAccountsCommand` --- .../textsecuregcm/WhisperServerService.java | 11 +- ...icAccountDatabaseCrawlerConfiguration.java | 10 ++ .../dynamic/DynamicConfiguration.java | 9 ++ .../textsecuregcm/storage/AccountCleaner.java | 2 +- .../storage/AccountDatabaseCrawler.java | 63 ++++++++- .../AccountDatabaseCrawlerListener.java | 10 +- .../NonNormalizedAccountCrawlerListener.java | 2 +- .../storage/PushFeedbackProcessor.java | 3 +- .../workers/CommandDependencies.java | 8 +- .../workers/CrawlAccountsCommand.java | 132 ++++++++++++++++++ .../storage/AccountCleanerTest.java | 5 +- ...AccountDatabaseCrawlerIntegrationTest.java | 31 +++- .../storage/AccountDatabaseCrawlerTest.java | 41 +++++- 13 files changed, 301 insertions(+), 26 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicAccountDatabaseCrawlerConfiguration.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/workers/CrawlAccountsCommand.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 91ead2faf..300c08417 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -214,6 +214,7 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; import org.whispersystems.textsecuregcm.workers.AssignUsernameCommand; import org.whispersystems.textsecuregcm.workers.CertificateCommand; import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand; +import org.whispersystems.textsecuregcm.workers.CrawlAccountsCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; import org.whispersystems.textsecuregcm.workers.ServerVersionCommand; import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask; @@ -245,6 +246,7 @@ public class WhisperServerService extends Application getExperimentEnrollmentConfiguration( final String experimentName) { return Optional.ofNullable(experiments.get(experimentName)); @@ -97,4 +102,8 @@ public class DynamicConfiguration { public DynamicRateLimitPolicy getRateLimitPolicy() { return rateLimitPolicy; } + + public DynamicAccountDatabaseCrawlerConfiguration getAccountDatabaseCrawlerConfiguration() { + return accountDatabaseCrawler; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java index 7e5340236..7f7d91acb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java @@ -37,7 +37,7 @@ public class AccountCleaner extends AccountDatabaseCrawlerListener { } @Override - public void onCrawlEnd(Optional fromUuid) { + public void onCrawlEnd() { } @Override diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java index 8e56db900..5a5270ce8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java @@ -18,6 +18,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Util; @@ -40,6 +41,8 @@ public class AccountDatabaseCrawler implements Managed, Runnable { private final AccountDatabaseCrawlerCache cache; private final List listeners; + private final DynamicConfigurationManager dynamicConfigurationManager; + private AtomicBoolean running = new AtomicBoolean(false); private boolean finished; @@ -47,14 +50,15 @@ public class AccountDatabaseCrawler implements Managed, Runnable { AccountsManager accounts, AccountDatabaseCrawlerCache cache, List listeners, - int chunkSize) { + int chunkSize, + DynamicConfigurationManager dynamicConfigurationManager) { this.name = name; this.accounts = accounts; this.chunkSize = chunkSize; this.workerId = UUID.randomUUID().toString(); this.cache = cache; this.listeners = listeners; - + this.dynamicConfigurationManager = dynamicConfigurationManager; } @Override @@ -91,8 +95,61 @@ public class AccountDatabaseCrawler implements Managed, Runnable { } } + public void crawlAllAccounts() { + if (!cache.claimActiveWork(workerId, WORKER_TTL_MS)) { + logger.info("Did not claim active work"); + return; + } + try { + Optional fromUuid = cache.getLastUuid(); + + if (fromUuid.isEmpty()) { + logger.info("{}: Started crawl", name); + listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart); + } else { + logger.info("{}: Resuming crawl", name); + } + + try { + AccountCrawlChunk chunkAccounts; + do { + if (!dynamicConfigurationManager.getConfiguration().getAccountDatabaseCrawlerConfiguration() + .crawlAllEnabled()) { + logger.warn("Exiting crawl - not enabled by dynamic configuration"); + return; + } + try (Timer.Context timer = processChunkTimer.time()) { + logger.debug("{}: Processing chunk", name); + chunkAccounts = readChunk(fromUuid, chunkSize); + + for (AccountDatabaseCrawlerListener listener : listeners) { + listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts.getAccounts()); + } + fromUuid = chunkAccounts.getLastUuid(); + cacheLastUuid(fromUuid); + } + + } while (!chunkAccounts.getAccounts().isEmpty()); + + logger.info("{}: Finished crawl", name); + listeners.forEach(AccountDatabaseCrawlerListener::onCrawlEnd); + + } catch (AccountDatabaseCrawlerRestartException e) { + logger.warn("Crawl stopped", e); + } + + } finally { + cache.releaseActiveWork(workerId); + } + } + @VisibleForTesting public void doPeriodicWork() { + if (!dynamicConfigurationManager.getConfiguration().getAccountDatabaseCrawlerConfiguration() + .periodicWorkEnabled()) { + return; + } + if (cache.claimActiveWork(workerId, WORKER_TTL_MS)) { try { processChunk(); @@ -117,7 +174,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable { if (chunkAccounts.getAccounts().isEmpty()) { logger.info("{}: Finished crawl", name); - listeners.forEach(listener -> listener.onCrawlEnd(fromUuid)); + listeners.forEach(AccountDatabaseCrawlerListener::onCrawlEnd); cacheLastUuid(Optional.empty()); } else { logger.debug("{}: Processing chunk", name); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java index abfbb64e3..307dbff79 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java @@ -4,16 +4,14 @@ */ package org.whispersystems.textsecuregcm.storage; +import static com.codahale.metrics.MetricRegistry.name; + import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; - -import org.whispersystems.textsecuregcm.util.Constants; - import java.util.List; import java.util.Optional; import java.util.UUID; - -import static com.codahale.metrics.MetricRegistry.name; +import org.whispersystems.textsecuregcm.util.Constants; @SuppressWarnings("OptionalUsedAsFieldOrParameterType") public abstract class AccountDatabaseCrawlerListener { @@ -22,7 +20,7 @@ public abstract class AccountDatabaseCrawlerListener { abstract public void onCrawlStart(); - abstract public void onCrawlEnd(Optional fromUuid); + abstract public void onCrawlEnd(); abstract protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) throws AccountDatabaseCrawlerRestartException; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/NonNormalizedAccountCrawlerListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/NonNormalizedAccountCrawlerListener.java index bc69e9904..00598d62c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/NonNormalizedAccountCrawlerListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/NonNormalizedAccountCrawlerListener.java @@ -88,7 +88,7 @@ public class NonNormalizedAccountCrawlerListener extends AccountDatabaseCrawlerL } @Override - public void onCrawlEnd(final Optional fromUuid) { + public void onCrawlEnd() { final int normalizedNumbers = metricsCluster.withCluster(connection -> Integer.parseInt(connection.sync().get(NORMALIZED_NUMBER_COUNT_KEY))); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java index 0837032c8..32a408a46 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java @@ -33,7 +33,8 @@ public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener { public void onCrawlStart() {} @Override - public void onCrawlEnd(Optional toUuid) {} + public void onCrawlEnd() { + } @Override protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 4171e3068..0ccfdacae 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -66,7 +66,9 @@ record CommandDependencies( DeletedAccountsManager deletedAccountsManager, StoredVerificationCodeManager pendingAccountsManager, ClientPresenceManager clientPresenceManager, - Keys keys) { + Keys keys, + FaultTolerantRedisCluster cacheCluster, + ClientResources redisClusterClientResources) { static CommandDependencies build( final String name, @@ -206,7 +208,9 @@ record CommandDependencies( deletedAccountsManager, pendingAccountsManager, clientPresenceManager, - keys + keys, + cacheCluster, + redisClusterClientResources ); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CrawlAccountsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CrawlAccountsCommand.java new file mode 100644 index 000000000..3a9a1590d --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CrawlAccountsCommand.java @@ -0,0 +1,132 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.workers; + +import static com.codahale.metrics.MetricRegistry.name; + +import io.dropwizard.Application; +import io.dropwizard.cli.EnvironmentCommand; +import io.dropwizard.setup.Environment; +import java.util.List; +import java.util.concurrent.ExecutorService; +import net.sourceforge.argparse4j.inf.Argument; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.ArgumentType; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.storage.AccountCleaner; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; +import org.whispersystems.textsecuregcm.storage.NonNormalizedAccountCrawlerListener; +import org.whispersystems.textsecuregcm.storage.PushFeedbackProcessor; + +public class CrawlAccountsCommand extends EnvironmentCommand { + + private static final String CRAWL_TYPE = "crawlType"; + + public enum CrawlType implements ArgumentType { + GENERAL_PURPOSE, + ACCOUNT_CLEANER, + ; + + + @Override + public CrawlType convert(final ArgumentParser parser, final Argument arg, final String value) + throws ArgumentParserException { + return CrawlType.valueOf(value); + } + } + + public CrawlAccountsCommand() { + super(new Application<>() { + @Override + public void run(final WhisperServerConfiguration configuration, final Environment environment) throws Exception { + + } + }, "crawl-accounts", "Runs account crawler tasks"); + } + + @Override + public void configure(final Subparser subparser) { + super.configure(subparser); + subparser.addArgument("--crawl-type") + .type(CrawlType.class) + .dest(CRAWL_TYPE) + .required(true) + .help("The type of crawl to perform"); + } + + @Override + protected void run(final Environment environment, final Namespace namespace, + final WhisperServerConfiguration configuration) throws Exception { + + final CommandDependencies deps = CommandDependencies.build("account-crawler", environment, configuration); + final AccountsManager accountsManager = deps.accountsManager(); + + final FaultTolerantRedisCluster cacheCluster = deps.cacheCluster(); + final FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", + configuration.getMetricsClusterConfiguration(), deps.redisClusterClientResources()); + + // TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data + final List accountDatabaseCrawlerListeners = List.of( + new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster), + // PushFeedbackProcessor may update device properties + new PushFeedbackProcessor(accountsManager)); + + final DynamicConfigurationManager dynamicConfigurationManager = + new DynamicConfigurationManager<>(configuration.getAppConfig().getApplication(), + configuration.getAppConfig().getEnvironment(), + configuration.getAppConfig().getConfigurationName(), + DynamicConfiguration.class); + + final AccountDatabaseCrawler crawler = + + switch ((CrawlType) namespace.get(CRAWL_TYPE)) { + case GENERAL_PURPOSE -> { + final AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache( + cacheCluster, + AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX); + + yield new AccountDatabaseCrawler("General-purpose account crawler", + accountsManager, + accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, + configuration.getAccountDatabaseCrawlerConfiguration().getChunkSize(), + dynamicConfigurationManager + ); + } + case ACCOUNT_CLEANER -> { + final ExecutorService accountDeletionExecutor = environment.lifecycle() + .executorService(name(getClass(), "accountCleaner-%d")).maxThreads(16).minThreads(16).build(); + + final AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache( + cacheCluster, AccountDatabaseCrawlerCache.ACCOUNT_CLEANER_PREFIX); + + yield new AccountDatabaseCrawler("Account cleaner crawler", + accountsManager, + accountDatabaseCrawlerCache, + List.of(new AccountCleaner(accountsManager, accountDeletionExecutor)), + configuration.getAccountDatabaseCrawlerConfiguration().getChunkSize(), + dynamicConfigurationManager + ); + } + }; + + try { + crawler.crawlAllAccounts(); + } catch (final Exception e) { + LoggerFactory.getLogger(CrawlAccountsCommand.class).error("Error crawling accounts", e); + } + + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCleanerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCleanerTest.java index 0100983f6..7f3e6e822 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCleanerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCleanerTest.java @@ -80,8 +80,9 @@ class AccountCleanerTest { void testAccounts() throws AccountDatabaseCrawlerRestartException, InterruptedException { AccountCleaner accountCleaner = new AccountCleaner(accountsManager, deletionExecutor); accountCleaner.onCrawlStart(); - accountCleaner.timeAndProcessCrawlChunk(Optional.empty(), Arrays.asList(deletedDisabledAccount, undeletedDisabledAccount, undeletedEnabledAccount)); - accountCleaner.onCrawlEnd(Optional.empty()); + accountCleaner.timeAndProcessCrawlChunk(Optional.empty(), + Arrays.asList(deletedDisabledAccount, undeletedDisabledAccount, undeletedEnabledAccount)); + accountCleaner.onCrawlEnd(); verify(accountsManager).delete(deletedDisabledAccount, DeletionReason.EXPIRED); verify(accountsManager).delete(undeletedDisabledAccount, DeletionReason.EXPIRED); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java index 497ef7f8c..b41dcd949 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java @@ -20,6 +20,8 @@ import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountDatabaseCrawlerConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; class AccountDatabaseCrawlerIntegrationTest { @@ -39,7 +41,6 @@ class AccountDatabaseCrawlerIntegrationTest { private AccountDatabaseCrawler accountDatabaseCrawler; private static final int CHUNK_SIZE = 1; - private static final long CHUNK_INTERVAL_MS = 0; @BeforeEach void setUp() throws Exception { @@ -59,10 +60,18 @@ class AccountDatabaseCrawlerIntegrationTest { .thenReturn(new AccountCrawlChunk(List.of(secondAccount), SECOND_UUID)) .thenReturn(new AccountCrawlChunk(Collections.emptyList(), null)); + DynamicConfigurationManager dynamicConfigurationManager = mock( + DynamicConfigurationManager.class); + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + DynamicAccountDatabaseCrawlerConfiguration accountDatabaseCrawlerConfiguration = new DynamicAccountDatabaseCrawlerConfiguration( + true, true); + when(dynamicConfiguration.getAccountDatabaseCrawlerConfiguration()).thenReturn(accountDatabaseCrawlerConfiguration); + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache( REDIS_CLUSTER_EXTENSION.getRedisCluster(), "test"); accountDatabaseCrawler = new AccountDatabaseCrawler("test", accountsManager, crawlerCache, List.of(listener), - CHUNK_SIZE); + CHUNK_SIZE, dynamicConfigurationManager); } @Test @@ -78,7 +87,7 @@ class AccountDatabaseCrawlerIntegrationTest { verify(listener).onCrawlStart(); verify(listener).timeAndProcessCrawlChunk(Optional.empty(), List.of(firstAccount)); verify(listener).timeAndProcessCrawlChunk(Optional.of(FIRST_UUID), List.of(secondAccount)); - verify(listener).onCrawlEnd(Optional.of(SECOND_UUID)); + verify(listener).onCrawlEnd(); } @Test @@ -98,6 +107,20 @@ class AccountDatabaseCrawlerIntegrationTest { verify(listener, times(2)).onCrawlStart(); verify(listener, times(2)).timeAndProcessCrawlChunk(Optional.empty(), List.of(firstAccount)); verify(listener).timeAndProcessCrawlChunk(Optional.of(FIRST_UUID), List.of(secondAccount)); - verify(listener).onCrawlEnd(Optional.of(SECOND_UUID)); + verify(listener).onCrawlEnd(); + } + + @Test + void testCrawlAllAccounts() throws Exception { + accountDatabaseCrawler.crawlAllAccounts(); + + verify(accountsManager).getAllFromDynamo(CHUNK_SIZE); + verify(accountsManager).getAllFromDynamo(FIRST_UUID, CHUNK_SIZE); + verify(accountsManager).getAllFromDynamo(SECOND_UUID, CHUNK_SIZE); + + verify(listener).onCrawlStart(); + verify(listener).timeAndProcessCrawlChunk(Optional.empty(), List.of(firstAccount)); + verify(listener).timeAndProcessCrawlChunk(Optional.of(FIRST_UUID), List.of(secondAccount)); + verify(listener).onCrawlEnd(); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java index 7eed87249..30acfa266 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java @@ -23,6 +23,8 @@ import java.util.Optional; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountDatabaseCrawlerConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountCrawlChunk; import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler; @@ -30,6 +32,7 @@ import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache; import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener; import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException; import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; class AccountDatabaseCrawlerTest { @@ -37,7 +40,6 @@ class AccountDatabaseCrawlerTest { private static final UUID ACCOUNT2 = UUID.randomUUID(); private static final int CHUNK_SIZE = 1000; - private static final long CHUNK_INTERVAL_MS = 30_000L; private final Account account1 = mock(Account.class); private final Account account2 = mock(Account.class); @@ -46,8 +48,11 @@ class AccountDatabaseCrawlerTest { private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class); private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class); + private final DynamicConfigurationManager dynamicConfigurationManager = mock( + DynamicConfigurationManager.class); + private final AccountDatabaseCrawler crawler = - new AccountDatabaseCrawler("test", accounts, cache, List.of(listener), CHUNK_SIZE); + new AccountDatabaseCrawler("test", accounts, cache, List.of(listener), CHUNK_SIZE, dynamicConfigurationManager); @BeforeEach void setup() { @@ -63,6 +68,11 @@ class AccountDatabaseCrawlerTest { when(cache.claimActiveWork(any(), anyLong())).thenReturn(true); + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + final DynamicAccountDatabaseCrawlerConfiguration accountDatabaseCrawlerConfiguration = + new DynamicAccountDatabaseCrawlerConfiguration(true, true); + when(dynamicConfiguration.getAccountDatabaseCrawlerConfiguration()).thenReturn(accountDatabaseCrawlerConfiguration); + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); } @Test @@ -169,7 +179,7 @@ class AccountDatabaseCrawlerTest { verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT2), eq(CHUNK_SIZE)); verify(account1, times(0)).getNumber(); verify(account2, times(0)).getNumber(); - verify(listener, times(1)).onCrawlEnd(eq(Optional.of(ACCOUNT2))); + verify(listener, times(1)).onCrawlEnd(); verify(cache, times(1)).setLastUuid(eq(Optional.empty())); verify(cache, times(1)).releaseActiveWork(any(String.class)); @@ -181,4 +191,29 @@ class AccountDatabaseCrawlerTest { verifyNoMoreInteractions(cache); } + @Test + void testCrawlAllAccounts() throws Exception { + when(cache.getLastUuid()) + .thenReturn(Optional.empty()); + + crawler.crawlAllAccounts(); + + verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); + verify(cache, times(1)).getLastUuid(); + verify(listener, times(1)).onCrawlStart(); + verify(accounts, times(1)).getAllFromDynamo(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT2), eq(CHUNK_SIZE)); + verify(listener, times(1)).timeAndProcessCrawlChunk(Optional.empty(), List.of(account1, account2)); + verify(listener, times(1)).timeAndProcessCrawlChunk(Optional.of(ACCOUNT2), Collections.emptyList()); + verify(listener, times(1)).onCrawlEnd(); + verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2))); + // times(2) because empty() will get cached on the last run of loop and then again at the end + verify(cache, times(1)).setLastUuid(eq(Optional.empty())); + verify(cache, times(1)).releaseActiveWork(any(String.class)); + + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(cache); + } + }