Don't retry pub/sub commands

This commit is contained in:
Jon Chambers 2024-11-01 11:18:50 -04:00 committed by Jon Chambers
parent c9a396b9e3
commit 00d0dba62c
6 changed files with 10 additions and 46 deletions

View File

@ -7,7 +7,6 @@ package org.whispersystems.textsecuregcm.redis;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.RedisException; import io.lettuce.core.RedisException;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
@ -20,17 +19,11 @@ abstract class AbstractFaultTolerantPubSubConnection<K, V, C extends StatefulRed
private final String name; private final String name;
private final C pubSubConnection; private final C pubSubConnection;
private final Retry retry;
private final Timer executeTimer; private final Timer executeTimer;
protected AbstractFaultTolerantPubSubConnection(final String name, protected AbstractFaultTolerantPubSubConnection(final String name, final C pubSubConnection) {
final C pubSubConnection,
final Retry retry) {
this.name = name; this.name = name;
this.pubSubConnection = pubSubConnection; this.pubSubConnection = pubSubConnection;
this.retry = retry;
this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub"); this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub");
} }
@ -41,7 +34,7 @@ abstract class AbstractFaultTolerantPubSubConnection<K, V, C extends StatefulRed
public void usePubSubConnection(final Consumer<C> consumer) { public void usePubSubConnection(final Consumer<C> consumer) {
try { try {
retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection))); executeTimer.record(() -> consumer.accept(pubSubConnection));
} catch (final Throwable t) { } catch (final Throwable t) {
if (t instanceof RedisException) { if (t instanceof RedisException) {
throw (RedisException) t; throw (RedisException) t;
@ -53,7 +46,7 @@ abstract class AbstractFaultTolerantPubSubConnection<K, V, C extends StatefulRed
public <T> T withPubSubConnection(final Function<C, T> function) { public <T> T withPubSubConnection(final Function<C, T> function) {
try { try {
return retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection))); return executeTimer.record(() -> function.apply(pubSubConnection));
} catch (final Throwable t) { } catch (final Throwable t) {
if (t instanceof RedisException) { if (t instanceof RedisException) {
throw (RedisException) t; throw (RedisException) t;

View File

@ -21,11 +21,10 @@ public class FaultTolerantPubSubClusterConnection<K, V> extends AbstractFaultTol
protected FaultTolerantPubSubClusterConnection(final String name, protected FaultTolerantPubSubClusterConnection(final String name,
final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection, final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection,
final Retry retry,
final Retry resubscribeRetry, final Retry resubscribeRetry,
final Scheduler topologyChangedEventScheduler) { final Scheduler topologyChangedEventScheduler) {
super(name, pubSubConnection, retry); super(name, pubSubConnection);
pubSubConnection.setNodeMessagePropagation(true); pubSubConnection.setNodeMessagePropagation(true);

View File

@ -5,15 +5,13 @@
package org.whispersystems.textsecuregcm.redis; package org.whispersystems.textsecuregcm.redis;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
public class FaultTolerantPubSubConnection<K, V> extends AbstractFaultTolerantPubSubConnection<K, V, StatefulRedisPubSubConnection<K, V>> { public class FaultTolerantPubSubConnection<K, V> extends AbstractFaultTolerantPubSubConnection<K, V, StatefulRedisPubSubConnection<K, V>> {
protected FaultTolerantPubSubConnection(final String name, protected FaultTolerantPubSubConnection(final String name,
final StatefulRedisPubSubConnection<K, V> pubSubConnection, final StatefulRedisPubSubConnection<K, V> pubSubConnection) {
final Retry retry) {
super(name, pubSubConnection, retry); super(name, pubSubConnection);
} }
} }

View File

@ -147,13 +147,13 @@ public class FaultTolerantRedisClient {
final StatefulRedisPubSubConnection<String, String> pubSubConnection = redisClient.connectPubSub(); final StatefulRedisPubSubConnection<String, String> pubSubConnection = redisClient.connectPubSub();
pubSubConnections.add(pubSubConnection); pubSubConnections.add(pubSubConnection);
return new FaultTolerantPubSubConnection<>(name, pubSubConnection, retry); return new FaultTolerantPubSubConnection<>(name, pubSubConnection);
} }
public FaultTolerantPubSubConnection<byte[], byte[]> createBinaryPubSubConnection() { public FaultTolerantPubSubConnection<byte[], byte[]> createBinaryPubSubConnection() {
final StatefulRedisPubSubConnection<byte[], byte[]> pubSubConnection = redisClient.connectPubSub(ByteArrayCodec.INSTANCE); final StatefulRedisPubSubConnection<byte[], byte[]> pubSubConnection = redisClient.connectPubSub(ByteArrayCodec.INSTANCE);
pubSubConnections.add(pubSubConnection); pubSubConnections.add(pubSubConnection);
return new FaultTolerantPubSubConnection<>(name, pubSubConnection, retry); return new FaultTolerantPubSubConnection<>(name, pubSubConnection);
} }
} }

View File

@ -198,7 +198,7 @@ public class FaultTolerantRedisClusterClient {
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub(); final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub();
pubSubConnections.add(pubSubConnection); pubSubConnections.add(pubSubConnection);
return new FaultTolerantPubSubClusterConnection<>(name, pubSubConnection, retry, topologyChangedEventRetry, return new FaultTolerantPubSubClusterConnection<>(name, pubSubConnection, topologyChangedEventRetry,
Schedulers.newSingle(name + "-redisPubSubEvents", true)); Schedulers.newSingle(name + "-redisPubSubEvents", true));
} }

View File

@ -5,11 +5,8 @@
package org.whispersystems.textsecuregcm.redis; package org.whispersystems.textsecuregcm.redis;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -20,7 +17,6 @@ import static org.mockito.Mockito.when;
import io.github.resilience4j.core.IntervalFunction; import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig; import io.github.resilience4j.retry.RetryConfig;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException; import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
@ -60,8 +56,6 @@ class FaultTolerantPubSubClusterConnectionTest {
retryConfiguration.setMaxAttempts(3); retryConfiguration.setMaxAttempts(3);
retryConfiguration.setWaitDuration(10); retryConfiguration.setWaitDuration(10);
final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig());
final RetryConfig resubscribeRetryConfiguration = RetryConfig.custom() final RetryConfig resubscribeRetryConfiguration = RetryConfig.custom()
.maxAttempts(Integer.MAX_VALUE) .maxAttempts(Integer.MAX_VALUE)
.intervalFunction(IntervalFunction.ofExponentialBackoff(5)) .intervalFunction(IntervalFunction.ofExponentialBackoff(5))
@ -69,27 +63,7 @@ class FaultTolerantPubSubClusterConnectionTest {
final Retry resubscribeRetry = Retry.of("test-resubscribe", resubscribeRetryConfiguration); final Retry resubscribeRetry = Retry.of("test-resubscribe", resubscribeRetryConfiguration);
faultTolerantPubSubConnection = new FaultTolerantPubSubClusterConnection<>("test", pubSubConnection, faultTolerantPubSubConnection = new FaultTolerantPubSubClusterConnection<>("test", pubSubConnection,
retry, resubscribeRetry, Schedulers.newSingle("test")); resubscribeRetry, Schedulers.newSingle("test"));
}
@Test
void testRetry() {
when(pubSubCommands.get(anyString()))
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenReturn("value");
assertEquals("value",
faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key")));
when(pubSubCommands.get(anyString()))
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenThrow(new RedisCommandTimeoutException())
.thenReturn("value");
assertThrows(RedisCommandTimeoutException.class,
() -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key")));
} }
@Nested @Nested