account crawler: remove obsolete accelerated mode

This commit is contained in:
Chris Eager 2023-04-25 18:03:41 -05:00 committed by Chris Eager
parent 42a9f1b3e4
commit bc68b67cdf
8 changed files with 25 additions and 118 deletions

View File

@ -184,7 +184,6 @@ gcpAttachments: # GCP Storage configuration
accountDatabaseCrawler:
chunkSize: 10 # accounts per run
chunkIntervalMs: 60000 # time per run
apn: # Apple Push Notifications configuration
sandbox: true

View File

@ -217,7 +217,6 @@ import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
import org.whispersystems.textsecuregcm.workers.SetCrawlerAccelerationTask;
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
import org.whispersystems.textsecuregcm.workers.SetUserDiscoverabilityCommand;
import org.whispersystems.textsecuregcm.workers.UnlinkDeviceCommand;
@ -569,8 +568,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountDatabaseCrawler accountCleanerAccountDatabaseCrawler = new AccountDatabaseCrawler("Account cleaner crawler",
accountsManager,
accountCleanerAccountDatabaseCrawlerCache, List.of(new AccountCleaner(accountsManager, accountDeletionExecutor)),
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()
config.getAccountDatabaseCrawlerConfiguration().getChunkSize()
);
// TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data
@ -585,8 +583,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler("General-purpose account crawler",
accountsManager,
accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners,
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()
config.getAccountDatabaseCrawlerConfiguration().getChunkSize()
);
HttpClient currencyClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).connectTimeout(Duration.ofSeconds(10)).build();
@ -794,7 +791,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
provisioning.setAsyncSupported(true);
environment.admin().addTask(new SetRequestLoggingEnabledTask());
environment.admin().addTask(new SetCrawlerAccelerationTask(accountDatabaseCrawlerCache));
environment.healthChecks().register("cacheCluster", new RedisClusterHealthCheck(cacheCluster));

View File

@ -11,14 +11,8 @@ public class AccountDatabaseCrawlerConfiguration {
@JsonProperty
private int chunkSize = 1000;
@JsonProperty
private long chunkIntervalMs = 8000L;
public int getChunkSize() {
return chunkSize;
}
public long getChunkIntervalMs() {
return chunkIntervalMs;
}
}

View File

@ -11,6 +11,7 @@ import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -30,12 +31,11 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
name(AccountDatabaseCrawler.class, "processChunk"));
private static final long WORKER_TTL_MS = 120_000L;
private static final long ACCELERATED_CHUNK_INTERVAL = 10L;
private static final long CHUNK_INTERVAL_MILLIS = Duration.ofSeconds(2).toMillis();
private final String name;
private final AccountsManager accounts;
private final int chunkSize;
private final long chunkIntervalMs;
private final String workerId;
private final AccountDatabaseCrawlerCache cache;
private final List<AccountDatabaseCrawlerListener> listeners;
@ -47,12 +47,10 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
AccountsManager accounts,
AccountDatabaseCrawlerCache cache,
List<AccountDatabaseCrawlerListener> listeners,
int chunkSize,
long chunkIntervalMs) {
int chunkSize) {
this.name = name;
this.accounts = accounts;
this.chunkSize = chunkSize;
this.chunkIntervalMs = chunkIntervalMs;
this.workerId = UUID.randomUUID().toString();
this.cache = cache;
this.listeners = listeners;
@ -76,12 +74,11 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
@Override
public void run() {
boolean accelerated = false;
while (running.get()) {
try {
accelerated = doPeriodicWork();
sleepWhileRunning(accelerated ? ACCELERATED_CHUNK_INTERVAL : chunkIntervalMs);
doPeriodicWork();
sleepWhileRunning(CHUNK_INTERVAL_MILLIS);
} catch (Throwable t) {
logger.warn("{}: error in database crawl: {}: {}", name, t.getClass().getSimpleName(), t.getMessage(), t);
Util.sleep(10000);
@ -95,26 +92,14 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
}
@VisibleForTesting
public boolean doPeriodicWork() {
public void doPeriodicWork() {
if (cache.claimActiveWork(workerId, WORKER_TTL_MS)) {
try {
final long startTimeMs = System.currentTimeMillis();
processChunk();
if (cache.isAccelerated()) {
return true;
}
final long endTimeMs = System.currentTimeMillis();
final long sleepIntervalMs = chunkIntervalMs - (endTimeMs - startTimeMs);
if (sleepIntervalMs > 0) {
logger.debug("{}: Sleeping {}ms", name, sleepIntervalMs);
sleepWhileRunning(sleepIntervalMs);
}
} finally {
cache.releaseActiveWork(workerId);
}
}
return false;
}
private void processChunk() {
@ -134,7 +119,6 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
logger.info("{}: Finished crawl", name);
listeners.forEach(listener -> listener.onCrawlEnd(fromUuid));
cacheLastUuid(Optional.empty());
cache.setAccelerated(false);
} else {
logger.debug("{}: Processing chunk", name);
try {
@ -144,7 +128,6 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
cacheLastUuid(chunkAccounts.getLastUuid());
} catch (AccountDatabaseCrawlerRestartException e) {
cacheLastUuid(Optional.empty());
cache.setAccelerated(false);
}
}
}

View File

@ -20,8 +20,6 @@ public class AccountDatabaseCrawlerCache {
public static final String ACCOUNT_CLEANER_PREFIX = "account-cleaner";
private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker";
private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate";
private static final String LAST_UUID_DYNAMO_KEY = "account_database_crawler_cache_last_uuid_dynamo";
private static final long LAST_NUMBER_TTL_MS = 86400_000L;
@ -39,18 +37,6 @@ public class AccountDatabaseCrawlerCache {
this.prefix = prefix + "::";
}
public void setAccelerated(final boolean accelerated) {
if (accelerated) {
cacheCluster.useCluster(connection -> connection.sync().set(getPrefixedKey(ACCELERATE_KEY), "1"));
} else {
cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(ACCELERATE_KEY)));
}
}
public boolean isAccelerated() {
return "1".equals(cacheCluster.withCluster(connection -> connection.sync().get(ACCELERATE_KEY)));
}
public boolean claimActiveWork(String workerId, long ttlMs) {
return "OK".equals(cacheCluster.withCluster(connection -> connection.sync()
.set(getPrefixedKey(ACTIVE_WORKER_KEY), workerId, SetArgs.Builder.nx().px(ttlMs))));

View File

@ -1,36 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.servlets.tasks.Task;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
import java.io.PrintWriter;
import java.util.List;
import java.util.Map;
public class SetCrawlerAccelerationTask extends Task {
private final AccountDatabaseCrawlerCache crawlerCache;
public SetCrawlerAccelerationTask(final AccountDatabaseCrawlerCache crawlerCache) {
super("set-crawler-accelerated");
this.crawlerCache = crawlerCache;
}
@Override
public void execute(final Map<String, List<String>> parameters, final PrintWriter out) {
if (parameters.containsKey("accelerated") && parameters.get("accelerated").size() == 1) {
final boolean accelerated = "true".equalsIgnoreCase(parameters.get("accelerated").get(0));
crawlerCache.setAccelerated(accelerated);
out.println("Set accelerated: " + accelerated);
} else {
out.println("Usage: set-crawler-accelerated?accelerated=[true|false]");
}
}
}

View File

@ -5,7 +5,6 @@
package org.whispersystems.textsecuregcm.storage;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
@ -60,16 +59,17 @@ class AccountDatabaseCrawlerIntegrationTest {
.thenReturn(new AccountCrawlChunk(List.of(secondAccount), SECOND_UUID))
.thenReturn(new AccountCrawlChunk(Collections.emptyList(), null));
final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), "test");
accountDatabaseCrawler = new AccountDatabaseCrawler("test", accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE,
CHUNK_INTERVAL_MS);
final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(
REDIS_CLUSTER_EXTENSION.getRedisCluster(), "test");
accountDatabaseCrawler = new AccountDatabaseCrawler("test", accountsManager, crawlerCache, List.of(listener),
CHUNK_SIZE);
}
@Test
void testCrawlUninterrupted() throws AccountDatabaseCrawlerRestartException {
assertFalse(accountDatabaseCrawler.doPeriodicWork());
assertFalse(accountDatabaseCrawler.doPeriodicWork());
assertFalse(accountDatabaseCrawler.doPeriodicWork());
accountDatabaseCrawler.doPeriodicWork();
accountDatabaseCrawler.doPeriodicWork();
accountDatabaseCrawler.doPeriodicWork();
verify(accountsManager).getAllFromDynamo(CHUNK_SIZE);
verify(accountsManager).getAllFromDynamo(FIRST_UUID, CHUNK_SIZE);
@ -86,10 +86,10 @@ class AccountDatabaseCrawlerIntegrationTest {
doThrow(new AccountDatabaseCrawlerRestartException("OH NO")).doNothing()
.when(listener).timeAndProcessCrawlChunk(Optional.empty(), List.of(firstAccount));
assertFalse(accountDatabaseCrawler.doPeriodicWork());
assertFalse(accountDatabaseCrawler.doPeriodicWork());
assertFalse(accountDatabaseCrawler.doPeriodicWork());
assertFalse(accountDatabaseCrawler.doPeriodicWork());
accountDatabaseCrawler.doPeriodicWork();
accountDatabaseCrawler.doPeriodicWork();
accountDatabaseCrawler.doPeriodicWork();
accountDatabaseCrawler.doPeriodicWork();
verify(accountsManager, times(2)).getAllFromDynamo(CHUNK_SIZE);
verify(accountsManager).getAllFromDynamo(FIRST_UUID, CHUNK_SIZE);

View File

@ -5,7 +5,6 @@
package org.whispersystems.textsecuregcm.tests.storage;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@ -48,7 +47,7 @@ class AccountDatabaseCrawlerTest {
private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class);
private final AccountDatabaseCrawler crawler =
new AccountDatabaseCrawler("test", accounts, cache, List.of(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS);
new AccountDatabaseCrawler("test", accounts, cache, List.of(listener), CHUNK_SIZE);
@BeforeEach
void setup() {
@ -63,7 +62,6 @@ class AccountDatabaseCrawlerTest {
new AccountCrawlChunk(Collections.emptyList(), null));
when(cache.claimActiveWork(any(), anyLong())).thenReturn(true);
when(cache.isAccelerated()).thenReturn(false);
}
@ -71,8 +69,7 @@ class AccountDatabaseCrawlerTest {
void testCrawlStart() throws AccountDatabaseCrawlerRestartException {
when(cache.getLastUuid()).thenReturn(Optional.empty());
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
crawler.doPeriodicWork();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastUuid();
@ -82,7 +79,6 @@ class AccountDatabaseCrawlerTest {
verify(account1, times(0)).getUuid();
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.empty()), eq(List.of(account1, account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
verifyNoMoreInteractions(account1);
@ -96,8 +92,7 @@ class AccountDatabaseCrawlerTest {
void testCrawlChunk() throws AccountDatabaseCrawlerRestartException {
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
crawler.doPeriodicWork();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastUuid();
@ -105,7 +100,6 @@ class AccountDatabaseCrawlerTest {
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(List.of(account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
verifyNoInteractions(account1);
@ -118,11 +112,9 @@ class AccountDatabaseCrawlerTest {
@Test
void testCrawlChunkAccelerated() throws AccountDatabaseCrawlerRestartException {
when(cache.isAccelerated()).thenReturn(true);
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isTrue();
crawler.doPeriodicWork();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastUuid();
@ -130,7 +122,6 @@ class AccountDatabaseCrawlerTest {
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(List.of(account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
verifyNoInteractions(account1);
@ -147,8 +138,7 @@ class AccountDatabaseCrawlerTest {
doThrow(AccountDatabaseCrawlerRestartException.class).when(listener)
.timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(List.of(account2)));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
crawler.doPeriodicWork();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastUuid();
@ -157,8 +147,6 @@ class AccountDatabaseCrawlerTest {
verify(account2, times(0)).getNumber();
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(List.of(account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.empty()));
verify(cache, times(1)).setAccelerated(false);
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
verifyNoInteractions(account1);
@ -173,8 +161,7 @@ class AccountDatabaseCrawlerTest {
void testCrawlEnd() {
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT2));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
crawler.doPeriodicWork();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastUuid();
@ -184,8 +171,6 @@ class AccountDatabaseCrawlerTest {
verify(account2, times(0)).getNumber();
verify(listener, times(1)).onCrawlEnd(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).setLastUuid(eq(Optional.empty()));
verify(cache, times(1)).setAccelerated(false);
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
verifyNoInteractions(account1);