diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java index 093c2510d..c1e1059a7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java @@ -5,6 +5,8 @@ package org.whispersystems.textsecuregcm.redis; +import static com.codahale.metrics.MetricRegistry.name; + import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; @@ -13,16 +15,14 @@ import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.retry.Retry; import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.ThreadDumpUtil; - import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; - -import static com.codahale.metrics.MetricRegistry.name; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; +import org.whispersystems.textsecuregcm.util.Constants; +import org.whispersystems.textsecuregcm.util.ThreadDumpUtil; public class FaultTolerantPubSubConnection { @@ -41,16 +41,18 @@ public class FaultTolerantPubSubConnection { public FaultTolerantPubSubConnection(final String name, final StatefulRedisClusterPubSubConnection pubSubConnection, final CircuitBreaker circuitBreaker, final Retry retry) { this.name = name; - this.pubSubConnection = pubSubConnection; - this.circuitBreaker = circuitBreaker; - this.retry = retry; + this.pubSubConnection = pubSubConnection; + this.circuitBreaker = circuitBreaker; + this.retry = retry; - this.pubSubConnection.setNodeMessagePropagation(true); + this.pubSubConnection.setNodeMessagePropagation(true); - final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - this.executeTimer = metricRegistry.timer(name(getClass(), name + "-pubsub", "execute")); - this.commandTimeoutMeter = metricRegistry.meter(name(getClass(), name + "-pubsub", "commandTimeout")); + this.executeTimer = metricRegistry.timer(name(getClass(), name + "-pubsub", "execute")); + this.commandTimeoutMeter = metricRegistry.meter(name(getClass(), name + "-pubsub", "commandTimeout")); + + CircuitBreakerUtil.registerMetrics(metricRegistry, circuitBreaker, FaultTolerantPubSubConnection.class); } public void usePubSubConnection(final Consumer> consumer) {