Use per-shard circuit breakers for additional Redis clusters

This commit is contained in:
Chris Eager 2024-04-12 11:34:28 -05:00 committed by Chris Eager
parent be6f4e38b8
commit fc1f471369
5 changed files with 35 additions and 30 deletions

View File

@ -421,18 +421,18 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ConnectionEventLogger.logConnectionEvents(redisClientResources);
FaultTolerantRedisCluster cacheCluster = new ClusterFaultTolerantRedisCluster("main_cache_cluster",
config.getCacheClusterConfiguration(), redisClientResources);
FaultTolerantRedisCluster cacheCluster = new ShardFaultTolerantRedisCluster("main_cache",
config.getCacheClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster messagesCluster = new ClusterFaultTolerantRedisCluster("messages_cluster",
config.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClientResources);
FaultTolerantRedisCluster clientPresenceCluster = new ClusterFaultTolerantRedisCluster("client_presence_cluster",
config.getClientPresenceClusterConfiguration(), redisClientResources);
FaultTolerantRedisCluster clientPresenceCluster = new ShardFaultTolerantRedisCluster("client_presence",
config.getClientPresenceClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster metricsCluster = new ShardFaultTolerantRedisCluster("metrics",
config.getMetricsClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster pushSchedulerCluster = new ClusterFaultTolerantRedisCluster("push_scheduler",
config.getPushSchedulerCluster(), redisClientResources);
FaultTolerantRedisCluster rateLimitersCluster = new ClusterFaultTolerantRedisCluster("rate_limiters",
config.getRateLimitersCluster(), redisClientResources);
FaultTolerantRedisCluster pushSchedulerCluster = new ShardFaultTolerantRedisCluster("push_scheduler",
config.getPushSchedulerCluster(), redisClientResourcesBuilder);
FaultTolerantRedisCluster rateLimitersCluster = new ShardFaultTolerantRedisCluster("rate_limiters",
config.getRateLimitersCluster(), redisClientResourcesBuilder);
final BlockingQueue<Runnable> keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(100_000);
Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(),

View File

@ -31,6 +31,7 @@ import org.whispersystems.textsecuregcm.controllers.SecureValueRecovery2Controll
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.redis.ClusterFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ShardFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client;
import org.whispersystems.textsecuregcm.storage.Account;
@ -106,10 +107,11 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class, dynamicConfigurationExecutor);
dynamicConfigurationManager.start();
ClientResources redisClusterClientResources = ClientResources.builder().build();
final ClientResources.Builder redisClientResourcesBuilder = ClientResources.builder();
final ClientResources redisClusterClientResources = redisClientResourcesBuilder.build();
FaultTolerantRedisCluster cacheCluster = new ClusterFaultTolerantRedisCluster("main_cache_cluster",
configuration.getCacheClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster cacheCluster = new ShardFaultTolerantRedisCluster("main_cache_cluster",
configuration.getCacheClusterConfiguration(), redisClientResourcesBuilder);
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
environment.lifecycle().executorService("messageDelivery-%d").maxThreads(4)
@ -179,10 +181,10 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
FaultTolerantRedisCluster messageReadDeleteCluster = new ClusterFaultTolerantRedisCluster(
"message_read_delete_cluster",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster clientPresenceCluster = new ClusterFaultTolerantRedisCluster("client_presence_cluster",
configuration.getClientPresenceClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster rateLimitersCluster = new ClusterFaultTolerantRedisCluster("rate_limiters",
configuration.getRateLimitersCluster(), redisClusterClientResources);
FaultTolerantRedisCluster clientPresenceCluster = new ShardFaultTolerantRedisCluster("client_presence",
configuration.getClientPresenceClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster rateLimitersCluster = new ShardFaultTolerantRedisCluster("rate_limiters",
configuration.getRateLimitersCluster(), redisClientResourcesBuilder);
SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client(
secureValueRecoveryCredentialsGenerator, secureValueRecoveryExecutor, secureValueRecoveryServiceRetryExecutor,
configuration.getSvr2Configuration());

View File

@ -31,6 +31,7 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.redis.ClusterFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ShardFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client;
import org.whispersystems.textsecuregcm.storage.AccountLockManager;
@ -67,7 +68,7 @@ record CommandDependencies(
ClientPresenceManager clientPresenceManager,
KeysManager keysManager,
FaultTolerantRedisCluster cacheCluster,
ClientResources redisClusterClientResources,
ClientResources.Builder redisClusterClientResourcesBuilder,
BackupManager backupManager,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
@ -89,10 +90,11 @@ record CommandDependencies(
MetricsUtil.configureRegistries(configuration, environment, dynamicConfigurationManager);
ClientResources redisClusterClientResources = ClientResources.builder().build();
final ClientResources.Builder redisClientResourcesBuilder = ClientResources.builder();
ClientResources redisClusterClientResources = redisClientResourcesBuilder.build();
FaultTolerantRedisCluster cacheCluster = new ClusterFaultTolerantRedisCluster("main_cache_cluster",
configuration.getCacheClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster cacheCluster = new ShardFaultTolerantRedisCluster("main_cache",
configuration.getCacheClusterConfiguration(), redisClientResourcesBuilder);
ScheduledExecutorService recurringJobExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "recurringJob-%d")).threads(2).build();
@ -167,10 +169,10 @@ record CommandDependencies(
FaultTolerantRedisCluster messageReadDeleteCluster = new ClusterFaultTolerantRedisCluster(
"message_read_delete_cluster",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster clientPresenceCluster = new ClusterFaultTolerantRedisCluster("client_presence_cluster",
configuration.getClientPresenceClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster rateLimitersCluster = new ClusterFaultTolerantRedisCluster("rate_limiters",
configuration.getRateLimitersCluster(), redisClusterClientResources);
FaultTolerantRedisCluster clientPresenceCluster = new ShardFaultTolerantRedisCluster("client_presence",
configuration.getClientPresenceClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster rateLimitersCluster = new ShardFaultTolerantRedisCluster("rate_limiters",
configuration.getRateLimitersCluster(), redisClientResourcesBuilder);
SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client(
secureValueRecoveryCredentialsGenerator, secureValueRecoveryServiceExecutor,
secureValueRecoveryServiceRetryExecutor,
@ -231,7 +233,7 @@ record CommandDependencies(
clientPresenceManager,
keys,
cacheCluster,
redisClusterClientResources,
redisClientResourcesBuilder,
backupManager,
dynamicConfigurationManager
);

View File

@ -19,8 +19,8 @@ import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.push.APNSender;
import org.whispersystems.textsecuregcm.push.ApnPushNotificationScheduler;
import org.whispersystems.textsecuregcm.redis.ClusterFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ShardFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
public class ScheduledApnPushNotificationSenderServiceCommand extends ServerCommand<WhisperServerConfiguration> {
@ -64,8 +64,8 @@ public class ScheduledApnPushNotificationSenderServiceCommand extends ServerComm
});
}
final FaultTolerantRedisCluster pushSchedulerCluster = new ClusterFaultTolerantRedisCluster("push_scheduler",
configuration.getPushSchedulerCluster(), deps.redisClusterClientResources());
final FaultTolerantRedisCluster pushSchedulerCluster = new ShardFaultTolerantRedisCluster("push_scheduler",
configuration.getPushSchedulerCluster(), deps.redisClusterClientResourcesBuilder());
final ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d"))
.maxThreads(1).minThreads(1).build();

View File

@ -12,8 +12,8 @@ import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.resource.ClientResources;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
@ -81,8 +81,9 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb
@Override
public void beforeEach(final ExtensionContext context) throws Exception {
redisCluster = new ClusterFaultTolerantRedisCluster("test-cluster",
RedisClusterClient.create(getRedisURIs()),
redisCluster = new ShardFaultTolerantRedisCluster("test-cluster",
ClientResources.builder(),
getRedisURIs(),
timeout,
new CircuitBreakerConfiguration(),
retryConfiguration);