From b13cb098ce4b0db4b4d86cc6f0e77651404de279 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Wed, 2 Nov 2022 10:31:37 -0500 Subject: [PATCH] lettuce: set `publishOnScheduler` to `true` --- .../redis/FaultTolerantRedisCluster.java | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 46d9abf50..fd60d5fc6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -53,35 +53,39 @@ public class FaultTolerantRedisCluster { private final Retry retry; public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, final ClientResources clientResources) { - this(name, - RedisClusterClient.create(clientResources, clusterConfiguration.getConfigurationUri()), - clusterConfiguration.getTimeout(), - clusterConfiguration.getCircuitBreakerConfiguration(), - clusterConfiguration.getRetryConfiguration()); + this(name, + RedisClusterClient.create(clientResources, clusterConfiguration.getConfigurationUri()), + clusterConfiguration.getTimeout(), + clusterConfiguration.getCircuitBreakerConfiguration(), + clusterConfiguration.getRetryConfiguration()); } @VisibleForTesting FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) { - this.name = name; + this.name = name; - this.clusterClient = clusterClient; - this.clusterClient.setDefaultTimeout(commandTimeout); - this.clusterClient.setOptions(ClusterClientOptions.builder() - .disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS) - .validateClusterNodeMembership(false) - .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() - .enableAllAdaptiveRefreshTriggers() - .build()) - .build()); + this.clusterClient = clusterClient; + this.clusterClient.setDefaultTimeout(commandTimeout); + this.clusterClient.setOptions(ClusterClientOptions.builder() + .disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS) + .validateClusterNodeMembership(false) + .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() + .enableAllAdaptiveRefreshTriggers() + .build()) + .publishOnScheduler(true) + .build()); - this.stringConnection = clusterClient.connect(); - this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); + this.stringConnection = clusterClient.connect(); + this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); - this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); - this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder().retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); + this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); + this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder() + .retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); - CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), circuitBreaker, FaultTolerantRedisCluster.class); - CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), retry, FaultTolerantRedisCluster.class); + CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), circuitBreaker, + FaultTolerantRedisCluster.class); + CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), retry, + FaultTolerantRedisCluster.class); } void shutdown() {