diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index e4170aec9..2d96a0f58 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -47,6 +47,7 @@ import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccount; import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccountAuthenticator; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialGenerator; import org.whispersystems.textsecuregcm.auth.TurnTokenGenerator; +import org.whispersystems.textsecuregcm.configuration.DirectoryServerConfiguration; import org.whispersystems.textsecuregcm.controllers.AccountController; import org.whispersystems.textsecuregcm.controllers.AttachmentControllerV1; import org.whispersystems.textsecuregcm.controllers.AttachmentControllerV2; @@ -92,8 +93,8 @@ import org.whispersystems.textsecuregcm.push.APNSender; import org.whispersystems.textsecuregcm.push.ApnFallbackManager; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.GCMSender; -import org.whispersystems.textsecuregcm.push.ProvisioningManager; import org.whispersystems.textsecuregcm.push.MessageSender; +import org.whispersystems.textsecuregcm.push.ProvisioningManager; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.recaptcha.RecaptchaClient; import org.whispersystems.textsecuregcm.redis.ConnectionEventLogger; @@ -120,7 +121,9 @@ import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.FeatureFlags; import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager; import org.whispersystems.textsecuregcm.storage.Keys; +import org.whispersystems.textsecuregcm.storage.MessagePersister; import org.whispersystems.textsecuregcm.storage.Messages; +import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PendingAccounts; import org.whispersystems.textsecuregcm.storage.PendingAccountsManager; @@ -130,8 +133,6 @@ import org.whispersystems.textsecuregcm.storage.Profiles; import org.whispersystems.textsecuregcm.storage.ProfilesManager; import org.whispersystems.textsecuregcm.storage.PubSubManager; import org.whispersystems.textsecuregcm.storage.PushFeedbackProcessor; -import org.whispersystems.textsecuregcm.storage.MessagePersister; -import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.RegistrationLockVersionCounter; import org.whispersystems.textsecuregcm.storage.RemoteConfigs; import org.whispersystems.textsecuregcm.storage.RemoteConfigsManager; @@ -159,6 +160,7 @@ import javax.servlet.FilterRegistration; import javax.servlet.ServletRegistration; import java.security.Security; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -335,14 +337,16 @@ public class WhisperServerService extends Application accountDatabaseCrawlerListeners = List.of(pushFeedbackProcessor, activeUserCounter, directoryReconciler, accountCleaner, registrationLockVersionCounter); + final List accountDatabaseCrawlerListeners = new ArrayList<>(); + accountDatabaseCrawlerListeners.add(new PushFeedbackProcessor(accountsManager, directoryQueue)); + accountDatabaseCrawlerListeners.add(new ActiveUserCounter(config.getMetricsFactory(), cacheCluster)); + for (DirectoryServerConfiguration directoryServerConfiguration : config.getDirectoryConfiguration().getDirectoryServerConfiguration()) { + final DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(directoryServerConfiguration); + final DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryServerConfiguration.getReplicationName(), directoryServerConfiguration.isReplicationPrimary(), directoryReconciliationClient, directory); + accountDatabaseCrawlerListeners.add(directoryReconciler); + } + accountDatabaseCrawlerListeners.add(new AccountCleaner(accountsManager)); + accountDatabaseCrawlerListeners.add(new RegistrationLockVersionCounter(metricsCluster, config.getMetricsFactory())); AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster); AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryConfiguration.java index 53af8f079..c333a91eb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryConfiguration.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import javax.validation.Valid; import javax.validation.constraints.NotNull; +import java.util.List; public class DirectoryConfiguration { @@ -29,7 +30,7 @@ public class DirectoryConfiguration { @JsonProperty @NotNull @Valid - private DirectoryServerConfiguration server; + private List server; public RedisConfiguration getRedisConfiguration() { return redis; @@ -43,8 +44,7 @@ public class DirectoryConfiguration { return client; } - public DirectoryServerConfiguration getDirectoryServerConfiguration() { + public List getDirectoryServerConfiguration() { return server; } - } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java index 530ee1ade..208475265 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java @@ -9,6 +9,13 @@ import org.hibernate.validator.constraints.NotEmpty; public class DirectoryServerConfiguration { + @NotEmpty + @JsonProperty + private String replicationName; + + @JsonProperty + private boolean replicationPrimary; + @NotEmpty @JsonProperty private String replicationUrl; @@ -21,6 +28,14 @@ public class DirectoryServerConfiguration { @JsonProperty private String replicationCaCertificate; + public String getReplicationName() { + return replicationName; + } + + public boolean isReplicationPrimary() { + return replicationPrimary; + } + public String getReplicationUrl() { return replicationUrl; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java index 23ed5a68c..78c796f5c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java @@ -30,17 +30,22 @@ import static com.codahale.metrics.MetricRegistry.name; public class DirectoryReconciler extends AccountDatabaseCrawlerListener { private static final Logger logger = LoggerFactory.getLogger(DirectoryReconciler.class); - private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private static final Timer sendChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, "sendChunk")); - private static final Meter sendChunkErrorMeter = metricRegistry.meter(name(DirectoryReconciler.class, "sendChunkError")); + private final String name; + private final boolean primary; private final DirectoryManager directoryManager; private final DirectoryReconciliationClient reconciliationClient; + private final Timer sendChunkTimer; + private final Meter sendChunkErrorMeter; - public DirectoryReconciler(DirectoryReconciliationClient reconciliationClient, DirectoryManager directoryManager) { + public DirectoryReconciler(String name, boolean primary, DirectoryReconciliationClient reconciliationClient, DirectoryManager directoryManager) { + this.name = name; + this.primary = primary; this.directoryManager = directoryManager; this.reconciliationClient = reconciliationClient; + sendChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, name, "sendChunk")); + sendChunkErrorMeter = metricRegistry.meter(name(DirectoryReconciler.class, name, "sendChunkError")); } @Override @@ -49,13 +54,15 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener { @Override public void onCrawlEnd(Optional fromUuid) { DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList()); - DirectoryReconciliationResponse response = sendChunk(request); + sendChunk(request); } @Override protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) throws AccountDatabaseCrawlerRestartException { - updateDirectoryCache(chunkAccounts); + if (primary) { + updateDirectoryCache(chunkAccounts); + } DirectoryReconciliationRequest request = createChunkRequest(fromUuid, chunkAccounts); DirectoryReconciliationResponse response = sendChunk(request); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java index 957807f2e..6c5c91df7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java @@ -42,7 +42,7 @@ public class DirectoryReconciliationClient { this.client = initializeClient(directoryServerConfiguration); SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME) - .register(name(getClass(), "days_until_certificate_expiration"), + .register(name(getClass(), directoryServerConfiguration.getReplicationName(), "days_until_certificate_expiration"), new CertificateExpirationGauge(getCertificate(directoryServerConfiguration.getReplicationCaCertificate()))); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java index 6770b4486..a6a69c46a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java @@ -42,7 +42,7 @@ public class DirectoryReconcilerTest { private final BatchOperationHandle batchOperationHandle = mock(BatchOperationHandle.class); private final DirectoryManager directoryManager = mock(DirectoryManager.class); private final DirectoryReconciliationClient reconciliationClient = mock(DirectoryReconciliationClient.class); - private final DirectoryReconciler directoryReconciler = new DirectoryReconciler(reconciliationClient, directoryManager); + private final DirectoryReconciler directoryReconciler = new DirectoryReconciler("test", true, reconciliationClient, directoryManager); private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK);