Retry Redis commands that time out.
This commit is contained in:
parent
2d42b478ba
commit
76665dd56e
|
@ -23,6 +23,11 @@ public class RedisClusterConfiguration {
|
|||
@Valid
|
||||
private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration();
|
||||
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
@Valid
|
||||
private RetryConfiguration retry = new RetryConfiguration();
|
||||
|
||||
public List<String> getUrls() {
|
||||
return urls;
|
||||
}
|
||||
|
@ -34,4 +39,8 @@ public class RedisClusterConfiguration {
|
|||
public CircuitBreakerConfiguration getCircuitBreakerConfiguration() {
|
||||
return circuitBreaker;
|
||||
}
|
||||
|
||||
public RetryConfiguration getRetryConfiguration() {
|
||||
return retry;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,10 +22,18 @@ public class RetryConfiguration {
|
|||
return maxAttempts;
|
||||
}
|
||||
|
||||
public void setMaxAttempts(final int maxAttempts) {
|
||||
this.maxAttempts = maxAttempts;
|
||||
}
|
||||
|
||||
public long getWaitDuration() {
|
||||
return waitDuration;
|
||||
}
|
||||
|
||||
public void setWaitDuration(final long waitDuration) {
|
||||
this.waitDuration = waitDuration;
|
||||
}
|
||||
|
||||
public RetryConfig toRetryConfig() {
|
||||
return toRetryConfigBuilder().build();
|
||||
}
|
||||
|
|
|
@ -4,7 +4,10 @@ import com.codahale.metrics.MetricRegistry;
|
|||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
import io.github.resilience4j.retry.Retry;
|
||||
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
|
@ -16,13 +19,18 @@ import static com.codahale.metrics.MetricRegistry.name;
|
|||
public class FaultTolerantPubSubConnection<K, V> {
|
||||
|
||||
private final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection;
|
||||
|
||||
private final CircuitBreaker circuitBreaker;
|
||||
private final Retry retry;
|
||||
|
||||
private final Timer executeTimer;
|
||||
|
||||
public FaultTolerantPubSubConnection(final String name, final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection, final CircuitBreaker circuitBreaker) {
|
||||
private static final Logger log = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class);
|
||||
|
||||
public FaultTolerantPubSubConnection(final String name, final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection, final CircuitBreaker circuitBreaker, final Retry retry) {
|
||||
this.pubSubConnection = pubSubConnection;
|
||||
this.circuitBreaker = circuitBreaker;
|
||||
this.retry = retry;
|
||||
|
||||
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME),
|
||||
this.circuitBreaker,
|
||||
|
@ -36,14 +44,38 @@ public class FaultTolerantPubSubConnection<K, V> {
|
|||
}
|
||||
|
||||
public void usePubSubConnection(final Consumer<StatefulRedisClusterPubSubConnection<K, V>> consumer) {
|
||||
try (final Timer.Context ignored = executeTimer.time()) {
|
||||
this.circuitBreaker.executeRunnable(() -> consumer.accept(pubSubConnection));
|
||||
try {
|
||||
circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> {
|
||||
try (final Timer.Context ignored = executeTimer.time()) {
|
||||
consumer.accept(pubSubConnection);
|
||||
}
|
||||
}));
|
||||
} catch (final Throwable t) {
|
||||
log.warn("Redis operation failure", t);
|
||||
|
||||
if (t instanceof RuntimeException) {
|
||||
throw (RuntimeException) t;
|
||||
} else {
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public <T> T withPubSubConnection(final Function<StatefulRedisClusterPubSubConnection<K, V>, T> consumer) {
|
||||
try (final Timer.Context ignored = executeTimer.time()) {
|
||||
return this.circuitBreaker.executeSupplier(() -> consumer.apply(pubSubConnection));
|
||||
public <T> T withPubSubConnection(final Function<StatefulRedisClusterPubSubConnection<K, V>, T> function) {
|
||||
try {
|
||||
return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> {
|
||||
try (final Timer.Context ignored = executeTimer.time()) {
|
||||
return function.apply(pubSubConnection);
|
||||
}
|
||||
}));
|
||||
} catch (final Throwable t) {
|
||||
log.warn("Redis operation failure", t);
|
||||
|
||||
if (t instanceof RuntimeException) {
|
||||
throw (RuntimeException) t;
|
||||
} else {
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,8 @@ import com.codahale.metrics.SharedMetricRegistries;
|
|||
import com.codahale.metrics.Timer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
import io.github.resilience4j.retry.Retry;
|
||||
import io.lettuce.core.RedisCommandTimeoutException;
|
||||
import io.lettuce.core.RedisURI;
|
||||
import io.lettuce.core.cluster.RedisClusterClient;
|
||||
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
|
||||
|
@ -14,6 +16,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
|
@ -42,6 +45,7 @@ public class FaultTolerantRedisCluster {
|
|||
private final List<StatefulRedisClusterPubSubConnection<?, ?>> pubSubConnections = new ArrayList<>();
|
||||
|
||||
private final CircuitBreaker circuitBreaker;
|
||||
private final Retry retry;
|
||||
|
||||
private final Timer executeTimer;
|
||||
|
||||
|
@ -51,11 +55,12 @@ public class FaultTolerantRedisCluster {
|
|||
this(name,
|
||||
RedisClusterClient.create(clusterConfiguration.getUrls().stream().map(RedisURI::create).collect(Collectors.toList())),
|
||||
clusterConfiguration.getTimeout(),
|
||||
clusterConfiguration.getCircuitBreakerConfiguration());
|
||||
clusterConfiguration.getCircuitBreakerConfiguration(),
|
||||
clusterConfiguration.getRetryConfiguration());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) {
|
||||
FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) {
|
||||
this.name = name;
|
||||
|
||||
this.clusterClient = clusterClient;
|
||||
|
@ -65,10 +70,10 @@ public class FaultTolerantRedisCluster {
|
|||
this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE);
|
||||
|
||||
this.circuitBreaker = CircuitBreaker.of(name, circuitBreakerConfiguration.toCircuitBreakerConfig());
|
||||
this.retry = Retry.of(name, retryConfiguration.toRetryConfigBuilder().retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build());
|
||||
|
||||
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME),
|
||||
circuitBreaker,
|
||||
FaultTolerantRedisCluster.class);
|
||||
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), circuitBreaker, FaultTolerantRedisCluster.class);
|
||||
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), retry, FaultTolerantRedisCluster.class);
|
||||
|
||||
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
|
||||
|
@ -104,11 +109,11 @@ public class FaultTolerantRedisCluster {
|
|||
|
||||
private <K, V> void useConnection(final StatefulRedisClusterConnection<K, V> connection, final Consumer<StatefulRedisClusterConnection<K, V>> consumer) {
|
||||
try {
|
||||
circuitBreaker.executeCheckedRunnable(() -> {
|
||||
circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> {
|
||||
try (final Timer.Context ignored = executeTimer.time()) {
|
||||
consumer.accept(connection);
|
||||
}
|
||||
});
|
||||
}));
|
||||
} catch (final Throwable t) {
|
||||
log.warn("Redis operation failure", t);
|
||||
|
||||
|
@ -122,11 +127,11 @@ public class FaultTolerantRedisCluster {
|
|||
|
||||
private <T, K, V> T withConnection(final StatefulRedisClusterConnection<K, V> connection, final Function<StatefulRedisClusterConnection<K, V>, T> function) {
|
||||
try {
|
||||
return circuitBreaker.executeCheckedSupplier(() -> {
|
||||
return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> {
|
||||
try (final Timer.Context ignored = executeTimer.time()) {
|
||||
return function.apply(connection);
|
||||
}
|
||||
});
|
||||
}));
|
||||
} catch (final Throwable t) {
|
||||
log.warn("Redis operation failure", t);
|
||||
|
||||
|
@ -142,6 +147,6 @@ public class FaultTolerantRedisCluster {
|
|||
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub();
|
||||
pubSubConnections.add(pubSubConnection);
|
||||
|
||||
return new FaultTolerantPubSubConnection<>(name, pubSubConnection, circuitBreaker);
|
||||
return new FaultTolerantPubSubConnection<>(name, pubSubConnection, circuitBreaker, retry);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.junit.AfterClass;
|
|||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
|
||||
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
|
||||
import redis.embedded.RedisServer;
|
||||
|
||||
|
@ -60,7 +61,8 @@ public abstract class AbstractRedisClusterTest {
|
|||
redisCluster = new FaultTolerantRedisCluster("test-cluster",
|
||||
RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())),
|
||||
Duration.ofSeconds(2),
|
||||
new CircuitBreakerConfiguration());
|
||||
new CircuitBreakerConfiguration(),
|
||||
new RetryConfiguration());
|
||||
|
||||
redisCluster.useCluster(connection -> {
|
||||
boolean setAll = false;
|
||||
|
|
|
@ -1,12 +1,16 @@
|
|||
package org.whispersystems.textsecuregcm.redis;
|
||||
|
||||
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
import io.github.resilience4j.retry.Retry;
|
||||
import io.lettuce.core.RedisCommandTimeoutException;
|
||||
import io.lettuce.core.RedisException;
|
||||
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
|
||||
import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
@ -33,7 +37,14 @@ public class FaultTolerantPubSubConnectionTest {
|
|||
breakerConfiguration.setRingBufferSizeInClosedState(1);
|
||||
breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE);
|
||||
|
||||
faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, breakerConfiguration);
|
||||
final RetryConfiguration retryConfiguration = new RetryConfiguration();
|
||||
retryConfiguration.setMaxAttempts(3);
|
||||
retryConfiguration.setWaitDuration(0);
|
||||
|
||||
final CircuitBreaker circuitBreaker = CircuitBreaker.of("test", breakerConfiguration.toCircuitBreakerConfig());
|
||||
final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig());
|
||||
|
||||
faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, circuitBreaker, retry);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -50,4 +61,22 @@ public class FaultTolerantPubSubConnectionTest {
|
|||
assertThrows(CallNotPermittedException.class,
|
||||
() -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetry() {
|
||||
when(pubSubCommands.get(anyString()))
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenReturn("value");
|
||||
|
||||
assertEquals("value", faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key")));
|
||||
|
||||
when(pubSubCommands.get(anyString()))
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenReturn("value");
|
||||
|
||||
assertThrows(RedisCommandTimeoutException.class, () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key")));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package org.whispersystems.textsecuregcm.redis;
|
||||
|
||||
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
|
||||
import io.lettuce.core.RedisCommandTimeoutException;
|
||||
import io.lettuce.core.RedisException;
|
||||
import io.lettuce.core.cluster.RedisClusterClient;
|
||||
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
|
||||
|
@ -9,6 +10,7 @@ import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
|
@ -40,7 +42,11 @@ public class FaultTolerantRedisClusterTest {
|
|||
breakerConfiguration.setRingBufferSizeInClosedState(1);
|
||||
breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE);
|
||||
|
||||
faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), breakerConfiguration);
|
||||
final RetryConfiguration retryConfiguration = new RetryConfiguration();
|
||||
retryConfiguration.setMaxAttempts(3);
|
||||
retryConfiguration.setWaitDuration(0);
|
||||
|
||||
faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), breakerConfiguration, retryConfiguration);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -57,4 +63,22 @@ public class FaultTolerantRedisClusterTest {
|
|||
assertThrows(CallNotPermittedException.class,
|
||||
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetry() {
|
||||
when(clusterCommands.get(anyString()))
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenReturn("value");
|
||||
|
||||
assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
|
||||
|
||||
when(clusterCommands.get(anyString()))
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenReturn("value");
|
||||
|
||||
assertThrows(RedisCommandTimeoutException.class, () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue