diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java index 41fd7f442..9f4122a88 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java @@ -1,6 +1,8 @@ package org.whispersystems.textsecuregcm.redis; +import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; +import com.codahale.metrics.Timer; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; @@ -10,11 +12,15 @@ import org.whispersystems.textsecuregcm.util.Constants; import java.util.function.Consumer; import java.util.function.Function; +import static com.codahale.metrics.MetricRegistry.name; + public class FaultTolerantPubSubConnection { private final StatefulRedisClusterPubSubConnection pubSubConnection; private final CircuitBreaker circuitBreaker; + private final Timer executeTimer; + public FaultTolerantPubSubConnection(final String name, final StatefulRedisClusterPubSubConnection pubSubConnection, final CircuitBreakerConfiguration circuitBreakerConfiguration) { this.pubSubConnection = pubSubConnection; this.circuitBreaker = CircuitBreaker.of(name + "-pubsub", circuitBreakerConfiguration.toCircuitBreakerConfig()); @@ -24,13 +30,21 @@ public class FaultTolerantPubSubConnection { FaultTolerantRedisCluster.class); this.pubSubConnection.setNodeMessagePropagation(true); + + final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + + this.executeTimer = metricRegistry.timer(name(getClass(), name + "-pubsub", "execute")); } public void usePubSubConnection(final Consumer> consumer) { - this.circuitBreaker.executeRunnable(() -> consumer.accept(pubSubConnection)); + try (final Timer.Context ignored = executeTimer.time()) { + this.circuitBreaker.executeRunnable(() -> consumer.accept(pubSubConnection)); + } } public T withPubSubConnection(final Function, T> consumer) { - return this.circuitBreaker.executeSupplier(() -> consumer.apply(pubSubConnection)); + try (final Timer.Context ignored = executeTimer.time()) { + return this.circuitBreaker.executeSupplier(() -> consumer.apply(pubSubConnection)); + } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 66c7f9652..6f97c5940 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -1,6 +1,8 @@ package org.whispersystems.textsecuregcm.redis; +import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; +import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.lettuce.core.RedisURI; @@ -26,6 +28,8 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import static com.codahale.metrics.MetricRegistry.name; + /** * A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed, * circuit-breaker-protected access to a pool of connections. @@ -44,6 +48,9 @@ public class FaultTolerantRedisCluster { private final CircuitBreakerConfiguration circuitBreakerConfiguration; private final CircuitBreaker circuitBreaker; + private final Timer acquireAndExecuteTimer; + private final Timer executeTimer; + private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisCluster.class); public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration) { @@ -78,6 +85,11 @@ public class FaultTolerantRedisCluster { CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), circuitBreaker, FaultTolerantRedisCluster.class); + + final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + + this.acquireAndExecuteTimer = metricRegistry.timer(name(getClass(), name, "acquireConnectionAndExecute")); + this.executeTimer = metricRegistry.timer(name(getClass(), name, "execute")); } void shutdown() { @@ -110,8 +122,12 @@ public class FaultTolerantRedisCluster { private void acceptPooledConnection(final GenericObjectPool> pool, final Consumer> consumer) { try { circuitBreaker.executeCheckedRunnable(() -> { - try (final StatefulRedisClusterConnection connection = pool.borrowObject()) { - consumer.accept(connection); + try (final Timer.Context ignored = acquireAndExecuteTimer.time(); + final StatefulRedisClusterConnection connection = pool.borrowObject()) { + + try (final Timer.Context ignored2 = executeTimer.time()) { + consumer.accept(connection); + } } }); } catch (final Throwable t) { @@ -128,8 +144,12 @@ public class FaultTolerantRedisCluster { private T applyToPooledConnection(final GenericObjectPool> pool, final Function, T> function) { try { return circuitBreaker.executeCheckedSupplier(() -> { - try (final StatefulRedisClusterConnection connection = pool.borrowObject()) { - return function.apply(connection); + try (final Timer.Context ignored = acquireAndExecuteTimer.time(); + final StatefulRedisClusterConnection connection = pool.borrowObject()) { + + try (final Timer.Context ignored2 = executeTimer.time()) { + return function.apply(connection); + } } }); } catch (final Throwable t) {