Time Redis operations.
This commit is contained in:
parent
d243b73678
commit
c683cbdb2d
|
@ -1,6 +1,8 @@
|
|||
package org.whispersystems.textsecuregcm.redis;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
|
@ -10,11 +12,15 @@ import org.whispersystems.textsecuregcm.util.Constants;
|
|||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class FaultTolerantPubSubConnection<K, V> {
|
||||
|
||||
private final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection;
|
||||
private final CircuitBreaker circuitBreaker;
|
||||
|
||||
private final Timer executeTimer;
|
||||
|
||||
public FaultTolerantPubSubConnection(final String name, final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection, final CircuitBreakerConfiguration circuitBreakerConfiguration) {
|
||||
this.pubSubConnection = pubSubConnection;
|
||||
this.circuitBreaker = CircuitBreaker.of(name + "-pubsub", circuitBreakerConfiguration.toCircuitBreakerConfig());
|
||||
|
@ -24,13 +30,21 @@ public class FaultTolerantPubSubConnection<K, V> {
|
|||
FaultTolerantRedisCluster.class);
|
||||
|
||||
this.pubSubConnection.setNodeMessagePropagation(true);
|
||||
|
||||
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
|
||||
this.executeTimer = metricRegistry.timer(name(getClass(), name + "-pubsub", "execute"));
|
||||
}
|
||||
|
||||
public void usePubSubConnection(final Consumer<StatefulRedisClusterPubSubConnection<K, V>> consumer) {
|
||||
this.circuitBreaker.executeRunnable(() -> consumer.accept(pubSubConnection));
|
||||
try (final Timer.Context ignored = executeTimer.time()) {
|
||||
this.circuitBreaker.executeRunnable(() -> consumer.accept(pubSubConnection));
|
||||
}
|
||||
}
|
||||
|
||||
public <T> T withPubSubConnection(final Function<StatefulRedisClusterPubSubConnection<K, V>, T> consumer) {
|
||||
return this.circuitBreaker.executeSupplier(() -> consumer.apply(pubSubConnection));
|
||||
try (final Timer.Context ignored = executeTimer.time()) {
|
||||
return this.circuitBreaker.executeSupplier(() -> consumer.apply(pubSubConnection));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package org.whispersystems.textsecuregcm.redis;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
import io.lettuce.core.RedisURI;
|
||||
|
@ -26,6 +28,8 @@ import java.util.function.Consumer;
|
|||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
/**
|
||||
* A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed,
|
||||
* circuit-breaker-protected access to a pool of connections.
|
||||
|
@ -44,6 +48,9 @@ public class FaultTolerantRedisCluster {
|
|||
private final CircuitBreakerConfiguration circuitBreakerConfiguration;
|
||||
private final CircuitBreaker circuitBreaker;
|
||||
|
||||
private final Timer acquireAndExecuteTimer;
|
||||
private final Timer executeTimer;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisCluster.class);
|
||||
|
||||
public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration) {
|
||||
|
@ -78,6 +85,11 @@ public class FaultTolerantRedisCluster {
|
|||
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME),
|
||||
circuitBreaker,
|
||||
FaultTolerantRedisCluster.class);
|
||||
|
||||
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
|
||||
this.acquireAndExecuteTimer = metricRegistry.timer(name(getClass(), name, "acquireConnectionAndExecute"));
|
||||
this.executeTimer = metricRegistry.timer(name(getClass(), name, "execute"));
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
|
@ -110,8 +122,12 @@ public class FaultTolerantRedisCluster {
|
|||
private <K, V> void acceptPooledConnection(final GenericObjectPool<StatefulRedisClusterConnection<K, V>> pool, final Consumer<StatefulRedisClusterConnection<K, V>> consumer) {
|
||||
try {
|
||||
circuitBreaker.executeCheckedRunnable(() -> {
|
||||
try (final StatefulRedisClusterConnection<K, V> connection = pool.borrowObject()) {
|
||||
consumer.accept(connection);
|
||||
try (final Timer.Context ignored = acquireAndExecuteTimer.time();
|
||||
final StatefulRedisClusterConnection<K, V> connection = pool.borrowObject()) {
|
||||
|
||||
try (final Timer.Context ignored2 = executeTimer.time()) {
|
||||
consumer.accept(connection);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (final Throwable t) {
|
||||
|
@ -128,8 +144,12 @@ public class FaultTolerantRedisCluster {
|
|||
private <T, K, V> T applyToPooledConnection(final GenericObjectPool<StatefulRedisClusterConnection<K, V>> pool, final Function<StatefulRedisClusterConnection<K, V>, T> function) {
|
||||
try {
|
||||
return circuitBreaker.executeCheckedSupplier(() -> {
|
||||
try (final StatefulRedisClusterConnection<K, V> connection = pool.borrowObject()) {
|
||||
return function.apply(connection);
|
||||
try (final Timer.Context ignored = acquireAndExecuteTimer.time();
|
||||
final StatefulRedisClusterConnection<K, V> connection = pool.borrowObject()) {
|
||||
|
||||
try (final Timer.Context ignored2 = executeTimer.time()) {
|
||||
return function.apply(connection);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (final Throwable t) {
|
||||
|
|
Loading…
Reference in New Issue