Add `CrawlAccountsCommand`

This commit is contained in:
Chris Eager 2023-05-17 12:22:49 -05:00 committed by GitHub
parent caae27c44c
commit 8d1c26d07d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 301 additions and 26 deletions

View File

@ -214,6 +214,7 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
import org.whispersystems.textsecuregcm.workers.AssignUsernameCommand;
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
import org.whispersystems.textsecuregcm.workers.CrawlAccountsCommand;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
@ -245,6 +246,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new SetUserDiscoverabilityCommand());
bootstrap.addCommand(new AssignUsernameCommand());
bootstrap.addCommand(new UnlinkDeviceCommand());
bootstrap.addCommand(new CrawlAccountsCommand());
}
@Override
@ -565,8 +567,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
new AccountDatabaseCrawlerCache(cacheCluster, AccountDatabaseCrawlerCache.ACCOUNT_CLEANER_PREFIX);
AccountDatabaseCrawler accountCleanerAccountDatabaseCrawler = new AccountDatabaseCrawler("Account cleaner crawler",
accountsManager,
accountCleanerAccountDatabaseCrawlerCache, List.of(new AccountCleaner(accountsManager, accountDeletionExecutor)),
config.getAccountDatabaseCrawlerConfiguration().getChunkSize()
accountCleanerAccountDatabaseCrawlerCache,
List.of(new AccountCleaner(accountsManager, accountDeletionExecutor)),
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
dynamicConfigurationManager
);
// TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data
@ -580,7 +584,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler("General-purpose account crawler",
accountsManager,
accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners,
config.getAccountDatabaseCrawlerConfiguration().getChunkSize()
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
dynamicConfigurationManager
);
HttpClient currencyClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).connectTimeout(Duration.ofSeconds(10)).build();

View File

@ -0,0 +1,10 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration.dynamic;
public record DynamicAccountDatabaseCrawlerConfiguration(boolean periodicWorkEnabled, boolean crawlAllEnabled) {
}

View File

@ -56,6 +56,11 @@ public class DynamicConfiguration {
@Valid
DynamicRateLimitPolicy rateLimitPolicy = new DynamicRateLimitPolicy(false);
@JsonProperty
@Valid
DynamicAccountDatabaseCrawlerConfiguration accountDatabaseCrawler = new DynamicAccountDatabaseCrawlerConfiguration(
true, false);
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
@ -97,4 +102,8 @@ public class DynamicConfiguration {
public DynamicRateLimitPolicy getRateLimitPolicy() {
return rateLimitPolicy;
}
public DynamicAccountDatabaseCrawlerConfiguration getAccountDatabaseCrawlerConfiguration() {
return accountDatabaseCrawler;
}
}

View File

@ -37,7 +37,7 @@ public class AccountCleaner extends AccountDatabaseCrawlerListener {
}
@Override
public void onCrawlEnd(Optional<UUID> fromUuid) {
public void onCrawlEnd() {
}
@Override

View File

@ -18,6 +18,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
@ -40,6 +41,8 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
private final AccountDatabaseCrawlerCache cache;
private final List<AccountDatabaseCrawlerListener> listeners;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
@ -47,14 +50,15 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
AccountsManager accounts,
AccountDatabaseCrawlerCache cache,
List<AccountDatabaseCrawlerListener> listeners,
int chunkSize) {
int chunkSize,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
this.name = name;
this.accounts = accounts;
this.chunkSize = chunkSize;
this.workerId = UUID.randomUUID().toString();
this.cache = cache;
this.listeners = listeners;
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
@Override
@ -91,8 +95,61 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
}
}
public void crawlAllAccounts() {
if (!cache.claimActiveWork(workerId, WORKER_TTL_MS)) {
logger.info("Did not claim active work");
return;
}
try {
Optional<UUID> fromUuid = cache.getLastUuid();
if (fromUuid.isEmpty()) {
logger.info("{}: Started crawl", name);
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart);
} else {
logger.info("{}: Resuming crawl", name);
}
try {
AccountCrawlChunk chunkAccounts;
do {
if (!dynamicConfigurationManager.getConfiguration().getAccountDatabaseCrawlerConfiguration()
.crawlAllEnabled()) {
logger.warn("Exiting crawl - not enabled by dynamic configuration");
return;
}
try (Timer.Context timer = processChunkTimer.time()) {
logger.debug("{}: Processing chunk", name);
chunkAccounts = readChunk(fromUuid, chunkSize);
for (AccountDatabaseCrawlerListener listener : listeners) {
listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts.getAccounts());
}
fromUuid = chunkAccounts.getLastUuid();
cacheLastUuid(fromUuid);
}
} while (!chunkAccounts.getAccounts().isEmpty());
logger.info("{}: Finished crawl", name);
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlEnd);
} catch (AccountDatabaseCrawlerRestartException e) {
logger.warn("Crawl stopped", e);
}
} finally {
cache.releaseActiveWork(workerId);
}
}
@VisibleForTesting
public void doPeriodicWork() {
if (!dynamicConfigurationManager.getConfiguration().getAccountDatabaseCrawlerConfiguration()
.periodicWorkEnabled()) {
return;
}
if (cache.claimActiveWork(workerId, WORKER_TTL_MS)) {
try {
processChunk();
@ -117,7 +174,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
if (chunkAccounts.getAccounts().isEmpty()) {
logger.info("{}: Finished crawl", name);
listeners.forEach(listener -> listener.onCrawlEnd(fromUuid));
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlEnd);
cacheLastUuid(Optional.empty());
} else {
logger.debug("{}: Processing chunk", name);

View File

@ -4,16 +4,14 @@
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import org.whispersystems.textsecuregcm.util.Constants;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
import org.whispersystems.textsecuregcm.util.Constants;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public abstract class AccountDatabaseCrawlerListener {
@ -22,7 +20,7 @@ public abstract class AccountDatabaseCrawlerListener {
abstract public void onCrawlStart();
abstract public void onCrawlEnd(Optional<UUID> fromUuid);
abstract public void onCrawlEnd();
abstract protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException;

View File

@ -88,7 +88,7 @@ public class NonNormalizedAccountCrawlerListener extends AccountDatabaseCrawlerL
}
@Override
public void onCrawlEnd(final Optional<UUID> fromUuid) {
public void onCrawlEnd() {
final int normalizedNumbers = metricsCluster.withCluster(connection ->
Integer.parseInt(connection.sync().get(NORMALIZED_NUMBER_COUNT_KEY)));

View File

@ -33,7 +33,8 @@ public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener {
public void onCrawlStart() {}
@Override
public void onCrawlEnd(Optional<UUID> toUuid) {}
public void onCrawlEnd() {
}
@Override
protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {

View File

@ -66,7 +66,9 @@ record CommandDependencies(
DeletedAccountsManager deletedAccountsManager,
StoredVerificationCodeManager pendingAccountsManager,
ClientPresenceManager clientPresenceManager,
Keys keys) {
Keys keys,
FaultTolerantRedisCluster cacheCluster,
ClientResources redisClusterClientResources) {
static CommandDependencies build(
final String name,
@ -206,7 +208,9 @@ record CommandDependencies(
deletedAccountsManager,
pendingAccountsManager,
clientPresenceManager,
keys
keys,
cacheCluster,
redisClusterClientResources
);
}

View File

@ -0,0 +1,132 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.setup.Environment;
import java.util.List;
import java.util.concurrent.ExecutorService;
import net.sourceforge.argparse4j.inf.Argument;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.ArgumentType;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.AccountCleaner;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.NonNormalizedAccountCrawlerListener;
import org.whispersystems.textsecuregcm.storage.PushFeedbackProcessor;
public class CrawlAccountsCommand extends EnvironmentCommand<WhisperServerConfiguration> {
private static final String CRAWL_TYPE = "crawlType";
public enum CrawlType implements ArgumentType<CrawlType> {
GENERAL_PURPOSE,
ACCOUNT_CLEANER,
;
@Override
public CrawlType convert(final ArgumentParser parser, final Argument arg, final String value)
throws ArgumentParserException {
return CrawlType.valueOf(value);
}
}
public CrawlAccountsCommand() {
super(new Application<>() {
@Override
public void run(final WhisperServerConfiguration configuration, final Environment environment) throws Exception {
}
}, "crawl-accounts", "Runs account crawler tasks");
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--crawl-type")
.type(CrawlType.class)
.dest(CRAWL_TYPE)
.required(true)
.help("The type of crawl to perform");
}
@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception {
final CommandDependencies deps = CommandDependencies.build("account-crawler", environment, configuration);
final AccountsManager accountsManager = deps.accountsManager();
final FaultTolerantRedisCluster cacheCluster = deps.cacheCluster();
final FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster",
configuration.getMetricsClusterConfiguration(), deps.redisClusterClientResources());
// TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = List.of(
new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster),
// PushFeedbackProcessor may update device properties
new PushFeedbackProcessor(accountsManager));
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
new DynamicConfigurationManager<>(configuration.getAppConfig().getApplication(),
configuration.getAppConfig().getEnvironment(),
configuration.getAppConfig().getConfigurationName(),
DynamicConfiguration.class);
final AccountDatabaseCrawler crawler =
switch ((CrawlType) namespace.get(CRAWL_TYPE)) {
case GENERAL_PURPOSE -> {
final AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(
cacheCluster,
AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX);
yield new AccountDatabaseCrawler("General-purpose account crawler",
accountsManager,
accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners,
configuration.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
dynamicConfigurationManager
);
}
case ACCOUNT_CLEANER -> {
final ExecutorService accountDeletionExecutor = environment.lifecycle()
.executorService(name(getClass(), "accountCleaner-%d")).maxThreads(16).minThreads(16).build();
final AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(
cacheCluster, AccountDatabaseCrawlerCache.ACCOUNT_CLEANER_PREFIX);
yield new AccountDatabaseCrawler("Account cleaner crawler",
accountsManager,
accountDatabaseCrawlerCache,
List.of(new AccountCleaner(accountsManager, accountDeletionExecutor)),
configuration.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
dynamicConfigurationManager
);
}
};
try {
crawler.crawlAllAccounts();
} catch (final Exception e) {
LoggerFactory.getLogger(CrawlAccountsCommand.class).error("Error crawling accounts", e);
}
}
}

View File

@ -80,8 +80,9 @@ class AccountCleanerTest {
void testAccounts() throws AccountDatabaseCrawlerRestartException, InterruptedException {
AccountCleaner accountCleaner = new AccountCleaner(accountsManager, deletionExecutor);
accountCleaner.onCrawlStart();
accountCleaner.timeAndProcessCrawlChunk(Optional.empty(), Arrays.asList(deletedDisabledAccount, undeletedDisabledAccount, undeletedEnabledAccount));
accountCleaner.onCrawlEnd(Optional.empty());
accountCleaner.timeAndProcessCrawlChunk(Optional.empty(),
Arrays.asList(deletedDisabledAccount, undeletedDisabledAccount, undeletedEnabledAccount));
accountCleaner.onCrawlEnd();
verify(accountsManager).delete(deletedDisabledAccount, DeletionReason.EXPIRED);
verify(accountsManager).delete(undeletedDisabledAccount, DeletionReason.EXPIRED);

View File

@ -20,6 +20,8 @@ import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountDatabaseCrawlerConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
class AccountDatabaseCrawlerIntegrationTest {
@ -39,7 +41,6 @@ class AccountDatabaseCrawlerIntegrationTest {
private AccountDatabaseCrawler accountDatabaseCrawler;
private static final int CHUNK_SIZE = 1;
private static final long CHUNK_INTERVAL_MS = 0;
@BeforeEach
void setUp() throws Exception {
@ -59,10 +60,18 @@ class AccountDatabaseCrawlerIntegrationTest {
.thenReturn(new AccountCrawlChunk(List.of(secondAccount), SECOND_UUID))
.thenReturn(new AccountCrawlChunk(Collections.emptyList(), null));
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = mock(
DynamicConfigurationManager.class);
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
DynamicAccountDatabaseCrawlerConfiguration accountDatabaseCrawlerConfiguration = new DynamicAccountDatabaseCrawlerConfiguration(
true, true);
when(dynamicConfiguration.getAccountDatabaseCrawlerConfiguration()).thenReturn(accountDatabaseCrawlerConfiguration);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(
REDIS_CLUSTER_EXTENSION.getRedisCluster(), "test");
accountDatabaseCrawler = new AccountDatabaseCrawler("test", accountsManager, crawlerCache, List.of(listener),
CHUNK_SIZE);
CHUNK_SIZE, dynamicConfigurationManager);
}
@Test
@ -78,7 +87,7 @@ class AccountDatabaseCrawlerIntegrationTest {
verify(listener).onCrawlStart();
verify(listener).timeAndProcessCrawlChunk(Optional.empty(), List.of(firstAccount));
verify(listener).timeAndProcessCrawlChunk(Optional.of(FIRST_UUID), List.of(secondAccount));
verify(listener).onCrawlEnd(Optional.of(SECOND_UUID));
verify(listener).onCrawlEnd();
}
@Test
@ -98,6 +107,20 @@ class AccountDatabaseCrawlerIntegrationTest {
verify(listener, times(2)).onCrawlStart();
verify(listener, times(2)).timeAndProcessCrawlChunk(Optional.empty(), List.of(firstAccount));
verify(listener).timeAndProcessCrawlChunk(Optional.of(FIRST_UUID), List.of(secondAccount));
verify(listener).onCrawlEnd(Optional.of(SECOND_UUID));
verify(listener).onCrawlEnd();
}
@Test
void testCrawlAllAccounts() throws Exception {
accountDatabaseCrawler.crawlAllAccounts();
verify(accountsManager).getAllFromDynamo(CHUNK_SIZE);
verify(accountsManager).getAllFromDynamo(FIRST_UUID, CHUNK_SIZE);
verify(accountsManager).getAllFromDynamo(SECOND_UUID, CHUNK_SIZE);
verify(listener).onCrawlStart();
verify(listener).timeAndProcessCrawlChunk(Optional.empty(), List.of(firstAccount));
verify(listener).timeAndProcessCrawlChunk(Optional.of(FIRST_UUID), List.of(secondAccount));
verify(listener).onCrawlEnd();
}
}

View File

@ -23,6 +23,8 @@ import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountDatabaseCrawlerConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountCrawlChunk;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
@ -30,6 +32,7 @@ import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
class AccountDatabaseCrawlerTest {
@ -37,7 +40,6 @@ class AccountDatabaseCrawlerTest {
private static final UUID ACCOUNT2 = UUID.randomUUID();
private static final int CHUNK_SIZE = 1000;
private static final long CHUNK_INTERVAL_MS = 30_000L;
private final Account account1 = mock(Account.class);
private final Account account2 = mock(Account.class);
@ -46,8 +48,11 @@ class AccountDatabaseCrawlerTest {
private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class);
private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class);
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = mock(
DynamicConfigurationManager.class);
private final AccountDatabaseCrawler crawler =
new AccountDatabaseCrawler("test", accounts, cache, List.of(listener), CHUNK_SIZE);
new AccountDatabaseCrawler("test", accounts, cache, List.of(listener), CHUNK_SIZE, dynamicConfigurationManager);
@BeforeEach
void setup() {
@ -63,6 +68,11 @@ class AccountDatabaseCrawlerTest {
when(cache.claimActiveWork(any(), anyLong())).thenReturn(true);
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
final DynamicAccountDatabaseCrawlerConfiguration accountDatabaseCrawlerConfiguration =
new DynamicAccountDatabaseCrawlerConfiguration(true, true);
when(dynamicConfiguration.getAccountDatabaseCrawlerConfiguration()).thenReturn(accountDatabaseCrawlerConfiguration);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
}
@Test
@ -169,7 +179,7 @@ class AccountDatabaseCrawlerTest {
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT2), eq(CHUNK_SIZE));
verify(account1, times(0)).getNumber();
verify(account2, times(0)).getNumber();
verify(listener, times(1)).onCrawlEnd(eq(Optional.of(ACCOUNT2)));
verify(listener, times(1)).onCrawlEnd();
verify(cache, times(1)).setLastUuid(eq(Optional.empty()));
verify(cache, times(1)).releaseActiveWork(any(String.class));
@ -181,4 +191,29 @@ class AccountDatabaseCrawlerTest {
verifyNoMoreInteractions(cache);
}
@Test
void testCrawlAllAccounts() throws Exception {
when(cache.getLastUuid())
.thenReturn(Optional.empty());
crawler.crawlAllAccounts();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastUuid();
verify(listener, times(1)).onCrawlStart();
verify(accounts, times(1)).getAllFromDynamo(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT2), eq(CHUNK_SIZE));
verify(listener, times(1)).timeAndProcessCrawlChunk(Optional.empty(), List.of(account1, account2));
verify(listener, times(1)).timeAndProcessCrawlChunk(Optional.of(ACCOUNT2), Collections.emptyList());
verify(listener, times(1)).onCrawlEnd();
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
// times(2) because empty() will get cached on the last run of loop and then again at the end
verify(cache, times(1)).setLastUuid(eq(Optional.empty()));
verify(cache, times(1)).releaseActiveWork(any(String.class));
verifyNoMoreInteractions(accounts);
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(cache);
}
}