From a9117010f90fc9d40a5a400c0ba20d49291498c9 Mon Sep 17 00:00:00 2001 From: Jon Chambers <63609320+jon-signal@users.noreply.github.com> Date: Wed, 9 Oct 2024 09:22:10 -0400 Subject: [PATCH] Introduce `FaultTolerantRedisClient` --- service/config/sample.yml | 5 +- .../WhisperServerConfiguration.java | 9 +- .../textsecuregcm/WhisperServerService.java | 20 ++- ...a => FaultTolerantRedisClientFactory.java} | 6 +- .../FaultTolerantRedisClusterFactory.java | 4 +- .../ProvisioningConfiguration.java | 20 --- .../RedisClusterConfiguration.java | 6 +- .../configuration/RedisConfiguration.java | 38 ++++- .../currency/CurrencyConversionManager.java | 6 +- .../limits/BaseRateLimiters.java | 8 +- .../limits/CardinalityEstimator.java | 6 +- .../limits/DynamicRateLimiter.java | 6 +- .../limits/MessageDeliveryLoopMonitor.java | 4 +- .../textsecuregcm/limits/RateLimiters.java | 6 +- .../limits/StaticRateLimiter.java | 6 +- .../providers/RedisClusterHealthCheck.java | 6 +- .../push/ClientPresenceManager.java | 12 +- .../push/ProvisioningManager.java | 52 ++---- .../push/PushNotificationScheduler.java | 8 +- ...AbstractFaultTolerantPubSubConnection.java | 65 +++++++ .../textsecuregcm/redis/ClusterLuaScript.java | 6 +- .../FaultTolerantPubSubClusterConnection.java | 54 ++++++ .../redis/FaultTolerantPubSubConnection.java | 87 +--------- .../redis/FaultTolerantRedisClient.java | 159 ++++++++++++++++++ ...a => FaultTolerantRedisClusterClient.java} | 18 +- .../storage/AccountsManager.java | 6 +- .../textsecuregcm/storage/MessagesCache.java | 32 ++-- .../storage/MessagesCacheGetItemsScript.java | 4 +- ...MessagesCacheGetQueuesToPersistScript.java | 4 +- .../storage/MessagesCacheInsertScript.java | 4 +- ...edMultiRecipientPayloadAndViewsScript.java | 4 +- .../MessagesCacheRemoveByGuidScript.java | 4 +- .../MessagesCacheRemoveQueueScript.java | 4 +- ...eRemoveRecipientViewFromMrmDataScript.java | 4 +- .../storage/ProfilesManager.java | 6 +- .../storage/ReportMessageManager.java | 6 +- .../workers/CommandDependencies.java | 17 +- .../io.dropwizard.jackson.Discoverable | 2 +- .../LocalFaultTolerantRedisClientFactory.java | 49 ++++++ ...LocalFaultTolerantRedisClusterFactory.java | 6 +- .../LocalSingletonRedisClientFactory.java | 52 ------ .../CurrencyConversionManagerTest.java | 4 +- .../limits/CardinalityEstimatorTest.java | 6 +- .../limits/RateLimitersLuaScriptTest.java | 8 +- .../limits/RateLimitersTest.java | 4 +- .../push/ProvisioningManagerTest.java | 7 +- .../push/PushNotificationSchedulerTest.java | 4 +- .../redis/ClusterLuaScriptTest.java | 10 +- ...tTolerantPubSubClusterConnectionTest.java} | 6 +- .../redis/FaultTolerantRedisClientTest.java | 102 +++++++++++ ... FaultTolerantRedisClusterClientTest.java} | 10 +- .../redis/RedisClusterExtension.java | 6 +- .../redis/RedisServerExtension.java | 99 +++++++++++ .../redis/RedisSingletonExtension.java | 92 ---------- .../storage/AccountsManagerTest.java | 4 +- .../storage/MessagesCacheTest.java | 4 +- .../storage/ProfilesManagerTest.java | 4 +- .../tests/util/RedisClusterHelper.java | 8 +- ...figuration.FaultTolerantRedisClientFactory | 1 + ....configuration.SingletonRedisClientFactory | 1 - service/src/test/resources/config/test.yml | 5 +- 61 files changed, 744 insertions(+), 462 deletions(-) rename service/src/main/java/org/whispersystems/textsecuregcm/configuration/{SingletonRedisClientFactory.java => FaultTolerantRedisClientFactory.java} (63%) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/ProvisioningConfiguration.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/redis/AbstractFaultTolerantPubSubConnection.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnection.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java rename service/src/main/java/org/whispersystems/textsecuregcm/redis/{FaultTolerantRedisCluster.java => FaultTolerantRedisClusterClient.java} (90%) create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFaultTolerantRedisClientFactory.java delete mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalSingletonRedisClientFactory.java rename service/src/test/java/org/whispersystems/textsecuregcm/redis/{FaultTolerantPubSubConnectionTest.java => FaultTolerantPubSubClusterConnectionTest.java} (96%) create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClientTest.java rename service/src/test/java/org/whispersystems/textsecuregcm/redis/{FaultTolerantRedisClusterTest.java => FaultTolerantRedisClusterClientTest.java} (98%) create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisServerExtension.java delete mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisSingletonExtension.java create mode 100644 service/src/test/resources/META-INF/services/org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClientFactory delete mode 100644 service/src/test/resources/META-INF/services/org.whispersystems.textsecuregcm.configuration.SingletonRedisClientFactory diff --git a/service/config/sample.yml b/service/config/sample.yml index abe71c8b3..bdad29a6a 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -163,9 +163,8 @@ cacheCluster: # Redis server configuration for cache cluster clientPresenceCluster: # Redis server configuration for client presence cluster configurationUri: redis://redis.example.com:6379/ -provisioning: - pubsub: # Redis server configuration for pubsub cluster - uri: redis://redis.example.com:6379/ +pubsub: # Redis server configuration for pubsub cluster + uri: redis://redis.example.com:6379/ pushSchedulerCluster: # Redis server configuration for push scheduler cluster configurationUri: redis://redis.example.com:6379/ diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 80f476c99..6e1d401d1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -23,7 +23,6 @@ import org.whispersystems.textsecuregcm.configuration.BadgesConfiguration; import org.whispersystems.textsecuregcm.configuration.BraintreeConfiguration; import org.whispersystems.textsecuregcm.configuration.Cdn3StorageManagerConfiguration; import org.whispersystems.textsecuregcm.configuration.CdnConfiguration; -import org.whispersystems.textsecuregcm.configuration.ClientCdnConfiguration; import org.whispersystems.textsecuregcm.configuration.ClientReleaseConfiguration; import org.whispersystems.textsecuregcm.configuration.DatadogConfiguration; import org.whispersystems.textsecuregcm.configuration.DefaultAwsCredentialsFactory; @@ -34,6 +33,7 @@ import org.whispersystems.textsecuregcm.configuration.DynamoDbClientFactory; import org.whispersystems.textsecuregcm.configuration.DynamoDbTables; import org.whispersystems.textsecuregcm.configuration.ExternalRequestFilterConfiguration; import org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClusterFactory; +import org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClientFactory; import org.whispersystems.textsecuregcm.configuration.FcmConfiguration; import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguration; import org.whispersystems.textsecuregcm.configuration.GenericZkConfig; @@ -47,7 +47,6 @@ import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration; import org.whispersystems.textsecuregcm.configuration.NoiseWebSocketTunnelConfiguration; import org.whispersystems.textsecuregcm.configuration.OneTimeDonationConfiguration; import org.whispersystems.textsecuregcm.configuration.PaymentsServiceConfiguration; -import org.whispersystems.textsecuregcm.configuration.ProvisioningConfiguration; import org.whispersystems.textsecuregcm.configuration.RegistrationServiceClientFactory; import org.whispersystems.textsecuregcm.configuration.RemoteConfigConfiguration; import org.whispersystems.textsecuregcm.configuration.ReportMessageConfiguration; @@ -143,7 +142,7 @@ public class WhisperServerConfiguration extends Configuration { @NotNull @Valid @JsonProperty - private ProvisioningConfiguration provisioning; + private FaultTolerantRedisClientFactory pubsub; @NotNull @Valid @@ -410,8 +409,8 @@ public class WhisperServerConfiguration extends Configuration { return cacheCluster; } - public ProvisioningConfiguration getProvisioningConfiguration() { - return provisioning; + public FaultTolerantRedisClientFactory getRedisPubSubConfiguration() { + return pubsub; } public SecureValueRecovery2Configuration getSvr2Configuration() { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index ff0570d18..104a68f94 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -200,7 +200,8 @@ import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.redis.ConnectionEventLogger; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient; import org.whispersystems.textsecuregcm.registration.RegistrationServiceClient; import org.whispersystems.textsecuregcm.s3.PolicySigner; import org.whispersystems.textsecuregcm.s3.PostPolicyGenerator; @@ -447,18 +448,21 @@ public class WhisperServerService extends Application keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(100_000); Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), keyspaceNotificationDispatchQueue); @@ -652,9 +656,7 @@ public class WhisperServerService extends Application currencies, final ScheduledExecutorService executor, final Clock clock) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/BaseRateLimiters.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/BaseRateLimiters.java index 4f87761ae..0a0ef700a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/BaseRateLimiters.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/BaseRateLimiters.java @@ -22,7 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; public abstract class BaseRateLimiters { @@ -39,7 +39,7 @@ public abstract class BaseRateLimiters { final Map configs, final DynamicConfigurationManager dynamicConfigurationManager, final ClusterLuaScript validateScript, - final FaultTolerantRedisCluster cacheCluster, + final FaultTolerantRedisClusterClient cacheCluster, final Clock clock) { this.configs = configs; this.rateLimiterByDescriptor = Arrays.stream(values) @@ -69,7 +69,7 @@ public abstract class BaseRateLimiters { } } - protected static ClusterLuaScript defaultScript(final FaultTolerantRedisCluster cacheCluster) { + protected static ClusterLuaScript defaultScript(final FaultTolerantRedisClusterClient cacheCluster) { try { return ClusterLuaScript.fromResource( cacheCluster, "lua/validate_rate_limit.lua", ScriptOutputType.INTEGER); @@ -83,7 +83,7 @@ public abstract class BaseRateLimiters { final Map configs, final DynamicConfigurationManager dynamicConfigurationManager, final ClusterLuaScript validateScript, - final FaultTolerantRedisCluster cacheCluster, + final FaultTolerantRedisClusterClient cacheCluster, final Clock clock) { if (descriptor.isDynamic()) { final Supplier configResolver = () -> { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/CardinalityEstimator.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/CardinalityEstimator.java index 2f8f55dc7..62be0fb23 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/CardinalityEstimator.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/CardinalityEstimator.java @@ -12,7 +12,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.util.Util; /** @@ -21,11 +21,11 @@ import org.whispersystems.textsecuregcm.util.Util; public class CardinalityEstimator { private volatile double uniqueElementCount; - private final FaultTolerantRedisCluster redisCluster; + private final FaultTolerantRedisClusterClient redisCluster; private final String hllName; private final Duration period; - public CardinalityEstimator(final FaultTolerantRedisCluster redisCluster, final String name, final Duration period) { + public CardinalityEstimator(final FaultTolerantRedisClusterClient redisCluster, final String name, final Duration period) { this.redisCluster = redisCluster; this.hllName = "cardinality_estimator::" + name; this.period = period; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/DynamicRateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/DynamicRateLimiter.java index 2b38483e2..c8f7e42e6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/DynamicRateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/DynamicRateLimiter.java @@ -15,7 +15,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; public class DynamicRateLimiter implements RateLimiter { @@ -26,7 +26,7 @@ public class DynamicRateLimiter implements RateLimiter { private final ClusterLuaScript validateScript; - private final FaultTolerantRedisCluster cluster; + private final FaultTolerantRedisClusterClient cluster; private final Clock clock; @@ -38,7 +38,7 @@ public class DynamicRateLimiter implements RateLimiter { final DynamicConfigurationManager dynamicConfigurationManager, final Supplier configResolver, final ClusterLuaScript validateScript, - final FaultTolerantRedisCluster cluster, + final FaultTolerantRedisClusterClient cluster, final Clock clock) { this.name = requireNonNull(name); this.dynamicConfigurationManager = dynamicConfigurationManager; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/MessageDeliveryLoopMonitor.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/MessageDeliveryLoopMonitor.java index 4cc1fb4fd..bc4ecf21f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/MessageDeliveryLoopMonitor.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/MessageDeliveryLoopMonitor.java @@ -11,7 +11,7 @@ import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; public class MessageDeliveryLoopMonitor { @@ -22,7 +22,7 @@ public class MessageDeliveryLoopMonitor { private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryLoopMonitor.class); - public MessageDeliveryLoopMonitor(final FaultTolerantRedisCluster rateLimitCluster) { + public MessageDeliveryLoopMonitor(final FaultTolerantRedisClusterClient rateLimitCluster) { try { getDeliveryAttemptsScript = ClusterLuaScript.fromResource(rateLimitCluster, "lua/get_delivery_attempt_count.lua", ScriptOutputType.INTEGER); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java index 6ba5e2de5..5ae44d05f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java @@ -11,7 +11,7 @@ import java.time.Duration; import java.util.Map; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; public class RateLimiters extends BaseRateLimiters { @@ -81,7 +81,7 @@ public class RateLimiters extends BaseRateLimiters { public static RateLimiters createAndValidate( final Map configs, final DynamicConfigurationManager dynamicConfigurationManager, - final FaultTolerantRedisCluster cacheCluster) { + final FaultTolerantRedisClusterClient cacheCluster) { final RateLimiters rateLimiters = new RateLimiters( configs, dynamicConfigurationManager, defaultScript(cacheCluster), cacheCluster, Clock.systemUTC()); rateLimiters.validateValuesAndConfigs(); @@ -93,7 +93,7 @@ public class RateLimiters extends BaseRateLimiters { final Map configs, final DynamicConfigurationManager dynamicConfigurationManager, final ClusterLuaScript validateScript, - final FaultTolerantRedisCluster cacheCluster, + final FaultTolerantRedisClusterClient cacheCluster, final Clock clock) { super(For.values(), configs, dynamicConfigurationManager, validateScript, cacheCluster, clock); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/StaticRateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/StaticRateLimiter.java index e661c4a33..78cb8e229 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/StaticRateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/StaticRateLimiter.java @@ -20,7 +20,7 @@ import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfigurati import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.util.ExceptionUtils; import org.whispersystems.textsecuregcm.util.Util; @@ -36,7 +36,7 @@ public class StaticRateLimiter implements RateLimiter { private final ClusterLuaScript validateScript; - private final FaultTolerantRedisCluster cacheCluster; + private final FaultTolerantRedisClusterClient cacheCluster; private final Clock clock; @@ -45,7 +45,7 @@ public class StaticRateLimiter implements RateLimiter { final String name, final RateLimiterConfig config, final ClusterLuaScript validateScript, - final FaultTolerantRedisCluster cacheCluster, + final FaultTolerantRedisClusterClient cacheCluster, final Clock clock, final DynamicConfigurationManager dynamicConfigurationManager) { this.name = requireNonNull(name); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClusterHealthCheck.java b/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClusterHealthCheck.java index bfef14dee..67b51c752 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClusterHealthCheck.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClusterHealthCheck.java @@ -6,13 +6,13 @@ package org.whispersystems.textsecuregcm.providers; import com.codahale.metrics.health.HealthCheck; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; public class RedisClusterHealthCheck extends HealthCheck { - private final FaultTolerantRedisCluster redisCluster; + private final FaultTolerantRedisClusterClient redisCluster; - public RedisClusterHealthCheck(final FaultTolerantRedisCluster redisCluster) { + public RedisClusterHealthCheck(final FaultTolerantRedisClusterClient redisCluster) { this.redisCluster = redisCluster; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java index 7045b57f6..86939abb3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java @@ -35,8 +35,8 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.storage.Device; /** @@ -52,8 +52,8 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter pubSubConnection; + private final FaultTolerantRedisClusterClient presenceCluster; + private final FaultTolerantPubSubClusterConnection pubSubConnection; private final ClusterLuaScript clearPresenceScript; private final ClusterLuaScript renewPresenceScript; @@ -80,7 +80,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter getPubSubConnection() { + FaultTolerantPubSubClusterConnection getPubSubConnection() { return pubSubConnection; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ProvisioningManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ProvisioningManager.java index bddd25985..f1a83a6ae 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ProvisioningManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ProvisioningManager.java @@ -10,12 +10,7 @@ import static com.codahale.metrics.MetricRegistry.name; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import io.dropwizard.lifecycle.Managed; -import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import io.lettuce.core.RedisClient; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.codec.ByteArrayCodec; import io.lettuce.core.pubsub.RedisPubSubAdapter; -import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tags; import java.nio.charset.StandardCharsets; @@ -24,18 +19,15 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient; import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.storage.PubSubProtos; -import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; public class ProvisioningManager extends RedisPubSubAdapter implements Managed { - private final RedisClient redisClient; - private final StatefulRedisPubSubConnection subscriptionConnection; - private final StatefulRedisConnection publicationConnection; - - private final CircuitBreaker circuitBreaker; + private final FaultTolerantRedisClient pubSubClient; + private final FaultTolerantPubSubConnection pubSubConnection; private final Map> listenersByProvisioningAddress = new ConcurrentHashMap<>(); @@ -50,46 +42,31 @@ public class ProvisioningManager extends RedisPubSubAdapter impl private static final Logger logger = LoggerFactory.getLogger(ProvisioningManager.class); - public ProvisioningManager(final RedisClient redisClient, - final CircuitBreakerConfiguration circuitBreakerConfiguration) { - - this.redisClient = redisClient; - - this.subscriptionConnection = redisClient.connectPubSub(new ByteArrayCodec()); - this.publicationConnection = redisClient.connect(new ByteArrayCodec()); - - this.circuitBreaker = CircuitBreaker.of("pubsub-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); - - CircuitBreakerUtil.registerMetrics(circuitBreaker, ProvisioningManager.class, Tags.empty()); + public ProvisioningManager(final FaultTolerantRedisClient pubSubClient) { + this.pubSubClient = pubSubClient; + this.pubSubConnection = pubSubClient.createBinaryPubSubConnection(); Metrics.gaugeMapSize(ACTIVE_LISTENERS_GAUGE_NAME, Tags.empty(), listenersByProvisioningAddress); } @Override public void start() throws Exception { - subscriptionConnection.addListener(this); + pubSubConnection.usePubSubConnection(connection -> connection.addListener(this)); } @Override public void stop() throws Exception { - subscriptionConnection.removeListener(this); - - subscriptionConnection.close(); - publicationConnection.close(); - - redisClient.shutdown(); + pubSubConnection.usePubSubConnection(connection -> connection.removeListener(this)); } public void addListener(final String address, final Consumer listener) { listenersByProvisioningAddress.put(address, listener); - - circuitBreaker.executeRunnable( - () -> subscriptionConnection.sync().subscribe(address.getBytes(StandardCharsets.UTF_8))); + pubSubConnection.usePubSubConnection(connection -> connection.sync().subscribe(address.getBytes(StandardCharsets.UTF_8))); } public void removeListener(final String address) { - RedisOperation.unchecked(() -> circuitBreaker.executeRunnable( - () -> subscriptionConnection.sync().unsubscribe(address.getBytes(StandardCharsets.UTF_8)))); + RedisOperation.unchecked(() -> + pubSubConnection.usePubSubConnection(connection -> connection.sync().unsubscribe(address.getBytes(StandardCharsets.UTF_8)))); listenersByProvisioningAddress.remove(address); } @@ -100,9 +77,8 @@ public class ProvisioningManager extends RedisPubSubAdapter impl .setContent(ByteString.copyFrom(body)) .build(); - final boolean receiverPresent = circuitBreaker.executeSupplier( - () -> publicationConnection.sync() - .publish(address.getBytes(StandardCharsets.UTF_8), pubSubMessage.toByteArray()) > 0); + final boolean receiverPresent = pubSubClient.withBinaryConnection(connection -> + connection.sync().publish(address.getBytes(StandardCharsets.UTF_8), pubSubMessage.toByteArray()) > 0); Metrics.counter(SEND_PROVISIONING_MESSAGE_COUNTER_NAME, "online", String.valueOf(receiverPresent)).increment(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationScheduler.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationScheduler.java index 7c2fb8f37..5596cba81 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationScheduler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationScheduler.java @@ -31,7 +31,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; @@ -62,7 +62,7 @@ public class PushNotificationScheduler implements Managed { private final APNSender apnSender; private final FcmSender fcmSender; private final AccountsManager accountsManager; - private final FaultTolerantRedisCluster pushSchedulingCluster; + private final FaultTolerantRedisClusterClient pushSchedulingCluster; private final Clock clock; private final ClusterLuaScript scheduleBackgroundApnsNotificationScript; @@ -141,7 +141,7 @@ public class PushNotificationScheduler implements Managed { } } - public PushNotificationScheduler(final FaultTolerantRedisCluster pushSchedulingCluster, + public PushNotificationScheduler(final FaultTolerantRedisClusterClient pushSchedulingCluster, final APNSender apnSender, final FcmSender fcmSender, final AccountsManager accountsManager, @@ -158,7 +158,7 @@ public class PushNotificationScheduler implements Managed { } @VisibleForTesting - PushNotificationScheduler(final FaultTolerantRedisCluster pushSchedulingCluster, + PushNotificationScheduler(final FaultTolerantRedisClusterClient pushSchedulingCluster, final APNSender apnSender, final FcmSender fcmSender, final AccountsManager accountsManager, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/AbstractFaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/AbstractFaultTolerantPubSubConnection.java new file mode 100644 index 000000000..f7fc0ad72 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/AbstractFaultTolerantPubSubConnection.java @@ -0,0 +1,65 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.redis; + +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; + +import io.github.resilience4j.retry.Retry; +import io.lettuce.core.RedisException; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; +import java.util.function.Consumer; +import java.util.function.Function; + +abstract class AbstractFaultTolerantPubSubConnection> { + + private final String name; + private final C pubSubConnection; + + private final Retry retry; + + private final Timer executeTimer; + + protected AbstractFaultTolerantPubSubConnection(final String name, + final C pubSubConnection, + final Retry retry) { + + this.name = name; + this.pubSubConnection = pubSubConnection; + this.retry = retry; + + this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub"); + } + + protected String getName() { + return name; + } + + public void usePubSubConnection(final Consumer consumer) { + try { + retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection))); + } catch (final Throwable t) { + if (t instanceof RedisException) { + throw (RedisException) t; + } else { + throw new RedisException(t); + } + } + } + + public T withPubSubConnection(final Function function) { + try { + return retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection))); + } catch (final Throwable t) { + if (t instanceof RedisException) { + throw (RedisException) t; + } else { + throw new RedisException(t); + } + } + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScript.java index 3915f53bd..2b21174e3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScript.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScript.java @@ -25,7 +25,7 @@ import reactor.core.publisher.Mono; public class ClusterLuaScript { - private final FaultTolerantRedisCluster redisCluster; + private final FaultTolerantRedisClusterClient redisCluster; private final ScriptOutputType scriptOutputType; private final String script; private final String sha; @@ -35,7 +35,7 @@ public class ClusterLuaScript { private static final Logger log = LoggerFactory.getLogger(ClusterLuaScript.class); - public static ClusterLuaScript fromResource(final FaultTolerantRedisCluster redisCluster, + public static ClusterLuaScript fromResource(final FaultTolerantRedisClusterClient redisCluster, final String resource, final ScriptOutputType scriptOutputType) throws IOException { @@ -51,7 +51,7 @@ public class ClusterLuaScript { } @VisibleForTesting - ClusterLuaScript(final FaultTolerantRedisCluster redisCluster, + ClusterLuaScript(final FaultTolerantRedisClusterClient redisCluster, final String script, final ScriptOutputType scriptOutputType) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnection.java new file mode 100644 index 000000000..2d649f4f4 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnection.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.redis; + +import io.github.resilience4j.retry.Retry; +import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.scheduler.Scheduler; + +public class FaultTolerantPubSubClusterConnection extends AbstractFaultTolerantPubSubConnection> { + + private final Logger logger = LoggerFactory.getLogger(FaultTolerantPubSubClusterConnection.class); + + private final Retry resubscribeRetry; + private final Scheduler topologyChangedEventScheduler; + + protected FaultTolerantPubSubClusterConnection(final String name, + final StatefulRedisClusterPubSubConnection pubSubConnection, + final Retry retry, + final Retry resubscribeRetry, + final Scheduler topologyChangedEventScheduler) { + + super(name, pubSubConnection, retry); + + pubSubConnection.setNodeMessagePropagation(true); + + this.resubscribeRetry = resubscribeRetry; + this.topologyChangedEventScheduler = topologyChangedEventScheduler; + } + + public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) { + + usePubSubConnection(connection -> connection.getResources().eventBus().get() + .filter(event -> event instanceof ClusterTopologyChangedEvent) + .subscribeOn(topologyChangedEventScheduler) + .subscribe(event -> { + logger.info("Got topology change event for {}, resubscribing all keyspace notifications", getName()); + + resubscribeRetry.executeRunnable(() -> { + try { + eventHandler.run(); + } catch (final RuntimeException e) { + logger.warn("Resubscribe for {} failed", getName(), e); + throw e; + } + }); + })); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java index 8392ddcc5..cf5d11894 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java @@ -5,90 +5,15 @@ package org.whispersystems.textsecuregcm.redis; -import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; - import io.github.resilience4j.retry.Retry; -import io.lettuce.core.RedisException; -import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; -import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; -import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Timer; -import java.util.function.Consumer; -import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.scheduler.Scheduler; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; -public class FaultTolerantPubSubConnection { +public class FaultTolerantPubSubConnection extends AbstractFaultTolerantPubSubConnection> { - private static final Logger logger = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class); + protected FaultTolerantPubSubConnection(final String name, + final StatefulRedisPubSubConnection pubSubConnection, + final Retry retry) { - - private final String name; - private final StatefulRedisClusterPubSubConnection pubSubConnection; - - private final Retry retry; - private final Retry resubscribeRetry; - private final Scheduler topologyChangedEventScheduler; - - private final Timer executeTimer; - - public FaultTolerantPubSubConnection(final String name, - final StatefulRedisClusterPubSubConnection pubSubConnection, - final Retry retry, final Retry resubscribeRetry, final Scheduler topologyChangedEventScheduler) { - this.name = name; - this.pubSubConnection = pubSubConnection; - this.retry = retry; - this.resubscribeRetry = resubscribeRetry; - this.topologyChangedEventScheduler = topologyChangedEventScheduler; - - this.pubSubConnection.setNodeMessagePropagation(true); - - this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub"); + super(name, pubSubConnection, retry); } - - public void usePubSubConnection(final Consumer> consumer) { - try { - retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection))); - } catch (final Throwable t) { - if (t instanceof RedisException) { - throw (RedisException) t; - } else { - throw new RedisException(t); - } - } - } - - public T withPubSubConnection(final Function, T> function) { - try { - return retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection))); - } catch (final Throwable t) { - if (t instanceof RedisException) { - throw (RedisException) t; - } else { - throw new RedisException(t); - } - } - } - - - public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) { - - usePubSubConnection(connection -> connection.getResources().eventBus().get() - .filter(event -> event instanceof ClusterTopologyChangedEvent) - .subscribeOn(topologyChangedEventScheduler) - .subscribe(event -> { - logger.info("Got topology change event for {}, resubscribing all keyspace notifications", name); - - resubscribeRetry.executeRunnable(() -> { - try { - eventHandler.run(); - } catch (final RuntimeException e) { - logger.warn("Resubscribe for {} failed", name, e); - throw e; - } - }); - })); - } - } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java new file mode 100644 index 000000000..6d91e5f59 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java @@ -0,0 +1,159 @@ +package org.whispersystems.textsecuregcm.redis; + +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.retry.Retry; +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisCommandTimeoutException; +import io.lettuce.core.RedisException; +import io.lettuce.core.RedisURI; +import io.lettuce.core.TimeoutOptions; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.cluster.ClusterClientOptions; +import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; +import io.lettuce.core.codec.ByteArrayCodec; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.resource.ClientResources; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RedisConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; +import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; +import reactor.core.scheduler.Schedulers; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +public class FaultTolerantRedisClient { + + private final String name; + + private final RedisClient redisClient; + + private final StatefulRedisConnection stringConnection; + private final StatefulRedisConnection binaryConnection; + + private final List> pubSubConnections = new ArrayList<>(); + + private final CircuitBreaker circuitBreaker; + private final Retry retry; + + public FaultTolerantRedisClient(final String name, + final RedisConfiguration redisConfiguration, + final ClientResources.Builder clientResourcesBuilder) { + + this(name, clientResourcesBuilder, + RedisUriUtil.createRedisUriWithTimeout(redisConfiguration.getUri(), redisConfiguration.getTimeout()), + redisConfiguration.getTimeout(), + redisConfiguration.getCircuitBreakerConfiguration(), + redisConfiguration.getRetryConfiguration()); + } + + FaultTolerantRedisClient(String name, + final ClientResources.Builder clientResourcesBuilder, + final RedisURI redisUri, + final Duration commandTimeout, + final CircuitBreakerConfiguration circuitBreakerConfiguration, + final RetryConfiguration retryConfiguration) { + + this.name = name; + + final LettuceShardCircuitBreaker lettuceShardCircuitBreaker = new LettuceShardCircuitBreaker(name, + circuitBreakerConfiguration.toCircuitBreakerConfig(), Schedulers.newSingle("topology-changed-" + name, true)); + this.redisClient = RedisClient.create(clientResourcesBuilder.build(), redisUri); + this.redisClient.setOptions(ClusterClientOptions.builder() + .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) + .validateClusterNodeMembership(false) + .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() + .enableAllAdaptiveRefreshTriggers() + .build()) + // for asynchronous commands + .timeoutOptions(TimeoutOptions.builder() + .fixedTimeout(commandTimeout) + .build()) + .publishOnScheduler(true) + .build()); + + lettuceShardCircuitBreaker.setEventBus(redisClient.getResources().eventBus()); + + this.stringConnection = redisClient.connect(); + this.binaryConnection = redisClient.connect(ByteArrayCodec.INSTANCE); + + this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); + this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder() + .retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); + + CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisClusterClient.class); + } + + public void shutdown() { + stringConnection.close(); + + for (final StatefulRedisPubSubConnection pubSubConnection : pubSubConnections) { + pubSubConnection.close(); + } + + redisClient.shutdown(); + } + + public String getName() { + return name; + } + + public void useConnection(final Consumer> consumer) { + useConnection(stringConnection, consumer); + } + + public T withConnection(final Function, T> function) { + return withConnection(stringConnection, function); + } + + public void useBinaryConnection(final Consumer> consumer) { + useConnection(binaryConnection, consumer); + } + + public T withBinaryConnection(final Function, T> function) { + return withConnection(binaryConnection, function); + } + + public void useConnection(final StatefulRedisConnection connection, + final Consumer> consumer) { + try { + circuitBreaker.executeRunnable(() -> retry.executeRunnable(() -> consumer.accept(connection))); + } catch (final Throwable t) { + if (t instanceof RedisException) { + throw (RedisException) t; + } else { + throw new RedisException(t); + } + } + } + + public T withConnection(final StatefulRedisConnection connection, + final Function, T> function) { + try { + return circuitBreaker.executeCallable(() -> retry.executeCallable(() -> function.apply(connection))); + } catch (final Throwable t) { + if (t instanceof RedisException) { + throw (RedisException) t; + } else { + throw new RedisException(t); + } + } + } + + public FaultTolerantPubSubConnection createPubSubConnection() { + final StatefulRedisPubSubConnection pubSubConnection = redisClient.connectPubSub(); + pubSubConnections.add(pubSubConnection); + + return new FaultTolerantPubSubConnection<>(name, pubSubConnection, retry); + } + + public FaultTolerantPubSubConnection createBinaryPubSubConnection() { + final StatefulRedisPubSubConnection pubSubConnection = redisClient.connectPubSub(ByteArrayCodec.INSTANCE); + pubSubConnections.add(pubSubConnection); + + return new FaultTolerantPubSubConnection<>(name, pubSubConnection, retry); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java similarity index 90% rename from service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java rename to service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java index fe64dd2de..aad296652 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java @@ -41,7 +41,7 @@ import reactor.core.scheduler.Schedulers; * * @see LettuceShardCircuitBreaker */ -public class FaultTolerantRedisCluster { +public class FaultTolerantRedisClusterClient { private final String name; @@ -56,8 +56,8 @@ public class FaultTolerantRedisCluster { private final Retry topologyChangedEventRetry; - public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, - final ClientResources.Builder clientResourcesBuilder) { + public FaultTolerantRedisClusterClient(final String name, final RedisClusterConfiguration clusterConfiguration, + final ClientResources.Builder clientResourcesBuilder) { this(name, clientResourcesBuilder, Collections.singleton(RedisUriUtil.createRedisUriWithTimeout(clusterConfiguration.getConfigurationUri(), @@ -68,9 +68,9 @@ public class FaultTolerantRedisCluster { } - FaultTolerantRedisCluster(String name, final ClientResources.Builder clientResourcesBuilder, - Iterable redisUris, Duration commandTimeout, CircuitBreakerConfiguration circuitBreakerConfig, - RetryConfiguration retryConfiguration) { + FaultTolerantRedisClusterClient(String name, final ClientResources.Builder clientResourcesBuilder, + Iterable redisUris, Duration commandTimeout, CircuitBreakerConfiguration circuitBreakerConfig, + RetryConfiguration retryConfiguration) { this.name = name; @@ -112,7 +112,7 @@ public class FaultTolerantRedisCluster { this.topologyChangedEventRetry = Retry.of(name + "-topologyChangedRetry", topologyChangedEventRetryConfig); - CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisCluster.class); + CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisClusterClient.class); } public void shutdown() { @@ -184,11 +184,11 @@ public class FaultTolerantRedisCluster { .transformDeferred(RetryOperator.of(retry)); } - public FaultTolerantPubSubConnection createPubSubConnection() { + public FaultTolerantPubSubClusterConnection createPubSubConnection() { final StatefulRedisClusterPubSubConnection pubSubConnection = clusterClient.connectPubSub(); pubSubConnections.add(pubSubConnection); - return new FaultTolerantPubSubConnection<>(name, pubSubConnection, retry, topologyChangedEventRetry, + return new FaultTolerantPubSubClusterConnection<>(name, pubSubConnection, retry, topologyChangedEventRetry, Schedulers.newSingle(name + "-redisPubSubEvents", true)); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index d22057aaa..8f9610a65 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -67,7 +67,7 @@ import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; @@ -107,7 +107,7 @@ public class AccountsManager { private final Accounts accounts; private final PhoneNumberIdentifiers phoneNumberIdentifiers; - private final FaultTolerantRedisCluster cacheCluster; + private final FaultTolerantRedisClusterClient cacheCluster; private final AccountLockManager accountLockManager; private final KeysManager keysManager; private final MessagesManager messagesManager; @@ -157,7 +157,7 @@ public class AccountsManager { public AccountsManager(final Accounts accounts, final PhoneNumberIdentifiers phoneNumberIdentifiers, - final FaultTolerantRedisCluster cacheCluster, + final FaultTolerantRedisClusterClient cacheCluster, final AccountLockManager accountLockManager, final KeysManager keysManager, final MessagesManager messagesManager, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 6dc3cf903..e320cf31e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -50,8 +50,8 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.experiment.Experiment; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; -import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.RedisClusterUtil; import org.whispersystems.textsecuregcm.util.Util; @@ -119,8 +119,8 @@ import reactor.core.scheduler.Schedulers; */ public class MessagesCache extends RedisClusterPubSubAdapter implements Managed { - private final FaultTolerantRedisCluster redisCluster; - private final FaultTolerantPubSubConnection pubSubConnection; + private final FaultTolerantRedisClusterClient redisCluster; + private final FaultTolerantPubSubClusterConnection pubSubConnection; private final Clock clock; private final ExecutorService notificationExecutorService; @@ -183,9 +183,9 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class); - public MessagesCache(final FaultTolerantRedisCluster redisCluster, final ExecutorService notificationExecutorService, - final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock, - final DynamicConfigurationManager dynamicConfigurationManager) + public MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final ExecutorService notificationExecutorService, + final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock, + final DynamicConfigurationManager dynamicConfigurationManager) throws IOException { this( redisCluster, @@ -205,15 +205,15 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp } @VisibleForTesting - MessagesCache(final FaultTolerantRedisCluster redisCluster, final ExecutorService notificationExecutorService, - final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock, - final DynamicConfigurationManager dynamicConfigurationManager, - final MessagesCacheInsertScript insertScript, - final MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript insertMrmScript, - final MessagesCacheGetItemsScript getItemsScript, final MessagesCacheRemoveByGuidScript removeByGuidScript, - final MessagesCacheRemoveQueueScript removeQueueScript, - final MessagesCacheGetQueuesToPersistScript getQueuesToPersistScript, - final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript) + MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final ExecutorService notificationExecutorService, + final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock, + final DynamicConfigurationManager dynamicConfigurationManager, + final MessagesCacheInsertScript insertScript, + final MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript insertMrmScript, + final MessagesCacheGetItemsScript getItemsScript, final MessagesCacheRemoveByGuidScript removeByGuidScript, + final MessagesCacheRemoveQueueScript removeQueueScript, + final MessagesCacheGetQueuesToPersistScript getQueuesToPersistScript, + final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript) throws IOException { this.redisCluster = redisCluster; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheGetItemsScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheGetItemsScript.java index 58e5a824a..dfd137755 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheGetItemsScript.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheGetItemsScript.java @@ -11,7 +11,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.UUID; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import reactor.core.publisher.Mono; /** @@ -22,7 +22,7 @@ class MessagesCacheGetItemsScript { private final ClusterLuaScript getItemsScript; - MessagesCacheGetItemsScript(FaultTolerantRedisCluster redisCluster) throws IOException { + MessagesCacheGetItemsScript(FaultTolerantRedisClusterClient redisCluster) throws IOException { this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.OBJECT); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheGetQueuesToPersistScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheGetQueuesToPersistScript.java index 8f3703f3e..18c5c8f6e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheGetQueuesToPersistScript.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheGetQueuesToPersistScript.java @@ -11,7 +11,7 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.List; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; /** * Returns a list of queues that may be persisted. They will be sorted from oldest to more recent, limited by the @@ -23,7 +23,7 @@ class MessagesCacheGetQueuesToPersistScript { private final ClusterLuaScript getQueuesToPersistScript; - MessagesCacheGetQueuesToPersistScript(final FaultTolerantRedisCluster redisCluster) throws IOException { + MessagesCacheGetQueuesToPersistScript(final FaultTolerantRedisClusterClient redisCluster) throws IOException { this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScript.java index 257e90b86..87e3ba664 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScript.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScript.java @@ -14,7 +14,7 @@ import java.util.List; import java.util.UUID; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; /** * Inserts an envelope into the message queue for a destination device. @@ -23,7 +23,7 @@ class MessagesCacheInsertScript { private final ClusterLuaScript insertScript; - MessagesCacheInsertScript(FaultTolerantRedisCluster redisCluster) throws IOException { + MessagesCacheInsertScript(FaultTolerantRedisClusterClient redisCluster) throws IOException { this.insertScript = ClusterLuaScript.fromResource(redisCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript.java index 2d7274552..ce05af209 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript.java @@ -11,7 +11,7 @@ import java.util.ArrayList; import java.util.List; import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; /** * Inserts the shared multi-recipient message payload into the cache. The list of recipients and views will be set as @@ -25,7 +25,7 @@ class MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript { static final String ERROR_KEY_EXISTS = "ERR key exists"; - MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript(FaultTolerantRedisCluster redisCluster) + MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript(FaultTolerantRedisClusterClient redisCluster) throws IOException { this.script = ClusterLuaScript.fromResource(redisCluster, "lua/insert_shared_multirecipient_message_data.lua", ScriptOutputType.INTEGER); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheRemoveByGuidScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheRemoveByGuidScript.java index c8847a57e..a2afa82d2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheRemoveByGuidScript.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheRemoveByGuidScript.java @@ -12,7 +12,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; /** * Removes a list of message GUIDs from the queue of a destination device. @@ -21,7 +21,7 @@ class MessagesCacheRemoveByGuidScript { private final ClusterLuaScript removeByGuidScript; - MessagesCacheRemoveByGuidScript(final FaultTolerantRedisCluster redisCluster) throws IOException { + MessagesCacheRemoveByGuidScript(final FaultTolerantRedisClusterClient redisCluster) throws IOException { this.removeByGuidScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_guid.lua", ScriptOutputType.OBJECT); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheRemoveQueueScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheRemoveQueueScript.java index 2188f0933..36069c6b8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheRemoveQueueScript.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheRemoveQueueScript.java @@ -12,7 +12,7 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import reactor.core.publisher.Mono; /** @@ -29,7 +29,7 @@ class MessagesCacheRemoveQueueScript { private final ClusterLuaScript removeQueueScript; - MessagesCacheRemoveQueueScript(FaultTolerantRedisCluster redisCluster) throws IOException { + MessagesCacheRemoveQueueScript(FaultTolerantRedisClusterClient redisCluster) throws IOException { this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua", ScriptOutputType.MULTI); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheRemoveRecipientViewFromMrmDataScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheRemoveRecipientViewFromMrmDataScript.java index 7307916a9..e3d2ccc64 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheRemoveRecipientViewFromMrmDataScript.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheRemoveRecipientViewFromMrmDataScript.java @@ -12,7 +12,7 @@ import java.util.Collection; import java.util.List; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import reactor.core.publisher.Mono; /** @@ -25,7 +25,7 @@ class MessagesCacheRemoveRecipientViewFromMrmDataScript { private final ClusterLuaScript removeRecipientViewFromMrmDataScript; - MessagesCacheRemoveRecipientViewFromMrmDataScript(final FaultTolerantRedisCluster redisCluster) throws IOException { + MessagesCacheRemoveRecipientViewFromMrmDataScript(final FaultTolerantRedisClusterClient redisCluster) throws IOException { this.removeRecipientViewFromMrmDataScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_recipient_view_from_mrm_data.lua", ScriptOutputType.INTEGER); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java index f3464d237..a7b3b9c68 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java @@ -14,7 +14,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.Util; import javax.annotation.Nullable; @@ -26,12 +26,12 @@ public class ProfilesManager { private static final String CACHE_PREFIX = "profiles::"; private final Profiles profiles; - private final FaultTolerantRedisCluster cacheCluster; + private final FaultTolerantRedisClusterClient cacheCluster; private final ObjectMapper mapper; public ProfilesManager(final Profiles profiles, - final FaultTolerantRedisCluster cacheCluster) { + final FaultTolerantRedisClusterClient cacheCluster) { this.profiles = profiles; this.cacheCluster = cacheCluster; this.mapper = SystemMapper.jsonMapper(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ReportMessageManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ReportMessageManager.java index 26ebe939b..0123a2617 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ReportMessageManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ReportMessageManager.java @@ -21,13 +21,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.util.UUIDUtil; public class ReportMessageManager { private final ReportMessageDynamoDb reportMessageDynamoDb; - private final FaultTolerantRedisCluster rateLimitCluster; + private final FaultTolerantRedisClusterClient rateLimitCluster; private final Duration counterTtl; @@ -40,7 +40,7 @@ public class ReportMessageManager { private static final Logger logger = LoggerFactory.getLogger(ReportMessageManager.class); public ReportMessageManager(final ReportMessageDynamoDb reportMessageDynamoDb, - final FaultTolerantRedisCluster rateLimitCluster, + final FaultTolerantRedisClusterClient rateLimitCluster, final Duration counterTtl) { this.reportMessageDynamoDb = reportMessageDynamoDb; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index e639771a3..72b292e4f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -34,13 +34,12 @@ import org.whispersystems.textsecuregcm.controllers.SecureValueRecovery2Controll import org.whispersystems.textsecuregcm.experiment.PushNotificationExperimentSamples; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.metrics.MicrometerAwsSdkMetricPublisher; -import org.whispersystems.textsecuregcm.metrics.NoopAwsSdkMetricPublisher; import org.whispersystems.textsecuregcm.push.APNSender; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.FcmSender; import org.whispersystems.textsecuregcm.push.PushNotificationManager; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; import org.whispersystems.textsecuregcm.storage.AccountLockManager; @@ -82,8 +81,8 @@ record CommandDependencies( FcmSender fcmSender, PushNotificationManager pushNotificationManager, PushNotificationExperimentSamples pushNotificationExperimentSamples, - FaultTolerantRedisCluster cacheCluster, - FaultTolerantRedisCluster pushSchedulerCluster, + FaultTolerantRedisClusterClient cacheCluster, + FaultTolerantRedisClusterClient pushSchedulerCluster, ClientResources.Builder redisClusterClientResourcesBuilder, BackupManager backupManager, DynamicConfigurationManager dynamicConfigurationManager, @@ -109,9 +108,9 @@ record CommandDependencies( final ClientResources.Builder redisClientResourcesBuilder = ClientResources.builder(); - FaultTolerantRedisCluster cacheCluster = configuration.getCacheClusterConfiguration() + FaultTolerantRedisClusterClient cacheCluster = configuration.getCacheClusterConfiguration() .build("main_cache", redisClientResourcesBuilder); - FaultTolerantRedisCluster pushSchedulerCluster = configuration.getPushSchedulerCluster() + FaultTolerantRedisClusterClient pushSchedulerCluster = configuration.getPushSchedulerCluster() .build("push_scheduler", redisClientResourcesBuilder); ScheduledExecutorService recurringJobExecutor = environment.lifecycle() @@ -197,11 +196,11 @@ record CommandDependencies( configuration.getDynamoDbTables().getMessages().getTableName(), configuration.getDynamoDbTables().getMessages().getExpiration(), messageDeletionExecutor); - FaultTolerantRedisCluster messagesCluster = configuration.getMessageCacheConfiguration() + FaultTolerantRedisClusterClient messagesCluster = configuration.getMessageCacheConfiguration() .getRedisClusterConfiguration().build("messages", redisClientResourcesBuilder); - FaultTolerantRedisCluster clientPresenceCluster = configuration.getClientPresenceClusterConfiguration() + FaultTolerantRedisClusterClient clientPresenceCluster = configuration.getClientPresenceClusterConfiguration() .build("client_presence", redisClientResourcesBuilder); - FaultTolerantRedisCluster rateLimitersCluster = configuration.getRateLimitersCluster().build("rate_limiters", + FaultTolerantRedisClusterClient rateLimitersCluster = configuration.getRateLimitersCluster().build("rate_limiters", redisClientResourcesBuilder); SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client( secureValueRecoveryCredentialsGenerator, secureValueRecoveryServiceExecutor, diff --git a/service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable b/service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable index bc79249e9..83a0af492 100644 --- a/service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable +++ b/service/src/main/resources/META-INF/services/io.dropwizard.jackson.Discoverable @@ -4,8 +4,8 @@ org.whispersystems.textsecuregcm.configuration.DynamicConfigurationManagerFactor org.whispersystems.textsecuregcm.configuration.DynamoDbClientFactory org.whispersystems.textsecuregcm.configuration.HCaptchaClientFactory org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClusterFactory +org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClientFactory org.whispersystems.textsecuregcm.configuration.PaymentsServiceClientsFactory org.whispersystems.textsecuregcm.configuration.PubSubPublisherFactory org.whispersystems.textsecuregcm.configuration.RegistrationServiceClientFactory org.whispersystems.textsecuregcm.configuration.S3ObjectMonitorFactory -org.whispersystems.textsecuregcm.configuration.SingletonRedisClientFactory diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFaultTolerantRedisClientFactory.java b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFaultTolerantRedisClientFactory.java new file mode 100644 index 000000000..eff6297f9 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFaultTolerantRedisClientFactory.java @@ -0,0 +1,49 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.lettuce.core.resource.ClientResources; +import java.util.concurrent.atomic.AtomicBoolean; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient; +import org.whispersystems.textsecuregcm.redis.RedisServerExtension; + +@JsonTypeName("local") +public class LocalFaultTolerantRedisClientFactory implements FaultTolerantRedisClientFactory { + + private static final RedisServerExtension REDIS_SERVER_EXTENSION = RedisServerExtension.builder().build(); + + private final AtomicBoolean shutdownHookConfigured = new AtomicBoolean(); + + private LocalFaultTolerantRedisClientFactory() { + try { + REDIS_SERVER_EXTENSION.beforeAll(null); + REDIS_SERVER_EXTENSION.beforeEach(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public FaultTolerantRedisClient build(final String name, final ClientResources clientResources) { + + if (shutdownHookConfigured.compareAndSet(false, true)) { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + REDIS_SERVER_EXTENSION.afterEach(null); + REDIS_SERVER_EXTENSION.afterAll(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + } + + final RedisConfiguration config = new RedisConfiguration(); + config.setUri(RedisServerExtension.getRedisURI().toString()); + + return new FaultTolerantRedisClient(name, config, clientResources.mutate()); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFaultTolerantRedisClusterFactory.java b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFaultTolerantRedisClusterFactory.java index 7c78314d1..a49935fd1 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFaultTolerantRedisClusterFactory.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFaultTolerantRedisClusterFactory.java @@ -8,7 +8,7 @@ package org.whispersystems.textsecuregcm.configuration; import com.fasterxml.jackson.annotation.JsonTypeName; import io.lettuce.core.resource.ClientResources; import java.util.concurrent.atomic.AtomicBoolean; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; @JsonTypeName("local") @@ -31,7 +31,7 @@ public class LocalFaultTolerantRedisClusterFactory implements FaultTolerantRedis } @Override - public FaultTolerantRedisCluster build(final String name, final ClientResources.Builder clientResourcesBuilder) { + public FaultTolerantRedisClusterClient build(final String name, final ClientResources.Builder clientResourcesBuilder) { if (shutdownHookConfigured.compareAndSet(false, true)) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -47,7 +47,7 @@ public class LocalFaultTolerantRedisClusterFactory implements FaultTolerantRedis final RedisClusterConfiguration config = new RedisClusterConfiguration(); config.setConfigurationUri(RedisClusterExtension.getRedisURIs().getFirst().toString()); - return new FaultTolerantRedisCluster(name, config, clientResourcesBuilder); + return new FaultTolerantRedisClusterClient(name, config, clientResourcesBuilder); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalSingletonRedisClientFactory.java b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalSingletonRedisClientFactory.java deleted file mode 100644 index c17e2706a..000000000 --- a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalSingletonRedisClientFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2024 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.configuration; - -import com.fasterxml.jackson.annotation.JsonTypeName; -import io.dropwizard.lifecycle.Managed; -import io.lettuce.core.RedisClient; -import io.lettuce.core.resource.ClientResources; -import java.util.concurrent.atomic.AtomicBoolean; -import org.whispersystems.textsecuregcm.redis.RedisSingletonExtension; - -@JsonTypeName("local") -public class LocalSingletonRedisClientFactory implements SingletonRedisClientFactory, Managed { - - private static final RedisSingletonExtension redisSingletonExtension = RedisSingletonExtension.builder().build(); - - private final AtomicBoolean shutdownHookConfigured = new AtomicBoolean(); - - private LocalSingletonRedisClientFactory() { - try { - redisSingletonExtension.beforeAll(null); - redisSingletonExtension.beforeEach(null); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public RedisClient build(final ClientResources clientResources) { - - if (shutdownHookConfigured.compareAndSet(false, true)) { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - this.stop(); - } catch (Exception e) { - throw new RuntimeException(e); - } - })); - } - - return RedisClient.create(clientResources, redisSingletonExtension.getRedisUri()); - } - - @Override - public void stop() throws Exception { - redisSingletonExtension.afterEach(null); - redisSingletonExtension.afterAll(null); - } -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManagerTest.java index c394c4557..220bc8775 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManagerTest.java @@ -25,7 +25,7 @@ import java.util.concurrent.ScheduledExecutorService; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; class CurrencyConversionManagerTest { @@ -240,7 +240,7 @@ class CurrencyConversionManagerTest { void convertToUsd() { final CurrencyConversionManager currencyConversionManager = new CurrencyConversionManager(mock(FixerClient.class), mock(CoinMarketCapClient.class), - mock(FaultTolerantRedisCluster.class), + mock(FaultTolerantRedisClusterClient.class), Collections.emptyList(), EXECUTOR, Clock.systemUTC()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/limits/CardinalityEstimatorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/limits/CardinalityEstimatorTest.java index 643d2a9fa..795593594 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/limits/CardinalityEstimatorTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/limits/CardinalityEstimatorTest.java @@ -9,7 +9,7 @@ import static org.assertj.core.api.Assertions.assertThat; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import java.time.Duration; @@ -20,7 +20,7 @@ public class CardinalityEstimatorTest { @Test public void testAdd() throws Exception { - final FaultTolerantRedisCluster redisCluster = REDIS_CLUSTER_EXTENSION.getRedisCluster(); + final FaultTolerantRedisClusterClient redisCluster = REDIS_CLUSTER_EXTENSION.getRedisCluster(); final CardinalityEstimator estimator = new CardinalityEstimator(redisCluster, "test", Duration.ofSeconds(1)); estimator.add("1"); @@ -40,7 +40,7 @@ public class CardinalityEstimatorTest { @Test @Timeout(5) public void testEventuallyExpires() throws InterruptedException { - final FaultTolerantRedisCluster redisCluster = REDIS_CLUSTER_EXTENSION.getRedisCluster(); + final FaultTolerantRedisClusterClient redisCluster = REDIS_CLUSTER_EXTENSION.getRedisCluster(); final CardinalityEstimator estimator = new CardinalityEstimator(redisCluster, "test", Duration.ofMillis(100)); estimator.add("1"); long count; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/limits/RateLimitersLuaScriptTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/limits/RateLimitersLuaScriptTest.java index 6d3908470..c91a04c72 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/limits/RateLimitersLuaScriptTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/limits/RateLimitersLuaScriptTest.java @@ -25,7 +25,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicRateLimitPolicy; import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.util.MockUtils; @@ -55,7 +55,7 @@ public class RateLimitersLuaScriptTest { @Test public void testWithEmbeddedRedis() throws Exception { final RateLimiters.For descriptor = RateLimiters.For.REGISTRATION; - final FaultTolerantRedisCluster redisCluster = REDIS_CLUSTER_EXTENSION.getRedisCluster(); + final FaultTolerantRedisClusterClient redisCluster = REDIS_CLUSTER_EXTENSION.getRedisCluster(); final RateLimiters limiters = new RateLimiters( Map.of(descriptor.id(), new RateLimiterConfig(60, Duration.ofSeconds(1))), dynamicConfig, @@ -72,7 +72,7 @@ public class RateLimitersLuaScriptTest { @Test public void testTtl() throws Exception { final RateLimiters.For descriptor = RateLimiters.For.REGISTRATION; - final FaultTolerantRedisCluster redisCluster = REDIS_CLUSTER_EXTENSION.getRedisCluster(); + final FaultTolerantRedisClusterClient redisCluster = REDIS_CLUSTER_EXTENSION.getRedisCluster(); final RateLimiters limiters = new RateLimiters( Map.of(descriptor.id(), new RateLimiterConfig(1000, Duration.ofSeconds(1))), dynamicConfig, @@ -123,7 +123,7 @@ public class RateLimitersLuaScriptTest { public void testFailOpen() throws Exception { when(configuration.getRateLimitPolicy()).thenReturn(new DynamicRateLimitPolicy(true)); final RateLimiters.For descriptor = RateLimiters.For.REGISTRATION; - final FaultTolerantRedisCluster redisCluster = mock(FaultTolerantRedisCluster.class); + final FaultTolerantRedisClusterClient redisCluster = mock(FaultTolerantRedisClusterClient.class); final RateLimiters limiters = new RateLimiters( Map.of(descriptor.id(), new RateLimiterConfig(1000, Duration.ofSeconds(1))), dynamicConfig, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/limits/RateLimitersTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/limits/RateLimitersTest.java index 52b740ff1..dd819d8c1 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/limits/RateLimitersTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/limits/RateLimitersTest.java @@ -22,7 +22,7 @@ import org.junit.jupiter.api.Test; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicRateLimitPolicy; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.util.MockUtils; import org.whispersystems.textsecuregcm.util.MutableClock; @@ -37,7 +37,7 @@ public class RateLimitersTest { private final ClusterLuaScript validateScript = mock(ClusterLuaScript.class); - private final FaultTolerantRedisCluster redisCluster = mock(FaultTolerantRedisCluster.class); + private final FaultTolerantRedisClusterClient redisCluster = mock(FaultTolerantRedisClusterClient.class); private final MutableClock clock = MockUtils.mutableClock(0); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ProvisioningManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ProvisioningManagerTest.java index 2633b8d8c..53faa33f7 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ProvisioningManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ProvisioningManagerTest.java @@ -15,8 +15,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.ArgumentCaptor; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.redis.RedisSingletonExtension; +import org.whispersystems.textsecuregcm.redis.RedisServerExtension; import org.whispersystems.textsecuregcm.storage.PubSubProtos; import org.whispersystems.textsecuregcm.util.TestRandomUtil; @@ -25,13 +24,13 @@ class ProvisioningManagerTest { private ProvisioningManager provisioningManager; @RegisterExtension - static final RedisSingletonExtension REDIS_EXTENSION = RedisSingletonExtension.builder().build(); + static final RedisServerExtension REDIS_EXTENSION = RedisServerExtension.builder().build(); private static final long PUBSUB_TIMEOUT_MILLIS = 1_000; @BeforeEach void setUp() throws Exception { - provisioningManager = new ProvisioningManager(REDIS_EXTENSION.getRedisClient(), new CircuitBreakerConfiguration()); + provisioningManager = new ProvisioningManager(REDIS_EXTENSION.getRedisClient()); provisioningManager.start(); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationSchedulerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationSchedulerTest.java index b4e1e3b76..15245f9e3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationSchedulerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationSchedulerTest.java @@ -31,7 +31,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; import org.whispersystems.textsecuregcm.identity.IdentityType; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; @@ -247,7 +247,7 @@ class PushNotificationSchedulerTest { void testDedicatedProcessDynamicConfiguration(final int dedicatedThreadCount, final boolean expectActivity) throws Exception { - final FaultTolerantRedisCluster redisCluster = mock(FaultTolerantRedisCluster.class); + final FaultTolerantRedisClusterClient redisCluster = mock(FaultTolerantRedisClusterClient.class); when(redisCluster.withCluster(any())).thenReturn(0L); final AccountsManager accountsManager = mock(AccountsManager.class); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScriptTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScriptTest.java index 992df8921..764286277 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScriptTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScriptTest.java @@ -43,7 +43,7 @@ class ClusterLuaScriptTest { @Test void testExecute() { final RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); - final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.builder().stringCommands(commands).build(); + final FaultTolerantRedisClusterClient mockCluster = RedisClusterHelper.builder().stringCommands(commands).build(); final String script = "return redis.call(\"SET\", KEYS[1], ARGV[1])"; final ScriptOutputType scriptOutputType = ScriptOutputType.VALUE; @@ -62,7 +62,7 @@ class ClusterLuaScriptTest { @Test void testExecuteScriptNotLoaded() { final RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); - final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.builder().stringCommands(commands).build(); + final FaultTolerantRedisClusterClient mockCluster = RedisClusterHelper.builder().stringCommands(commands).build(); final String script = "return redis.call(\"SET\", KEYS[1], ARGV[1])"; final ScriptOutputType scriptOutputType = ScriptOutputType.VALUE; @@ -82,7 +82,7 @@ class ClusterLuaScriptTest { void testExecuteBinaryScriptNotLoaded() { final RedisAdvancedClusterCommands stringCommands = mock(RedisAdvancedClusterCommands.class); final RedisAdvancedClusterCommands binaryCommands = mock(RedisAdvancedClusterCommands.class); - final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.builder() + final FaultTolerantRedisClusterClient mockCluster = RedisClusterHelper.builder() .stringCommands(stringCommands) .binaryCommands(binaryCommands) .build(); @@ -106,7 +106,7 @@ class ClusterLuaScriptTest { void testExecuteBinaryAsyncScriptNotLoaded() throws Exception { final RedisAdvancedClusterAsyncCommands binaryAsyncCommands = mock(RedisAdvancedClusterAsyncCommands.class); - final FaultTolerantRedisCluster mockCluster = + final FaultTolerantRedisClusterClient mockCluster = RedisClusterHelper.builder().binaryAsyncCommands(binaryAsyncCommands).build(); final String script = "return redis.call(\"SET\", KEYS[1], ARGV[1])"; @@ -136,7 +136,7 @@ class ClusterLuaScriptTest { void testExecuteBinaryReactiveScriptNotLoaded() { final RedisAdvancedClusterReactiveCommands binaryReactiveCommands = mock(RedisAdvancedClusterReactiveCommands.class); - final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.builder() + final FaultTolerantRedisClusterClient mockCluster = RedisClusterHelper.builder() .binaryReactiveCommands(binaryReactiveCommands).build(); final String script = "return redis.call(\"SET\", KEYS[1], ARGV[1])"; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnectionTest.java similarity index 96% rename from service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java rename to service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnectionTest.java index 497ff1cbf..49dfbf5fc 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnectionTest.java @@ -40,11 +40,11 @@ import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; import reactor.test.publisher.TestPublisher; -class FaultTolerantPubSubConnectionTest { +class FaultTolerantPubSubClusterConnectionTest { private StatefulRedisClusterPubSubConnection pubSubConnection; private RedisClusterPubSubCommands pubSubCommands; - private FaultTolerantPubSubConnection faultTolerantPubSubConnection; + private FaultTolerantPubSubClusterConnection faultTolerantPubSubConnection; @SuppressWarnings("unchecked") @@ -68,7 +68,7 @@ class FaultTolerantPubSubConnectionTest { .build(); final Retry resubscribeRetry = Retry.of("test-resubscribe", resubscribeRetryConfiguration); - faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, + faultTolerantPubSubConnection = new FaultTolerantPubSubClusterConnection<>("test", pubSubConnection, retry, resubscribeRetry, Schedulers.newSingle("test")); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClientTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClientTest.java new file mode 100644 index 000000000..e70aa18d8 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClientTest.java @@ -0,0 +1,102 @@ +package org.whispersystems.textsecuregcm.redis; + +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; +import io.lettuce.core.RedisCommandTimeoutException; +import io.lettuce.core.RedisException; +import io.lettuce.core.resource.ClientResources; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; + +import javax.annotation.Nullable; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +// ThreadMode.SEPARATE_THREAD protects against hangs in the remote Redis calls, as this mode allows the test code to be +// preempted by the timeout check +@Timeout(value = 5, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) +class FaultTolerantRedisClientTest { + + private static final Duration TIMEOUT = Duration.ofMillis(50); + + private static final RetryConfiguration RETRY_CONFIGURATION = new RetryConfiguration(); + + static { + RETRY_CONFIGURATION.setMaxAttempts(1); + RETRY_CONFIGURATION.setWaitDuration(50); + } + + @RegisterExtension + static final RedisServerExtension REDIS_SERVER_EXTENSION = RedisServerExtension.builder().build(); + + private FaultTolerantRedisClient faultTolerantRedisClient; + + private static FaultTolerantRedisClient buildRedisClient( + @Nullable final CircuitBreakerConfiguration circuitBreakerConfiguration, + final ClientResources.Builder clientResourcesBuilder) { + + return new FaultTolerantRedisClient("test", clientResourcesBuilder, + RedisServerExtension.getRedisURI(), TIMEOUT, + Optional.ofNullable(circuitBreakerConfiguration).orElseGet(CircuitBreakerConfiguration::new), + RETRY_CONFIGURATION); + } + + @AfterEach + void tearDown() { + faultTolerantRedisClient.shutdown(); + } + + @Test + void testTimeout() { + faultTolerantRedisClient = buildRedisClient(null, ClientResources.builder()); + + final ExecutionException asyncException = assertThrows(ExecutionException.class, + () -> faultTolerantRedisClient.withConnection(connection -> connection.async().blpop(2 * TIMEOUT.toMillis() / 1000d, "key")) + .get()); + + assertInstanceOf(RedisCommandTimeoutException.class, asyncException.getCause()); + + assertThrows(RedisCommandTimeoutException.class, + () -> faultTolerantRedisClient.withConnection(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, "key"))); + } + + @Test + void testTimeoutCircuitBreaker() throws Exception { + // because we’re using a single key, and blpop involves *Redis* also blocking, the breaker wait duration must be + // longer than the sum of the remote timeouts + final Duration breakerWaitDuration = TIMEOUT.multipliedBy(5); + + final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); + circuitBreakerConfig.setFailureRateThreshold(1); + circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(1); + circuitBreakerConfig.setSlidingWindowSize(1); + circuitBreakerConfig.setWaitDurationInOpenState(breakerWaitDuration); + + faultTolerantRedisClient = buildRedisClient(circuitBreakerConfig, ClientResources.builder()); + + final String key = "key"; + + // the first call should time out and open the breaker + assertThrows(RedisCommandTimeoutException.class, + () -> faultTolerantRedisClient.withConnection(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, key))); + + // the second call gets blocked by the breaker + final RedisException e = assertThrows(RedisException.class, + () -> faultTolerantRedisClient.withConnection(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, key))); + assertInstanceOf(CallNotPermittedException.class, e.getCause()); + + // wait for breaker to be half-open + Thread.sleep(breakerWaitDuration.toMillis() * 2); + + assertEquals(0, (Long) faultTolerantRedisClient.withConnection(connection -> connection.sync().llen(key))); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java similarity index 98% rename from service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java rename to service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java index 27b2764a0..381bcf22e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java @@ -68,7 +68,7 @@ import org.whispersystems.textsecuregcm.util.RedisClusterUtil; // ThreadMode.SEPARATE_THREAD protects against hangs in the remote Redis calls, as this mode allows the test code to be // preempted by the timeout check @Timeout(value = 5, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) -class FaultTolerantRedisClusterTest { +class FaultTolerantRedisClusterClientTest { private static final Duration TIMEOUT = Duration.ofMillis(50); @@ -85,13 +85,13 @@ class FaultTolerantRedisClusterTest { .timeout(TIMEOUT) .build(); - private FaultTolerantRedisCluster cluster; + private FaultTolerantRedisClusterClient cluster; - private static FaultTolerantRedisCluster buildCluster( + private static FaultTolerantRedisClusterClient buildCluster( @Nullable final CircuitBreakerConfiguration circuitBreakerConfiguration, final ClientResources.Builder clientResourcesBuilder) { - return new FaultTolerantRedisCluster("test", clientResourcesBuilder, + return new FaultTolerantRedisClusterClient("test", clientResourcesBuilder, RedisClusterExtension.getRedisURIs(), TIMEOUT, Optional.ofNullable(circuitBreakerConfiguration).orElseGet(CircuitBreakerConfiguration::new), RETRY_CONFIGURATION); @@ -235,7 +235,7 @@ class FaultTolerantRedisClusterTest { final int availableSlot = availableNode.getSlots().getFirst(); final String availableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(availableSlot)); - final FaultTolerantPubSubConnection pubSubConnection = cluster.createPubSubConnection(); + final FaultTolerantPubSubClusterConnection pubSubConnection = cluster.createPubSubConnection(); // Keyspace notifications are delivered on a different thread, so we use a CountDownLatch to wait for the // expected number of notifications to arrive diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java index 905751a6e..ba4a49cb8 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java @@ -42,7 +42,7 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb private final Duration timeout; private final RetryConfiguration retryConfiguration; - private FaultTolerantRedisCluster redisCluster; + private FaultTolerantRedisClusterClient redisCluster; private ClientResources redisClientResources; public RedisClusterExtension(final Duration timeout, final RetryConfiguration retryConfiguration) { @@ -87,7 +87,7 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb redisClientResources = ClientResources.builder().build(); final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); circuitBreakerConfig.setWaitDurationInOpenState(Duration.ofMillis(500)); - redisCluster = new FaultTolerantRedisCluster("test-cluster", + redisCluster = new FaultTolerantRedisClusterClient("test-cluster", redisClientResources.mutate(), getRedisURIs(), timeout, @@ -130,7 +130,7 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb .toList(); } - public FaultTolerantRedisCluster getRedisCluster() { + public FaultTolerantRedisClusterClient getRedisCluster() { return redisCluster; } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisServerExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisServerExtension.java new file mode 100644 index 000000000..a4c84a00f --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisServerExtension.java @@ -0,0 +1,99 @@ +/* + * Copyright 2022 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.redis; + +import static org.junit.jupiter.api.Assumptions.assumeFalse; + +import io.lettuce.core.RedisURI; +import java.io.IOException; +import java.net.ServerSocket; +import java.time.Duration; +import io.lettuce.core.resource.ClientResources; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; +import redis.embedded.RedisServer; + +public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback { + + private static RedisServer redisServer; + private FaultTolerantRedisClient faultTolerantRedisClient; + private ClientResources redisClientResources; + + public static class RedisServerExtensionBuilder { + + private RedisServerExtensionBuilder() { + } + + public RedisServerExtension build() { + return new RedisServerExtension(); + } + } + + public static RedisServerExtensionBuilder builder() { + return new RedisServerExtensionBuilder(); + } + + @Override + public void beforeAll(final ExtensionContext context) throws Exception { + assumeFalse(System.getProperty("os.name").equalsIgnoreCase("windows")); + + redisServer = RedisServer.builder() + .setting("appendonly no") + .setting("save \"\"") + .setting("dir " + System.getProperty("java.io.tmpdir")) + .port(getAvailablePort()) + .build(); + + redisServer.start(); + } + + public static RedisURI getRedisURI() { + return RedisURI.create("redis://127.0.0.1:%d".formatted(redisServer.ports().getFirst())); + } + + @Override + public void beforeEach(final ExtensionContext context) { + redisClientResources = ClientResources.builder().build(); + final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); + circuitBreakerConfig.setWaitDurationInOpenState(Duration.ofMillis(500)); + faultTolerantRedisClient = new FaultTolerantRedisClient("test-redis-client", + redisClientResources.mutate(), + getRedisURI(), + Duration.ofSeconds(2), + circuitBreakerConfig, + new RetryConfiguration()); + + faultTolerantRedisClient.useConnection(connection -> connection.sync().flushall()); + } + + @Override + public void afterEach(final ExtensionContext context) throws InterruptedException { + redisClientResources.shutdown().await(); + } + + @Override + public void afterAll(final ExtensionContext context) { + if (redisServer != null) { + redisServer.stop(); + } + } + + public FaultTolerantRedisClient getRedisClient() { + return faultTolerantRedisClient; + } + + private static int getAvailablePort() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + socket.setReuseAddress(false); + return socket.getLocalPort(); + } + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisSingletonExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisSingletonExtension.java deleted file mode 100644 index eb8d4ed3d..000000000 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisSingletonExtension.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2013-2022 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.redis; - -import static org.junit.jupiter.api.Assumptions.assumeFalse; - -import io.lettuce.core.RedisClient; -import io.lettuce.core.RedisURI; -import io.lettuce.core.api.StatefulRedisConnection; -import java.io.IOException; -import java.net.ServerSocket; -import org.junit.jupiter.api.extension.AfterAllCallback; -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeAllCallback; -import org.junit.jupiter.api.extension.BeforeEachCallback; -import org.junit.jupiter.api.extension.ExtensionContext; -import redis.embedded.RedisServer; - -public class RedisSingletonExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback { - - private static RedisServer redisServer; - private RedisClient redisClient; - private RedisURI redisUri; - - public static class RedisSingletonExtensionBuilder { - - private RedisSingletonExtensionBuilder() { - } - - public RedisSingletonExtension build() { - return new RedisSingletonExtension(); - } - } - - public static RedisSingletonExtensionBuilder builder() { - return new RedisSingletonExtensionBuilder(); - } - - @Override - public void beforeAll(final ExtensionContext context) throws Exception { - assumeFalse(System.getProperty("os.name").equalsIgnoreCase("windows")); - - redisServer = RedisServer.builder() - .setting("appendonly no") - .setting("save \"\"") - .setting("dir " + System.getProperty("java.io.tmpdir")) - .port(getAvailablePort()) - .build(); - - redisServer.start(); - } - - @Override - public void beforeEach(final ExtensionContext context) { - redisUri = RedisURI.create("redis://127.0.0.1:%d".formatted(redisServer.ports().get(0))); - redisClient = RedisClient.create(redisUri); - - try (final StatefulRedisConnection connection = redisClient.connect()) { - connection.sync().flushall(); - } - } - - @Override - public void afterEach(final ExtensionContext context) { - redisClient.shutdown(); - } - - @Override - public void afterAll(final ExtensionContext context) { - if (redisServer != null) { - redisServer.stop(); - } - } - - public RedisClient getRedisClient() { - return redisClient; - } - - public RedisURI getRedisUri() { - return redisUri; - } - - private static int getAvailablePort() throws IOException { - try (ServerSocket socket = new ServerSocket(0)) { - socket.setReuseAddress(false); - return socket.getLocalPort(); - } - } -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java index 5d58df844..33d69464c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java @@ -77,7 +77,7 @@ import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.identity.PniServiceIdentifier; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecoveryException; @@ -230,7 +230,7 @@ class AccountsManagerTest { CLOCK = TestClock.now(); - final FaultTolerantRedisCluster redisCluster = RedisClusterHelper.builder() + final FaultTolerantRedisClusterClient redisCluster = RedisClusterHelper.builder() .stringCommands(commands) .stringAsyncCommands(asyncCommands) .build(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 1a8098626..3eb5beab4 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -72,7 +72,7 @@ import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagesCon import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; import reactor.core.publisher.Flux; @@ -690,7 +690,7 @@ class MessagesCacheTest { void setup() throws Exception { reactiveCommands = mock(RedisAdvancedClusterReactiveCommands.class); asyncCommands = mock(RedisAdvancedClusterAsyncCommands.class); - final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.builder() + final FaultTolerantRedisClusterClient mockCluster = RedisClusterHelper.builder() .binaryReactiveCommands(reactiveCommands) .binaryAsyncCommands(asyncCommands) .build(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesManagerTest.java index e2a6d977d..e3aa860a2 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesManagerTest.java @@ -30,7 +30,7 @@ import org.junit.jupiter.api.Timeout; import org.signal.libsignal.protocol.ServiceId; import org.signal.libsignal.zkgroup.InvalidInputException; import org.signal.libsignal.zkgroup.profiles.ProfileKey; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.tests.util.MockRedisFuture; import org.whispersystems.textsecuregcm.tests.util.ProfileTestHelper; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; @@ -50,7 +50,7 @@ public class ProfilesManagerTest { //noinspection unchecked commands = mock(RedisAdvancedClusterCommands.class); asyncCommands = mock(RedisAdvancedClusterAsyncCommands.class); - final FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.builder() + final FaultTolerantRedisClusterClient cacheCluster = RedisClusterHelper.builder() .stringCommands(commands) .stringAsyncCommands(asyncCommands) .build(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/RedisClusterHelper.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/RedisClusterHelper.java index c11dfbb7e..b6b7ac306 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/RedisClusterHelper.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/RedisClusterHelper.java @@ -16,7 +16,7 @@ import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import java.util.function.Consumer; import java.util.function.Function; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; public class RedisClusterHelper { @@ -25,13 +25,13 @@ public class RedisClusterHelper { } @SuppressWarnings("unchecked") - private static FaultTolerantRedisCluster buildMockRedisCluster( + private static FaultTolerantRedisClusterClient buildMockRedisCluster( final RedisAdvancedClusterCommands stringCommands, final RedisAdvancedClusterAsyncCommands stringAsyncCommands, final RedisAdvancedClusterCommands binaryCommands, final RedisAdvancedClusterAsyncCommands binaryAsyncCommands, final RedisAdvancedClusterReactiveCommands binaryReactiveCommands) { - final FaultTolerantRedisCluster cluster = mock(FaultTolerantRedisCluster.class); + final FaultTolerantRedisClusterClient cluster = mock(FaultTolerantRedisClusterClient.class); final StatefulRedisClusterConnection stringConnection = mock(StatefulRedisClusterConnection.class); final StatefulRedisClusterConnection binaryConnection = mock(StatefulRedisClusterConnection.class); @@ -107,7 +107,7 @@ public class RedisClusterHelper { return this; } - public FaultTolerantRedisCluster build() { + public FaultTolerantRedisClusterClient build() { return RedisClusterHelper.buildMockRedisCluster(stringCommands, stringAsyncCommands, binaryCommands, binaryAsyncCommands, binaryReactiveCommands); } diff --git a/service/src/test/resources/META-INF/services/org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClientFactory b/service/src/test/resources/META-INF/services/org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClientFactory new file mode 100644 index 000000000..e2c8792e8 --- /dev/null +++ b/service/src/test/resources/META-INF/services/org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClientFactory @@ -0,0 +1 @@ +org.whispersystems.textsecuregcm.configuration.LocalFaultTolerantRedisClientFactory diff --git a/service/src/test/resources/META-INF/services/org.whispersystems.textsecuregcm.configuration.SingletonRedisClientFactory b/service/src/test/resources/META-INF/services/org.whispersystems.textsecuregcm.configuration.SingletonRedisClientFactory deleted file mode 100644 index f851b9371..000000000 --- a/service/src/test/resources/META-INF/services/org.whispersystems.textsecuregcm.configuration.SingletonRedisClientFactory +++ /dev/null @@ -1 +0,0 @@ -org.whispersystems.textsecuregcm.configuration.LocalSingletonRedisClientFactory diff --git a/service/src/test/resources/config/test.yml b/service/src/test/resources/config/test.yml index 5c157529d..a867c0583 100644 --- a/service/src/test/resources/config/test.yml +++ b/service/src/test/resources/config/test.yml @@ -158,9 +158,8 @@ cacheCluster: # Redis server configuration for cache cluster clientPresenceCluster: # Redis server configuration for client presence cluster type: local -provisioning: - pubsub: # Redis server configuration for pubsub cluster - type: local +pubsub: # Redis server configuration for pubsub cluster + type: local pushSchedulerCluster: # Redis server configuration for push scheduler cluster type: local