Share resources between Lettuce clients.
This commit is contained in:
		
							parent
							
								
									2bcc90a9eb
								
							
						
					
					
						commit
						eab1f503a5
					
				| 
						 | 
				
			
			@ -40,6 +40,7 @@ import io.dropwizard.db.PooledDataSourceFactory;
 | 
			
		|||
import io.dropwizard.jdbi3.JdbiFactory;
 | 
			
		||||
import io.dropwizard.setup.Bootstrap;
 | 
			
		||||
import io.dropwizard.setup.Environment;
 | 
			
		||||
import io.lettuce.core.resource.ClientResources;
 | 
			
		||||
import io.micrometer.core.instrument.Clock;
 | 
			
		||||
import io.micrometer.core.instrument.Metrics;
 | 
			
		||||
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
 | 
			
		||||
| 
						 | 
				
			
			@ -282,9 +283,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
 | 
			
		|||
    ReplicatedJedisPool directoryClient     = directoryClientFactory.getRedisClientPool();
 | 
			
		||||
    ReplicatedJedisPool pushSchedulerClient = pushSchedulerClientFactory.getRedisClientPool();
 | 
			
		||||
 | 
			
		||||
    FaultTolerantRedisCluster cacheCluster         = new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration());
 | 
			
		||||
    FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration());
 | 
			
		||||
    FaultTolerantRedisCluster metricsCluster       = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration());
 | 
			
		||||
    ClientResources redisClusterClientResources = ClientResources.builder().build();
 | 
			
		||||
 | 
			
		||||
    FaultTolerantRedisCluster cacheCluster         = new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration(), redisClusterClientResources);
 | 
			
		||||
    FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
 | 
			
		||||
    FaultTolerantRedisCluster metricsCluster       = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration(), redisClusterClientResources);
 | 
			
		||||
 | 
			
		||||
    BlockingQueue<Runnable> keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(10_000);
 | 
			
		||||
    Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), keyspaceNotificationDispatchQueue);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -12,6 +12,7 @@ import io.lettuce.core.cluster.RedisClusterClient;
 | 
			
		|||
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
 | 
			
		||||
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
 | 
			
		||||
import io.lettuce.core.codec.ByteArrayCodec;
 | 
			
		||||
import io.lettuce.core.resource.ClientResources;
 | 
			
		||||
import org.slf4j.Logger;
 | 
			
		||||
import org.slf4j.LoggerFactory;
 | 
			
		||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
 | 
			
		||||
| 
						 | 
				
			
			@ -51,9 +52,9 @@ public class FaultTolerantRedisCluster {
 | 
			
		|||
 | 
			
		||||
    private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisCluster.class);
 | 
			
		||||
 | 
			
		||||
    public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration) {
 | 
			
		||||
    public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, final ClientResources clientResources) {
 | 
			
		||||
        this(name,
 | 
			
		||||
             RedisClusterClient.create(clusterConfiguration.getUrls().stream().map(RedisURI::create).collect(Collectors.toList())),
 | 
			
		||||
             RedisClusterClient.create(clientResources, clusterConfiguration.getUrls().stream().map(RedisURI::create).collect(Collectors.toList())),
 | 
			
		||||
             clusterConfiguration.getTimeout(),
 | 
			
		||||
             clusterConfiguration.getCircuitBreakerConfiguration(),
 | 
			
		||||
             clusterConfiguration.getRetryConfiguration());
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -5,6 +5,7 @@ import io.dropwizard.Application;
 | 
			
		|||
import io.dropwizard.cli.EnvironmentCommand;
 | 
			
		||||
import io.dropwizard.jdbi3.JdbiFactory;
 | 
			
		||||
import io.dropwizard.setup.Environment;
 | 
			
		||||
import io.lettuce.core.resource.ClientResources;
 | 
			
		||||
import net.sourceforge.argparse4j.inf.Namespace;
 | 
			
		||||
import net.sourceforge.argparse4j.inf.Subparser;
 | 
			
		||||
import org.jdbi.v3.core.Jdbi;
 | 
			
		||||
| 
						 | 
				
			
			@ -71,13 +72,15 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
 | 
			
		|||
 | 
			
		||||
      environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 | 
			
		||||
 | 
			
		||||
      JdbiFactory           jdbiFactory     = new JdbiFactory();
 | 
			
		||||
      Jdbi                  accountJdbi     = jdbiFactory.build(environment, configuration.getAccountsDatabaseConfiguration(), "accountdb");
 | 
			
		||||
      Jdbi                  messageJdbi     = jdbiFactory.build(environment, configuration.getMessageStoreConfiguration(), "messagedb" );
 | 
			
		||||
      FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_delete_user", accountJdbi, configuration.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
 | 
			
		||||
      FaultTolerantDatabase messageDatabase = new FaultTolerantDatabase("message_database", messageJdbi, configuration.getMessageStoreConfiguration().getCircuitBreakerConfiguration());
 | 
			
		||||
      JdbiFactory           jdbiFactory                 = new JdbiFactory();
 | 
			
		||||
      Jdbi                  accountJdbi                 = jdbiFactory.build(environment, configuration.getAccountsDatabaseConfiguration(), "accountdb");
 | 
			
		||||
      Jdbi                  messageJdbi                 = jdbiFactory.build(environment, configuration.getMessageStoreConfiguration(), "messagedb" );
 | 
			
		||||
      FaultTolerantDatabase accountDatabase             = new FaultTolerantDatabase("account_database_delete_user", accountJdbi, configuration.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
 | 
			
		||||
      FaultTolerantDatabase messageDatabase             = new FaultTolerantDatabase("message_database", messageJdbi, configuration.getMessageStoreConfiguration().getCircuitBreakerConfiguration());
 | 
			
		||||
      ClientResources       redisClusterClientResources = ClientResources.builder().build();
 | 
			
		||||
 | 
			
		||||
      FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration());
 | 
			
		||||
 | 
			
		||||
      FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration(), redisClusterClientResources);
 | 
			
		||||
 | 
			
		||||
      ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(4).build();
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -88,8 +91,8 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
 | 
			
		|||
      Keys                      keys                 = new Keys(accountDatabase);
 | 
			
		||||
      Messages                  messages             = new Messages(messageDatabase);
 | 
			
		||||
      ReplicatedJedisPool       redisClient          = new RedisClientFactory("directory_cache_delete_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
 | 
			
		||||
      FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration());
 | 
			
		||||
      FaultTolerantRedisCluster metricsCluster       = new FaultTolerantRedisCluster("metrics_cluster", configuration.getMetricsClusterConfiguration());
 | 
			
		||||
      FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
 | 
			
		||||
      FaultTolerantRedisCluster metricsCluster       = new FaultTolerantRedisCluster("metrics_cluster", configuration.getMetricsClusterConfiguration(), redisClusterClientResources);
 | 
			
		||||
      MessagesCache             messagesCache        = new MessagesCache(messagesCacheCluster, keyspaceNotificationDispatchExecutor);
 | 
			
		||||
      PushLatencyManager        pushLatencyManager   = new PushLatencyManager(metricsCluster);
 | 
			
		||||
      DirectoryQueue            directoryQueue       = new DirectoryQueue  (configuration.getDirectoryConfiguration().getSqsConfiguration());
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue