diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 1c68d3b7c..8c3899892 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -526,9 +526,7 @@ public class WhisperServerService extends Application accountDatabaseCrawlerListeners = new ArrayList<>(); - + final List directoryReconciliationAccountDatabaseCrawlerListeners = new ArrayList<>(); final List deletedAccountsDirectoryReconcilers = new ArrayList<>(); for (DirectoryServerConfiguration directoryServerConfiguration : config.getDirectoryConfiguration() .getDirectoryServerConfiguration()) { @@ -538,26 +536,34 @@ public class WhisperServerService extends Application accountDatabaseCrawlerListeners = List.of( + new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster), + new ContactDiscoveryWriter(accountsManager), + new AssignPhoneNumberIdentifierCrawlerListener(accountsManager, phoneNumberIdentifiers), + // PushFeedbackProcessor may update device properties + new PushFeedbackProcessor(accountsManager), + // delete accounts last + new AccountCleaner(accountsManager) + ); + + AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster, + AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX); AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), @@ -566,12 +572,18 @@ public class WhisperServerService extends Application connection.sync().set(ACCELERATE_KEY, "1")); + cacheCluster.useCluster(connection -> connection.sync().set(getPrefixedKey(ACCELERATE_KEY), "1")); } else { - cacheCluster.useCluster(connection -> connection.sync().del(ACCELERATE_KEY)); + cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(ACCELERATE_KEY))); } } @@ -47,16 +54,16 @@ public class AccountDatabaseCrawlerCache { public boolean claimActiveWork(String workerId, long ttlMs) { return "OK".equals(cacheCluster.withCluster(connection -> connection.sync() - .set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttlMs)))); + .set(getPrefixedKey(ACTIVE_WORKER_KEY), workerId, SetArgs.Builder.nx().px(ttlMs)))); } public void releaseActiveWork(String workerId) { - unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId)); + unlockClusterScript.execute(List.of(getPrefixedKey(ACTIVE_WORKER_KEY)), List.of(workerId)); } public Optional getLastUuid() { final String lastUuidString = cacheCluster.withCluster( - connection -> connection.sync().get(LAST_UUID_KEY)); + connection -> connection.sync().get(getPrefixedKey(LAST_UUID_KEY))); if (lastUuidString == null) { return Optional.empty(); @@ -68,15 +75,15 @@ public class AccountDatabaseCrawlerCache { public void setLastUuid(Optional lastUuid) { if (lastUuid.isPresent()) { cacheCluster.useCluster(connection -> connection.sync() - .psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); + .psetex(getPrefixedKey(LAST_UUID_KEY), LAST_NUMBER_TTL_MS, lastUuid.get().toString())); } else { - cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_KEY)); + cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(LAST_UUID_KEY))); } } public Optional getLastUuidDynamo() { final String lastUuidString = cacheCluster.withCluster( - connection -> connection.sync().get(LAST_UUID_DYNAMO_KEY)); + connection -> connection.sync().get(getPrefixedKey(LAST_UUID_DYNAMO_KEY))); if (lastUuidString == null) { return Optional.empty(); @@ -89,9 +96,14 @@ public class AccountDatabaseCrawlerCache { if (lastUuid.isPresent()) { cacheCluster.useCluster( connection -> connection.sync() - .psetex(LAST_UUID_DYNAMO_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); + .psetex(getPrefixedKey(LAST_UUID_DYNAMO_KEY), LAST_NUMBER_TTL_MS, lastUuid.get().toString())); } else { - cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_DYNAMO_KEY)); + cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(LAST_UUID_DYNAMO_KEY))); } } + + private String getPrefixedKey(final String key) { + return prefix + key; + } + } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java index 60c23c7cf..3dfadaba3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java @@ -57,7 +57,7 @@ public class AccountDatabaseCrawlerIntegrationTest extends AbstractRedisClusterT .thenReturn(new AccountCrawlChunk(List.of(secondAccount), SECOND_UUID)) .thenReturn(new AccountCrawlChunk(Collections.emptyList(), null)); - final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(getRedisCluster()); + final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(getRedisCluster(), "test"); accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS); }