Remove obsolete metric
This commit is contained in:
parent
ecbef9c6ee
commit
d0ccae129a
|
@ -5,21 +5,17 @@
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.redis;
|
package org.whispersystems.textsecuregcm.redis;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||||
|
|
||||||
import com.codahale.metrics.MetricRegistry;
|
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
|
||||||
import com.codahale.metrics.Timer;
|
|
||||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||||
import io.github.resilience4j.retry.Retry;
|
import io.github.resilience4j.retry.Retry;
|
||||||
import io.lettuce.core.RedisException;
|
import io.lettuce.core.RedisException;
|
||||||
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
|
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
|
||||||
import io.micrometer.core.instrument.Metrics;
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
import io.micrometer.core.instrument.Timer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
|
||||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
|
||||||
|
|
||||||
public class FaultTolerantPubSubConnection<K, V> {
|
public class FaultTolerantPubSubConnection<K, V> {
|
||||||
|
|
||||||
|
@ -28,31 +24,26 @@ public class FaultTolerantPubSubConnection<K, V> {
|
||||||
private final CircuitBreaker circuitBreaker;
|
private final CircuitBreaker circuitBreaker;
|
||||||
private final Retry retry;
|
private final Retry retry;
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
private final Timer executeTimer;
|
private final Timer executeTimer;
|
||||||
private final io.micrometer.core.instrument.Timer newExecuteTimer;
|
|
||||||
|
|
||||||
public FaultTolerantPubSubConnection(final String name, final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection, final CircuitBreaker circuitBreaker, final Retry retry) {
|
public FaultTolerantPubSubConnection(final String name,
|
||||||
|
final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection, final CircuitBreaker circuitBreaker,
|
||||||
|
final Retry retry) {
|
||||||
this.pubSubConnection = pubSubConnection;
|
this.pubSubConnection = pubSubConnection;
|
||||||
this.circuitBreaker = circuitBreaker;
|
this.circuitBreaker = circuitBreaker;
|
||||||
this.retry = retry;
|
this.retry = retry;
|
||||||
|
|
||||||
this.pubSubConnection.setNodeMessagePropagation(true);
|
this.pubSubConnection.setNodeMessagePropagation(true);
|
||||||
|
|
||||||
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
this.executeTimer = Metrics.timer(name(getClass(), "execute"), "name", name + "-pubsub");
|
||||||
this.executeTimer = metricRegistry.timer(name(getClass(), name + "-pubsub", "execute"));
|
|
||||||
this.newExecuteTimer = Metrics.timer(MetricsUtil.name(getClass(), "execute", "name", name + "-pubsub"));
|
|
||||||
|
|
||||||
CircuitBreakerUtil.registerMetrics(circuitBreaker, FaultTolerantPubSubConnection.class);
|
CircuitBreakerUtil.registerMetrics(circuitBreaker, FaultTolerantPubSubConnection.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void usePubSubConnection(final Consumer<StatefulRedisClusterPubSubConnection<K, V>> consumer) {
|
public void usePubSubConnection(final Consumer<StatefulRedisClusterPubSubConnection<K, V>> consumer) {
|
||||||
try {
|
try {
|
||||||
circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> {
|
circuitBreaker.executeCheckedRunnable(
|
||||||
try (final Timer.Context ignored = executeTimer.time()) {
|
() -> retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection))));
|
||||||
newExecuteTimer.record(() -> consumer.accept(pubSubConnection));
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
if (t instanceof RedisException) {
|
if (t instanceof RedisException) {
|
||||||
throw (RedisException) t;
|
throw (RedisException) t;
|
||||||
|
@ -64,11 +55,8 @@ public class FaultTolerantPubSubConnection<K, V> {
|
||||||
|
|
||||||
public <T> T withPubSubConnection(final Function<StatefulRedisClusterPubSubConnection<K, V>, T> function) {
|
public <T> T withPubSubConnection(final Function<StatefulRedisClusterPubSubConnection<K, V>, T> function) {
|
||||||
try {
|
try {
|
||||||
return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> {
|
return circuitBreaker.executeCheckedSupplier(
|
||||||
try (final Timer.Context ignored = executeTimer.time()) {
|
() -> retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection))));
|
||||||
return newExecuteTimer.record(() -> function.apply(pubSubConnection));
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
if (t instanceof RedisException) {
|
if (t instanceof RedisException) {
|
||||||
throw (RedisException) t;
|
throw (RedisException) t;
|
||||||
|
|
Loading…
Reference in New Issue