From a4062b338e1361f6dcb6e0d91bf57e447136ba5f Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 28 Oct 2020 15:08:27 -0400 Subject: [PATCH] Count timeouts directly. --- .../redis/FaultTolerantPubSubConnection.java | 10 +++++++ .../redis/FaultTolerantRedisCluster.java | 29 ++++++++++++++++--- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java index c316bffe6..db08d8c44 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java @@ -1,10 +1,12 @@ package org.whispersystems.textsecuregcm.redis; +import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.retry.Retry; +import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +26,7 @@ public class FaultTolerantPubSubConnection { private final Retry retry; private final Timer executeTimer; + private final Meter commandTimeoutMeter; private static final Logger log = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class); @@ -37,6 +40,7 @@ public class FaultTolerantPubSubConnection { final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); this.executeTimer = metricRegistry.timer(name(getClass(), name + "-pubsub", "execute")); + this.commandTimeoutMeter = metricRegistry.meter(name(getClass(), name + "-pubsub", "commandTimeout")); } public void usePubSubConnection(final Consumer> consumer) { @@ -44,6 +48,9 @@ public class FaultTolerantPubSubConnection { circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> { try (final Timer.Context ignored = executeTimer.time()) { consumer.accept(pubSubConnection); + } catch (final RedisCommandTimeoutException e) { + commandTimeoutMeter.mark(); + throw e; } })); } catch (final Throwable t) { @@ -62,6 +69,9 @@ public class FaultTolerantPubSubConnection { return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> { try (final Timer.Context ignored = executeTimer.time()) { return function.apply(pubSubConnection); + } catch (final RedisCommandTimeoutException e) { + commandTimeoutMeter.mark(); + throw e; } })); } catch (final Throwable t) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 58b35c9e6..2b0825ac2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -1,5 +1,7 @@ package org.whispersystems.textsecuregcm.redis; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.google.common.annotations.VisibleForTesting; import io.github.resilience4j.circuitbreaker.CircuitBreaker; @@ -8,10 +10,8 @@ import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; -import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.codec.ByteArrayCodec; -import io.lettuce.core.event.connection.ConnectionEvent; import io.lettuce.core.resource.ClientResources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +28,8 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import static com.codahale.metrics.MetricRegistry.name; + /** * A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed, * circuit-breaker-protected access to connections. @@ -46,6 +48,8 @@ public class FaultTolerantRedisCluster { private final CircuitBreaker circuitBreaker; private final Retry retry; + private final Meter commandTimeoutMeter; + private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisCluster.class); public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, final ClientResources clientResources) { @@ -60,6 +64,9 @@ public class FaultTolerantRedisCluster { FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) { this.name = name; + final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + this.commandTimeoutMeter = metricRegistry.meter(name(getClass(), this.name, "commandTimeout")); + this.clusterClient = clusterClient; this.clusterClient.setDefaultTimeout(commandTimeout); @@ -106,7 +113,14 @@ public class FaultTolerantRedisCluster { private void useConnection(final StatefulRedisClusterConnection connection, final Consumer> consumer) { try { - circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> consumer.accept(connection))); + circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> { + try { + consumer.accept(connection); + } catch (final RedisCommandTimeoutException e) { + commandTimeoutMeter.mark(); + throw e; + } + })); } catch (final Throwable t) { log.warn("Redis operation failure", t); @@ -120,7 +134,14 @@ public class FaultTolerantRedisCluster { private T withConnection(final StatefulRedisClusterConnection connection, final Function, T> function) { try { - return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> function.apply(connection))); + return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> { + try { + return function.apply(connection); + } catch (final RedisCommandTimeoutException e) { + commandTimeoutMeter.mark(); + throw e; + } + })); } catch (final Throwable t) { log.warn("Redis operation failure", t);