From 86ccaa52a54e916a1082ce8b84792bfe28208329 Mon Sep 17 00:00:00 2001 From: Ehren Kret Date: Sun, 10 Jan 2021 17:11:02 -0600 Subject: [PATCH] Allow configuration of multiple directory account crawler listeners (#325) * Allow configuration of multiple directory account crawler listeners Only one should update the local redis directory. This one is marked with replicationPrimary true. The others in the list only serve to issue replication requests over to CDS replication load balancers. * Update one more metric name --- .../textsecuregcm/WhisperServerService.java | 26 +++++++++++-------- .../configuration/DirectoryConfiguration.java | 6 ++--- .../DirectoryServerConfiguration.java | 15 +++++++++++ .../storage/DirectoryReconciler.java | 19 +++++++++----- .../DirectoryReconciliationClient.java | 2 +- .../storage/DirectoryReconcilerTest.java | 2 +- 6 files changed, 48 insertions(+), 22 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index e4170aec9..2d96a0f58 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -47,6 +47,7 @@ import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccount; import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccountAuthenticator; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialGenerator; import org.whispersystems.textsecuregcm.auth.TurnTokenGenerator; +import org.whispersystems.textsecuregcm.configuration.DirectoryServerConfiguration; import org.whispersystems.textsecuregcm.controllers.AccountController; import org.whispersystems.textsecuregcm.controllers.AttachmentControllerV1; import org.whispersystems.textsecuregcm.controllers.AttachmentControllerV2; @@ -92,8 +93,8 @@ import org.whispersystems.textsecuregcm.push.APNSender; import org.whispersystems.textsecuregcm.push.ApnFallbackManager; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.GCMSender; -import org.whispersystems.textsecuregcm.push.ProvisioningManager; import org.whispersystems.textsecuregcm.push.MessageSender; +import org.whispersystems.textsecuregcm.push.ProvisioningManager; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.recaptcha.RecaptchaClient; import org.whispersystems.textsecuregcm.redis.ConnectionEventLogger; @@ -120,7 +121,9 @@ import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.FeatureFlags; import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager; import org.whispersystems.textsecuregcm.storage.Keys; +import org.whispersystems.textsecuregcm.storage.MessagePersister; import org.whispersystems.textsecuregcm.storage.Messages; +import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PendingAccounts; import org.whispersystems.textsecuregcm.storage.PendingAccountsManager; @@ -130,8 +133,6 @@ import org.whispersystems.textsecuregcm.storage.Profiles; import org.whispersystems.textsecuregcm.storage.ProfilesManager; import org.whispersystems.textsecuregcm.storage.PubSubManager; import org.whispersystems.textsecuregcm.storage.PushFeedbackProcessor; -import org.whispersystems.textsecuregcm.storage.MessagePersister; -import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.RegistrationLockVersionCounter; import org.whispersystems.textsecuregcm.storage.RemoteConfigs; import org.whispersystems.textsecuregcm.storage.RemoteConfigsManager; @@ -159,6 +160,7 @@ import javax.servlet.FilterRegistration; import javax.servlet.ServletRegistration; import java.security.Security; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -335,14 +337,16 @@ public class WhisperServerService extends Application accountDatabaseCrawlerListeners = List.of(pushFeedbackProcessor, activeUserCounter, directoryReconciler, accountCleaner, registrationLockVersionCounter); + final List accountDatabaseCrawlerListeners = new ArrayList<>(); + accountDatabaseCrawlerListeners.add(new PushFeedbackProcessor(accountsManager, directoryQueue)); + accountDatabaseCrawlerListeners.add(new ActiveUserCounter(config.getMetricsFactory(), cacheCluster)); + for (DirectoryServerConfiguration directoryServerConfiguration : config.getDirectoryConfiguration().getDirectoryServerConfiguration()) { + final DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(directoryServerConfiguration); + final DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryServerConfiguration.getReplicationName(), directoryServerConfiguration.isReplicationPrimary(), directoryReconciliationClient, directory); + accountDatabaseCrawlerListeners.add(directoryReconciler); + } + accountDatabaseCrawlerListeners.add(new AccountCleaner(accountsManager)); + accountDatabaseCrawlerListeners.add(new RegistrationLockVersionCounter(metricsCluster, config.getMetricsFactory())); AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster); AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryConfiguration.java index 53af8f079..c333a91eb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryConfiguration.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import javax.validation.Valid; import javax.validation.constraints.NotNull; +import java.util.List; public class DirectoryConfiguration { @@ -29,7 +30,7 @@ public class DirectoryConfiguration { @JsonProperty @NotNull @Valid - private DirectoryServerConfiguration server; + private List server; public RedisConfiguration getRedisConfiguration() { return redis; @@ -43,8 +44,7 @@ public class DirectoryConfiguration { return client; } - public DirectoryServerConfiguration getDirectoryServerConfiguration() { + public List getDirectoryServerConfiguration() { return server; } - } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java index 530ee1ade..208475265 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java @@ -9,6 +9,13 @@ import org.hibernate.validator.constraints.NotEmpty; public class DirectoryServerConfiguration { + @NotEmpty + @JsonProperty + private String replicationName; + + @JsonProperty + private boolean replicationPrimary; + @NotEmpty @JsonProperty private String replicationUrl; @@ -21,6 +28,14 @@ public class DirectoryServerConfiguration { @JsonProperty private String replicationCaCertificate; + public String getReplicationName() { + return replicationName; + } + + public boolean isReplicationPrimary() { + return replicationPrimary; + } + public String getReplicationUrl() { return replicationUrl; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java index 23ed5a68c..78c796f5c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java @@ -30,17 +30,22 @@ import static com.codahale.metrics.MetricRegistry.name; public class DirectoryReconciler extends AccountDatabaseCrawlerListener { private static final Logger logger = LoggerFactory.getLogger(DirectoryReconciler.class); - private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private static final Timer sendChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, "sendChunk")); - private static final Meter sendChunkErrorMeter = metricRegistry.meter(name(DirectoryReconciler.class, "sendChunkError")); + private final String name; + private final boolean primary; private final DirectoryManager directoryManager; private final DirectoryReconciliationClient reconciliationClient; + private final Timer sendChunkTimer; + private final Meter sendChunkErrorMeter; - public DirectoryReconciler(DirectoryReconciliationClient reconciliationClient, DirectoryManager directoryManager) { + public DirectoryReconciler(String name, boolean primary, DirectoryReconciliationClient reconciliationClient, DirectoryManager directoryManager) { + this.name = name; + this.primary = primary; this.directoryManager = directoryManager; this.reconciliationClient = reconciliationClient; + sendChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, name, "sendChunk")); + sendChunkErrorMeter = metricRegistry.meter(name(DirectoryReconciler.class, name, "sendChunkError")); } @Override @@ -49,13 +54,15 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener { @Override public void onCrawlEnd(Optional fromUuid) { DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList()); - DirectoryReconciliationResponse response = sendChunk(request); + sendChunk(request); } @Override protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) throws AccountDatabaseCrawlerRestartException { - updateDirectoryCache(chunkAccounts); + if (primary) { + updateDirectoryCache(chunkAccounts); + } DirectoryReconciliationRequest request = createChunkRequest(fromUuid, chunkAccounts); DirectoryReconciliationResponse response = sendChunk(request); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java index 957807f2e..6c5c91df7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java @@ -42,7 +42,7 @@ public class DirectoryReconciliationClient { this.client = initializeClient(directoryServerConfiguration); SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME) - .register(name(getClass(), "days_until_certificate_expiration"), + .register(name(getClass(), directoryServerConfiguration.getReplicationName(), "days_until_certificate_expiration"), new CertificateExpirationGauge(getCertificate(directoryServerConfiguration.getReplicationCaCertificate()))); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java index 6770b4486..a6a69c46a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java @@ -42,7 +42,7 @@ public class DirectoryReconcilerTest { private final BatchOperationHandle batchOperationHandle = mock(BatchOperationHandle.class); private final DirectoryManager directoryManager = mock(DirectoryManager.class); private final DirectoryReconciliationClient reconciliationClient = mock(DirectoryReconciliationClient.class); - private final DirectoryReconciler directoryReconciler = new DirectoryReconciler(reconciliationClient, directoryManager); + private final DirectoryReconciler directoryReconciler = new DirectoryReconciler("test", true, reconciliationClient, directoryManager); private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK);