Allow configuration of multiple directory account crawler listeners (#325)
* Allow configuration of multiple directory account crawler listeners Only one should update the local redis directory. This one is marked with replicationPrimary true. The others in the list only serve to issue replication requests over to CDS replication load balancers. * Update one more metric name
This commit is contained in:
		
							parent
							
								
									db14d15953
								
							
						
					
					
						commit
						86ccaa52a5
					
				| 
						 | 
				
			
			@ -47,6 +47,7 @@ import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccount;
 | 
			
		|||
import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccountAuthenticator;
 | 
			
		||||
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialGenerator;
 | 
			
		||||
import org.whispersystems.textsecuregcm.auth.TurnTokenGenerator;
 | 
			
		||||
import org.whispersystems.textsecuregcm.configuration.DirectoryServerConfiguration;
 | 
			
		||||
import org.whispersystems.textsecuregcm.controllers.AccountController;
 | 
			
		||||
import org.whispersystems.textsecuregcm.controllers.AttachmentControllerV1;
 | 
			
		||||
import org.whispersystems.textsecuregcm.controllers.AttachmentControllerV2;
 | 
			
		||||
| 
						 | 
				
			
			@ -92,8 +93,8 @@ import org.whispersystems.textsecuregcm.push.APNSender;
 | 
			
		|||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
 | 
			
		||||
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
 | 
			
		||||
import org.whispersystems.textsecuregcm.push.GCMSender;
 | 
			
		||||
import org.whispersystems.textsecuregcm.push.ProvisioningManager;
 | 
			
		||||
import org.whispersystems.textsecuregcm.push.MessageSender;
 | 
			
		||||
import org.whispersystems.textsecuregcm.push.ProvisioningManager;
 | 
			
		||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
 | 
			
		||||
import org.whispersystems.textsecuregcm.recaptcha.RecaptchaClient;
 | 
			
		||||
import org.whispersystems.textsecuregcm.redis.ConnectionEventLogger;
 | 
			
		||||
| 
						 | 
				
			
			@ -120,7 +121,9 @@ import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
 | 
			
		|||
import org.whispersystems.textsecuregcm.storage.FeatureFlags;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.Keys;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.MessagePersister;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.Messages;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.MessagesCache;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.PendingAccountsManager;
 | 
			
		||||
| 
						 | 
				
			
			@ -130,8 +133,6 @@ import org.whispersystems.textsecuregcm.storage.Profiles;
 | 
			
		|||
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.PushFeedbackProcessor;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.MessagePersister;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.MessagesCache;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.RegistrationLockVersionCounter;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.RemoteConfigs;
 | 
			
		||||
import org.whispersystems.textsecuregcm.storage.RemoteConfigsManager;
 | 
			
		||||
| 
						 | 
				
			
			@ -159,6 +160,7 @@ import javax.servlet.FilterRegistration;
 | 
			
		|||
import javax.servlet.ServletRegistration;
 | 
			
		||||
import java.security.Security;
 | 
			
		||||
import java.time.Duration;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.EnumSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
| 
						 | 
				
			
			@ -335,14 +337,16 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
 | 
			
		|||
 | 
			
		||||
    MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
 | 
			
		||||
 | 
			
		||||
    DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
 | 
			
		||||
 | 
			
		||||
    ActiveUserCounter                    activeUserCounter               = new ActiveUserCounter(config.getMetricsFactory(), cacheCluster);
 | 
			
		||||
    DirectoryReconciler                  directoryReconciler             = new DirectoryReconciler(directoryReconciliationClient, directory);
 | 
			
		||||
    AccountCleaner                       accountCleaner                  = new AccountCleaner(accountsManager);
 | 
			
		||||
    PushFeedbackProcessor                pushFeedbackProcessor           = new PushFeedbackProcessor(accountsManager, directoryQueue);
 | 
			
		||||
    RegistrationLockVersionCounter       registrationLockVersionCounter  = new RegistrationLockVersionCounter(metricsCluster, config.getMetricsFactory());
 | 
			
		||||
    List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = List.of(pushFeedbackProcessor, activeUserCounter, directoryReconciler, accountCleaner, registrationLockVersionCounter);
 | 
			
		||||
    final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = new ArrayList<>();
 | 
			
		||||
    accountDatabaseCrawlerListeners.add(new PushFeedbackProcessor(accountsManager, directoryQueue));
 | 
			
		||||
    accountDatabaseCrawlerListeners.add(new ActiveUserCounter(config.getMetricsFactory(), cacheCluster));
 | 
			
		||||
    for (DirectoryServerConfiguration directoryServerConfiguration : config.getDirectoryConfiguration().getDirectoryServerConfiguration()) {
 | 
			
		||||
      final DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(directoryServerConfiguration);
 | 
			
		||||
      final DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryServerConfiguration.getReplicationName(), directoryServerConfiguration.isReplicationPrimary(), directoryReconciliationClient, directory);
 | 
			
		||||
      accountDatabaseCrawlerListeners.add(directoryReconciler);
 | 
			
		||||
    }
 | 
			
		||||
    accountDatabaseCrawlerListeners.add(new AccountCleaner(accountsManager));
 | 
			
		||||
    accountDatabaseCrawlerListeners.add(new RegistrationLockVersionCounter(metricsCluster, config.getMetricsFactory()));
 | 
			
		||||
 | 
			
		||||
    AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster);
 | 
			
		||||
    AccountDatabaseCrawler      accountDatabaseCrawler      = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs());
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -8,6 +8,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 | 
			
		|||
 | 
			
		||||
import javax.validation.Valid;
 | 
			
		||||
import javax.validation.constraints.NotNull;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
 | 
			
		||||
public class DirectoryConfiguration {
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -29,7 +30,7 @@ public class DirectoryConfiguration {
 | 
			
		|||
  @JsonProperty
 | 
			
		||||
  @NotNull
 | 
			
		||||
  @Valid
 | 
			
		||||
  private DirectoryServerConfiguration server;
 | 
			
		||||
  private List<DirectoryServerConfiguration> server;
 | 
			
		||||
 | 
			
		||||
  public RedisConfiguration getRedisConfiguration() {
 | 
			
		||||
    return redis;
 | 
			
		||||
| 
						 | 
				
			
			@ -43,8 +44,7 @@ public class DirectoryConfiguration {
 | 
			
		|||
    return client;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public DirectoryServerConfiguration getDirectoryServerConfiguration() {
 | 
			
		||||
  public List<DirectoryServerConfiguration> getDirectoryServerConfiguration() {
 | 
			
		||||
    return server;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -9,6 +9,13 @@ import org.hibernate.validator.constraints.NotEmpty;
 | 
			
		|||
 | 
			
		||||
public class DirectoryServerConfiguration {
 | 
			
		||||
 | 
			
		||||
  @NotEmpty
 | 
			
		||||
  @JsonProperty
 | 
			
		||||
  private String replicationName;
 | 
			
		||||
 | 
			
		||||
  @JsonProperty
 | 
			
		||||
  private boolean replicationPrimary;
 | 
			
		||||
 | 
			
		||||
  @NotEmpty
 | 
			
		||||
  @JsonProperty
 | 
			
		||||
  private String replicationUrl;
 | 
			
		||||
| 
						 | 
				
			
			@ -21,6 +28,14 @@ public class DirectoryServerConfiguration {
 | 
			
		|||
  @JsonProperty
 | 
			
		||||
  private String replicationCaCertificate;
 | 
			
		||||
 | 
			
		||||
  public String getReplicationName() {
 | 
			
		||||
    return replicationName;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public boolean isReplicationPrimary() {
 | 
			
		||||
    return replicationPrimary;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public String getReplicationUrl() {
 | 
			
		||||
    return replicationUrl;
 | 
			
		||||
  }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -30,17 +30,22 @@ import static com.codahale.metrics.MetricRegistry.name;
 | 
			
		|||
public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
 | 
			
		||||
 | 
			
		||||
  private static final Logger logger = LoggerFactory.getLogger(DirectoryReconciler.class);
 | 
			
		||||
 | 
			
		||||
  private static final MetricRegistry metricRegistry      = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
 | 
			
		||||
  private static final Timer          sendChunkTimer      = metricRegistry.timer(name(DirectoryReconciler.class, "sendChunk"));
 | 
			
		||||
  private static final Meter          sendChunkErrorMeter = metricRegistry.meter(name(DirectoryReconciler.class, "sendChunkError"));
 | 
			
		||||
 | 
			
		||||
  private final String                        name;
 | 
			
		||||
  private final boolean                       primary;
 | 
			
		||||
  private final DirectoryManager              directoryManager;
 | 
			
		||||
  private final DirectoryReconciliationClient reconciliationClient;
 | 
			
		||||
  private final Timer                         sendChunkTimer;
 | 
			
		||||
  private final Meter                         sendChunkErrorMeter;
 | 
			
		||||
 | 
			
		||||
  public DirectoryReconciler(DirectoryReconciliationClient reconciliationClient, DirectoryManager directoryManager) {
 | 
			
		||||
  public DirectoryReconciler(String name, boolean primary, DirectoryReconciliationClient reconciliationClient, DirectoryManager directoryManager) {
 | 
			
		||||
    this.name                 = name;
 | 
			
		||||
    this.primary              = primary;
 | 
			
		||||
    this.directoryManager     = directoryManager;
 | 
			
		||||
    this.reconciliationClient = reconciliationClient;
 | 
			
		||||
    sendChunkTimer            = metricRegistry.timer(name(DirectoryReconciler.class, name, "sendChunk"));
 | 
			
		||||
    sendChunkErrorMeter       = metricRegistry.meter(name(DirectoryReconciler.class, name, "sendChunkError"));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Override
 | 
			
		||||
| 
						 | 
				
			
			@ -49,13 +54,15 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
 | 
			
		|||
  @Override
 | 
			
		||||
  public void onCrawlEnd(Optional<UUID> fromUuid) {
 | 
			
		||||
    DirectoryReconciliationRequest  request  = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList());
 | 
			
		||||
    DirectoryReconciliationResponse response = sendChunk(request);
 | 
			
		||||
    sendChunk(request);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Override
 | 
			
		||||
  protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
 | 
			
		||||
 | 
			
		||||
    updateDirectoryCache(chunkAccounts);
 | 
			
		||||
    if (primary) {
 | 
			
		||||
      updateDirectoryCache(chunkAccounts);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    DirectoryReconciliationRequest  request  = createChunkRequest(fromUuid, chunkAccounts);
 | 
			
		||||
    DirectoryReconciliationResponse response = sendChunk(request);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -42,7 +42,7 @@ public class DirectoryReconciliationClient {
 | 
			
		|||
    this.client         = initializeClient(directoryServerConfiguration);
 | 
			
		||||
 | 
			
		||||
    SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME)
 | 
			
		||||
                          .register(name(getClass(), "days_until_certificate_expiration"),
 | 
			
		||||
                          .register(name(getClass(), directoryServerConfiguration.getReplicationName(), "days_until_certificate_expiration"),
 | 
			
		||||
                                    new CertificateExpirationGauge(getCertificate(directoryServerConfiguration.getReplicationCaCertificate())));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -42,7 +42,7 @@ public class DirectoryReconcilerTest {
 | 
			
		|||
  private final BatchOperationHandle          batchOperationHandle  = mock(BatchOperationHandle.class);
 | 
			
		||||
  private final DirectoryManager              directoryManager      = mock(DirectoryManager.class);
 | 
			
		||||
  private final DirectoryReconciliationClient reconciliationClient  = mock(DirectoryReconciliationClient.class);
 | 
			
		||||
  private final DirectoryReconciler           directoryReconciler   = new DirectoryReconciler(reconciliationClient, directoryManager);
 | 
			
		||||
  private final DirectoryReconciler           directoryReconciler   = new DirectoryReconciler("test", true, reconciliationClient, directoryManager);
 | 
			
		||||
 | 
			
		||||
  private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue