From b734d58ab7589a15345c70f0e4b480b065397b19 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Fri, 12 Apr 2024 12:32:16 -0500 Subject: [PATCH] Coalesce all Redis clusters to per-shard circuit breakers --- .../textsecuregcm/WhisperServerService.java | 38 +- .../ClusterFaultTolerantPubSubConnection.java | 106 ---- .../ClusterFaultTolerantRedisCluster.java | 196 ------- .../redis/FaultTolerantPubSubConnection.java | 83 ++- .../redis/FaultTolerantRedisCluster.java | 179 +++++- .../ShardFaultTolerantPubSubConnection.java | 97 ---- .../redis/ShardFaultTolerantRedisCluster.java | 197 ------- .../workers/AssignUsernameCommand.java | 17 +- .../workers/CommandDependencies.java | 17 +- ...nPushNotificationSenderServiceCommand.java | 3 +- .../FaultTolerantPubSubConnectionTest.java | 31 +- .../redis/FaultTolerantRedisClusterTest.java | 535 +++++++++++++++--- .../redis/RedisClusterExtension.java | 2 +- ...hardFaultTolerantPubSubConnectionTest.java | 197 ------- .../ShardFaultTolerantRedisClusterTest.java | 495 ---------------- 15 files changed, 720 insertions(+), 1473 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterFaultTolerantPubSubConnection.java delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterFaultTolerantRedisCluster.java delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantPubSubConnection.java delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantRedisCluster.java delete mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantPubSubConnectionTest.java delete mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantRedisClusterTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index c6b3554ad..8224621f3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -20,6 +20,7 @@ import io.dropwizard.core.server.DefaultServerFactory; import io.dropwizard.core.setup.Bootstrap; import io.dropwizard.core.setup.Environment; import io.dropwizard.jetty.HttpsConnectorFactory; +import io.dropwizard.lifecycle.Managed; import io.grpc.ServerBuilder; import io.lettuce.core.metrics.MicrometerCommandLatencyRecorder; import io.lettuce.core.metrics.MicrometerOptions; @@ -176,10 +177,8 @@ import org.whispersystems.textsecuregcm.push.ProvisioningManager; import org.whispersystems.textsecuregcm.push.PushLatencyManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.ReceiptSender; -import org.whispersystems.textsecuregcm.redis.ClusterFaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ConnectionEventLogger; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ShardFaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.registration.RegistrationServiceClient; import org.whispersystems.textsecuregcm.s3.PolicySigner; import org.whispersystems.textsecuregcm.s3.PostPolicyGenerator; @@ -414,25 +413,24 @@ public class WhisperServerService extends Application keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(100_000); Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), @@ -598,7 +596,7 @@ public class WhisperServerService extends Application implements FaultTolerantPubSubConnection { - - private static final Logger logger = LoggerFactory.getLogger(ClusterFaultTolerantPubSubConnection.class); - - - private final String name; - private final StatefulRedisClusterPubSubConnection pubSubConnection; - - private final CircuitBreaker circuitBreaker; - private final Retry retry; - private final Retry resubscribeRetry; - private final Scheduler topologyChangedEventScheduler; - - private final Timer executeTimer; - - public ClusterFaultTolerantPubSubConnection(final String name, - final StatefulRedisClusterPubSubConnection pubSubConnection, final CircuitBreaker circuitBreaker, - final Retry retry, final Retry resubscribeRetry, final Scheduler topologyChangedEventScheduler) { - this.name = name; - this.pubSubConnection = pubSubConnection; - this.circuitBreaker = circuitBreaker; - this.retry = retry; - this.resubscribeRetry = resubscribeRetry; - this.topologyChangedEventScheduler = topologyChangedEventScheduler; - - this.pubSubConnection.setNodeMessagePropagation(true); - - this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub"); - - CircuitBreakerUtil.registerMetrics(circuitBreaker, ClusterFaultTolerantPubSubConnection.class, Tags.empty()); - } - - @Override - public void usePubSubConnection(final Consumer> consumer) { - try { - circuitBreaker.executeCheckedRunnable( - () -> retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection)))); - } catch (final Throwable t) { - if (t instanceof RedisException) { - throw (RedisException) t; - } else { - throw new RedisException(t); - } - } - } - - @Override - public T withPubSubConnection(final Function, T> function) { - try { - return circuitBreaker.executeCheckedSupplier( - () -> retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection)))); - } catch (final Throwable t) { - if (t instanceof RedisException) { - throw (RedisException) t; - } else { - throw new RedisException(t); - } - } - } - - - @Override - public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) { - - usePubSubConnection(connection -> connection.getResources().eventBus().get() - .filter(event -> event instanceof ClusterTopologyChangedEvent) - .subscribeOn(topologyChangedEventScheduler) - .subscribe(event -> { - logger.info("Got topology change event for {}, resubscribing all keyspace notifications", name); - - resubscribeRetry.executeRunnable(() -> { - try { - eventHandler.run(); - } catch (final RuntimeException e) { - logger.warn("Resubscribe for {} failed", name, e); - throw e; - } - }); - })); - } - -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterFaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterFaultTolerantRedisCluster.java deleted file mode 100644 index 64b53f2d3..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterFaultTolerantRedisCluster.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.redis; - -import com.google.common.annotations.VisibleForTesting; -import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import io.github.resilience4j.core.IntervalFunction; -import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; -import io.github.resilience4j.reactor.retry.RetryOperator; -import io.github.resilience4j.retry.Retry; -import io.github.resilience4j.retry.RetryConfig; -import io.lettuce.core.ClientOptions.DisconnectedBehavior; -import io.lettuce.core.RedisCommandTimeoutException; -import io.lettuce.core.RedisException; -import io.lettuce.core.TimeoutOptions; -import io.lettuce.core.cluster.ClusterClientOptions; -import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; -import io.lettuce.core.cluster.RedisClusterClient; -import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; -import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; -import io.lettuce.core.codec.ByteArrayCodec; -import io.lettuce.core.resource.ClientResources; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; -import java.util.function.Function; -import io.micrometer.core.instrument.Tags; -import org.reactivestreams.Publisher; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration; -import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; -import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; -import reactor.core.publisher.Flux; -import reactor.core.scheduler.Schedulers; - -/** - * A fault-tolerant access manager for a Redis cluster. A single circuit breaker protects all cluster - * calls. - */ -public class ClusterFaultTolerantRedisCluster implements FaultTolerantRedisCluster { - - private final String name; - - private final RedisClusterClient clusterClient; - - private final StatefulRedisClusterConnection stringConnection; - private final StatefulRedisClusterConnection binaryConnection; - - private final List> pubSubConnections = new ArrayList<>(); - - private final CircuitBreaker circuitBreaker; - private final Retry retry; - private final Retry topologyChangedEventRetry; - - public ClusterFaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, - final ClientResources clientResources) { - this(name, - RedisClusterClient.create(clientResources, - RedisUriUtil.createRedisUriWithTimeout(clusterConfiguration.getConfigurationUri(), - clusterConfiguration.getTimeout())), - clusterConfiguration.getTimeout(), - clusterConfiguration.getCircuitBreakerConfiguration(), - clusterConfiguration.getRetryConfiguration()); - } - - @VisibleForTesting - ClusterFaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, - final Duration commandTimeout, - final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) { - this.name = name; - - this.clusterClient = clusterClient; - this.clusterClient.setOptions(ClusterClientOptions.builder() - .disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS) - .validateClusterNodeMembership(false) - .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() - .enableAllAdaptiveRefreshTriggers() - .build()) - // for asynchronous commands - .timeoutOptions(TimeoutOptions.builder() - .fixedTimeout(commandTimeout) - .build()) - .publishOnScheduler(true) - .build()); - - this.stringConnection = clusterClient.connect(); - this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); - - this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); - this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder() - .retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); - final RetryConfig topologyChangedEventRetryConfig = RetryConfig.custom() - .maxAttempts(Integer.MAX_VALUE) - .intervalFunction( - IntervalFunction.ofExponentialRandomBackoff(Duration.ofSeconds(1), 1.5, Duration.ofSeconds(30))) - .build(); - - this.topologyChangedEventRetry = Retry.of(name + "-topologyChangedRetry", topologyChangedEventRetryConfig); - - CircuitBreakerUtil.registerMetrics(circuitBreaker, FaultTolerantRedisCluster.class, Tags.empty()); - CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisCluster.class); - } - - @Override - public void shutdown() { - stringConnection.close(); - binaryConnection.close(); - - for (final StatefulRedisClusterPubSubConnection pubSubConnection : pubSubConnections) { - pubSubConnection.close(); - } - - clusterClient.shutdown(); - } - - @Override - public String getName() { - return name; - } - - @Override - public void useCluster(final Consumer> consumer) { - useConnection(stringConnection, consumer); - } - - @Override - public T withCluster(final Function, T> function) { - return withConnection(stringConnection, function); - } - - @Override - public void useBinaryCluster(final Consumer> consumer) { - useConnection(binaryConnection, consumer); - } - - @Override - public T withBinaryCluster(final Function, T> function) { - return withConnection(binaryConnection, function); - } - - @Override - public Publisher withBinaryClusterReactive( - final Function, Publisher> function) { - return withConnectionReactive(binaryConnection, function); - } - - @Override - public void useConnection(final StatefulRedisClusterConnection connection, - final Consumer> consumer) { - try { - circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> consumer.accept(connection))); - } catch (final Throwable t) { - if (t instanceof RedisException) { - throw (RedisException) t; - } else { - throw new RedisException(t); - } - } - } - - @Override - public T withConnection(final StatefulRedisClusterConnection connection, - final Function, T> function) { - try { - return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> function.apply(connection))); - } catch (final Throwable t) { - if (t instanceof RedisException) { - throw (RedisException) t; - } else { - throw new RedisException(t); - } - } - } - - @Override - public Publisher withConnectionReactive(final StatefulRedisClusterConnection connection, - final Function, Publisher> function) { - - return Flux.from(function.apply(connection)) - .transformDeferred(RetryOperator.of(retry)) - .transformDeferred(CircuitBreakerOperator.of(circuitBreaker)); - } - - public FaultTolerantPubSubConnection createPubSubConnection() { - final StatefulRedisClusterPubSubConnection pubSubConnection = clusterClient.connectPubSub(); - pubSubConnections.add(pubSubConnection); - - return new ClusterFaultTolerantPubSubConnection<>(name, pubSubConnection, circuitBreaker, retry, - topologyChangedEventRetry, - Schedulers.newSingle(name + "-redisPubSubEvents", true)); - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java index 6ffef4b81..8392ddcc5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java @@ -5,15 +5,90 @@ package org.whispersystems.textsecuregcm.redis; +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; + +import io.github.resilience4j.retry.Retry; +import io.lettuce.core.RedisException; +import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; import java.util.function.Consumer; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.scheduler.Scheduler; -public interface FaultTolerantPubSubConnection { +public class FaultTolerantPubSubConnection { - void usePubSubConnection(Consumer> consumer); + private static final Logger logger = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class); - T withPubSubConnection(Function, T> function); - void subscribeToClusterTopologyChangedEvents(Runnable eventHandler); + private final String name; + private final StatefulRedisClusterPubSubConnection pubSubConnection; + + private final Retry retry; + private final Retry resubscribeRetry; + private final Scheduler topologyChangedEventScheduler; + + private final Timer executeTimer; + + public FaultTolerantPubSubConnection(final String name, + final StatefulRedisClusterPubSubConnection pubSubConnection, + final Retry retry, final Retry resubscribeRetry, final Scheduler topologyChangedEventScheduler) { + this.name = name; + this.pubSubConnection = pubSubConnection; + this.retry = retry; + this.resubscribeRetry = resubscribeRetry; + this.topologyChangedEventScheduler = topologyChangedEventScheduler; + + this.pubSubConnection.setNodeMessagePropagation(true); + + this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub"); + } + + public void usePubSubConnection(final Consumer> consumer) { + try { + retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection))); + } catch (final Throwable t) { + if (t instanceof RedisException) { + throw (RedisException) t; + } else { + throw new RedisException(t); + } + } + } + + public T withPubSubConnection(final Function, T> function) { + try { + return retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection))); + } catch (final Throwable t) { + if (t instanceof RedisException) { + throw (RedisException) t; + } else { + throw new RedisException(t); + } + } + } + + + public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) { + + usePubSubConnection(connection -> connection.getResources().eventBus().get() + .filter(event -> event instanceof ClusterTopologyChangedEvent) + .subscribeOn(topologyChangedEventScheduler) + .subscribe(event -> { + logger.info("Got topology change event for {}, resubscribing all keyspace notifications", name); + + resubscribeRetry.executeRunnable(() -> { + try { + eventHandler.run(); + } catch (final RuntimeException e) { + logger.warn("Resubscribe for {} failed", name, e); + throw e; + } + }); + })); + } + } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index ad6203f7f..9f72400f6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -5,36 +5,183 @@ package org.whispersystems.textsecuregcm.redis; +import io.github.resilience4j.core.IntervalFunction; +import io.github.resilience4j.reactor.retry.RetryOperator; +import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisCommandTimeoutException; +import io.lettuce.core.RedisException; +import io.lettuce.core.RedisURI; +import io.lettuce.core.TimeoutOptions; +import io.lettuce.core.cluster.ClusterClientOptions; +import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; +import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import io.lettuce.core.codec.ByteArrayCodec; +import io.lettuce.core.resource.ClientResources; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.function.Consumer; import java.util.function.Function; import org.reactivestreams.Publisher; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; +import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; -public interface FaultTolerantRedisCluster { +/** + * A fault-tolerant access manager for a Redis cluster. Each shard in the cluster has a dedicated circuit breaker. + * + * @see LettuceShardCircuitBreaker + */ +public class FaultTolerantRedisCluster { - void shutdown(); + private final String name; - String getName(); + private final RedisClusterClient clusterClient; - void useCluster(Consumer> consumer); + private final StatefulRedisClusterConnection stringConnection; + private final StatefulRedisClusterConnection binaryConnection; - T withCluster(Function, T> function); + private final List> pubSubConnections = new ArrayList<>(); - void useBinaryCluster(Consumer> consumer); + private final Retry retry; + private final Retry topologyChangedEventRetry; - T withBinaryCluster(Function, T> function); - Publisher withBinaryClusterReactive( - Function, Publisher> function); + public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, + final ClientResources.Builder clientResourcesBuilder) { - void useConnection(StatefulRedisClusterConnection connection, - Consumer> consumer); + this(name, clientResourcesBuilder, + Collections.singleton(RedisUriUtil.createRedisUriWithTimeout(clusterConfiguration.getConfigurationUri(), + clusterConfiguration.getTimeout())), + clusterConfiguration.getTimeout(), + clusterConfiguration.getCircuitBreakerConfiguration(), + clusterConfiguration.getRetryConfiguration()); - T withConnection(StatefulRedisClusterConnection connection, - Function, T> function); + } - Publisher withConnectionReactive(StatefulRedisClusterConnection connection, - Function, Publisher> function); + FaultTolerantRedisCluster(String name, final ClientResources.Builder clientResourcesBuilder, + Iterable redisUris, Duration commandTimeout, CircuitBreakerConfiguration circuitBreakerConfig, + RetryConfiguration retryConfiguration) { + + this.name = name; + + this.clusterClient = RedisClusterClient.create( + clientResourcesBuilder.nettyCustomizer( + new LettuceShardCircuitBreaker(name, circuitBreakerConfig.toCircuitBreakerConfig())). + build(), + redisUris); + this.clusterClient.setOptions(ClusterClientOptions.builder() + .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) + .validateClusterNodeMembership(false) + .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() + .enableAllAdaptiveRefreshTriggers() + .build()) + // for asynchronous commands + .timeoutOptions(TimeoutOptions.builder() + .fixedTimeout(commandTimeout) + .build()) + .publishOnScheduler(true) + .build()); + + this.stringConnection = clusterClient.connect(); + this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); + + this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder() + .retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); + final RetryConfig topologyChangedEventRetryConfig = RetryConfig.custom() + .maxAttempts(Integer.MAX_VALUE) + .intervalFunction( + IntervalFunction.ofExponentialRandomBackoff(Duration.ofSeconds(1), 1.5, Duration.ofSeconds(30))) + .build(); + + this.topologyChangedEventRetry = Retry.of(name + "-topologyChangedRetry", topologyChangedEventRetryConfig); + + CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisCluster.class); + } + + public void shutdown() { + stringConnection.close(); + binaryConnection.close(); + + for (final StatefulRedisClusterPubSubConnection pubSubConnection : pubSubConnections) { + pubSubConnection.close(); + } + + clusterClient.shutdown(); + } + + public String getName() { + return name; + } + + public void useCluster(final Consumer> consumer) { + useConnection(stringConnection, consumer); + } + + public T withCluster(final Function, T> function) { + return withConnection(stringConnection, function); + } + + public void useBinaryCluster(final Consumer> consumer) { + useConnection(binaryConnection, consumer); + } + + public T withBinaryCluster(final Function, T> function) { + return withConnection(binaryConnection, function); + } + + public Publisher withBinaryClusterReactive( + final Function, Publisher> function) { + return withConnectionReactive(binaryConnection, function); + } + + public void useConnection(final StatefulRedisClusterConnection connection, + final Consumer> consumer) { + try { + retry.executeRunnable(() -> consumer.accept(connection)); + } catch (final Throwable t) { + if (t instanceof RedisException) { + throw (RedisException) t; + } else { + throw new RedisException(t); + } + } + } + + public T withConnection(final StatefulRedisClusterConnection connection, + final Function, T> function) { + try { + return retry.executeCallable(() -> function.apply(connection)); + } catch (final Throwable t) { + if (t instanceof RedisException) { + throw (RedisException) t; + } else { + throw new RedisException(t); + } + } + } + + public Publisher withConnectionReactive(final StatefulRedisClusterConnection connection, + final Function, Publisher> function) { + + return Flux.from(function.apply(connection)) + .transformDeferred(RetryOperator.of(retry)); + } + + public FaultTolerantPubSubConnection createPubSubConnection() { + final StatefulRedisClusterPubSubConnection pubSubConnection = clusterClient.connectPubSub(); + pubSubConnections.add(pubSubConnection); + + return new FaultTolerantPubSubConnection<>(name, pubSubConnection, retry, topologyChangedEventRetry, + Schedulers.newSingle(name + "-redisPubSubEvents", true)); + } - FaultTolerantPubSubConnection createPubSubConnection(); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantPubSubConnection.java deleted file mode 100644 index bbd389c0c..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantPubSubConnection.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.redis; - -import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; - -import io.github.resilience4j.retry.Retry; -import io.lettuce.core.RedisException; -import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; -import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; -import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Timer; -import java.util.function.Consumer; -import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.scheduler.Scheduler; - -public class ShardFaultTolerantPubSubConnection implements FaultTolerantPubSubConnection { - - private static final Logger logger = LoggerFactory.getLogger(ShardFaultTolerantPubSubConnection.class); - - - private final String name; - private final StatefulRedisClusterPubSubConnection pubSubConnection; - - private final Retry retry; - private final Retry resubscribeRetry; - private final Scheduler topologyChangedEventScheduler; - - private final Timer executeTimer; - - public ShardFaultTolerantPubSubConnection(final String name, - final StatefulRedisClusterPubSubConnection pubSubConnection, - final Retry retry, final Retry resubscribeRetry, final Scheduler topologyChangedEventScheduler) { - this.name = name; - this.pubSubConnection = pubSubConnection; - this.retry = retry; - this.resubscribeRetry = resubscribeRetry; - this.topologyChangedEventScheduler = topologyChangedEventScheduler; - - this.pubSubConnection.setNodeMessagePropagation(true); - - this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub"); - } - - @Override - public void usePubSubConnection(final Consumer> consumer) { - try { - retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection))); - } catch (final Throwable t) { - if (t instanceof RedisException) { - throw (RedisException) t; - } else { - throw new RedisException(t); - } - } - } - - @Override - public T withPubSubConnection(final Function, T> function) { - try { - return retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection))); - } catch (final Throwable t) { - if (t instanceof RedisException) { - throw (RedisException) t; - } else { - throw new RedisException(t); - } - } - } - - - @Override - public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) { - - usePubSubConnection(connection -> connection.getResources().eventBus().get() - .filter(event -> event instanceof ClusterTopologyChangedEvent) - .subscribeOn(topologyChangedEventScheduler) - .subscribe(event -> { - logger.info("Got topology change event for {}, resubscribing all keyspace notifications", name); - - resubscribeRetry.executeRunnable(() -> { - try { - eventHandler.run(); - } catch (final RuntimeException e) { - logger.warn("Resubscribe for {} failed", name, e); - throw e; - } - }); - })); - } - -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantRedisCluster.java deleted file mode 100644 index 854980f29..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantRedisCluster.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Copyright 2024 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.redis; - -import io.github.resilience4j.core.IntervalFunction; -import io.github.resilience4j.reactor.retry.RetryOperator; -import io.github.resilience4j.retry.Retry; -import io.github.resilience4j.retry.RetryConfig; -import io.lettuce.core.ClientOptions; -import io.lettuce.core.RedisCommandTimeoutException; -import io.lettuce.core.RedisException; -import io.lettuce.core.RedisURI; -import io.lettuce.core.TimeoutOptions; -import io.lettuce.core.cluster.ClusterClientOptions; -import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; -import io.lettuce.core.cluster.RedisClusterClient; -import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; -import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; -import io.lettuce.core.codec.ByteArrayCodec; -import io.lettuce.core.resource.ClientResources; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.function.Consumer; -import java.util.function.Function; -import org.reactivestreams.Publisher; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration; -import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; -import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; -import reactor.core.publisher.Flux; -import reactor.core.scheduler.Schedulers; - -/** - * A fault-tolerant access manager for a Redis cluster. Each shard in the cluster has a dedicated circuit breaker. - * - * @see LettuceShardCircuitBreaker - */ -public class ShardFaultTolerantRedisCluster implements FaultTolerantRedisCluster { - - private final String name; - - private final RedisClusterClient clusterClient; - - private final StatefulRedisClusterConnection stringConnection; - private final StatefulRedisClusterConnection binaryConnection; - - private final List> pubSubConnections = new ArrayList<>(); - - private final Retry retry; - private final Retry topologyChangedEventRetry; - - - public ShardFaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, - final ClientResources.Builder clientResourcesBuilder) { - - this(name, clientResourcesBuilder, - Collections.singleton(RedisUriUtil.createRedisUriWithTimeout(clusterConfiguration.getConfigurationUri(), - clusterConfiguration.getTimeout())), - clusterConfiguration.getTimeout(), - clusterConfiguration.getCircuitBreakerConfiguration(), - clusterConfiguration.getRetryConfiguration()); - - } - - ShardFaultTolerantRedisCluster(String name, final ClientResources.Builder clientResourcesBuilder, - Iterable redisUris, Duration commandTimeout, CircuitBreakerConfiguration circuitBreakerConfig, - RetryConfiguration retryConfiguration) { - - this.name = name; - - this.clusterClient = RedisClusterClient.create( - clientResourcesBuilder.nettyCustomizer( - new LettuceShardCircuitBreaker(name, circuitBreakerConfig.toCircuitBreakerConfig())). - build(), - redisUris); - this.clusterClient.setOptions(ClusterClientOptions.builder() - .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) - .validateClusterNodeMembership(false) - .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() - .enableAllAdaptiveRefreshTriggers() - .build()) - // for asynchronous commands - .timeoutOptions(TimeoutOptions.builder() - .fixedTimeout(commandTimeout) - .build()) - .publishOnScheduler(true) - .build()); - - this.stringConnection = clusterClient.connect(); - this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); - - this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder() - .retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); - final RetryConfig topologyChangedEventRetryConfig = RetryConfig.custom() - .maxAttempts(Integer.MAX_VALUE) - .intervalFunction( - IntervalFunction.ofExponentialRandomBackoff(Duration.ofSeconds(1), 1.5, Duration.ofSeconds(30))) - .build(); - - this.topologyChangedEventRetry = Retry.of(name + "-topologyChangedRetry", topologyChangedEventRetryConfig); - - CircuitBreakerUtil.registerMetrics(retry, ShardFaultTolerantRedisCluster.class); - } - - @Override - public void shutdown() { - stringConnection.close(); - binaryConnection.close(); - - for (final StatefulRedisClusterPubSubConnection pubSubConnection : pubSubConnections) { - pubSubConnection.close(); - } - - clusterClient.shutdown(); - } - - public String getName() { - return name; - } - - @Override - public void useCluster(final Consumer> consumer) { - useConnection(stringConnection, consumer); - } - - @Override - public T withCluster(final Function, T> function) { - return withConnection(stringConnection, function); - } - - @Override - public void useBinaryCluster(final Consumer> consumer) { - useConnection(binaryConnection, consumer); - } - - @Override - public T withBinaryCluster(final Function, T> function) { - return withConnection(binaryConnection, function); - } - - @Override - public Publisher withBinaryClusterReactive( - final Function, Publisher> function) { - return withConnectionReactive(binaryConnection, function); - } - - @Override - public void useConnection(final StatefulRedisClusterConnection connection, - final Consumer> consumer) { - try { - retry.executeRunnable(() -> consumer.accept(connection)); - } catch (final Throwable t) { - if (t instanceof RedisException) { - throw (RedisException) t; - } else { - throw new RedisException(t); - } - } - } - - @Override - public T withConnection(final StatefulRedisClusterConnection connection, - final Function, T> function) { - try { - return retry.executeCallable(() -> function.apply(connection)); - } catch (final Throwable t) { - if (t instanceof RedisException) { - throw (RedisException) t; - } else { - throw new RedisException(t); - } - } - } - - @Override - public Publisher withConnectionReactive(final StatefulRedisClusterConnection connection, - final Function, Publisher> function) { - - return Flux.from(function.apply(connection)) - .transformDeferred(RetryOperator.of(retry)); - } - - @Override - public FaultTolerantPubSubConnection createPubSubConnection() { - final StatefulRedisClusterPubSubConnection pubSubConnection = clusterClient.connectPubSub(); - pubSubConnections.add(pubSubConnection); - - return new ShardFaultTolerantPubSubConnection<>(name, pubSubConnection, retry, topologyChangedEventRetry, - Schedulers.newSingle(name + "-redisPubSubEvents", true)); - } - -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java index f7e95b9b3..0b0f8ae08 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java @@ -29,9 +29,7 @@ import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfigurati import org.whispersystems.textsecuregcm.controllers.SecureStorageController; import org.whispersystems.textsecuregcm.controllers.SecureValueRecovery2Controller; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; -import org.whispersystems.textsecuregcm.redis.ClusterFaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ShardFaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; import org.whispersystems.textsecuregcm.storage.Account; @@ -108,9 +106,8 @@ public class AssignUsernameCommand extends EnvironmentCommand { @@ -64,7 +63,7 @@ public class ScheduledApnPushNotificationSenderServiceCommand extends ServerComm }); } - final FaultTolerantRedisCluster pushSchedulerCluster = new ShardFaultTolerantRedisCluster("push_scheduler", + final FaultTolerantRedisCluster pushSchedulerCluster = new FaultTolerantRedisCluster("push_scheduler", configuration.getPushSchedulerCluster(), deps.redisClusterClientResourcesBuilder()); final ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java index 4c46cbb43..497ff1cbf 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java @@ -17,8 +17,6 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import io.github.resilience4j.circuitbreaker.CallNotPermittedException; -import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.core.IntervalFunction; import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.RetryConfig; @@ -30,7 +28,6 @@ import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; import io.lettuce.core.event.Event; import io.lettuce.core.event.EventBus; import io.lettuce.core.resource.ClientResources; -import java.time.Duration; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -38,7 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; @@ -60,17 +56,10 @@ class FaultTolerantPubSubConnectionTest { when(pubSubConnection.sync()).thenReturn(pubSubCommands); - final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); - breakerConfiguration.setFailureRateThreshold(100); - breakerConfiguration.setSlidingWindowSize(1); - breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1); - breakerConfiguration.setWaitDurationInOpenState(Duration.ofSeconds(Integer.MAX_VALUE)); - final RetryConfiguration retryConfiguration = new RetryConfiguration(); retryConfiguration.setMaxAttempts(3); retryConfiguration.setWaitDuration(10); - final CircuitBreaker circuitBreaker = CircuitBreaker.of("test", breakerConfiguration.toCircuitBreakerConfig()); final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig()); final RetryConfig resubscribeRetryConfiguration = RetryConfig.custom() @@ -79,28 +68,10 @@ class FaultTolerantPubSubConnectionTest { .build(); final Retry resubscribeRetry = Retry.of("test-resubscribe", resubscribeRetryConfiguration); - faultTolerantPubSubConnection = new ClusterFaultTolerantPubSubConnection<>("test", pubSubConnection, circuitBreaker, + faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, retry, resubscribeRetry, Schedulers.newSingle("test")); } - @Test - void testBreaker() { - when(pubSubCommands.get(anyString())) - .thenReturn("value") - .thenThrow(new RuntimeException("Badness has ensued.")); - - assertEquals("value", - faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); - - assertThrows(RedisException.class, - () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO"))); - - final RedisException redisException = assertThrows(RedisException.class, - () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO"))); - - assertTrue(redisException.getCause() instanceof CallNotPermittedException); - } - @Test void testRetry() { when(pubSubCommands.get(anyString())) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java index 166c9d0e6..268435bb3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2024 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ @@ -8,136 +8,487 @@ package org.whispersystems.textsecuregcm.redis; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.junit.jupiter.api.Assertions.assertTrue; import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisException; -import io.lettuce.core.cluster.RedisClusterClient; -import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; -import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; -import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.models.partitions.ClusterPartitionParser; +import io.lettuce.core.cluster.models.partitions.Partitions; +import io.lettuce.core.cluster.models.partitions.RedisClusterNode; +import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; import io.lettuce.core.event.EventBus; +import io.lettuce.core.event.EventPublisherOptions; +import io.lettuce.core.metrics.CommandLatencyCollectorOptions; +import io.lettuce.core.metrics.CommandLatencyRecorder; import io.lettuce.core.resource.ClientResources; +import io.lettuce.core.resource.Delay; +import io.lettuce.core.resource.DnsResolver; +import io.lettuce.core.resource.EventLoopGroupProvider; +import io.lettuce.core.resource.NettyCustomizer; +import io.lettuce.core.resource.SocketAddressResolver; +import io.lettuce.core.resource.ThreadFactoryProvider; +import io.lettuce.core.tracing.Tracing; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.resolver.AddressResolverGroup; +import io.netty.util.Timer; +import io.netty.util.concurrent.EventExecutorGroup; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; -import reactor.core.publisher.Flux; +import org.whispersystems.textsecuregcm.util.Pair; +import org.whispersystems.textsecuregcm.util.RedisClusterUtil; +// ThreadMode.SEPARATE_THREAD protects against hangs in the remote Redis calls, as this mode allows the test code to be +// preempted by the timeout check +@Timeout(value = 5, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) class FaultTolerantRedisClusterTest { - private RedisAdvancedClusterCommands clusterCommands; - private FaultTolerantRedisCluster faultTolerantCluster; + private static final Duration TIMEOUT = Duration.ofMillis(50); - @SuppressWarnings("unchecked") - @BeforeEach - public void setUp() { - final RedisClusterClient clusterClient = mock(RedisClusterClient.class); - final StatefulRedisClusterConnection clusterConnection = mock(StatefulRedisClusterConnection.class); - final StatefulRedisClusterPubSubConnection pubSubConnection = mock( - StatefulRedisClusterPubSubConnection.class); - final ClientResources clientResources = mock(ClientResources.class); - final EventBus eventBus = mock(EventBus.class); + private static final RetryConfiguration RETRY_CONFIGURATION = new RetryConfiguration(); - clusterCommands = mock(RedisAdvancedClusterCommands.class); + static { + RETRY_CONFIGURATION.setMaxAttempts(1); + RETRY_CONFIGURATION.setWaitDuration(50); + } - when(clusterClient.connect()).thenReturn(clusterConnection); - when(clusterClient.connectPubSub()).thenReturn(pubSubConnection); - when(clusterClient.getResources()).thenReturn(clientResources); - when(clusterConnection.sync()).thenReturn(clusterCommands); - when(clientResources.eventBus()).thenReturn(eventBus); - when(eventBus.get()).thenReturn(mock(Flux.class)); + @RegisterExtension + static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder() + .retryConfiguration(RETRY_CONFIGURATION) + .timeout(TIMEOUT) + .build(); - final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); - breakerConfiguration.setFailureRateThreshold(100); - breakerConfiguration.setSlidingWindowSize(1); - breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1); - breakerConfiguration.setWaitDurationInOpenState(Duration.ofSeconds(Integer.MAX_VALUE)); + private FaultTolerantRedisCluster cluster; - final RetryConfiguration retryConfiguration = new RetryConfiguration(); - retryConfiguration.setMaxAttempts(3); - retryConfiguration.setWaitDuration(0); + private static FaultTolerantRedisCluster buildCluster( + @Nullable final CircuitBreakerConfiguration circuitBreakerConfiguration, + final ClientResources.Builder clientResourcesBuilder) { - faultTolerantCluster = new ClusterFaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), - breakerConfiguration, retryConfiguration); + return new FaultTolerantRedisCluster("test", clientResourcesBuilder, + RedisClusterExtension.getRedisURIs(), TIMEOUT, + Optional.ofNullable(circuitBreakerConfiguration).orElseGet(CircuitBreakerConfiguration::new), + RETRY_CONFIGURATION); + } + + @AfterEach + void tearDown() { + cluster.shutdown(); } @Test - void testBreaker() { - when(clusterCommands.get(anyString())) - .thenReturn("value") - .thenThrow(new RuntimeException("Badness has ensued.")); + void testTimeout() { + cluster = buildCluster(null, ClientResources.builder()); - assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); + final ExecutionException asyncException = assertThrows(ExecutionException.class, + () -> cluster.withCluster(connection -> connection.async().blpop(2 * TIMEOUT.toMillis() / 1000d, "key")) + .get()); - assertThrows(RedisException.class, - () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO"))); - - final RedisException redisException = assertThrows(RedisException.class, - () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO"))); - - assertInstanceOf(CallNotPermittedException.class, redisException.getCause()); - } - - @Test - void testRetry() { - when(clusterCommands.get(anyString())) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenReturn("value"); - - assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); - - when(clusterCommands.get(anyString())) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenReturn("value"); + assertInstanceOf(RedisCommandTimeoutException.class, asyncException.getCause()); assertThrows(RedisCommandTimeoutException.class, - () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); - + () -> cluster.withCluster(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, "key"))); } - @Nested - class WithRealCluster { + @Test + void testTimeoutCircuitBreaker() throws Exception { + // because we’re using a single key, and blpop involves *Redis* also blocking, the breaker wait duration must be + // longer than the sum of the remote timeouts + final Duration breakerWaitDuration = TIMEOUT.multipliedBy(5); - private static final Duration TIMEOUT = Duration.ofMillis(50); + final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); + circuitBreakerConfig.setFailureRateThreshold(1); + circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(1); + circuitBreakerConfig.setSlidingWindowSize(1); + circuitBreakerConfig.setWaitDurationInOpenState(breakerWaitDuration); - private static final RetryConfiguration retryConfiguration = new RetryConfiguration(); + cluster = buildCluster(circuitBreakerConfig, ClientResources.builder()); - static { - retryConfiguration.setMaxAttempts(1); - retryConfiguration.setWaitDuration(50); + final String key = "key"; + + // the first call should time out and open the breaker + assertThrows(RedisCommandTimeoutException.class, + () -> cluster.withCluster(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, key))); + + // the second call gets blocked by the breaker + final RedisException e = assertThrows(RedisException.class, + () -> cluster.withCluster(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, key))); + assertInstanceOf(CallNotPermittedException.class, e.getCause()); + + // wait for breaker to be half-open + Thread.sleep(breakerWaitDuration.toMillis() * 2); + + assertEquals(0, (Long) cluster.withCluster(connection -> connection.sync().llen(key))); + } + + @Test + void testShardUnavailable() { + final TestBreakerManager testBreakerManager = new TestBreakerManager(); + final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); + circuitBreakerConfig.setFailureRateThreshold(1); + circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(2); + circuitBreakerConfig.setSlidingWindowSize(5); + + final ClientResources.Builder builder = CompositeNettyCustomizerClientResourcesBuilder.builder() + .nettyCustomizer(testBreakerManager); + + cluster = buildCluster(circuitBreakerConfig, builder); + + // this test will open the breaker on one shard and check that other shards are still available, + // so we get two nodes and a slot+key on each to test + final Pair nodePair = + cluster.withCluster(connection -> { + Partitions partitions = ClusterPartitionParser.parse(connection.sync().clusterNodes()); + + assertTrue(partitions.size() >= 2); + + return new Pair<>(partitions.getPartition(0), partitions.getPartition(1)); + }); + + final RedisClusterNode unavailableNode = nodePair.first(); + final int unavailableSlot = unavailableNode.getSlots().getFirst(); + final String unavailableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(unavailableSlot)); + + final int availableSlot = nodePair.second().getSlots().getFirst(); + final String availableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(availableSlot)); + + cluster.useCluster(connection -> { + connection.sync().set(unavailableKey, "unavailable"); + connection.sync().set(availableKey, "available"); + + assertEquals("unavailable", connection.sync().get(unavailableKey)); + assertEquals("available", connection.sync().get(availableKey)); + }); + + // shard is now unavailable + testBreakerManager.openBreaker(unavailableNode.getUri()); + final RedisException e = assertThrows(RedisException.class, () -> + cluster.useCluster(connection -> connection.sync().get(unavailableKey))); + assertInstanceOf(CallNotPermittedException.class, e.getCause()); + + // other shard is still available + assertEquals("available", cluster.withCluster(connection -> connection.sync().get(availableKey))); + + // shard is available again + testBreakerManager.closeBreaker(unavailableNode.getUri()); + assertEquals("unavailable", cluster.withCluster(connection -> connection.sync().get(unavailableKey))); + } + + @Test + void testShardUnavailablePubSub() throws Exception { + final TestBreakerManager testBreakerManager = new TestBreakerManager(); + final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); + circuitBreakerConfig.setFailureRateThreshold(1); + circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(2); + circuitBreakerConfig.setSlidingWindowSize(5); + + final ClientResources.Builder builder = CompositeNettyCustomizerClientResourcesBuilder.builder() + .nettyCustomizer(testBreakerManager); + + cluster = buildCluster(circuitBreakerConfig, builder); + + cluster.useCluster( + connection -> connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz")); + + // this test will open the breaker on one shard and check that other shards are still available, + // so we get two nodes and a slot+key on each to test + final Pair nodePair = + cluster.withCluster(connection -> { + Partitions partitions = ClusterPartitionParser.parse(connection.sync().clusterNodes()); + + assertTrue(partitions.size() >= 2); + + return new Pair<>(partitions.getPartition(0), partitions.getPartition(1)); + }); + + final RedisClusterNode unavailableNode = nodePair.first(); + final int unavailableSlot = unavailableNode.getSlots().getFirst(); + final String unavailableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(unavailableSlot)); + + final RedisClusterNode availableNode = nodePair.second(); + final int availableSlot = availableNode.getSlots().getFirst(); + final String availableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(availableSlot)); + + final FaultTolerantPubSubConnection pubSubConnection = cluster.createPubSubConnection(); + + // Keyspace notifications are delivered on a different thread, so we use a CountDownLatch to wait for the + // expected number of notifications to arrive + final AtomicReference countDownLatchRef = new AtomicReference<>(); + + final Map channelMessageCounts = new ConcurrentHashMap<>(); + final String keyspacePrefix = "__keyspace@0__:"; + final RedisClusterPubSubAdapter listener = new RedisClusterPubSubAdapter<>() { + @Override + public void message(final RedisClusterNode node, final String channel, final String message) { + channelMessageCounts.computeIfAbsent(StringUtils.substringAfter(channel, keyspacePrefix), + k -> new AtomicInteger(0)) + .incrementAndGet(); + + countDownLatchRef.get().countDown(); + } + }; + + countDownLatchRef.set(new CountDownLatch(2)); + pubSubConnection.usePubSubConnection(c -> { + c.addListener(listener); + c.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(availableSlot)) + .commands() + .subscribe(keyspacePrefix + availableKey); + c.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(unavailableSlot)) + .commands() + .subscribe(keyspacePrefix + unavailableKey); + }); + + cluster.useCluster(connection -> { + connection.sync().set(availableKey, "ping1"); + connection.sync().set(unavailableKey, "ping1"); + }); + + countDownLatchRef.get().await(); + + assertEquals(1, channelMessageCounts.get(availableKey).get()); + assertEquals(1, channelMessageCounts.get(unavailableKey).get()); + + // shard is now unavailable + testBreakerManager.openBreaker(unavailableNode.getUri()); + + final RedisException e = assertThrows(RedisException.class, () -> + cluster.useCluster(connection -> connection.sync().set(unavailableKey, "ping2"))); + assertInstanceOf(CallNotPermittedException.class, e.getCause()); + assertEquals(1, channelMessageCounts.get(unavailableKey).get()); + assertEquals(1, channelMessageCounts.get(availableKey).get()); + + countDownLatchRef.set(new CountDownLatch(1)); + pubSubConnection.usePubSubConnection(connection -> connection.sync().set(availableKey, "ping2")); + + countDownLatchRef.get().await(); + + assertEquals(1, channelMessageCounts.get(unavailableKey).get()); + assertEquals(2, channelMessageCounts.get(availableKey).get()); + + // shard is available again + testBreakerManager.closeBreaker(unavailableNode.getUri()); + + countDownLatchRef.set(new CountDownLatch(2)); + + cluster.useCluster(connection -> { + connection.sync().set(availableKey, "ping3"); + connection.sync().set(unavailableKey, "ping3"); + }); + + countDownLatchRef.get().await(); + + assertEquals(2, channelMessageCounts.get(unavailableKey).get()); + assertEquals(3, channelMessageCounts.get(availableKey).get()); + } + + @ChannelHandler.Sharable + private static class TestBreakerManager extends ChannelDuplexHandler implements NettyCustomizer { + + private final Map> urisToChannelBreakers = new ConcurrentHashMap<>(); + private final AtomicInteger counter = new AtomicInteger(); + + @Override + public void afterChannelInitialized(Channel channel) { + channel.pipeline().addFirst("TestBreakerManager#" + counter.getAndIncrement(), this); } - @RegisterExtension - static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder() - .retryConfiguration(retryConfiguration) - .timeout(TIMEOUT) - .build(); + @Override + public void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, + final SocketAddress localAddress, final ChannelPromise promise) throws Exception { - @Test - void testTimeout() { - final FaultTolerantRedisCluster cluster = REDIS_CLUSTER_EXTENSION.getRedisCluster(); + super.connect(ctx, remoteAddress, localAddress, promise); - assertTimeoutPreemptively(Duration.ofSeconds(1), () -> { - final ExecutionException asyncException = assertThrows(ExecutionException.class, - () -> cluster.withCluster(connection -> connection.async().blpop(TIMEOUT.toMillis() * 2, "key")).get()); - assertInstanceOf(RedisCommandTimeoutException.class, asyncException.getCause()); + final LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler channelCircuitBreakerHandler = + ctx.channel().pipeline().get(LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler.class); - assertThrows(RedisCommandTimeoutException.class, - () -> cluster.withCluster(connection -> connection.sync().blpop(TIMEOUT.toMillis() * 2, "key"))); - }); + urisToChannelBreakers.computeIfAbsent(getRedisURI(ctx.channel()), ignored -> new HashSet<>()) + .add(channelCircuitBreakerHandler); + } + private static RedisURI getRedisURI(Channel channel) { + final InetSocketAddress inetAddress = (InetSocketAddress) channel.remoteAddress(); + return RedisURI.create(inetAddress.getHostString(), inetAddress.getPort()); + } + + void openBreaker(final RedisURI redisURI) { + urisToChannelBreakers.get(redisURI).forEach(handler -> handler.breaker.transitionToOpenState()); + } + + void closeBreaker(final RedisURI redisURI) { + urisToChannelBreakers.get(redisURI).forEach(handler -> handler.breaker.transitionToClosedState()); + } + } + + static class CompositeNettyCustomizer implements NettyCustomizer { + + private final List nettyCustomizers = new ArrayList<>(); + + @Override + public void afterBootstrapInitialized(final Bootstrap bootstrap) { + nettyCustomizers.forEach(nc -> nc.afterBootstrapInitialized(bootstrap)); + } + + @Override + public void afterChannelInitialized(final Channel channel) { + nettyCustomizers.forEach(nc -> nc.afterChannelInitialized(channel)); + } + + void add(NettyCustomizer customizer) { + nettyCustomizers.add(customizer); + } + } + + static class CompositeNettyCustomizerClientResourcesBuilder implements ClientResources.Builder { + + private final CompositeNettyCustomizer compositeNettyCustomizer; + private final ClientResources.Builder delegate; + + static CompositeNettyCustomizerClientResourcesBuilder builder() { + return new CompositeNettyCustomizerClientResourcesBuilder(); + } + + private CompositeNettyCustomizerClientResourcesBuilder() { + this.compositeNettyCustomizer = new CompositeNettyCustomizer(); + this.delegate = ClientResources.builder().nettyCustomizer(compositeNettyCustomizer); + } + + + @Override + public ClientResources.Builder addressResolverGroup(final AddressResolverGroup addressResolverGroup) { + delegate.addressResolverGroup(addressResolverGroup); + return this; + } + + @Override + public ClientResources.Builder commandLatencyRecorder(final CommandLatencyRecorder latencyRecorder) { + delegate.commandLatencyRecorder(latencyRecorder); + return this; + } + + @Override + @Deprecated + public ClientResources.Builder commandLatencyCollectorOptions( + final CommandLatencyCollectorOptions commandLatencyCollectorOptions) { + delegate.commandLatencyCollectorOptions(commandLatencyCollectorOptions); + return this; + } + + @Override + public ClientResources.Builder commandLatencyPublisherOptions( + final EventPublisherOptions commandLatencyPublisherOptions) { + delegate.commandLatencyPublisherOptions(commandLatencyPublisherOptions); + return this; + } + + @Override + public ClientResources.Builder computationThreadPoolSize(final int computationThreadPoolSize) { + delegate.computationThreadPoolSize(computationThreadPoolSize); + return this; + } + + @Override + @Deprecated + public ClientResources.Builder dnsResolver(final DnsResolver dnsResolver) { + delegate.dnsResolver(dnsResolver); + return this; + } + + @Override + public ClientResources.Builder eventBus(final EventBus eventBus) { + delegate.eventBus(eventBus); + return this; + } + + @Override + public ClientResources.Builder eventExecutorGroup(final EventExecutorGroup eventExecutorGroup) { + delegate.eventExecutorGroup(eventExecutorGroup); + return this; + } + + @Override + public ClientResources.Builder eventLoopGroupProvider(final EventLoopGroupProvider eventLoopGroupProvider) { + delegate.eventLoopGroupProvider(eventLoopGroupProvider); + return this; + } + + @Override + public ClientResources.Builder ioThreadPoolSize(final int ioThreadPoolSize) { + delegate.ioThreadPoolSize(ioThreadPoolSize); + return this; + } + + @Override + public ClientResources.Builder nettyCustomizer(final NettyCustomizer nettyCustomizer) { + compositeNettyCustomizer.add(nettyCustomizer); + return this; + } + + @Override + public ClientResources.Builder reconnectDelay(final Delay reconnectDelay) { + delegate.reconnectDelay(reconnectDelay); + return this; + } + + @Override + public ClientResources.Builder reconnectDelay(final Supplier reconnectDelay) { + delegate.reconnectDelay(reconnectDelay); + return this; + } + + @Override + public ClientResources.Builder socketAddressResolver(final SocketAddressResolver socketAddressResolver) { + delegate.socketAddressResolver(socketAddressResolver); + return this; + } + + @Override + public ClientResources.Builder threadFactoryProvider(final ThreadFactoryProvider threadFactoryProvider) { + delegate.threadFactoryProvider(threadFactoryProvider); + return this; + } + + @Override + public ClientResources.Builder timer(final Timer timer) { + delegate.timer(timer); + return this; + } + + @Override + public ClientResources.Builder tracing(final Tracing tracing) { + delegate.tracing(tracing); + return this; + } + + @Override + public ClientResources build() { + return delegate.build(); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java index 61795fe4e..905751a6e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java @@ -87,7 +87,7 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb redisClientResources = ClientResources.builder().build(); final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); circuitBreakerConfig.setWaitDurationInOpenState(Duration.ofMillis(500)); - redisCluster = new ShardFaultTolerantRedisCluster("test-cluster", + redisCluster = new FaultTolerantRedisCluster("test-cluster", redisClientResources.mutate(), getRedisURIs(), timeout, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantPubSubConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantPubSubConnectionTest.java deleted file mode 100644 index f06b808cd..000000000 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantPubSubConnectionTest.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.redis; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.clearInvocations; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import io.github.resilience4j.core.IntervalFunction; -import io.github.resilience4j.retry.Retry; -import io.github.resilience4j.retry.RetryConfig; -import io.lettuce.core.RedisCommandTimeoutException; -import io.lettuce.core.RedisException; -import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; -import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; -import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; -import io.lettuce.core.event.Event; -import io.lettuce.core.event.EventBus; -import io.lettuce.core.resource.ClientResources; -import java.util.Collections; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; -import reactor.core.publisher.Flux; -import reactor.core.scheduler.Schedulers; -import reactor.test.publisher.TestPublisher; - -class ShardFaultTolerantPubSubConnectionTest { - - private StatefulRedisClusterPubSubConnection pubSubConnection; - private RedisClusterPubSubCommands pubSubCommands; - private ShardFaultTolerantPubSubConnection faultTolerantPubSubConnection; - - - @SuppressWarnings("unchecked") - @BeforeEach - public void setUp() { - pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class); - - pubSubCommands = mock(RedisClusterPubSubCommands.class); - - when(pubSubConnection.sync()).thenReturn(pubSubCommands); - - final RetryConfiguration retryConfiguration = new RetryConfiguration(); - retryConfiguration.setMaxAttempts(3); - retryConfiguration.setWaitDuration(10); - - final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig()); - - final RetryConfig resubscribeRetryConfiguration = RetryConfig.custom() - .maxAttempts(Integer.MAX_VALUE) - .intervalFunction(IntervalFunction.ofExponentialBackoff(5)) - .build(); - final Retry resubscribeRetry = Retry.of("test-resubscribe", resubscribeRetryConfiguration); - - faultTolerantPubSubConnection = new ShardFaultTolerantPubSubConnection<>("test", pubSubConnection, - retry, resubscribeRetry, Schedulers.newSingle("test")); - } - - @Test - void testRetry() { - when(pubSubCommands.get(anyString())) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenReturn("value"); - - assertEquals("value", - faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); - - when(pubSubCommands.get(anyString())) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenReturn("value"); - - assertThrows(RedisCommandTimeoutException.class, - () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); - } - - @Nested - class ClusterTopologyChangedEventTest { - - private TestPublisher eventPublisher; - - private Runnable resubscribe; - - private AtomicInteger resubscribeCounter; - private CountDownLatch resubscribeFailure; - private CountDownLatch resubscribeSuccess; - - @BeforeEach - @SuppressWarnings("unchecked") - void setup() { - // ignore inherited stubbing - reset(pubSubConnection); - - eventPublisher = TestPublisher.createCold(); - - final ClientResources clientResources = mock(ClientResources.class); - when(pubSubConnection.getResources()) - .thenReturn(clientResources); - final EventBus eventBus = mock(EventBus.class); - when(clientResources.eventBus()) - .thenReturn(eventBus); - - final Flux eventFlux = Flux.from(eventPublisher); - when(eventBus.get()).thenReturn(eventFlux); - - resubscribeCounter = new AtomicInteger(); - - resubscribe = () -> { - try { - resubscribeCounter.incrementAndGet(); - pubSubConnection.sync().nodes((ignored) -> true); - resubscribeSuccess.countDown(); - } catch (final RuntimeException e) { - resubscribeFailure.countDown(); - throw e; - } - }; - - resubscribeSuccess = new CountDownLatch(1); - resubscribeFailure = new CountDownLatch(1); - } - - @SuppressWarnings("unchecked") - @Test - void testSubscribeToClusterTopologyChangedEvents() throws Exception { - - when(pubSubConnection.sync()) - .thenThrow(new RedisException("Cluster unavailable")); - - eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); - - faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(resubscribe); - - assertTrue(resubscribeFailure.await(1, TimeUnit.SECONDS)); - - // simulate cluster recovery - no more exceptions, run the retry - reset(pubSubConnection); - clearInvocations(pubSubCommands); - when(pubSubConnection.sync()) - .thenReturn(pubSubCommands); - - assertTrue(resubscribeSuccess.await(1, TimeUnit.SECONDS)); - - assertTrue(resubscribeCounter.get() >= 2, String.format("resubscribe called %d times", resubscribeCounter.get())); - verify(pubSubCommands).nodes(any()); - } - - @Test - @SuppressWarnings("unchecked") - void testMultipleEventsWithPendingRetries() throws Exception { - // more complicated scenario: multiple events while retries are pending - - // cluster is down - when(pubSubConnection.sync()) - .thenThrow(new RedisException("Cluster unavailable")); - - // publish multiple topology changed events - eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); - eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); - eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); - eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); - - faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(resubscribe); - - assertTrue(resubscribeFailure.await(1, TimeUnit.SECONDS)); - - // simulate cluster recovery - no more exceptions, run the retry - reset(pubSubConnection); - clearInvocations(pubSubCommands); - when(pubSubConnection.sync()) - .thenReturn(pubSubCommands); - - assertTrue(resubscribeSuccess.await(1, TimeUnit.SECONDS)); - - verify(pubSubCommands, atLeastOnce()).nodes(any()); - } - } - -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantRedisClusterTest.java deleted file mode 100644 index be5714889..000000000 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/ShardFaultTolerantRedisClusterTest.java +++ /dev/null @@ -1,495 +0,0 @@ -/* - * Copyright 2024 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.redis; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.github.resilience4j.circuitbreaker.CallNotPermittedException; -import io.lettuce.core.RedisCommandTimeoutException; -import io.lettuce.core.RedisException; -import io.lettuce.core.RedisURI; -import io.lettuce.core.cluster.models.partitions.ClusterPartitionParser; -import io.lettuce.core.cluster.models.partitions.Partitions; -import io.lettuce.core.cluster.models.partitions.RedisClusterNode; -import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; -import io.lettuce.core.event.EventBus; -import io.lettuce.core.event.EventPublisherOptions; -import io.lettuce.core.metrics.CommandLatencyCollectorOptions; -import io.lettuce.core.metrics.CommandLatencyRecorder; -import io.lettuce.core.resource.ClientResources; -import io.lettuce.core.resource.Delay; -import io.lettuce.core.resource.DnsResolver; -import io.lettuce.core.resource.EventLoopGroupProvider; -import io.lettuce.core.resource.NettyCustomizer; -import io.lettuce.core.resource.SocketAddressResolver; -import io.lettuce.core.resource.ThreadFactoryProvider; -import io.lettuce.core.tracing.Tracing; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.resolver.AddressResolverGroup; -import io.netty.util.Timer; -import io.netty.util.concurrent.EventExecutorGroup; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; -import org.apache.commons.lang3.StringUtils; -import org.jetbrains.annotations.Nullable; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; -import org.whispersystems.textsecuregcm.util.Pair; -import org.whispersystems.textsecuregcm.util.RedisClusterUtil; - -// ThreadMode.SEPARATE_THREAD protects against hangs in the remote Redis calls, as this mode allows the test code to be -// preempted by the timeout check -@Timeout(value = 5, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) -class ShardFaultTolerantRedisClusterTest { - - private static final Duration TIMEOUT = Duration.ofMillis(50); - - private static final RetryConfiguration RETRY_CONFIGURATION = new RetryConfiguration(); - - static { - RETRY_CONFIGURATION.setMaxAttempts(1); - RETRY_CONFIGURATION.setWaitDuration(50); - } - - @RegisterExtension - static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder() - .retryConfiguration(RETRY_CONFIGURATION) - .timeout(TIMEOUT) - .build(); - - private ShardFaultTolerantRedisCluster cluster; - - private static ShardFaultTolerantRedisCluster buildCluster( - @Nullable final CircuitBreakerConfiguration circuitBreakerConfiguration, - final ClientResources.Builder clientResourcesBuilder) { - - return new ShardFaultTolerantRedisCluster("test", clientResourcesBuilder, - RedisClusterExtension.getRedisURIs(), TIMEOUT, - Optional.ofNullable(circuitBreakerConfiguration).orElseGet(CircuitBreakerConfiguration::new), - RETRY_CONFIGURATION); - } - - @AfterEach - void tearDown() { - cluster.shutdown(); - } - - @Test - void testTimeout() { - cluster = buildCluster(null, ClientResources.builder()); - - final ExecutionException asyncException = assertThrows(ExecutionException.class, - () -> cluster.withCluster(connection -> connection.async().blpop(2 * TIMEOUT.toMillis() / 1000d, "key")) - .get()); - - assertInstanceOf(RedisCommandTimeoutException.class, asyncException.getCause()); - - assertThrows(RedisCommandTimeoutException.class, - () -> cluster.withCluster(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, "key"))); - } - - @Test - void testTimeoutCircuitBreaker() throws Exception { - // because we’re using a single key, and blpop involves *Redis* also blocking, the breaker wait duration must be - // longer than the sum of the remote timeouts - final Duration breakerWaitDuration = TIMEOUT.multipliedBy(5); - - final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); - circuitBreakerConfig.setFailureRateThreshold(1); - circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(1); - circuitBreakerConfig.setSlidingWindowSize(1); - circuitBreakerConfig.setWaitDurationInOpenState(breakerWaitDuration); - - cluster = buildCluster(circuitBreakerConfig, ClientResources.builder()); - - final String key = "key"; - - // the first call should time out and open the breaker - assertThrows(RedisCommandTimeoutException.class, - () -> cluster.withCluster(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, key))); - - // the second call gets blocked by the breaker - final RedisException e = assertThrows(RedisException.class, - () -> cluster.withCluster(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, key))); - assertInstanceOf(CallNotPermittedException.class, e.getCause()); - - // wait for breaker to be half-open - Thread.sleep(breakerWaitDuration.toMillis() * 2); - - assertEquals(0, (Long) cluster.withCluster(connection -> connection.sync().llen(key))); - } - - @Test - void testShardUnavailable() { - final TestBreakerManager testBreakerManager = new TestBreakerManager(); - final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); - circuitBreakerConfig.setFailureRateThreshold(1); - circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(2); - circuitBreakerConfig.setSlidingWindowSize(5); - - final ClientResources.Builder builder = CompositeNettyCustomizerClientResourcesBuilder.builder() - .nettyCustomizer(testBreakerManager); - - cluster = buildCluster(circuitBreakerConfig, builder); - - // this test will open the breaker on one shard and check that other shards are still available, - // so we get two nodes and a slot+key on each to test - final Pair nodePair = - cluster.withCluster(connection -> { - Partitions partitions = ClusterPartitionParser.parse(connection.sync().clusterNodes()); - - assertTrue(partitions.size() >= 2); - - return new Pair<>(partitions.getPartition(0), partitions.getPartition(1)); - }); - - final RedisClusterNode unavailableNode = nodePair.first(); - final int unavailableSlot = unavailableNode.getSlots().getFirst(); - final String unavailableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(unavailableSlot)); - - final int availableSlot = nodePair.second().getSlots().getFirst(); - final String availableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(availableSlot)); - - cluster.useCluster(connection -> { - connection.sync().set(unavailableKey, "unavailable"); - connection.sync().set(availableKey, "available"); - - assertEquals("unavailable", connection.sync().get(unavailableKey)); - assertEquals("available", connection.sync().get(availableKey)); - }); - - // shard is now unavailable - testBreakerManager.openBreaker(unavailableNode.getUri()); - final RedisException e = assertThrows(RedisException.class, () -> - cluster.useCluster(connection -> connection.sync().get(unavailableKey))); - assertInstanceOf(CallNotPermittedException.class, e.getCause()); - - // other shard is still available - assertEquals("available", cluster.withCluster(connection -> connection.sync().get(availableKey))); - - // shard is available again - testBreakerManager.closeBreaker(unavailableNode.getUri()); - assertEquals("unavailable", cluster.withCluster(connection -> connection.sync().get(unavailableKey))); - } - - @Test - void testShardUnavailablePubSub() throws Exception { - final TestBreakerManager testBreakerManager = new TestBreakerManager(); - final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); - circuitBreakerConfig.setFailureRateThreshold(1); - circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(2); - circuitBreakerConfig.setSlidingWindowSize(5); - - final ClientResources.Builder builder = CompositeNettyCustomizerClientResourcesBuilder.builder() - .nettyCustomizer(testBreakerManager); - - cluster = buildCluster(circuitBreakerConfig, builder); - - cluster.useCluster( - connection -> connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz")); - - // this test will open the breaker on one shard and check that other shards are still available, - // so we get two nodes and a slot+key on each to test - final Pair nodePair = - cluster.withCluster(connection -> { - Partitions partitions = ClusterPartitionParser.parse(connection.sync().clusterNodes()); - - assertTrue(partitions.size() >= 2); - - return new Pair<>(partitions.getPartition(0), partitions.getPartition(1)); - }); - - final RedisClusterNode unavailableNode = nodePair.first(); - final int unavailableSlot = unavailableNode.getSlots().getFirst(); - final String unavailableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(unavailableSlot)); - - final RedisClusterNode availableNode = nodePair.second(); - final int availableSlot = availableNode.getSlots().getFirst(); - final String availableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(availableSlot)); - - final FaultTolerantPubSubConnection pubSubConnection = cluster.createPubSubConnection(); - - // Keyspace notifications are delivered on a different thread, so we use a CountDownLatch to wait for the - // expected number of notifications to arrive - final AtomicReference countDownLatchRef = new AtomicReference<>(); - - final Map channelMessageCounts = new ConcurrentHashMap<>(); - final String keyspacePrefix = "__keyspace@0__:"; - final RedisClusterPubSubAdapter listener = new RedisClusterPubSubAdapter<>() { - @Override - public void message(final RedisClusterNode node, final String channel, final String message) { - channelMessageCounts.computeIfAbsent(StringUtils.substringAfter(channel, keyspacePrefix), - k -> new AtomicInteger(0)) - .incrementAndGet(); - - countDownLatchRef.get().countDown(); - } - }; - - countDownLatchRef.set(new CountDownLatch(2)); - pubSubConnection.usePubSubConnection(c -> { - c.addListener(listener); - c.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(availableSlot)) - .commands() - .subscribe(keyspacePrefix + availableKey); - c.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(unavailableSlot)) - .commands() - .subscribe(keyspacePrefix + unavailableKey); - }); - - cluster.useCluster(connection -> { - connection.sync().set(availableKey, "ping1"); - connection.sync().set(unavailableKey, "ping1"); - }); - - countDownLatchRef.get().await(); - - assertEquals(1, channelMessageCounts.get(availableKey).get()); - assertEquals(1, channelMessageCounts.get(unavailableKey).get()); - - // shard is now unavailable - testBreakerManager.openBreaker(unavailableNode.getUri()); - - final RedisException e = assertThrows(RedisException.class, () -> - cluster.useCluster(connection -> connection.sync().set(unavailableKey, "ping2"))); - assertInstanceOf(CallNotPermittedException.class, e.getCause()); - assertEquals(1, channelMessageCounts.get(unavailableKey).get()); - assertEquals(1, channelMessageCounts.get(availableKey).get()); - - countDownLatchRef.set(new CountDownLatch(1)); - pubSubConnection.usePubSubConnection(connection -> connection.sync().set(availableKey, "ping2")); - - countDownLatchRef.get().await(); - - assertEquals(1, channelMessageCounts.get(unavailableKey).get()); - assertEquals(2, channelMessageCounts.get(availableKey).get()); - - // shard is available again - testBreakerManager.closeBreaker(unavailableNode.getUri()); - - countDownLatchRef.set(new CountDownLatch(2)); - - cluster.useCluster(connection -> { - connection.sync().set(availableKey, "ping3"); - connection.sync().set(unavailableKey, "ping3"); - }); - - countDownLatchRef.get().await(); - - assertEquals(2, channelMessageCounts.get(unavailableKey).get()); - assertEquals(3, channelMessageCounts.get(availableKey).get()); - } - - @ChannelHandler.Sharable - private static class TestBreakerManager extends ChannelDuplexHandler implements NettyCustomizer { - - private final Map> urisToChannelBreakers = new ConcurrentHashMap<>(); - private final AtomicInteger counter = new AtomicInteger(); - - @Override - public void afterChannelInitialized(Channel channel) { - channel.pipeline().addFirst("TestBreakerManager#" + counter.getAndIncrement(), this); - } - - @Override - public void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, - final SocketAddress localAddress, final ChannelPromise promise) throws Exception { - - super.connect(ctx, remoteAddress, localAddress, promise); - - final LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler channelCircuitBreakerHandler = - ctx.channel().pipeline().get(LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler.class); - - urisToChannelBreakers.computeIfAbsent(getRedisURI(ctx.channel()), ignored -> new HashSet<>()) - .add(channelCircuitBreakerHandler); - } - - private static RedisURI getRedisURI(Channel channel) { - final InetSocketAddress inetAddress = (InetSocketAddress) channel.remoteAddress(); - return RedisURI.create(inetAddress.getHostString(), inetAddress.getPort()); - } - - void openBreaker(final RedisURI redisURI) { - urisToChannelBreakers.get(redisURI).forEach(handler -> handler.breaker.transitionToOpenState()); - } - - void closeBreaker(final RedisURI redisURI) { - urisToChannelBreakers.get(redisURI).forEach(handler -> handler.breaker.transitionToClosedState()); - } - } - - static class CompositeNettyCustomizer implements NettyCustomizer { - - private final List nettyCustomizers = new ArrayList<>(); - - @Override - public void afterBootstrapInitialized(final Bootstrap bootstrap) { - nettyCustomizers.forEach(nc -> nc.afterBootstrapInitialized(bootstrap)); - } - - @Override - public void afterChannelInitialized(final Channel channel) { - nettyCustomizers.forEach(nc -> nc.afterChannelInitialized(channel)); - } - - void add(NettyCustomizer customizer) { - nettyCustomizers.add(customizer); - } - } - - static class CompositeNettyCustomizerClientResourcesBuilder implements ClientResources.Builder { - - private final CompositeNettyCustomizer compositeNettyCustomizer; - private final ClientResources.Builder delegate; - - static CompositeNettyCustomizerClientResourcesBuilder builder() { - return new CompositeNettyCustomizerClientResourcesBuilder(); - } - - private CompositeNettyCustomizerClientResourcesBuilder() { - this.compositeNettyCustomizer = new CompositeNettyCustomizer(); - this.delegate = ClientResources.builder().nettyCustomizer(compositeNettyCustomizer); - } - - - @Override - public ClientResources.Builder addressResolverGroup(final AddressResolverGroup addressResolverGroup) { - delegate.addressResolverGroup(addressResolverGroup); - return this; - } - - @Override - public ClientResources.Builder commandLatencyRecorder(final CommandLatencyRecorder latencyRecorder) { - delegate.commandLatencyRecorder(latencyRecorder); - return this; - } - - @Override - @Deprecated - public ClientResources.Builder commandLatencyCollectorOptions( - final CommandLatencyCollectorOptions commandLatencyCollectorOptions) { - delegate.commandLatencyCollectorOptions(commandLatencyCollectorOptions); - return this; - } - - @Override - public ClientResources.Builder commandLatencyPublisherOptions( - final EventPublisherOptions commandLatencyPublisherOptions) { - delegate.commandLatencyPublisherOptions(commandLatencyPublisherOptions); - return this; - } - - @Override - public ClientResources.Builder computationThreadPoolSize(final int computationThreadPoolSize) { - delegate.computationThreadPoolSize(computationThreadPoolSize); - return this; - } - - @Override - @Deprecated - public ClientResources.Builder dnsResolver(final DnsResolver dnsResolver) { - delegate.dnsResolver(dnsResolver); - return this; - } - - @Override - public ClientResources.Builder eventBus(final EventBus eventBus) { - delegate.eventBus(eventBus); - return this; - } - - @Override - public ClientResources.Builder eventExecutorGroup(final EventExecutorGroup eventExecutorGroup) { - delegate.eventExecutorGroup(eventExecutorGroup); - return this; - } - - @Override - public ClientResources.Builder eventLoopGroupProvider(final EventLoopGroupProvider eventLoopGroupProvider) { - delegate.eventLoopGroupProvider(eventLoopGroupProvider); - return this; - } - - @Override - public ClientResources.Builder ioThreadPoolSize(final int ioThreadPoolSize) { - delegate.ioThreadPoolSize(ioThreadPoolSize); - return this; - } - - @Override - public ClientResources.Builder nettyCustomizer(final NettyCustomizer nettyCustomizer) { - compositeNettyCustomizer.add(nettyCustomizer); - return this; - } - - @Override - public ClientResources.Builder reconnectDelay(final Delay reconnectDelay) { - delegate.reconnectDelay(reconnectDelay); - return this; - } - - @Override - public ClientResources.Builder reconnectDelay(final Supplier reconnectDelay) { - delegate.reconnectDelay(reconnectDelay); - return this; - } - - @Override - public ClientResources.Builder socketAddressResolver(final SocketAddressResolver socketAddressResolver) { - delegate.socketAddressResolver(socketAddressResolver); - return this; - } - - @Override - public ClientResources.Builder threadFactoryProvider(final ThreadFactoryProvider threadFactoryProvider) { - delegate.threadFactoryProvider(threadFactoryProvider); - return this; - } - - @Override - public ClientResources.Builder timer(final Timer timer) { - delegate.timer(timer); - return this; - } - - @Override - public ClientResources.Builder tracing(final Tracing tracing) { - delegate.tracing(tracing); - return this; - } - - @Override - public ClientResources build() { - return delegate.build(); - } - } - -}