Coalesce all Redis clusters to per-shard circuit breakers

This commit is contained in:
Chris Eager 2024-04-12 12:32:16 -05:00 committed by Chris Eager
parent 2046b02bd8
commit b734d58ab7
15 changed files with 720 additions and 1473 deletions

View File

@ -20,6 +20,7 @@ import io.dropwizard.core.server.DefaultServerFactory;
import io.dropwizard.core.setup.Bootstrap;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.jetty.HttpsConnectorFactory;
import io.dropwizard.lifecycle.Managed;
import io.grpc.ServerBuilder;
import io.lettuce.core.metrics.MicrometerCommandLatencyRecorder;
import io.lettuce.core.metrics.MicrometerOptions;
@ -176,10 +177,8 @@ import org.whispersystems.textsecuregcm.push.ProvisioningManager;
import org.whispersystems.textsecuregcm.push.PushLatencyManager;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.redis.ClusterFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ConnectionEventLogger;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ShardFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.registration.RegistrationServiceClient;
import org.whispersystems.textsecuregcm.s3.PolicySigner;
import org.whispersystems.textsecuregcm.s3.PostPolicyGenerator;
@ -414,25 +413,24 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final VerificationSessions verificationSessions = new VerificationSessions(dynamoDbAsyncClient,
config.getDynamoDbTables().getVerificationSessions().getTableName(), clock);
final ClientResources.Builder redisClientResourcesBuilder = ClientResources.builder()
final ClientResources sharedClientResources = ClientResources.builder()
.commandLatencyRecorder(
new MicrometerCommandLatencyRecorder(Metrics.globalRegistry, MicrometerOptions.builder().build()));
final ClientResources redisClientResources = redisClientResourcesBuilder.build();
new MicrometerCommandLatencyRecorder(Metrics.globalRegistry, MicrometerOptions.builder().build()))
.build();
ConnectionEventLogger.logConnectionEvents(sharedClientResources);
ConnectionEventLogger.logConnectionEvents(redisClientResources);
FaultTolerantRedisCluster cacheCluster = new ShardFaultTolerantRedisCluster("main_cache",
config.getCacheClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster messagesCluster = new ClusterFaultTolerantRedisCluster("messages_cluster",
config.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClientResources);
FaultTolerantRedisCluster clientPresenceCluster = new ShardFaultTolerantRedisCluster("client_presence",
config.getClientPresenceClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster metricsCluster = new ShardFaultTolerantRedisCluster("metrics",
config.getMetricsClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster pushSchedulerCluster = new ShardFaultTolerantRedisCluster("push_scheduler",
config.getPushSchedulerCluster(), redisClientResourcesBuilder);
FaultTolerantRedisCluster rateLimitersCluster = new ShardFaultTolerantRedisCluster("rate_limiters",
config.getRateLimitersCluster(), redisClientResourcesBuilder);
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache",
config.getCacheClusterConfiguration(), sharedClientResources.mutate());
FaultTolerantRedisCluster messagesCluster = new FaultTolerantRedisCluster("messages",
config.getMessageCacheConfiguration().getRedisClusterConfiguration(), sharedClientResources.mutate());
FaultTolerantRedisCluster clientPresenceCluster = new FaultTolerantRedisCluster("client_presence",
config.getClientPresenceClusterConfiguration(), sharedClientResources.mutate());
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics",
config.getMetricsClusterConfiguration(), sharedClientResources.mutate());
FaultTolerantRedisCluster pushSchedulerCluster = new FaultTolerantRedisCluster("push_scheduler",
config.getPushSchedulerCluster(), sharedClientResources.mutate());
FaultTolerantRedisCluster rateLimitersCluster = new FaultTolerantRedisCluster("rate_limiters",
config.getRateLimitersCluster(), sharedClientResources.mutate());
final BlockingQueue<Runnable> keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(100_000);
Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(),
@ -598,7 +596,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
RateLimiters rateLimiters = RateLimiters.createAndValidate(config.getLimitsConfiguration(),
dynamicConfigurationManager, rateLimitersCluster);
ProvisioningManager provisioningManager = new ProvisioningManager(config.getPubsubCacheConfiguration().getUri(),
redisClientResources, config.getPubsubCacheConfiguration().getTimeout(),
sharedClientResources, config.getPubsubCacheConfiguration().getTimeout(),
config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
IssuedReceiptsManager issuedReceiptsManager = new IssuedReceiptsManager(
config.getDynamoDbTables().getIssuedReceipts().getTableName(),

View File

@ -1,106 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.redis;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import reactor.core.scheduler.Scheduler;
public class ClusterFaultTolerantPubSubConnection<K, V> implements FaultTolerantPubSubConnection<K, V> {
private static final Logger logger = LoggerFactory.getLogger(ClusterFaultTolerantPubSubConnection.class);
private final String name;
private final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection;
private final CircuitBreaker circuitBreaker;
private final Retry retry;
private final Retry resubscribeRetry;
private final Scheduler topologyChangedEventScheduler;
private final Timer executeTimer;
public ClusterFaultTolerantPubSubConnection(final String name,
final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection, final CircuitBreaker circuitBreaker,
final Retry retry, final Retry resubscribeRetry, final Scheduler topologyChangedEventScheduler) {
this.name = name;
this.pubSubConnection = pubSubConnection;
this.circuitBreaker = circuitBreaker;
this.retry = retry;
this.resubscribeRetry = resubscribeRetry;
this.topologyChangedEventScheduler = topologyChangedEventScheduler;
this.pubSubConnection.setNodeMessagePropagation(true);
this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub");
CircuitBreakerUtil.registerMetrics(circuitBreaker, ClusterFaultTolerantPubSubConnection.class, Tags.empty());
}
@Override
public void usePubSubConnection(final Consumer<StatefulRedisClusterPubSubConnection<K, V>> consumer) {
try {
circuitBreaker.executeCheckedRunnable(
() -> retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection))));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
@Override
public <T> T withPubSubConnection(final Function<StatefulRedisClusterPubSubConnection<K, V>, T> function) {
try {
return circuitBreaker.executeCheckedSupplier(
() -> retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection))));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
@Override
public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) {
usePubSubConnection(connection -> connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribeOn(topologyChangedEventScheduler)
.subscribe(event -> {
logger.info("Got topology change event for {}, resubscribing all keyspace notifications", name);
resubscribeRetry.executeRunnable(() -> {
try {
eventHandler.run();
} catch (final RuntimeException e) {
logger.warn("Resubscribe for {} failed", name, e);
throw e;
}
});
}));
}
}

View File

@ -1,196 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.redis;
import com.google.common.annotations.VisibleForTesting;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator;
import io.github.resilience4j.reactor.retry.RetryOperator;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.lettuce.core.ClientOptions.DisconnectedBehavior;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.resource.ClientResources;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import io.micrometer.core.instrument.Tags;
import org.reactivestreams.Publisher;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
/**
* A fault-tolerant access manager for a Redis cluster. A single circuit breaker protects all cluster
* calls.
*/
public class ClusterFaultTolerantRedisCluster implements FaultTolerantRedisCluster {
private final String name;
private final RedisClusterClient clusterClient;
private final StatefulRedisClusterConnection<String, String> stringConnection;
private final StatefulRedisClusterConnection<byte[], byte[]> binaryConnection;
private final List<StatefulRedisClusterPubSubConnection<?, ?>> pubSubConnections = new ArrayList<>();
private final CircuitBreaker circuitBreaker;
private final Retry retry;
private final Retry topologyChangedEventRetry;
public ClusterFaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration,
final ClientResources clientResources) {
this(name,
RedisClusterClient.create(clientResources,
RedisUriUtil.createRedisUriWithTimeout(clusterConfiguration.getConfigurationUri(),
clusterConfiguration.getTimeout())),
clusterConfiguration.getTimeout(),
clusterConfiguration.getCircuitBreakerConfiguration(),
clusterConfiguration.getRetryConfiguration());
}
@VisibleForTesting
ClusterFaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient,
final Duration commandTimeout,
final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) {
this.name = name;
this.clusterClient = clusterClient;
this.clusterClient.setOptions(ClusterClientOptions.builder()
.disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS)
.validateClusterNodeMembership(false)
.topologyRefreshOptions(ClusterTopologyRefreshOptions.builder()
.enableAllAdaptiveRefreshTriggers()
.build())
// for asynchronous commands
.timeoutOptions(TimeoutOptions.builder()
.fixedTimeout(commandTimeout)
.build())
.publishOnScheduler(true)
.build());
this.stringConnection = clusterClient.connect();
this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE);
this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig());
this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder()
.retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build());
final RetryConfig topologyChangedEventRetryConfig = RetryConfig.custom()
.maxAttempts(Integer.MAX_VALUE)
.intervalFunction(
IntervalFunction.ofExponentialRandomBackoff(Duration.ofSeconds(1), 1.5, Duration.ofSeconds(30)))
.build();
this.topologyChangedEventRetry = Retry.of(name + "-topologyChangedRetry", topologyChangedEventRetryConfig);
CircuitBreakerUtil.registerMetrics(circuitBreaker, FaultTolerantRedisCluster.class, Tags.empty());
CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisCluster.class);
}
@Override
public void shutdown() {
stringConnection.close();
binaryConnection.close();
for (final StatefulRedisClusterPubSubConnection<?, ?> pubSubConnection : pubSubConnections) {
pubSubConnection.close();
}
clusterClient.shutdown();
}
@Override
public String getName() {
return name;
}
@Override
public void useCluster(final Consumer<StatefulRedisClusterConnection<String, String>> consumer) {
useConnection(stringConnection, consumer);
}
@Override
public <T> T withCluster(final Function<StatefulRedisClusterConnection<String, String>, T> function) {
return withConnection(stringConnection, function);
}
@Override
public void useBinaryCluster(final Consumer<StatefulRedisClusterConnection<byte[], byte[]>> consumer) {
useConnection(binaryConnection, consumer);
}
@Override
public <T> T withBinaryCluster(final Function<StatefulRedisClusterConnection<byte[], byte[]>, T> function) {
return withConnection(binaryConnection, function);
}
@Override
public <T> Publisher<T> withBinaryClusterReactive(
final Function<StatefulRedisClusterConnection<byte[], byte[]>, Publisher<T>> function) {
return withConnectionReactive(binaryConnection, function);
}
@Override
public <K, V> void useConnection(final StatefulRedisClusterConnection<K, V> connection,
final Consumer<StatefulRedisClusterConnection<K, V>> consumer) {
try {
circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> consumer.accept(connection)));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
@Override
public <T, K, V> T withConnection(final StatefulRedisClusterConnection<K, V> connection,
final Function<StatefulRedisClusterConnection<K, V>, T> function) {
try {
return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> function.apply(connection)));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
@Override
public <T, K, V> Publisher<T> withConnectionReactive(final StatefulRedisClusterConnection<K, V> connection,
final Function<StatefulRedisClusterConnection<K, V>, Publisher<T>> function) {
return Flux.from(function.apply(connection))
.transformDeferred(RetryOperator.of(retry))
.transformDeferred(CircuitBreakerOperator.of(circuitBreaker));
}
public FaultTolerantPubSubConnection<String, String> createPubSubConnection() {
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub();
pubSubConnections.add(pubSubConnection);
return new ClusterFaultTolerantPubSubConnection<>(name, pubSubConnection, circuitBreaker, retry,
topologyChangedEventRetry,
Schedulers.newSingle(name + "-redisPubSubEvents", true));
}
}

View File

@ -5,15 +5,90 @@
package org.whispersystems.textsecuregcm.redis;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.Scheduler;
public interface FaultTolerantPubSubConnection<K, V> {
public class FaultTolerantPubSubConnection<K, V> {
void usePubSubConnection(Consumer<StatefulRedisClusterPubSubConnection<K, V>> consumer);
private static final Logger logger = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class);
<T> T withPubSubConnection(Function<StatefulRedisClusterPubSubConnection<K, V>, T> function);
void subscribeToClusterTopologyChangedEvents(Runnable eventHandler);
private final String name;
private final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection;
private final Retry retry;
private final Retry resubscribeRetry;
private final Scheduler topologyChangedEventScheduler;
private final Timer executeTimer;
public FaultTolerantPubSubConnection(final String name,
final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection,
final Retry retry, final Retry resubscribeRetry, final Scheduler topologyChangedEventScheduler) {
this.name = name;
this.pubSubConnection = pubSubConnection;
this.retry = retry;
this.resubscribeRetry = resubscribeRetry;
this.topologyChangedEventScheduler = topologyChangedEventScheduler;
this.pubSubConnection.setNodeMessagePropagation(true);
this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub");
}
public void usePubSubConnection(final Consumer<StatefulRedisClusterPubSubConnection<K, V>> consumer) {
try {
retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection)));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
public <T> T withPubSubConnection(final Function<StatefulRedisClusterPubSubConnection<K, V>, T> function) {
try {
return retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection)));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) {
usePubSubConnection(connection -> connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribeOn(topologyChangedEventScheduler)
.subscribe(event -> {
logger.info("Got topology change event for {}, resubscribing all keyspace notifications", name);
resubscribeRetry.executeRunnable(() -> {
try {
eventHandler.run();
} catch (final RuntimeException e) {
logger.warn("Resubscribe for {} failed", name, e);
throw e;
}
});
}));
}
}

View File

@ -5,36 +5,183 @@
package org.whispersystems.textsecuregcm.redis;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.reactor.retry.RetryOperator;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.resource.ClientResources;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public interface FaultTolerantRedisCluster {
/**
* A fault-tolerant access manager for a Redis cluster. Each shard in the cluster has a dedicated circuit breaker.
*
* @see LettuceShardCircuitBreaker
*/
public class FaultTolerantRedisCluster {
void shutdown();
private final String name;
String getName();
private final RedisClusterClient clusterClient;
void useCluster(Consumer<StatefulRedisClusterConnection<String, String>> consumer);
private final StatefulRedisClusterConnection<String, String> stringConnection;
private final StatefulRedisClusterConnection<byte[], byte[]> binaryConnection;
<T> T withCluster(Function<StatefulRedisClusterConnection<String, String>, T> function);
private final List<StatefulRedisClusterPubSubConnection<?, ?>> pubSubConnections = new ArrayList<>();
void useBinaryCluster(Consumer<StatefulRedisClusterConnection<byte[], byte[]>> consumer);
private final Retry retry;
private final Retry topologyChangedEventRetry;
<T> T withBinaryCluster(Function<StatefulRedisClusterConnection<byte[], byte[]>, T> function);
<T> Publisher<T> withBinaryClusterReactive(
Function<StatefulRedisClusterConnection<byte[], byte[]>, Publisher<T>> function);
public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration,
final ClientResources.Builder clientResourcesBuilder) {
<K, V> void useConnection(StatefulRedisClusterConnection<K, V> connection,
Consumer<StatefulRedisClusterConnection<K, V>> consumer);
this(name, clientResourcesBuilder,
Collections.singleton(RedisUriUtil.createRedisUriWithTimeout(clusterConfiguration.getConfigurationUri(),
clusterConfiguration.getTimeout())),
clusterConfiguration.getTimeout(),
clusterConfiguration.getCircuitBreakerConfiguration(),
clusterConfiguration.getRetryConfiguration());
<T, K, V> T withConnection(StatefulRedisClusterConnection<K, V> connection,
Function<StatefulRedisClusterConnection<K, V>, T> function);
}
<T, K, V> Publisher<T> withConnectionReactive(StatefulRedisClusterConnection<K, V> connection,
Function<StatefulRedisClusterConnection<K, V>, Publisher<T>> function);
FaultTolerantRedisCluster(String name, final ClientResources.Builder clientResourcesBuilder,
Iterable<RedisURI> redisUris, Duration commandTimeout, CircuitBreakerConfiguration circuitBreakerConfig,
RetryConfiguration retryConfiguration) {
this.name = name;
this.clusterClient = RedisClusterClient.create(
clientResourcesBuilder.nettyCustomizer(
new LettuceShardCircuitBreaker(name, circuitBreakerConfig.toCircuitBreakerConfig())).
build(),
redisUris);
this.clusterClient.setOptions(ClusterClientOptions.builder()
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
.validateClusterNodeMembership(false)
.topologyRefreshOptions(ClusterTopologyRefreshOptions.builder()
.enableAllAdaptiveRefreshTriggers()
.build())
// for asynchronous commands
.timeoutOptions(TimeoutOptions.builder()
.fixedTimeout(commandTimeout)
.build())
.publishOnScheduler(true)
.build());
this.stringConnection = clusterClient.connect();
this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE);
this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder()
.retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build());
final RetryConfig topologyChangedEventRetryConfig = RetryConfig.custom()
.maxAttempts(Integer.MAX_VALUE)
.intervalFunction(
IntervalFunction.ofExponentialRandomBackoff(Duration.ofSeconds(1), 1.5, Duration.ofSeconds(30)))
.build();
this.topologyChangedEventRetry = Retry.of(name + "-topologyChangedRetry", topologyChangedEventRetryConfig);
CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisCluster.class);
}
public void shutdown() {
stringConnection.close();
binaryConnection.close();
for (final StatefulRedisClusterPubSubConnection<?, ?> pubSubConnection : pubSubConnections) {
pubSubConnection.close();
}
clusterClient.shutdown();
}
public String getName() {
return name;
}
public void useCluster(final Consumer<StatefulRedisClusterConnection<String, String>> consumer) {
useConnection(stringConnection, consumer);
}
public <T> T withCluster(final Function<StatefulRedisClusterConnection<String, String>, T> function) {
return withConnection(stringConnection, function);
}
public void useBinaryCluster(final Consumer<StatefulRedisClusterConnection<byte[], byte[]>> consumer) {
useConnection(binaryConnection, consumer);
}
public <T> T withBinaryCluster(final Function<StatefulRedisClusterConnection<byte[], byte[]>, T> function) {
return withConnection(binaryConnection, function);
}
public <T> Publisher<T> withBinaryClusterReactive(
final Function<StatefulRedisClusterConnection<byte[], byte[]>, Publisher<T>> function) {
return withConnectionReactive(binaryConnection, function);
}
public <K, V> void useConnection(final StatefulRedisClusterConnection<K, V> connection,
final Consumer<StatefulRedisClusterConnection<K, V>> consumer) {
try {
retry.executeRunnable(() -> consumer.accept(connection));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
public <T, K, V> T withConnection(final StatefulRedisClusterConnection<K, V> connection,
final Function<StatefulRedisClusterConnection<K, V>, T> function) {
try {
return retry.executeCallable(() -> function.apply(connection));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
public <T, K, V> Publisher<T> withConnectionReactive(final StatefulRedisClusterConnection<K, V> connection,
final Function<StatefulRedisClusterConnection<K, V>, Publisher<T>> function) {
return Flux.from(function.apply(connection))
.transformDeferred(RetryOperator.of(retry));
}
public FaultTolerantPubSubConnection<String, String> createPubSubConnection() {
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub();
pubSubConnections.add(pubSubConnection);
return new FaultTolerantPubSubConnection<>(name, pubSubConnection, retry, topologyChangedEventRetry,
Schedulers.newSingle(name + "-redisPubSubEvents", true));
}
FaultTolerantPubSubConnection<String, String> createPubSubConnection();
}

View File

@ -1,97 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.redis;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.Scheduler;
public class ShardFaultTolerantPubSubConnection<K, V> implements FaultTolerantPubSubConnection<K, V> {
private static final Logger logger = LoggerFactory.getLogger(ShardFaultTolerantPubSubConnection.class);
private final String name;
private final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection;
private final Retry retry;
private final Retry resubscribeRetry;
private final Scheduler topologyChangedEventScheduler;
private final Timer executeTimer;
public ShardFaultTolerantPubSubConnection(final String name,
final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection,
final Retry retry, final Retry resubscribeRetry, final Scheduler topologyChangedEventScheduler) {
this.name = name;
this.pubSubConnection = pubSubConnection;
this.retry = retry;
this.resubscribeRetry = resubscribeRetry;
this.topologyChangedEventScheduler = topologyChangedEventScheduler;
this.pubSubConnection.setNodeMessagePropagation(true);
this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub");
}
@Override
public void usePubSubConnection(final Consumer<StatefulRedisClusterPubSubConnection<K, V>> consumer) {
try {
retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection)));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
@Override
public <T> T withPubSubConnection(final Function<StatefulRedisClusterPubSubConnection<K, V>, T> function) {
try {
return retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection)));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
@Override
public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) {
usePubSubConnection(connection -> connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribeOn(topologyChangedEventScheduler)
.subscribe(event -> {
logger.info("Got topology change event for {}, resubscribing all keyspace notifications", name);
resubscribeRetry.executeRunnable(() -> {
try {
eventHandler.run();
} catch (final RuntimeException e) {
logger.warn("Resubscribe for {} failed", name, e);
throw e;
}
});
}));
}
}

View File

@ -1,197 +0,0 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.redis;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.reactor.retry.RetryOperator;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.resource.ClientResources;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
/**
* A fault-tolerant access manager for a Redis cluster. Each shard in the cluster has a dedicated circuit breaker.
*
* @see LettuceShardCircuitBreaker
*/
public class ShardFaultTolerantRedisCluster implements FaultTolerantRedisCluster {
private final String name;
private final RedisClusterClient clusterClient;
private final StatefulRedisClusterConnection<String, String> stringConnection;
private final StatefulRedisClusterConnection<byte[], byte[]> binaryConnection;
private final List<StatefulRedisClusterPubSubConnection<?, ?>> pubSubConnections = new ArrayList<>();
private final Retry retry;
private final Retry topologyChangedEventRetry;
public ShardFaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration,
final ClientResources.Builder clientResourcesBuilder) {
this(name, clientResourcesBuilder,
Collections.singleton(RedisUriUtil.createRedisUriWithTimeout(clusterConfiguration.getConfigurationUri(),
clusterConfiguration.getTimeout())),
clusterConfiguration.getTimeout(),
clusterConfiguration.getCircuitBreakerConfiguration(),
clusterConfiguration.getRetryConfiguration());
}
ShardFaultTolerantRedisCluster(String name, final ClientResources.Builder clientResourcesBuilder,
Iterable<RedisURI> redisUris, Duration commandTimeout, CircuitBreakerConfiguration circuitBreakerConfig,
RetryConfiguration retryConfiguration) {
this.name = name;
this.clusterClient = RedisClusterClient.create(
clientResourcesBuilder.nettyCustomizer(
new LettuceShardCircuitBreaker(name, circuitBreakerConfig.toCircuitBreakerConfig())).
build(),
redisUris);
this.clusterClient.setOptions(ClusterClientOptions.builder()
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
.validateClusterNodeMembership(false)
.topologyRefreshOptions(ClusterTopologyRefreshOptions.builder()
.enableAllAdaptiveRefreshTriggers()
.build())
// for asynchronous commands
.timeoutOptions(TimeoutOptions.builder()
.fixedTimeout(commandTimeout)
.build())
.publishOnScheduler(true)
.build());
this.stringConnection = clusterClient.connect();
this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE);
this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder()
.retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build());
final RetryConfig topologyChangedEventRetryConfig = RetryConfig.custom()
.maxAttempts(Integer.MAX_VALUE)
.intervalFunction(
IntervalFunction.ofExponentialRandomBackoff(Duration.ofSeconds(1), 1.5, Duration.ofSeconds(30)))
.build();
this.topologyChangedEventRetry = Retry.of(name + "-topologyChangedRetry", topologyChangedEventRetryConfig);
CircuitBreakerUtil.registerMetrics(retry, ShardFaultTolerantRedisCluster.class);
}
@Override
public void shutdown() {
stringConnection.close();
binaryConnection.close();
for (final StatefulRedisClusterPubSubConnection<?, ?> pubSubConnection : pubSubConnections) {
pubSubConnection.close();
}
clusterClient.shutdown();
}
public String getName() {
return name;
}
@Override
public void useCluster(final Consumer<StatefulRedisClusterConnection<String, String>> consumer) {
useConnection(stringConnection, consumer);
}
@Override
public <T> T withCluster(final Function<StatefulRedisClusterConnection<String, String>, T> function) {
return withConnection(stringConnection, function);
}
@Override
public void useBinaryCluster(final Consumer<StatefulRedisClusterConnection<byte[], byte[]>> consumer) {
useConnection(binaryConnection, consumer);
}
@Override
public <T> T withBinaryCluster(final Function<StatefulRedisClusterConnection<byte[], byte[]>, T> function) {
return withConnection(binaryConnection, function);
}
@Override
public <T> Publisher<T> withBinaryClusterReactive(
final Function<StatefulRedisClusterConnection<byte[], byte[]>, Publisher<T>> function) {
return withConnectionReactive(binaryConnection, function);
}
@Override
public <K, V> void useConnection(final StatefulRedisClusterConnection<K, V> connection,
final Consumer<StatefulRedisClusterConnection<K, V>> consumer) {
try {
retry.executeRunnable(() -> consumer.accept(connection));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
@Override
public <T, K, V> T withConnection(final StatefulRedisClusterConnection<K, V> connection,
final Function<StatefulRedisClusterConnection<K, V>, T> function) {
try {
return retry.executeCallable(() -> function.apply(connection));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
@Override
public <T, K, V> Publisher<T> withConnectionReactive(final StatefulRedisClusterConnection<K, V> connection,
final Function<StatefulRedisClusterConnection<K, V>, Publisher<T>> function) {
return Flux.from(function.apply(connection))
.transformDeferred(RetryOperator.of(retry));
}
@Override
public FaultTolerantPubSubConnection<String, String> createPubSubConnection() {
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub();
pubSubConnections.add(pubSubConnection);
return new ShardFaultTolerantPubSubConnection<>(name, pubSubConnection, retry, topologyChangedEventRetry,
Schedulers.newSingle(name + "-redisPubSubEvents", true));
}
}

View File

@ -29,9 +29,7 @@ import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfigurati
import org.whispersystems.textsecuregcm.controllers.SecureStorageController;
import org.whispersystems.textsecuregcm.controllers.SecureValueRecovery2Controller;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.redis.ClusterFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ShardFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client;
import org.whispersystems.textsecuregcm.storage.Account;
@ -108,9 +106,8 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
dynamicConfigurationManager.start();
final ClientResources.Builder redisClientResourcesBuilder = ClientResources.builder();
final ClientResources redisClusterClientResources = redisClientResourcesBuilder.build();
FaultTolerantRedisCluster cacheCluster = new ShardFaultTolerantRedisCluster("main_cache_cluster",
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster",
configuration.getCacheClusterConfiguration(), redisClientResourcesBuilder);
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
@ -176,14 +173,14 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
configuration.getDynamoDbTables().getMessages().getTableName(),
configuration.getDynamoDbTables().getMessages().getExpiration(),
messageDeletionExecutor);
FaultTolerantRedisCluster messageInsertCacheCluster = new ClusterFaultTolerantRedisCluster("message_insert_cluster",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messageReadDeleteCluster = new ClusterFaultTolerantRedisCluster(
FaultTolerantRedisCluster messageInsertCacheCluster = new FaultTolerantRedisCluster("message_insert_cluster",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster messageReadDeleteCluster = new FaultTolerantRedisCluster(
"message_read_delete_cluster",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster clientPresenceCluster = new ShardFaultTolerantRedisCluster("client_presence",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster clientPresenceCluster = new FaultTolerantRedisCluster("client_presence",
configuration.getClientPresenceClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster rateLimitersCluster = new ShardFaultTolerantRedisCluster("rate_limiters",
FaultTolerantRedisCluster rateLimitersCluster = new FaultTolerantRedisCluster("rate_limiters",
configuration.getRateLimitersCluster(), redisClientResourcesBuilder);
SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client(
secureValueRecoveryCredentialsGenerator, secureValueRecoveryExecutor, secureValueRecoveryServiceRetryExecutor,

View File

@ -31,9 +31,7 @@ import org.whispersystems.textsecuregcm.controllers.SecureValueRecovery2Controll
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.redis.ClusterFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ShardFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client;
import org.whispersystems.textsecuregcm.storage.AccountLockManager;
@ -93,9 +91,8 @@ record CommandDependencies(
MetricsUtil.configureRegistries(configuration, environment, dynamicConfigurationManager);
final ClientResources.Builder redisClientResourcesBuilder = ClientResources.builder();
ClientResources redisClusterClientResources = redisClientResourcesBuilder.build();
FaultTolerantRedisCluster cacheCluster = new ShardFaultTolerantRedisCluster("main_cache",
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache",
configuration.getCacheClusterConfiguration(), redisClientResourcesBuilder);
ScheduledExecutorService recurringJobExecutor = environment.lifecycle()
@ -166,14 +163,14 @@ record CommandDependencies(
configuration.getDynamoDbTables().getMessages().getTableName(),
configuration.getDynamoDbTables().getMessages().getExpiration(),
messageDeletionExecutor);
FaultTolerantRedisCluster messageInsertCacheCluster = new ClusterFaultTolerantRedisCluster("message_insert_cluster",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messageReadDeleteCluster = new ClusterFaultTolerantRedisCluster(
FaultTolerantRedisCluster messageInsertCacheCluster = new FaultTolerantRedisCluster("message_insert_cluster",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster messageReadDeleteCluster = new FaultTolerantRedisCluster(
"message_read_delete_cluster",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster clientPresenceCluster = new ShardFaultTolerantRedisCluster("client_presence",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster clientPresenceCluster = new FaultTolerantRedisCluster("client_presence",
configuration.getClientPresenceClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster rateLimitersCluster = new ShardFaultTolerantRedisCluster("rate_limiters",
FaultTolerantRedisCluster rateLimitersCluster = new FaultTolerantRedisCluster("rate_limiters",
configuration.getRateLimitersCluster(), redisClientResourcesBuilder);
SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client(
secureValueRecoveryCredentialsGenerator, secureValueRecoveryServiceExecutor,

View File

@ -20,7 +20,6 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.push.APNSender;
import org.whispersystems.textsecuregcm.push.ApnPushNotificationScheduler;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ShardFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
public class ScheduledApnPushNotificationSenderServiceCommand extends ServerCommand<WhisperServerConfiguration> {
@ -64,7 +63,7 @@ public class ScheduledApnPushNotificationSenderServiceCommand extends ServerComm
});
}
final FaultTolerantRedisCluster pushSchedulerCluster = new ShardFaultTolerantRedisCluster("push_scheduler",
final FaultTolerantRedisCluster pushSchedulerCluster = new FaultTolerantRedisCluster("push_scheduler",
configuration.getPushSchedulerCluster(), deps.redisClusterClientResourcesBuilder());
final ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d"))

View File

@ -17,8 +17,6 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
@ -30,7 +28,6 @@ import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands;
import io.lettuce.core.event.Event;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.resource.ClientResources;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -38,7 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@ -60,17 +56,10 @@ class FaultTolerantPubSubConnectionTest {
when(pubSubConnection.sync()).thenReturn(pubSubCommands);
final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration();
breakerConfiguration.setFailureRateThreshold(100);
breakerConfiguration.setSlidingWindowSize(1);
breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1);
breakerConfiguration.setWaitDurationInOpenState(Duration.ofSeconds(Integer.MAX_VALUE));
final RetryConfiguration retryConfiguration = new RetryConfiguration();
retryConfiguration.setMaxAttempts(3);
retryConfiguration.setWaitDuration(10);
final CircuitBreaker circuitBreaker = CircuitBreaker.of("test", breakerConfiguration.toCircuitBreakerConfig());
final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig());
final RetryConfig resubscribeRetryConfiguration = RetryConfig.custom()
@ -79,28 +68,10 @@ class FaultTolerantPubSubConnectionTest {
.build();
final Retry resubscribeRetry = Retry.of("test-resubscribe", resubscribeRetryConfiguration);
faultTolerantPubSubConnection = new ClusterFaultTolerantPubSubConnection<>("test", pubSubConnection, circuitBreaker,
faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection,
retry, resubscribeRetry, Schedulers.newSingle("test"));
}
@Test
void testBreaker() {
when(pubSubCommands.get(anyString()))
.thenReturn("value")
.thenThrow(new RuntimeException("Badness has ensued."));
assertEquals("value",
faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key")));
assertThrows(RedisException.class,
() -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO")));
final RedisException redisException = assertThrows(RedisException.class,
() -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO")));
assertTrue(redisException.getCause() instanceof CallNotPermittedException);
}
@Test
void testRetry() {
when(pubSubCommands.get(anyString()))

View File

@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
@ -8,136 +8,487 @@ package org.whispersystems.textsecuregcm.redis;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.models.partitions.ClusterPartitionParser;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.EventPublisherOptions;
import io.lettuce.core.metrics.CommandLatencyCollectorOptions;
import io.lettuce.core.metrics.CommandLatencyRecorder;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.Delay;
import io.lettuce.core.resource.DnsResolver;
import io.lettuce.core.resource.EventLoopGroupProvider;
import io.lettuce.core.resource.NettyCustomizer;
import io.lettuce.core.resource.SocketAddressResolver;
import io.lettuce.core.resource.ThreadFactoryProvider;
import io.lettuce.core.tracing.Tracing;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import reactor.core.publisher.Flux;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
// ThreadMode.SEPARATE_THREAD protects against hangs in the remote Redis calls, as this mode allows the test code to be
// preempted by the timeout check
@Timeout(value = 5, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
class FaultTolerantRedisClusterTest {
private RedisAdvancedClusterCommands<String, String> clusterCommands;
private FaultTolerantRedisCluster faultTolerantCluster;
private static final Duration TIMEOUT = Duration.ofMillis(50);
@SuppressWarnings("unchecked")
@BeforeEach
public void setUp() {
final RedisClusterClient clusterClient = mock(RedisClusterClient.class);
final StatefulRedisClusterConnection<String, String> clusterConnection = mock(StatefulRedisClusterConnection.class);
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = mock(
StatefulRedisClusterPubSubConnection.class);
final ClientResources clientResources = mock(ClientResources.class);
final EventBus eventBus = mock(EventBus.class);
private static final RetryConfiguration RETRY_CONFIGURATION = new RetryConfiguration();
clusterCommands = mock(RedisAdvancedClusterCommands.class);
static {
RETRY_CONFIGURATION.setMaxAttempts(1);
RETRY_CONFIGURATION.setWaitDuration(50);
}
when(clusterClient.connect()).thenReturn(clusterConnection);
when(clusterClient.connectPubSub()).thenReturn(pubSubConnection);
when(clusterClient.getResources()).thenReturn(clientResources);
when(clusterConnection.sync()).thenReturn(clusterCommands);
when(clientResources.eventBus()).thenReturn(eventBus);
when(eventBus.get()).thenReturn(mock(Flux.class));
@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder()
.retryConfiguration(RETRY_CONFIGURATION)
.timeout(TIMEOUT)
.build();
final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration();
breakerConfiguration.setFailureRateThreshold(100);
breakerConfiguration.setSlidingWindowSize(1);
breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1);
breakerConfiguration.setWaitDurationInOpenState(Duration.ofSeconds(Integer.MAX_VALUE));
private FaultTolerantRedisCluster cluster;
final RetryConfiguration retryConfiguration = new RetryConfiguration();
retryConfiguration.setMaxAttempts(3);
retryConfiguration.setWaitDuration(0);
private static FaultTolerantRedisCluster buildCluster(
@Nullable final CircuitBreakerConfiguration circuitBreakerConfiguration,
final ClientResources.Builder clientResourcesBuilder) {
faultTolerantCluster = new ClusterFaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2),
breakerConfiguration, retryConfiguration);
return new FaultTolerantRedisCluster("test", clientResourcesBuilder,
RedisClusterExtension.getRedisURIs(), TIMEOUT,
Optional.ofNullable(circuitBreakerConfiguration).orElseGet(CircuitBreakerConfiguration::new),
RETRY_CONFIGURATION);
}
@AfterEach
void tearDown() {
cluster.shutdown();
}
@Test
void testBreaker() {
when(clusterCommands.get(anyString()))
.thenReturn("value")
.thenThrow(new RuntimeException("Badness has ensued."));
void testTimeout() {
cluster = buildCluster(null, ClientResources.builder());
assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
final ExecutionException asyncException = assertThrows(ExecutionException.class,
() -> cluster.withCluster(connection -> connection.async().blpop(2 * TIMEOUT.toMillis() / 1000d, "key"))
.get());
assertThrows(RedisException.class,
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));
final RedisException redisException = assertThrows(RedisException.class,
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));
assertInstanceOf(CallNotPermittedException.class, redisException.getCause());
}
@Test
void testRetry() {
when(clusterCommands.get(anyString()))
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenReturn("value");
assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
when(clusterCommands.get(anyString()))
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenReturn("value");
assertInstanceOf(RedisCommandTimeoutException.class, asyncException.getCause());
assertThrows(RedisCommandTimeoutException.class,
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
() -> cluster.withCluster(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, "key")));
}
@Nested
class WithRealCluster {
@Test
void testTimeoutCircuitBreaker() throws Exception {
// because were using a single key, and blpop involves *Redis* also blocking, the breaker wait duration must be
// longer than the sum of the remote timeouts
final Duration breakerWaitDuration = TIMEOUT.multipliedBy(5);
private static final Duration TIMEOUT = Duration.ofMillis(50);
final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration();
circuitBreakerConfig.setFailureRateThreshold(1);
circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(1);
circuitBreakerConfig.setSlidingWindowSize(1);
circuitBreakerConfig.setWaitDurationInOpenState(breakerWaitDuration);
private static final RetryConfiguration retryConfiguration = new RetryConfiguration();
cluster = buildCluster(circuitBreakerConfig, ClientResources.builder());
static {
retryConfiguration.setMaxAttempts(1);
retryConfiguration.setWaitDuration(50);
final String key = "key";
// the first call should time out and open the breaker
assertThrows(RedisCommandTimeoutException.class,
() -> cluster.withCluster(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, key)));
// the second call gets blocked by the breaker
final RedisException e = assertThrows(RedisException.class,
() -> cluster.withCluster(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, key)));
assertInstanceOf(CallNotPermittedException.class, e.getCause());
// wait for breaker to be half-open
Thread.sleep(breakerWaitDuration.toMillis() * 2);
assertEquals(0, (Long) cluster.withCluster(connection -> connection.sync().llen(key)));
}
@Test
void testShardUnavailable() {
final TestBreakerManager testBreakerManager = new TestBreakerManager();
final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration();
circuitBreakerConfig.setFailureRateThreshold(1);
circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(2);
circuitBreakerConfig.setSlidingWindowSize(5);
final ClientResources.Builder builder = CompositeNettyCustomizerClientResourcesBuilder.builder()
.nettyCustomizer(testBreakerManager);
cluster = buildCluster(circuitBreakerConfig, builder);
// this test will open the breaker on one shard and check that other shards are still available,
// so we get two nodes and a slot+key on each to test
final Pair<RedisClusterNode, RedisClusterNode> nodePair =
cluster.withCluster(connection -> {
Partitions partitions = ClusterPartitionParser.parse(connection.sync().clusterNodes());
assertTrue(partitions.size() >= 2);
return new Pair<>(partitions.getPartition(0), partitions.getPartition(1));
});
final RedisClusterNode unavailableNode = nodePair.first();
final int unavailableSlot = unavailableNode.getSlots().getFirst();
final String unavailableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(unavailableSlot));
final int availableSlot = nodePair.second().getSlots().getFirst();
final String availableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(availableSlot));
cluster.useCluster(connection -> {
connection.sync().set(unavailableKey, "unavailable");
connection.sync().set(availableKey, "available");
assertEquals("unavailable", connection.sync().get(unavailableKey));
assertEquals("available", connection.sync().get(availableKey));
});
// shard is now unavailable
testBreakerManager.openBreaker(unavailableNode.getUri());
final RedisException e = assertThrows(RedisException.class, () ->
cluster.useCluster(connection -> connection.sync().get(unavailableKey)));
assertInstanceOf(CallNotPermittedException.class, e.getCause());
// other shard is still available
assertEquals("available", cluster.withCluster(connection -> connection.sync().get(availableKey)));
// shard is available again
testBreakerManager.closeBreaker(unavailableNode.getUri());
assertEquals("unavailable", cluster.withCluster(connection -> connection.sync().get(unavailableKey)));
}
@Test
void testShardUnavailablePubSub() throws Exception {
final TestBreakerManager testBreakerManager = new TestBreakerManager();
final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration();
circuitBreakerConfig.setFailureRateThreshold(1);
circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(2);
circuitBreakerConfig.setSlidingWindowSize(5);
final ClientResources.Builder builder = CompositeNettyCustomizerClientResourcesBuilder.builder()
.nettyCustomizer(testBreakerManager);
cluster = buildCluster(circuitBreakerConfig, builder);
cluster.useCluster(
connection -> connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz"));
// this test will open the breaker on one shard and check that other shards are still available,
// so we get two nodes and a slot+key on each to test
final Pair<RedisClusterNode, RedisClusterNode> nodePair =
cluster.withCluster(connection -> {
Partitions partitions = ClusterPartitionParser.parse(connection.sync().clusterNodes());
assertTrue(partitions.size() >= 2);
return new Pair<>(partitions.getPartition(0), partitions.getPartition(1));
});
final RedisClusterNode unavailableNode = nodePair.first();
final int unavailableSlot = unavailableNode.getSlots().getFirst();
final String unavailableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(unavailableSlot));
final RedisClusterNode availableNode = nodePair.second();
final int availableSlot = availableNode.getSlots().getFirst();
final String availableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(availableSlot));
final FaultTolerantPubSubConnection<String, String> pubSubConnection = cluster.createPubSubConnection();
// Keyspace notifications are delivered on a different thread, so we use a CountDownLatch to wait for the
// expected number of notifications to arrive
final AtomicReference<CountDownLatch> countDownLatchRef = new AtomicReference<>();
final Map<String, AtomicInteger> channelMessageCounts = new ConcurrentHashMap<>();
final String keyspacePrefix = "__keyspace@0__:";
final RedisClusterPubSubAdapter<String, String> listener = new RedisClusterPubSubAdapter<>() {
@Override
public void message(final RedisClusterNode node, final String channel, final String message) {
channelMessageCounts.computeIfAbsent(StringUtils.substringAfter(channel, keyspacePrefix),
k -> new AtomicInteger(0))
.incrementAndGet();
countDownLatchRef.get().countDown();
}
};
countDownLatchRef.set(new CountDownLatch(2));
pubSubConnection.usePubSubConnection(c -> {
c.addListener(listener);
c.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(availableSlot))
.commands()
.subscribe(keyspacePrefix + availableKey);
c.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(unavailableSlot))
.commands()
.subscribe(keyspacePrefix + unavailableKey);
});
cluster.useCluster(connection -> {
connection.sync().set(availableKey, "ping1");
connection.sync().set(unavailableKey, "ping1");
});
countDownLatchRef.get().await();
assertEquals(1, channelMessageCounts.get(availableKey).get());
assertEquals(1, channelMessageCounts.get(unavailableKey).get());
// shard is now unavailable
testBreakerManager.openBreaker(unavailableNode.getUri());
final RedisException e = assertThrows(RedisException.class, () ->
cluster.useCluster(connection -> connection.sync().set(unavailableKey, "ping2")));
assertInstanceOf(CallNotPermittedException.class, e.getCause());
assertEquals(1, channelMessageCounts.get(unavailableKey).get());
assertEquals(1, channelMessageCounts.get(availableKey).get());
countDownLatchRef.set(new CountDownLatch(1));
pubSubConnection.usePubSubConnection(connection -> connection.sync().set(availableKey, "ping2"));
countDownLatchRef.get().await();
assertEquals(1, channelMessageCounts.get(unavailableKey).get());
assertEquals(2, channelMessageCounts.get(availableKey).get());
// shard is available again
testBreakerManager.closeBreaker(unavailableNode.getUri());
countDownLatchRef.set(new CountDownLatch(2));
cluster.useCluster(connection -> {
connection.sync().set(availableKey, "ping3");
connection.sync().set(unavailableKey, "ping3");
});
countDownLatchRef.get().await();
assertEquals(2, channelMessageCounts.get(unavailableKey).get());
assertEquals(3, channelMessageCounts.get(availableKey).get());
}
@ChannelHandler.Sharable
private static class TestBreakerManager extends ChannelDuplexHandler implements NettyCustomizer {
private final Map<RedisURI, Set<LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler>> urisToChannelBreakers = new ConcurrentHashMap<>();
private final AtomicInteger counter = new AtomicInteger();
@Override
public void afterChannelInitialized(Channel channel) {
channel.pipeline().addFirst("TestBreakerManager#" + counter.getAndIncrement(), this);
}
@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder()
.retryConfiguration(retryConfiguration)
.timeout(TIMEOUT)
.build();
@Override
public void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
@Test
void testTimeout() {
final FaultTolerantRedisCluster cluster = REDIS_CLUSTER_EXTENSION.getRedisCluster();
super.connect(ctx, remoteAddress, localAddress, promise);
assertTimeoutPreemptively(Duration.ofSeconds(1), () -> {
final ExecutionException asyncException = assertThrows(ExecutionException.class,
() -> cluster.withCluster(connection -> connection.async().blpop(TIMEOUT.toMillis() * 2, "key")).get());
assertInstanceOf(RedisCommandTimeoutException.class, asyncException.getCause());
final LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler channelCircuitBreakerHandler =
ctx.channel().pipeline().get(LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler.class);
assertThrows(RedisCommandTimeoutException.class,
() -> cluster.withCluster(connection -> connection.sync().blpop(TIMEOUT.toMillis() * 2, "key")));
});
urisToChannelBreakers.computeIfAbsent(getRedisURI(ctx.channel()), ignored -> new HashSet<>())
.add(channelCircuitBreakerHandler);
}
private static RedisURI getRedisURI(Channel channel) {
final InetSocketAddress inetAddress = (InetSocketAddress) channel.remoteAddress();
return RedisURI.create(inetAddress.getHostString(), inetAddress.getPort());
}
void openBreaker(final RedisURI redisURI) {
urisToChannelBreakers.get(redisURI).forEach(handler -> handler.breaker.transitionToOpenState());
}
void closeBreaker(final RedisURI redisURI) {
urisToChannelBreakers.get(redisURI).forEach(handler -> handler.breaker.transitionToClosedState());
}
}
static class CompositeNettyCustomizer implements NettyCustomizer {
private final List<NettyCustomizer> nettyCustomizers = new ArrayList<>();
@Override
public void afterBootstrapInitialized(final Bootstrap bootstrap) {
nettyCustomizers.forEach(nc -> nc.afterBootstrapInitialized(bootstrap));
}
@Override
public void afterChannelInitialized(final Channel channel) {
nettyCustomizers.forEach(nc -> nc.afterChannelInitialized(channel));
}
void add(NettyCustomizer customizer) {
nettyCustomizers.add(customizer);
}
}
static class CompositeNettyCustomizerClientResourcesBuilder implements ClientResources.Builder {
private final CompositeNettyCustomizer compositeNettyCustomizer;
private final ClientResources.Builder delegate;
static CompositeNettyCustomizerClientResourcesBuilder builder() {
return new CompositeNettyCustomizerClientResourcesBuilder();
}
private CompositeNettyCustomizerClientResourcesBuilder() {
this.compositeNettyCustomizer = new CompositeNettyCustomizer();
this.delegate = ClientResources.builder().nettyCustomizer(compositeNettyCustomizer);
}
@Override
public ClientResources.Builder addressResolverGroup(final AddressResolverGroup<?> addressResolverGroup) {
delegate.addressResolverGroup(addressResolverGroup);
return this;
}
@Override
public ClientResources.Builder commandLatencyRecorder(final CommandLatencyRecorder latencyRecorder) {
delegate.commandLatencyRecorder(latencyRecorder);
return this;
}
@Override
@Deprecated
public ClientResources.Builder commandLatencyCollectorOptions(
final CommandLatencyCollectorOptions commandLatencyCollectorOptions) {
delegate.commandLatencyCollectorOptions(commandLatencyCollectorOptions);
return this;
}
@Override
public ClientResources.Builder commandLatencyPublisherOptions(
final EventPublisherOptions commandLatencyPublisherOptions) {
delegate.commandLatencyPublisherOptions(commandLatencyPublisherOptions);
return this;
}
@Override
public ClientResources.Builder computationThreadPoolSize(final int computationThreadPoolSize) {
delegate.computationThreadPoolSize(computationThreadPoolSize);
return this;
}
@Override
@Deprecated
public ClientResources.Builder dnsResolver(final DnsResolver dnsResolver) {
delegate.dnsResolver(dnsResolver);
return this;
}
@Override
public ClientResources.Builder eventBus(final EventBus eventBus) {
delegate.eventBus(eventBus);
return this;
}
@Override
public ClientResources.Builder eventExecutorGroup(final EventExecutorGroup eventExecutorGroup) {
delegate.eventExecutorGroup(eventExecutorGroup);
return this;
}
@Override
public ClientResources.Builder eventLoopGroupProvider(final EventLoopGroupProvider eventLoopGroupProvider) {
delegate.eventLoopGroupProvider(eventLoopGroupProvider);
return this;
}
@Override
public ClientResources.Builder ioThreadPoolSize(final int ioThreadPoolSize) {
delegate.ioThreadPoolSize(ioThreadPoolSize);
return this;
}
@Override
public ClientResources.Builder nettyCustomizer(final NettyCustomizer nettyCustomizer) {
compositeNettyCustomizer.add(nettyCustomizer);
return this;
}
@Override
public ClientResources.Builder reconnectDelay(final Delay reconnectDelay) {
delegate.reconnectDelay(reconnectDelay);
return this;
}
@Override
public ClientResources.Builder reconnectDelay(final Supplier<Delay> reconnectDelay) {
delegate.reconnectDelay(reconnectDelay);
return this;
}
@Override
public ClientResources.Builder socketAddressResolver(final SocketAddressResolver socketAddressResolver) {
delegate.socketAddressResolver(socketAddressResolver);
return this;
}
@Override
public ClientResources.Builder threadFactoryProvider(final ThreadFactoryProvider threadFactoryProvider) {
delegate.threadFactoryProvider(threadFactoryProvider);
return this;
}
@Override
public ClientResources.Builder timer(final Timer timer) {
delegate.timer(timer);
return this;
}
@Override
public ClientResources.Builder tracing(final Tracing tracing) {
delegate.tracing(tracing);
return this;
}
@Override
public ClientResources build() {
return delegate.build();
}
}

View File

@ -87,7 +87,7 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb
redisClientResources = ClientResources.builder().build();
final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration();
circuitBreakerConfig.setWaitDurationInOpenState(Duration.ofMillis(500));
redisCluster = new ShardFaultTolerantRedisCluster("test-cluster",
redisCluster = new FaultTolerantRedisCluster("test-cluster",
redisClientResources.mutate(),
getRedisURIs(),
timeout,

View File

@ -1,197 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.redis;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands;
import io.lettuce.core.event.Event;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.resource.ClientResources;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.publisher.TestPublisher;
class ShardFaultTolerantPubSubConnectionTest {
private StatefulRedisClusterPubSubConnection<String, String> pubSubConnection;
private RedisClusterPubSubCommands<String, String> pubSubCommands;
private ShardFaultTolerantPubSubConnection<String, String> faultTolerantPubSubConnection;
@SuppressWarnings("unchecked")
@BeforeEach
public void setUp() {
pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class);
pubSubCommands = mock(RedisClusterPubSubCommands.class);
when(pubSubConnection.sync()).thenReturn(pubSubCommands);
final RetryConfiguration retryConfiguration = new RetryConfiguration();
retryConfiguration.setMaxAttempts(3);
retryConfiguration.setWaitDuration(10);
final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig());
final RetryConfig resubscribeRetryConfiguration = RetryConfig.custom()
.maxAttempts(Integer.MAX_VALUE)
.intervalFunction(IntervalFunction.ofExponentialBackoff(5))
.build();
final Retry resubscribeRetry = Retry.of("test-resubscribe", resubscribeRetryConfiguration);
faultTolerantPubSubConnection = new ShardFaultTolerantPubSubConnection<>("test", pubSubConnection,
retry, resubscribeRetry, Schedulers.newSingle("test"));
}
@Test
void testRetry() {
when(pubSubCommands.get(anyString()))
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenReturn("value");
assertEquals("value",
faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key")));
when(pubSubCommands.get(anyString()))
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenReturn("value");
assertThrows(RedisCommandTimeoutException.class,
() -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key")));
}
@Nested
class ClusterTopologyChangedEventTest {
private TestPublisher<Event> eventPublisher;
private Runnable resubscribe;
private AtomicInteger resubscribeCounter;
private CountDownLatch resubscribeFailure;
private CountDownLatch resubscribeSuccess;
@BeforeEach
@SuppressWarnings("unchecked")
void setup() {
// ignore inherited stubbing
reset(pubSubConnection);
eventPublisher = TestPublisher.createCold();
final ClientResources clientResources = mock(ClientResources.class);
when(pubSubConnection.getResources())
.thenReturn(clientResources);
final EventBus eventBus = mock(EventBus.class);
when(clientResources.eventBus())
.thenReturn(eventBus);
final Flux<Event> eventFlux = Flux.from(eventPublisher);
when(eventBus.get()).thenReturn(eventFlux);
resubscribeCounter = new AtomicInteger();
resubscribe = () -> {
try {
resubscribeCounter.incrementAndGet();
pubSubConnection.sync().nodes((ignored) -> true);
resubscribeSuccess.countDown();
} catch (final RuntimeException e) {
resubscribeFailure.countDown();
throw e;
}
};
resubscribeSuccess = new CountDownLatch(1);
resubscribeFailure = new CountDownLatch(1);
}
@SuppressWarnings("unchecked")
@Test
void testSubscribeToClusterTopologyChangedEvents() throws Exception {
when(pubSubConnection.sync())
.thenThrow(new RedisException("Cluster unavailable"));
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList()));
faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(resubscribe);
assertTrue(resubscribeFailure.await(1, TimeUnit.SECONDS));
// simulate cluster recovery - no more exceptions, run the retry
reset(pubSubConnection);
clearInvocations(pubSubCommands);
when(pubSubConnection.sync())
.thenReturn(pubSubCommands);
assertTrue(resubscribeSuccess.await(1, TimeUnit.SECONDS));
assertTrue(resubscribeCounter.get() >= 2, String.format("resubscribe called %d times", resubscribeCounter.get()));
verify(pubSubCommands).nodes(any());
}
@Test
@SuppressWarnings("unchecked")
void testMultipleEventsWithPendingRetries() throws Exception {
// more complicated scenario: multiple events while retries are pending
// cluster is down
when(pubSubConnection.sync())
.thenThrow(new RedisException("Cluster unavailable"));
// publish multiple topology changed events
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList()));
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList()));
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList()));
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList()));
faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(resubscribe);
assertTrue(resubscribeFailure.await(1, TimeUnit.SECONDS));
// simulate cluster recovery - no more exceptions, run the retry
reset(pubSubConnection);
clearInvocations(pubSubCommands);
when(pubSubConnection.sync())
.thenReturn(pubSubCommands);
assertTrue(resubscribeSuccess.await(1, TimeUnit.SECONDS));
verify(pubSubCommands, atLeastOnce()).nodes(any());
}
}
}

View File

@ -1,495 +0,0 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.redis;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.models.partitions.ClusterPartitionParser;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.EventPublisherOptions;
import io.lettuce.core.metrics.CommandLatencyCollectorOptions;
import io.lettuce.core.metrics.CommandLatencyRecorder;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.Delay;
import io.lettuce.core.resource.DnsResolver;
import io.lettuce.core.resource.EventLoopGroupProvider;
import io.lettuce.core.resource.NettyCustomizer;
import io.lettuce.core.resource.SocketAddressResolver;
import io.lettuce.core.resource.ThreadFactoryProvider;
import io.lettuce.core.tracing.Tracing;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
// ThreadMode.SEPARATE_THREAD protects against hangs in the remote Redis calls, as this mode allows the test code to be
// preempted by the timeout check
@Timeout(value = 5, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
class ShardFaultTolerantRedisClusterTest {
private static final Duration TIMEOUT = Duration.ofMillis(50);
private static final RetryConfiguration RETRY_CONFIGURATION = new RetryConfiguration();
static {
RETRY_CONFIGURATION.setMaxAttempts(1);
RETRY_CONFIGURATION.setWaitDuration(50);
}
@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder()
.retryConfiguration(RETRY_CONFIGURATION)
.timeout(TIMEOUT)
.build();
private ShardFaultTolerantRedisCluster cluster;
private static ShardFaultTolerantRedisCluster buildCluster(
@Nullable final CircuitBreakerConfiguration circuitBreakerConfiguration,
final ClientResources.Builder clientResourcesBuilder) {
return new ShardFaultTolerantRedisCluster("test", clientResourcesBuilder,
RedisClusterExtension.getRedisURIs(), TIMEOUT,
Optional.ofNullable(circuitBreakerConfiguration).orElseGet(CircuitBreakerConfiguration::new),
RETRY_CONFIGURATION);
}
@AfterEach
void tearDown() {
cluster.shutdown();
}
@Test
void testTimeout() {
cluster = buildCluster(null, ClientResources.builder());
final ExecutionException asyncException = assertThrows(ExecutionException.class,
() -> cluster.withCluster(connection -> connection.async().blpop(2 * TIMEOUT.toMillis() / 1000d, "key"))
.get());
assertInstanceOf(RedisCommandTimeoutException.class, asyncException.getCause());
assertThrows(RedisCommandTimeoutException.class,
() -> cluster.withCluster(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, "key")));
}
@Test
void testTimeoutCircuitBreaker() throws Exception {
// because were using a single key, and blpop involves *Redis* also blocking, the breaker wait duration must be
// longer than the sum of the remote timeouts
final Duration breakerWaitDuration = TIMEOUT.multipliedBy(5);
final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration();
circuitBreakerConfig.setFailureRateThreshold(1);
circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(1);
circuitBreakerConfig.setSlidingWindowSize(1);
circuitBreakerConfig.setWaitDurationInOpenState(breakerWaitDuration);
cluster = buildCluster(circuitBreakerConfig, ClientResources.builder());
final String key = "key";
// the first call should time out and open the breaker
assertThrows(RedisCommandTimeoutException.class,
() -> cluster.withCluster(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, key)));
// the second call gets blocked by the breaker
final RedisException e = assertThrows(RedisException.class,
() -> cluster.withCluster(connection -> connection.sync().blpop(2 * TIMEOUT.toMillis() / 1000d, key)));
assertInstanceOf(CallNotPermittedException.class, e.getCause());
// wait for breaker to be half-open
Thread.sleep(breakerWaitDuration.toMillis() * 2);
assertEquals(0, (Long) cluster.withCluster(connection -> connection.sync().llen(key)));
}
@Test
void testShardUnavailable() {
final TestBreakerManager testBreakerManager = new TestBreakerManager();
final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration();
circuitBreakerConfig.setFailureRateThreshold(1);
circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(2);
circuitBreakerConfig.setSlidingWindowSize(5);
final ClientResources.Builder builder = CompositeNettyCustomizerClientResourcesBuilder.builder()
.nettyCustomizer(testBreakerManager);
cluster = buildCluster(circuitBreakerConfig, builder);
// this test will open the breaker on one shard and check that other shards are still available,
// so we get two nodes and a slot+key on each to test
final Pair<RedisClusterNode, RedisClusterNode> nodePair =
cluster.withCluster(connection -> {
Partitions partitions = ClusterPartitionParser.parse(connection.sync().clusterNodes());
assertTrue(partitions.size() >= 2);
return new Pair<>(partitions.getPartition(0), partitions.getPartition(1));
});
final RedisClusterNode unavailableNode = nodePair.first();
final int unavailableSlot = unavailableNode.getSlots().getFirst();
final String unavailableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(unavailableSlot));
final int availableSlot = nodePair.second().getSlots().getFirst();
final String availableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(availableSlot));
cluster.useCluster(connection -> {
connection.sync().set(unavailableKey, "unavailable");
connection.sync().set(availableKey, "available");
assertEquals("unavailable", connection.sync().get(unavailableKey));
assertEquals("available", connection.sync().get(availableKey));
});
// shard is now unavailable
testBreakerManager.openBreaker(unavailableNode.getUri());
final RedisException e = assertThrows(RedisException.class, () ->
cluster.useCluster(connection -> connection.sync().get(unavailableKey)));
assertInstanceOf(CallNotPermittedException.class, e.getCause());
// other shard is still available
assertEquals("available", cluster.withCluster(connection -> connection.sync().get(availableKey)));
// shard is available again
testBreakerManager.closeBreaker(unavailableNode.getUri());
assertEquals("unavailable", cluster.withCluster(connection -> connection.sync().get(unavailableKey)));
}
@Test
void testShardUnavailablePubSub() throws Exception {
final TestBreakerManager testBreakerManager = new TestBreakerManager();
final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration();
circuitBreakerConfig.setFailureRateThreshold(1);
circuitBreakerConfig.setSlidingWindowMinimumNumberOfCalls(2);
circuitBreakerConfig.setSlidingWindowSize(5);
final ClientResources.Builder builder = CompositeNettyCustomizerClientResourcesBuilder.builder()
.nettyCustomizer(testBreakerManager);
cluster = buildCluster(circuitBreakerConfig, builder);
cluster.useCluster(
connection -> connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz"));
// this test will open the breaker on one shard and check that other shards are still available,
// so we get two nodes and a slot+key on each to test
final Pair<RedisClusterNode, RedisClusterNode> nodePair =
cluster.withCluster(connection -> {
Partitions partitions = ClusterPartitionParser.parse(connection.sync().clusterNodes());
assertTrue(partitions.size() >= 2);
return new Pair<>(partitions.getPartition(0), partitions.getPartition(1));
});
final RedisClusterNode unavailableNode = nodePair.first();
final int unavailableSlot = unavailableNode.getSlots().getFirst();
final String unavailableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(unavailableSlot));
final RedisClusterNode availableNode = nodePair.second();
final int availableSlot = availableNode.getSlots().getFirst();
final String availableKey = "key::{%s}".formatted(RedisClusterUtil.getMinimalHashTag(availableSlot));
final FaultTolerantPubSubConnection<String, String> pubSubConnection = cluster.createPubSubConnection();
// Keyspace notifications are delivered on a different thread, so we use a CountDownLatch to wait for the
// expected number of notifications to arrive
final AtomicReference<CountDownLatch> countDownLatchRef = new AtomicReference<>();
final Map<String, AtomicInteger> channelMessageCounts = new ConcurrentHashMap<>();
final String keyspacePrefix = "__keyspace@0__:";
final RedisClusterPubSubAdapter<String, String> listener = new RedisClusterPubSubAdapter<>() {
@Override
public void message(final RedisClusterNode node, final String channel, final String message) {
channelMessageCounts.computeIfAbsent(StringUtils.substringAfter(channel, keyspacePrefix),
k -> new AtomicInteger(0))
.incrementAndGet();
countDownLatchRef.get().countDown();
}
};
countDownLatchRef.set(new CountDownLatch(2));
pubSubConnection.usePubSubConnection(c -> {
c.addListener(listener);
c.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(availableSlot))
.commands()
.subscribe(keyspacePrefix + availableKey);
c.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(unavailableSlot))
.commands()
.subscribe(keyspacePrefix + unavailableKey);
});
cluster.useCluster(connection -> {
connection.sync().set(availableKey, "ping1");
connection.sync().set(unavailableKey, "ping1");
});
countDownLatchRef.get().await();
assertEquals(1, channelMessageCounts.get(availableKey).get());
assertEquals(1, channelMessageCounts.get(unavailableKey).get());
// shard is now unavailable
testBreakerManager.openBreaker(unavailableNode.getUri());
final RedisException e = assertThrows(RedisException.class, () ->
cluster.useCluster(connection -> connection.sync().set(unavailableKey, "ping2")));
assertInstanceOf(CallNotPermittedException.class, e.getCause());
assertEquals(1, channelMessageCounts.get(unavailableKey).get());
assertEquals(1, channelMessageCounts.get(availableKey).get());
countDownLatchRef.set(new CountDownLatch(1));
pubSubConnection.usePubSubConnection(connection -> connection.sync().set(availableKey, "ping2"));
countDownLatchRef.get().await();
assertEquals(1, channelMessageCounts.get(unavailableKey).get());
assertEquals(2, channelMessageCounts.get(availableKey).get());
// shard is available again
testBreakerManager.closeBreaker(unavailableNode.getUri());
countDownLatchRef.set(new CountDownLatch(2));
cluster.useCluster(connection -> {
connection.sync().set(availableKey, "ping3");
connection.sync().set(unavailableKey, "ping3");
});
countDownLatchRef.get().await();
assertEquals(2, channelMessageCounts.get(unavailableKey).get());
assertEquals(3, channelMessageCounts.get(availableKey).get());
}
@ChannelHandler.Sharable
private static class TestBreakerManager extends ChannelDuplexHandler implements NettyCustomizer {
private final Map<RedisURI, Set<LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler>> urisToChannelBreakers = new ConcurrentHashMap<>();
private final AtomicInteger counter = new AtomicInteger();
@Override
public void afterChannelInitialized(Channel channel) {
channel.pipeline().addFirst("TestBreakerManager#" + counter.getAndIncrement(), this);
}
@Override
public void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
super.connect(ctx, remoteAddress, localAddress, promise);
final LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler channelCircuitBreakerHandler =
ctx.channel().pipeline().get(LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler.class);
urisToChannelBreakers.computeIfAbsent(getRedisURI(ctx.channel()), ignored -> new HashSet<>())
.add(channelCircuitBreakerHandler);
}
private static RedisURI getRedisURI(Channel channel) {
final InetSocketAddress inetAddress = (InetSocketAddress) channel.remoteAddress();
return RedisURI.create(inetAddress.getHostString(), inetAddress.getPort());
}
void openBreaker(final RedisURI redisURI) {
urisToChannelBreakers.get(redisURI).forEach(handler -> handler.breaker.transitionToOpenState());
}
void closeBreaker(final RedisURI redisURI) {
urisToChannelBreakers.get(redisURI).forEach(handler -> handler.breaker.transitionToClosedState());
}
}
static class CompositeNettyCustomizer implements NettyCustomizer {
private final List<NettyCustomizer> nettyCustomizers = new ArrayList<>();
@Override
public void afterBootstrapInitialized(final Bootstrap bootstrap) {
nettyCustomizers.forEach(nc -> nc.afterBootstrapInitialized(bootstrap));
}
@Override
public void afterChannelInitialized(final Channel channel) {
nettyCustomizers.forEach(nc -> nc.afterChannelInitialized(channel));
}
void add(NettyCustomizer customizer) {
nettyCustomizers.add(customizer);
}
}
static class CompositeNettyCustomizerClientResourcesBuilder implements ClientResources.Builder {
private final CompositeNettyCustomizer compositeNettyCustomizer;
private final ClientResources.Builder delegate;
static CompositeNettyCustomizerClientResourcesBuilder builder() {
return new CompositeNettyCustomizerClientResourcesBuilder();
}
private CompositeNettyCustomizerClientResourcesBuilder() {
this.compositeNettyCustomizer = new CompositeNettyCustomizer();
this.delegate = ClientResources.builder().nettyCustomizer(compositeNettyCustomizer);
}
@Override
public ClientResources.Builder addressResolverGroup(final AddressResolverGroup<?> addressResolverGroup) {
delegate.addressResolverGroup(addressResolverGroup);
return this;
}
@Override
public ClientResources.Builder commandLatencyRecorder(final CommandLatencyRecorder latencyRecorder) {
delegate.commandLatencyRecorder(latencyRecorder);
return this;
}
@Override
@Deprecated
public ClientResources.Builder commandLatencyCollectorOptions(
final CommandLatencyCollectorOptions commandLatencyCollectorOptions) {
delegate.commandLatencyCollectorOptions(commandLatencyCollectorOptions);
return this;
}
@Override
public ClientResources.Builder commandLatencyPublisherOptions(
final EventPublisherOptions commandLatencyPublisherOptions) {
delegate.commandLatencyPublisherOptions(commandLatencyPublisherOptions);
return this;
}
@Override
public ClientResources.Builder computationThreadPoolSize(final int computationThreadPoolSize) {
delegate.computationThreadPoolSize(computationThreadPoolSize);
return this;
}
@Override
@Deprecated
public ClientResources.Builder dnsResolver(final DnsResolver dnsResolver) {
delegate.dnsResolver(dnsResolver);
return this;
}
@Override
public ClientResources.Builder eventBus(final EventBus eventBus) {
delegate.eventBus(eventBus);
return this;
}
@Override
public ClientResources.Builder eventExecutorGroup(final EventExecutorGroup eventExecutorGroup) {
delegate.eventExecutorGroup(eventExecutorGroup);
return this;
}
@Override
public ClientResources.Builder eventLoopGroupProvider(final EventLoopGroupProvider eventLoopGroupProvider) {
delegate.eventLoopGroupProvider(eventLoopGroupProvider);
return this;
}
@Override
public ClientResources.Builder ioThreadPoolSize(final int ioThreadPoolSize) {
delegate.ioThreadPoolSize(ioThreadPoolSize);
return this;
}
@Override
public ClientResources.Builder nettyCustomizer(final NettyCustomizer nettyCustomizer) {
compositeNettyCustomizer.add(nettyCustomizer);
return this;
}
@Override
public ClientResources.Builder reconnectDelay(final Delay reconnectDelay) {
delegate.reconnectDelay(reconnectDelay);
return this;
}
@Override
public ClientResources.Builder reconnectDelay(final Supplier<Delay> reconnectDelay) {
delegate.reconnectDelay(reconnectDelay);
return this;
}
@Override
public ClientResources.Builder socketAddressResolver(final SocketAddressResolver socketAddressResolver) {
delegate.socketAddressResolver(socketAddressResolver);
return this;
}
@Override
public ClientResources.Builder threadFactoryProvider(final ThreadFactoryProvider threadFactoryProvider) {
delegate.threadFactoryProvider(threadFactoryProvider);
return this;
}
@Override
public ClientResources.Builder timer(final Timer timer) {
delegate.timer(timer);
return this;
}
@Override
public ClientResources.Builder tracing(final Tracing tracing) {
delegate.tracing(tracing);
return this;
}
@Override
public ClientResources build() {
return delegate.build();
}
}
}