From 76665dd56ebdb85e254bd1605dc45fd1501bd8e9 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Fri, 4 Sep 2020 17:18:37 -0400 Subject: [PATCH] Retry Redis commands that time out. --- .../RedisClusterConfiguration.java | 9 ++++ .../configuration/RetryConfiguration.java | 8 ++++ .../redis/FaultTolerantPubSubConnection.java | 44 ++++++++++++++++--- .../redis/FaultTolerantRedisCluster.java | 25 ++++++----- .../redis/AbstractRedisClusterTest.java | 4 +- .../FaultTolerantPubSubConnectionTest.java | 31 ++++++++++++- .../redis/FaultTolerantRedisClusterTest.java | 26 ++++++++++- 7 files changed, 128 insertions(+), 19 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java index 4a2f60e3c..cd0ba5db5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java @@ -23,6 +23,11 @@ public class RedisClusterConfiguration { @Valid private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration(); + @JsonProperty + @NotNull + @Valid + private RetryConfiguration retry = new RetryConfiguration(); + public List getUrls() { return urls; } @@ -34,4 +39,8 @@ public class RedisClusterConfiguration { public CircuitBreakerConfiguration getCircuitBreakerConfiguration() { return circuitBreaker; } + + public RetryConfiguration getRetryConfiguration() { + return retry; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RetryConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RetryConfiguration.java index 13aa9cb84..aa6d667e3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RetryConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RetryConfiguration.java @@ -22,10 +22,18 @@ public class RetryConfiguration { return maxAttempts; } + public void setMaxAttempts(final int maxAttempts) { + this.maxAttempts = maxAttempts; + } + public long getWaitDuration() { return waitDuration; } + public void setWaitDuration(final long waitDuration) { + this.waitDuration = waitDuration; + } + public RetryConfig toRetryConfig() { return toRetryConfigBuilder().build(); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java index aa5c45710..b013b516c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java @@ -4,7 +4,10 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.retry.Retry; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.Constants; @@ -16,13 +19,18 @@ import static com.codahale.metrics.MetricRegistry.name; public class FaultTolerantPubSubConnection { private final StatefulRedisClusterPubSubConnection pubSubConnection; + private final CircuitBreaker circuitBreaker; + private final Retry retry; private final Timer executeTimer; - public FaultTolerantPubSubConnection(final String name, final StatefulRedisClusterPubSubConnection pubSubConnection, final CircuitBreaker circuitBreaker) { + private static final Logger log = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class); + + public FaultTolerantPubSubConnection(final String name, final StatefulRedisClusterPubSubConnection pubSubConnection, final CircuitBreaker circuitBreaker, final Retry retry) { this.pubSubConnection = pubSubConnection; this.circuitBreaker = circuitBreaker; + this.retry = retry; CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), this.circuitBreaker, @@ -36,14 +44,38 @@ public class FaultTolerantPubSubConnection { } public void usePubSubConnection(final Consumer> consumer) { - try (final Timer.Context ignored = executeTimer.time()) { - this.circuitBreaker.executeRunnable(() -> consumer.accept(pubSubConnection)); + try { + circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> { + try (final Timer.Context ignored = executeTimer.time()) { + consumer.accept(pubSubConnection); + } + })); + } catch (final Throwable t) { + log.warn("Redis operation failure", t); + + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } } } - public T withPubSubConnection(final Function, T> consumer) { - try (final Timer.Context ignored = executeTimer.time()) { - return this.circuitBreaker.executeSupplier(() -> consumer.apply(pubSubConnection)); + public T withPubSubConnection(final Function, T> function) { + try { + return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> { + try (final Timer.Context ignored = executeTimer.time()) { + return function.apply(pubSubConnection); + } + })); + } catch (final Throwable t) { + log.warn("Redis operation failure", t); + + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 56e578050..0ea7b633e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -5,6 +5,8 @@ import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.retry.Retry; +import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; @@ -14,6 +16,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.Constants; @@ -42,6 +45,7 @@ public class FaultTolerantRedisCluster { private final List> pubSubConnections = new ArrayList<>(); private final CircuitBreaker circuitBreaker; + private final Retry retry; private final Timer executeTimer; @@ -51,11 +55,12 @@ public class FaultTolerantRedisCluster { this(name, RedisClusterClient.create(clusterConfiguration.getUrls().stream().map(RedisURI::create).collect(Collectors.toList())), clusterConfiguration.getTimeout(), - clusterConfiguration.getCircuitBreakerConfiguration()); + clusterConfiguration.getCircuitBreakerConfiguration(), + clusterConfiguration.getRetryConfiguration()); } @VisibleForTesting - FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) { + FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) { this.name = name; this.clusterClient = clusterClient; @@ -65,10 +70,10 @@ public class FaultTolerantRedisCluster { this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); this.circuitBreaker = CircuitBreaker.of(name, circuitBreakerConfiguration.toCircuitBreakerConfig()); + this.retry = Retry.of(name, retryConfiguration.toRetryConfigBuilder().retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); - CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), - circuitBreaker, - FaultTolerantRedisCluster.class); + CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), circuitBreaker, FaultTolerantRedisCluster.class); + CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), retry, FaultTolerantRedisCluster.class); final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); @@ -104,11 +109,11 @@ public class FaultTolerantRedisCluster { private void useConnection(final StatefulRedisClusterConnection connection, final Consumer> consumer) { try { - circuitBreaker.executeCheckedRunnable(() -> { + circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> { try (final Timer.Context ignored = executeTimer.time()) { consumer.accept(connection); } - }); + })); } catch (final Throwable t) { log.warn("Redis operation failure", t); @@ -122,11 +127,11 @@ public class FaultTolerantRedisCluster { private T withConnection(final StatefulRedisClusterConnection connection, final Function, T> function) { try { - return circuitBreaker.executeCheckedSupplier(() -> { + return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> { try (final Timer.Context ignored = executeTimer.time()) { return function.apply(connection); } - }); + })); } catch (final Throwable t) { log.warn("Redis operation failure", t); @@ -142,6 +147,6 @@ public class FaultTolerantRedisCluster { final StatefulRedisClusterPubSubConnection pubSubConnection = clusterClient.connectPubSub(); pubSubConnections.add(pubSubConnection); - return new FaultTolerantPubSubConnection<>(name, pubSubConnection, circuitBreaker); + return new FaultTolerantPubSubConnection<>(name, pubSubConnection, circuitBreaker, retry); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java index 02ab7df80..0c0308c77 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java @@ -12,6 +12,7 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.util.RedisClusterUtil; import redis.embedded.RedisServer; @@ -60,7 +61,8 @@ public abstract class AbstractRedisClusterTest { redisCluster = new FaultTolerantRedisCluster("test-cluster", RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())), Duration.ofSeconds(2), - new CircuitBreakerConfiguration()); + new CircuitBreakerConfiguration(), + new RetryConfiguration()); redisCluster.useCluster(connection -> { boolean setAll = false; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java index 969187904..8f84663a0 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java @@ -1,12 +1,16 @@ package org.whispersystems.textsecuregcm.redis; import io.github.resilience4j.circuitbreaker.CallNotPermittedException; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.retry.Retry; +import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisException; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; import org.junit.Before; import org.junit.Test; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; @@ -33,7 +37,14 @@ public class FaultTolerantPubSubConnectionTest { breakerConfiguration.setRingBufferSizeInClosedState(1); breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); - faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, breakerConfiguration); + final RetryConfiguration retryConfiguration = new RetryConfiguration(); + retryConfiguration.setMaxAttempts(3); + retryConfiguration.setWaitDuration(0); + + final CircuitBreaker circuitBreaker = CircuitBreaker.of("test", breakerConfiguration.toCircuitBreakerConfig()); + final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig()); + + faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, circuitBreaker, retry); } @Test @@ -50,4 +61,22 @@ public class FaultTolerantPubSubConnectionTest { assertThrows(CallNotPermittedException.class, () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO"))); } + + @Test + public void testRetry() { + when(pubSubCommands.get(anyString())) + .thenThrow(new RedisCommandTimeoutException()) + .thenThrow(new RedisCommandTimeoutException()) + .thenReturn("value"); + + assertEquals("value", faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); + + when(pubSubCommands.get(anyString())) + .thenThrow(new RedisCommandTimeoutException()) + .thenThrow(new RedisCommandTimeoutException()) + .thenThrow(new RedisCommandTimeoutException()) + .thenReturn("value"); + + assertThrows(RedisCommandTimeoutException.class, () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java index 488080b06..8591348d8 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java @@ -1,6 +1,7 @@ package org.whispersystems.textsecuregcm.redis; import io.github.resilience4j.circuitbreaker.CallNotPermittedException; +import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisException; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; @@ -9,6 +10,7 @@ import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import org.junit.Before; import org.junit.Test; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import java.time.Duration; @@ -40,7 +42,11 @@ public class FaultTolerantRedisClusterTest { breakerConfiguration.setRingBufferSizeInClosedState(1); breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); - faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), breakerConfiguration); + final RetryConfiguration retryConfiguration = new RetryConfiguration(); + retryConfiguration.setMaxAttempts(3); + retryConfiguration.setWaitDuration(0); + + faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), breakerConfiguration, retryConfiguration); } @Test @@ -57,4 +63,22 @@ public class FaultTolerantRedisClusterTest { assertThrows(CallNotPermittedException.class, () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO"))); } + + @Test + public void testRetry() { + when(clusterCommands.get(anyString())) + .thenThrow(new RedisCommandTimeoutException()) + .thenThrow(new RedisCommandTimeoutException()) + .thenReturn("value"); + + assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); + + when(clusterCommands.get(anyString())) + .thenThrow(new RedisCommandTimeoutException()) + .thenThrow(new RedisCommandTimeoutException()) + .thenThrow(new RedisCommandTimeoutException()) + .thenReturn("value"); + + assertThrows(RedisCommandTimeoutException.class, () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); + } }