diff --git a/service/config/sample.yml b/service/config/sample.yml index f9dda6306..05a33af39 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -184,7 +184,6 @@ gcpAttachments: # GCP Storage configuration accountDatabaseCrawler: chunkSize: 10 # accounts per run - chunkIntervalMs: 60000 # time per run apn: # Apple Push Notifications configuration sandbox: true diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 6cfa3a6c4..0a9c1fb2e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -217,7 +217,6 @@ import org.whispersystems.textsecuregcm.workers.CertificateCommand; import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; import org.whispersystems.textsecuregcm.workers.ServerVersionCommand; -import org.whispersystems.textsecuregcm.workers.SetCrawlerAccelerationTask; import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask; import org.whispersystems.textsecuregcm.workers.SetUserDiscoverabilityCommand; import org.whispersystems.textsecuregcm.workers.UnlinkDeviceCommand; @@ -569,8 +568,7 @@ public class WhisperServerService extends Application listeners; @@ -47,12 +47,10 @@ public class AccountDatabaseCrawler implements Managed, Runnable { AccountsManager accounts, AccountDatabaseCrawlerCache cache, List listeners, - int chunkSize, - long chunkIntervalMs) { + int chunkSize) { this.name = name; this.accounts = accounts; this.chunkSize = chunkSize; - this.chunkIntervalMs = chunkIntervalMs; this.workerId = UUID.randomUUID().toString(); this.cache = cache; this.listeners = listeners; @@ -76,12 +74,11 @@ public class AccountDatabaseCrawler implements Managed, Runnable { @Override public void run() { - boolean accelerated = false; while (running.get()) { try { - accelerated = doPeriodicWork(); - sleepWhileRunning(accelerated ? ACCELERATED_CHUNK_INTERVAL : chunkIntervalMs); + doPeriodicWork(); + sleepWhileRunning(CHUNK_INTERVAL_MILLIS); } catch (Throwable t) { logger.warn("{}: error in database crawl: {}: {}", name, t.getClass().getSimpleName(), t.getMessage(), t); Util.sleep(10000); @@ -95,26 +92,14 @@ public class AccountDatabaseCrawler implements Managed, Runnable { } @VisibleForTesting - public boolean doPeriodicWork() { + public void doPeriodicWork() { if (cache.claimActiveWork(workerId, WORKER_TTL_MS)) { - try { - final long startTimeMs = System.currentTimeMillis(); processChunk(); - if (cache.isAccelerated()) { - return true; - } - final long endTimeMs = System.currentTimeMillis(); - final long sleepIntervalMs = chunkIntervalMs - (endTimeMs - startTimeMs); - if (sleepIntervalMs > 0) { - logger.debug("{}: Sleeping {}ms", name, sleepIntervalMs); - sleepWhileRunning(sleepIntervalMs); - } } finally { cache.releaseActiveWork(workerId); } } - return false; } private void processChunk() { @@ -134,7 +119,6 @@ public class AccountDatabaseCrawler implements Managed, Runnable { logger.info("{}: Finished crawl", name); listeners.forEach(listener -> listener.onCrawlEnd(fromUuid)); cacheLastUuid(Optional.empty()); - cache.setAccelerated(false); } else { logger.debug("{}: Processing chunk", name); try { @@ -144,7 +128,6 @@ public class AccountDatabaseCrawler implements Managed, Runnable { cacheLastUuid(chunkAccounts.getLastUuid()); } catch (AccountDatabaseCrawlerRestartException e) { cacheLastUuid(Optional.empty()); - cache.setAccelerated(false); } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java index 164d89003..fb76a86d1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java @@ -20,8 +20,6 @@ public class AccountDatabaseCrawlerCache { public static final String ACCOUNT_CLEANER_PREFIX = "account-cleaner"; private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker"; - private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate"; - private static final String LAST_UUID_DYNAMO_KEY = "account_database_crawler_cache_last_uuid_dynamo"; private static final long LAST_NUMBER_TTL_MS = 86400_000L; @@ -39,18 +37,6 @@ public class AccountDatabaseCrawlerCache { this.prefix = prefix + "::"; } - public void setAccelerated(final boolean accelerated) { - if (accelerated) { - cacheCluster.useCluster(connection -> connection.sync().set(getPrefixedKey(ACCELERATE_KEY), "1")); - } else { - cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(ACCELERATE_KEY))); - } - } - - public boolean isAccelerated() { - return "1".equals(cacheCluster.withCluster(connection -> connection.sync().get(ACCELERATE_KEY))); - } - public boolean claimActiveWork(String workerId, long ttlMs) { return "OK".equals(cacheCluster.withCluster(connection -> connection.sync() .set(getPrefixedKey(ACTIVE_WORKER_KEY), workerId, SetArgs.Builder.nx().px(ttlMs)))); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/SetCrawlerAccelerationTask.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/SetCrawlerAccelerationTask.java deleted file mode 100644 index a7cd2e560..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/SetCrawlerAccelerationTask.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2021 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.workers; - -import io.dropwizard.servlets.tasks.Task; -import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache; - -import java.io.PrintWriter; -import java.util.List; -import java.util.Map; - -public class SetCrawlerAccelerationTask extends Task { - - private final AccountDatabaseCrawlerCache crawlerCache; - - public SetCrawlerAccelerationTask(final AccountDatabaseCrawlerCache crawlerCache) { - super("set-crawler-accelerated"); - - this.crawlerCache = crawlerCache; - } - - @Override - public void execute(final Map> parameters, final PrintWriter out) { - if (parameters.containsKey("accelerated") && parameters.get("accelerated").size() == 1) { - final boolean accelerated = "true".equalsIgnoreCase(parameters.get("accelerated").get(0)); - - crawlerCache.setAccelerated(accelerated); - out.println("Set accelerated: " + accelerated); - } else { - out.println("Usage: set-crawler-accelerated?accelerated=[true|false]"); - } - } -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java index 2dcef4636..497ef7f8c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java @@ -5,7 +5,6 @@ package org.whispersystems.textsecuregcm.storage; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; @@ -60,16 +59,17 @@ class AccountDatabaseCrawlerIntegrationTest { .thenReturn(new AccountCrawlChunk(List.of(secondAccount), SECOND_UUID)) .thenReturn(new AccountCrawlChunk(Collections.emptyList(), null)); - final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), "test"); - accountDatabaseCrawler = new AccountDatabaseCrawler("test", accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE, - CHUNK_INTERVAL_MS); + final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache( + REDIS_CLUSTER_EXTENSION.getRedisCluster(), "test"); + accountDatabaseCrawler = new AccountDatabaseCrawler("test", accountsManager, crawlerCache, List.of(listener), + CHUNK_SIZE); } @Test void testCrawlUninterrupted() throws AccountDatabaseCrawlerRestartException { - assertFalse(accountDatabaseCrawler.doPeriodicWork()); - assertFalse(accountDatabaseCrawler.doPeriodicWork()); - assertFalse(accountDatabaseCrawler.doPeriodicWork()); + accountDatabaseCrawler.doPeriodicWork(); + accountDatabaseCrawler.doPeriodicWork(); + accountDatabaseCrawler.doPeriodicWork(); verify(accountsManager).getAllFromDynamo(CHUNK_SIZE); verify(accountsManager).getAllFromDynamo(FIRST_UUID, CHUNK_SIZE); @@ -86,10 +86,10 @@ class AccountDatabaseCrawlerIntegrationTest { doThrow(new AccountDatabaseCrawlerRestartException("OH NO")).doNothing() .when(listener).timeAndProcessCrawlChunk(Optional.empty(), List.of(firstAccount)); - assertFalse(accountDatabaseCrawler.doPeriodicWork()); - assertFalse(accountDatabaseCrawler.doPeriodicWork()); - assertFalse(accountDatabaseCrawler.doPeriodicWork()); - assertFalse(accountDatabaseCrawler.doPeriodicWork()); + accountDatabaseCrawler.doPeriodicWork(); + accountDatabaseCrawler.doPeriodicWork(); + accountDatabaseCrawler.doPeriodicWork(); + accountDatabaseCrawler.doPeriodicWork(); verify(accountsManager, times(2)).getAllFromDynamo(CHUNK_SIZE); verify(accountsManager).getAllFromDynamo(FIRST_UUID, CHUNK_SIZE); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java index 79866f5ce..7eed87249 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java @@ -5,7 +5,6 @@ package org.whispersystems.textsecuregcm.tests.storage; -import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -48,7 +47,7 @@ class AccountDatabaseCrawlerTest { private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class); private final AccountDatabaseCrawler crawler = - new AccountDatabaseCrawler("test", accounts, cache, List.of(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS); + new AccountDatabaseCrawler("test", accounts, cache, List.of(listener), CHUNK_SIZE); @BeforeEach void setup() { @@ -63,7 +62,6 @@ class AccountDatabaseCrawlerTest { new AccountCrawlChunk(Collections.emptyList(), null)); when(cache.claimActiveWork(any(), anyLong())).thenReturn(true); - when(cache.isAccelerated()).thenReturn(false); } @@ -71,8 +69,7 @@ class AccountDatabaseCrawlerTest { void testCrawlStart() throws AccountDatabaseCrawlerRestartException { when(cache.getLastUuid()).thenReturn(Optional.empty()); - boolean accelerated = crawler.doPeriodicWork(); - assertThat(accelerated).isFalse(); + crawler.doPeriodicWork(); verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); verify(cache, times(1)).getLastUuid(); @@ -82,7 +79,6 @@ class AccountDatabaseCrawlerTest { verify(account1, times(0)).getUuid(); verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.empty()), eq(List.of(account1, account2))); verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2))); - verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); verifyNoMoreInteractions(account1); @@ -96,8 +92,7 @@ class AccountDatabaseCrawlerTest { void testCrawlChunk() throws AccountDatabaseCrawlerRestartException { when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1)); - boolean accelerated = crawler.doPeriodicWork(); - assertThat(accelerated).isFalse(); + crawler.doPeriodicWork(); verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); verify(cache, times(1)).getLastUuid(); @@ -105,7 +100,6 @@ class AccountDatabaseCrawlerTest { verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE)); verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(List.of(account2))); verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2))); - verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); verifyNoInteractions(account1); @@ -118,11 +112,9 @@ class AccountDatabaseCrawlerTest { @Test void testCrawlChunkAccelerated() throws AccountDatabaseCrawlerRestartException { - when(cache.isAccelerated()).thenReturn(true); when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1)); - boolean accelerated = crawler.doPeriodicWork(); - assertThat(accelerated).isTrue(); + crawler.doPeriodicWork(); verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); verify(cache, times(1)).getLastUuid(); @@ -130,7 +122,6 @@ class AccountDatabaseCrawlerTest { verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE)); verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(List.of(account2))); verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2))); - verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); verifyNoInteractions(account1); @@ -147,8 +138,7 @@ class AccountDatabaseCrawlerTest { doThrow(AccountDatabaseCrawlerRestartException.class).when(listener) .timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(List.of(account2))); - boolean accelerated = crawler.doPeriodicWork(); - assertThat(accelerated).isFalse(); + crawler.doPeriodicWork(); verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); verify(cache, times(1)).getLastUuid(); @@ -157,8 +147,6 @@ class AccountDatabaseCrawlerTest { verify(account2, times(0)).getNumber(); verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(List.of(account2))); verify(cache, times(1)).setLastUuid(eq(Optional.empty())); - verify(cache, times(1)).setAccelerated(false); - verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); verifyNoInteractions(account1); @@ -173,8 +161,7 @@ class AccountDatabaseCrawlerTest { void testCrawlEnd() { when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT2)); - boolean accelerated = crawler.doPeriodicWork(); - assertThat(accelerated).isFalse(); + crawler.doPeriodicWork(); verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); verify(cache, times(1)).getLastUuid(); @@ -184,8 +171,6 @@ class AccountDatabaseCrawlerTest { verify(account2, times(0)).getNumber(); verify(listener, times(1)).onCrawlEnd(eq(Optional.of(ACCOUNT2))); verify(cache, times(1)).setLastUuid(eq(Optional.empty())); - verify(cache, times(1)).setAccelerated(false); - verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); verifyNoInteractions(account1);