Add dedicated crawler for directory reconciler

This commit is contained in:
Chris Eager 2021-11-09 16:27:44 -08:00 committed by Chris Eager
parent f0a6be32fc
commit de6e9d31c9
3 changed files with 53 additions and 29 deletions

View File

@ -526,9 +526,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes())); MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
// TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data final List<AccountDatabaseCrawlerListener> directoryReconciliationAccountDatabaseCrawlerListeners = new ArrayList<>();
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = new ArrayList<>();
final List<DeletedAccountsDirectoryReconciler> deletedAccountsDirectoryReconcilers = new ArrayList<>(); final List<DeletedAccountsDirectoryReconciler> deletedAccountsDirectoryReconcilers = new ArrayList<>();
for (DirectoryServerConfiguration directoryServerConfiguration : config.getDirectoryConfiguration() for (DirectoryServerConfiguration directoryServerConfiguration : config.getDirectoryConfiguration()
.getDirectoryServerConfiguration()) { .getDirectoryServerConfiguration()) {
@ -538,26 +536,34 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
directoryServerConfiguration.getReplicationName(), directoryReconciliationClient, directoryServerConfiguration.getReplicationName(), directoryReconciliationClient,
dynamicConfigurationManager); dynamicConfigurationManager);
// reconcilers are read-only // reconcilers are read-only
accountDatabaseCrawlerListeners.add(directoryReconciler); directoryReconciliationAccountDatabaseCrawlerListeners.add(directoryReconciler);
final DeletedAccountsDirectoryReconciler deletedAccountsDirectoryReconciler = new DeletedAccountsDirectoryReconciler( final DeletedAccountsDirectoryReconciler deletedAccountsDirectoryReconciler = new DeletedAccountsDirectoryReconciler(
directoryServerConfiguration.getReplicationName(), directoryReconciliationClient); directoryServerConfiguration.getReplicationName(), directoryReconciliationClient);
deletedAccountsDirectoryReconcilers.add(deletedAccountsDirectoryReconciler); deletedAccountsDirectoryReconcilers.add(deletedAccountsDirectoryReconciler);
} }
accountDatabaseCrawlerListeners.add(new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster));
accountDatabaseCrawlerListeners.add(new ContactDiscoveryWriter(accountsManager));
accountDatabaseCrawlerListeners.add(new AssignPhoneNumberIdentifierCrawlerListener(accountsManager, phoneNumberIdentifiers));
// PushFeedbackProcessor may update device properties
accountDatabaseCrawlerListeners.add(new PushFeedbackProcessor(accountsManager));
// delete accounts last
accountDatabaseCrawlerListeners.add(new AccountCleaner(accountsManager));
HttpClient currencyClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).connectTimeout(Duration.ofSeconds(10)).build(); AccountDatabaseCrawlerCache directoryReconciliationAccountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(
FixerClient fixerClient = new FixerClient(currencyClient, config.getPaymentsServiceConfiguration().getFixerApiKey()); cacheCluster, AccountDatabaseCrawlerCache.DIRECTORY_RECONCILER_PREFIX);
FtxClient ftxClient = new FtxClient(currencyClient); AccountDatabaseCrawler directoryReconciliationAccountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager,
CurrencyConversionManager currencyManager = new CurrencyConversionManager(fixerClient, ftxClient, config.getPaymentsServiceConfiguration().getPaymentCurrencies()); directoryReconciliationAccountDatabaseCrawlerCache, directoryReconciliationAccountDatabaseCrawlerListeners,
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()
);
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster); // TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = List.of(
new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster),
new ContactDiscoveryWriter(accountsManager),
new AssignPhoneNumberIdentifierCrawlerListener(accountsManager, phoneNumberIdentifiers),
// PushFeedbackProcessor may update device properties
new PushFeedbackProcessor(accountsManager),
// delete accounts last
new AccountCleaner(accountsManager)
);
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster,
AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX);
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager,
accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners,
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
@ -566,12 +572,18 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccountsManager, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor); DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccountsManager, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
HttpClient currencyClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).connectTimeout(Duration.ofSeconds(10)).build();
FixerClient fixerClient = new FixerClient(currencyClient, config.getPaymentsServiceConfiguration().getFixerApiKey());
FtxClient ftxClient = new FtxClient(currencyClient);
CurrencyConversionManager currencyManager = new CurrencyConversionManager(fixerClient, ftxClient, config.getPaymentsServiceConfiguration().getPaymentCurrencies());
apnSender.setApnFallbackManager(apnFallbackManager); apnSender.setApnFallbackManager(apnFallbackManager);
environment.lifecycle().manage(new ApplicationShutdownMonitor()); environment.lifecycle().manage(new ApplicationShutdownMonitor());
environment.lifecycle().manage(apnFallbackManager); environment.lifecycle().manage(apnFallbackManager);
environment.lifecycle().manage(pubSubManager); environment.lifecycle().manage(pubSubManager);
environment.lifecycle().manage(messageSender); environment.lifecycle().manage(messageSender);
environment.lifecycle().manage(accountDatabaseCrawler); environment.lifecycle().manage(accountDatabaseCrawler);
environment.lifecycle().manage(directoryReconciliationAccountDatabaseCrawler);
environment.lifecycle().manage(deletedAccountsTableCrawler); environment.lifecycle().manage(deletedAccountsTableCrawler);
environment.lifecycle().manage(remoteConfigsManager); environment.lifecycle().manage(remoteConfigsManager);
environment.lifecycle().manage(messagesCache); environment.lifecycle().manage(messagesCache);

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2013-2020 Signal Messenger, LLC * Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only * SPDX-License-Identifier: AGPL-3.0-only
*/ */
package org.whispersystems.textsecuregcm.storage; package org.whispersystems.textsecuregcm.storage;
@ -16,6 +16,9 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class AccountDatabaseCrawlerCache { public class AccountDatabaseCrawlerCache {
public static final String GENERAL_PURPOSE_PREFIX = "";
public static final String DIRECTORY_RECONCILER_PREFIX = "directory-reconciler";
private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker"; private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker";
private static final String LAST_UUID_KEY = "account_database_crawler_cache_last_uuid"; private static final String LAST_UUID_KEY = "account_database_crawler_cache_last_uuid";
private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate"; private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate";
@ -27,17 +30,21 @@ public class AccountDatabaseCrawlerCache {
private final FaultTolerantRedisCluster cacheCluster; private final FaultTolerantRedisCluster cacheCluster;
private final ClusterLuaScript unlockClusterScript; private final ClusterLuaScript unlockClusterScript;
public AccountDatabaseCrawlerCache(FaultTolerantRedisCluster cacheCluster) throws IOException { private final String prefix;
public AccountDatabaseCrawlerCache(FaultTolerantRedisCluster cacheCluster, String prefix) throws IOException {
this.cacheCluster = cacheCluster; this.cacheCluster = cacheCluster;
this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/account_database_crawler/unlock.lua", this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/account_database_crawler/unlock.lua",
ScriptOutputType.INTEGER); ScriptOutputType.INTEGER);
this.prefix = prefix + "::";
} }
public void setAccelerated(final boolean accelerated) { public void setAccelerated(final boolean accelerated) {
if (accelerated) { if (accelerated) {
cacheCluster.useCluster(connection -> connection.sync().set(ACCELERATE_KEY, "1")); cacheCluster.useCluster(connection -> connection.sync().set(getPrefixedKey(ACCELERATE_KEY), "1"));
} else { } else {
cacheCluster.useCluster(connection -> connection.sync().del(ACCELERATE_KEY)); cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(ACCELERATE_KEY)));
} }
} }
@ -47,16 +54,16 @@ public class AccountDatabaseCrawlerCache {
public boolean claimActiveWork(String workerId, long ttlMs) { public boolean claimActiveWork(String workerId, long ttlMs) {
return "OK".equals(cacheCluster.withCluster(connection -> connection.sync() return "OK".equals(cacheCluster.withCluster(connection -> connection.sync()
.set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttlMs)))); .set(getPrefixedKey(ACTIVE_WORKER_KEY), workerId, SetArgs.Builder.nx().px(ttlMs))));
} }
public void releaseActiveWork(String workerId) { public void releaseActiveWork(String workerId) {
unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId)); unlockClusterScript.execute(List.of(getPrefixedKey(ACTIVE_WORKER_KEY)), List.of(workerId));
} }
public Optional<UUID> getLastUuid() { public Optional<UUID> getLastUuid() {
final String lastUuidString = cacheCluster.withCluster( final String lastUuidString = cacheCluster.withCluster(
connection -> connection.sync().get(LAST_UUID_KEY)); connection -> connection.sync().get(getPrefixedKey(LAST_UUID_KEY)));
if (lastUuidString == null) { if (lastUuidString == null) {
return Optional.empty(); return Optional.empty();
@ -68,15 +75,15 @@ public class AccountDatabaseCrawlerCache {
public void setLastUuid(Optional<UUID> lastUuid) { public void setLastUuid(Optional<UUID> lastUuid) {
if (lastUuid.isPresent()) { if (lastUuid.isPresent()) {
cacheCluster.useCluster(connection -> connection.sync() cacheCluster.useCluster(connection -> connection.sync()
.psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); .psetex(getPrefixedKey(LAST_UUID_KEY), LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
} else { } else {
cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_KEY)); cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(LAST_UUID_KEY)));
} }
} }
public Optional<UUID> getLastUuidDynamo() { public Optional<UUID> getLastUuidDynamo() {
final String lastUuidString = cacheCluster.withCluster( final String lastUuidString = cacheCluster.withCluster(
connection -> connection.sync().get(LAST_UUID_DYNAMO_KEY)); connection -> connection.sync().get(getPrefixedKey(LAST_UUID_DYNAMO_KEY)));
if (lastUuidString == null) { if (lastUuidString == null) {
return Optional.empty(); return Optional.empty();
@ -89,9 +96,14 @@ public class AccountDatabaseCrawlerCache {
if (lastUuid.isPresent()) { if (lastUuid.isPresent()) {
cacheCluster.useCluster( cacheCluster.useCluster(
connection -> connection.sync() connection -> connection.sync()
.psetex(LAST_UUID_DYNAMO_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); .psetex(getPrefixedKey(LAST_UUID_DYNAMO_KEY), LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
} else { } else {
cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_DYNAMO_KEY)); cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(LAST_UUID_DYNAMO_KEY)));
} }
} }
private String getPrefixedKey(final String key) {
return prefix + key;
}
} }

View File

@ -57,7 +57,7 @@ public class AccountDatabaseCrawlerIntegrationTest extends AbstractRedisClusterT
.thenReturn(new AccountCrawlChunk(List.of(secondAccount), SECOND_UUID)) .thenReturn(new AccountCrawlChunk(List.of(secondAccount), SECOND_UUID))
.thenReturn(new AccountCrawlChunk(Collections.emptyList(), null)); .thenReturn(new AccountCrawlChunk(Collections.emptyList(), null));
final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(getRedisCluster()); final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(getRedisCluster(), "test");
accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE, accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE,
CHUNK_INTERVAL_MS); CHUNK_INTERVAL_MS);
} }