diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index d4f9a7bcb..bd2572af5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -421,18 +421,18 @@ public class WhisperServerService extends Application keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(100_000); Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java index 5a51b4fca..f7e95b9b3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java @@ -31,6 +31,7 @@ import org.whispersystems.textsecuregcm.controllers.SecureValueRecovery2Controll import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.redis.ClusterFaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.ShardFaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; import org.whispersystems.textsecuregcm.storage.Account; @@ -106,10 +107,11 @@ public class AssignUsernameCommand extends EnvironmentCommand dynamicConfigurationManager) { @@ -89,10 +90,11 @@ record CommandDependencies( MetricsUtil.configureRegistries(configuration, environment, dynamicConfigurationManager); - ClientResources redisClusterClientResources = ClientResources.builder().build(); + final ClientResources.Builder redisClientResourcesBuilder = ClientResources.builder(); + ClientResources redisClusterClientResources = redisClientResourcesBuilder.build(); - FaultTolerantRedisCluster cacheCluster = new ClusterFaultTolerantRedisCluster("main_cache_cluster", - configuration.getCacheClusterConfiguration(), redisClusterClientResources); + FaultTolerantRedisCluster cacheCluster = new ShardFaultTolerantRedisCluster("main_cache", + configuration.getCacheClusterConfiguration(), redisClientResourcesBuilder); ScheduledExecutorService recurringJobExecutor = environment.lifecycle() .scheduledExecutorService(name(name, "recurringJob-%d")).threads(2).build(); @@ -167,10 +169,10 @@ record CommandDependencies( FaultTolerantRedisCluster messageReadDeleteCluster = new ClusterFaultTolerantRedisCluster( "message_read_delete_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources); - FaultTolerantRedisCluster clientPresenceCluster = new ClusterFaultTolerantRedisCluster("client_presence_cluster", - configuration.getClientPresenceClusterConfiguration(), redisClusterClientResources); - FaultTolerantRedisCluster rateLimitersCluster = new ClusterFaultTolerantRedisCluster("rate_limiters", - configuration.getRateLimitersCluster(), redisClusterClientResources); + FaultTolerantRedisCluster clientPresenceCluster = new ShardFaultTolerantRedisCluster("client_presence", + configuration.getClientPresenceClusterConfiguration(), redisClientResourcesBuilder); + FaultTolerantRedisCluster rateLimitersCluster = new ShardFaultTolerantRedisCluster("rate_limiters", + configuration.getRateLimitersCluster(), redisClientResourcesBuilder); SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client( secureValueRecoveryCredentialsGenerator, secureValueRecoveryServiceExecutor, secureValueRecoveryServiceRetryExecutor, @@ -231,7 +233,7 @@ record CommandDependencies( clientPresenceManager, keys, cacheCluster, - redisClusterClientResources, + redisClientResourcesBuilder, backupManager, dynamicConfigurationManager ); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ScheduledApnPushNotificationSenderServiceCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ScheduledApnPushNotificationSenderServiceCommand.java index 7bfd802e9..199f61b0a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ScheduledApnPushNotificationSenderServiceCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ScheduledApnPushNotificationSenderServiceCommand.java @@ -19,8 +19,8 @@ import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.push.APNSender; import org.whispersystems.textsecuregcm.push.ApnPushNotificationScheduler; -import org.whispersystems.textsecuregcm.redis.ClusterFaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.ShardFaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler; public class ScheduledApnPushNotificationSenderServiceCommand extends ServerCommand { @@ -64,8 +64,8 @@ public class ScheduledApnPushNotificationSenderServiceCommand extends ServerComm }); } - final FaultTolerantRedisCluster pushSchedulerCluster = new ClusterFaultTolerantRedisCluster("push_scheduler", - configuration.getPushSchedulerCluster(), deps.redisClusterClientResources()); + final FaultTolerantRedisCluster pushSchedulerCluster = new ShardFaultTolerantRedisCluster("push_scheduler", + configuration.getPushSchedulerCluster(), deps.redisClusterClientResourcesBuilder()); final ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")) .maxThreads(1).minThreads(1).build(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java index 8521aaaa8..bce845b33 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java @@ -12,8 +12,8 @@ import io.lettuce.core.RedisException; import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.sync.RedisCommands; -import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.SlotHash; +import io.lettuce.core.resource.ClientResources; import java.io.File; import java.io.IOException; import java.net.ServerSocket; @@ -81,8 +81,9 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb @Override public void beforeEach(final ExtensionContext context) throws Exception { - redisCluster = new ClusterFaultTolerantRedisCluster("test-cluster", - RedisClusterClient.create(getRedisURIs()), + redisCluster = new ShardFaultTolerantRedisCluster("test-cluster", + ClientResources.builder(), + getRedisURIs(), timeout, new CircuitBreakerConfiguration(), retryConfiguration);