Count timeouts directly.
This commit is contained in:
parent
ec223ac2ed
commit
a4062b338e
|
@ -1,10 +1,12 @@
|
|||
package org.whispersystems.textsecuregcm.redis;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
import io.github.resilience4j.retry.Retry;
|
||||
import io.lettuce.core.RedisCommandTimeoutException;
|
||||
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -24,6 +26,7 @@ public class FaultTolerantPubSubConnection<K, V> {
|
|||
private final Retry retry;
|
||||
|
||||
private final Timer executeTimer;
|
||||
private final Meter commandTimeoutMeter;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class);
|
||||
|
||||
|
@ -37,6 +40,7 @@ public class FaultTolerantPubSubConnection<K, V> {
|
|||
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
|
||||
this.executeTimer = metricRegistry.timer(name(getClass(), name + "-pubsub", "execute"));
|
||||
this.commandTimeoutMeter = metricRegistry.meter(name(getClass(), name + "-pubsub", "commandTimeout"));
|
||||
}
|
||||
|
||||
public void usePubSubConnection(final Consumer<StatefulRedisClusterPubSubConnection<K, V>> consumer) {
|
||||
|
@ -44,6 +48,9 @@ public class FaultTolerantPubSubConnection<K, V> {
|
|||
circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> {
|
||||
try (final Timer.Context ignored = executeTimer.time()) {
|
||||
consumer.accept(pubSubConnection);
|
||||
} catch (final RedisCommandTimeoutException e) {
|
||||
commandTimeoutMeter.mark();
|
||||
throw e;
|
||||
}
|
||||
}));
|
||||
} catch (final Throwable t) {
|
||||
|
@ -62,6 +69,9 @@ public class FaultTolerantPubSubConnection<K, V> {
|
|||
return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> {
|
||||
try (final Timer.Context ignored = executeTimer.time()) {
|
||||
return function.apply(pubSubConnection);
|
||||
} catch (final RedisCommandTimeoutException e) {
|
||||
commandTimeoutMeter.mark();
|
||||
throw e;
|
||||
}
|
||||
}));
|
||||
} catch (final Throwable t) {
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package org.whispersystems.textsecuregcm.redis;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
|
@ -8,10 +10,8 @@ import io.lettuce.core.RedisCommandTimeoutException;
|
|||
import io.lettuce.core.RedisURI;
|
||||
import io.lettuce.core.cluster.RedisClusterClient;
|
||||
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
|
||||
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
|
||||
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
|
||||
import io.lettuce.core.codec.ByteArrayCodec;
|
||||
import io.lettuce.core.event.connection.ConnectionEvent;
|
||||
import io.lettuce.core.resource.ClientResources;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -28,6 +28,8 @@ import java.util.function.Consumer;
|
|||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
/**
|
||||
* A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed,
|
||||
* circuit-breaker-protected access to connections.
|
||||
|
@ -46,6 +48,8 @@ public class FaultTolerantRedisCluster {
|
|||
private final CircuitBreaker circuitBreaker;
|
||||
private final Retry retry;
|
||||
|
||||
private final Meter commandTimeoutMeter;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisCluster.class);
|
||||
|
||||
public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, final ClientResources clientResources) {
|
||||
|
@ -60,6 +64,9 @@ public class FaultTolerantRedisCluster {
|
|||
FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) {
|
||||
this.name = name;
|
||||
|
||||
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
this.commandTimeoutMeter = metricRegistry.meter(name(getClass(), this.name, "commandTimeout"));
|
||||
|
||||
this.clusterClient = clusterClient;
|
||||
this.clusterClient.setDefaultTimeout(commandTimeout);
|
||||
|
||||
|
@ -106,7 +113,14 @@ public class FaultTolerantRedisCluster {
|
|||
|
||||
private <K, V> void useConnection(final StatefulRedisClusterConnection<K, V> connection, final Consumer<StatefulRedisClusterConnection<K, V>> consumer) {
|
||||
try {
|
||||
circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> consumer.accept(connection)));
|
||||
circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> {
|
||||
try {
|
||||
consumer.accept(connection);
|
||||
} catch (final RedisCommandTimeoutException e) {
|
||||
commandTimeoutMeter.mark();
|
||||
throw e;
|
||||
}
|
||||
}));
|
||||
} catch (final Throwable t) {
|
||||
log.warn("Redis operation failure", t);
|
||||
|
||||
|
@ -120,7 +134,14 @@ public class FaultTolerantRedisCluster {
|
|||
|
||||
private <T, K, V> T withConnection(final StatefulRedisClusterConnection<K, V> connection, final Function<StatefulRedisClusterConnection<K, V>, T> function) {
|
||||
try {
|
||||
return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> function.apply(connection)));
|
||||
return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> {
|
||||
try {
|
||||
return function.apply(connection);
|
||||
} catch (final RedisCommandTimeoutException e) {
|
||||
commandTimeoutMeter.mark();
|
||||
throw e;
|
||||
}
|
||||
}));
|
||||
} catch (final Throwable t) {
|
||||
log.warn("Redis operation failure", t);
|
||||
|
||||
|
|
Loading…
Reference in New Issue