From 00d0dba62cd6ed56a125b570e0a1e2b5b4a0e22e Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Fri, 1 Nov 2024 11:18:50 -0400 Subject: [PATCH] Don't retry pub/sub commands --- ...AbstractFaultTolerantPubSubConnection.java | 13 ++------- .../FaultTolerantPubSubClusterConnection.java | 3 +- .../redis/FaultTolerantPubSubConnection.java | 6 ++-- .../redis/FaultTolerantRedisClient.java | 4 +-- .../FaultTolerantRedisClusterClient.java | 2 +- ...ltTolerantPubSubClusterConnectionTest.java | 28 +------------------ 6 files changed, 10 insertions(+), 46 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/AbstractFaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/AbstractFaultTolerantPubSubConnection.java index f7fc0ad72..406a9679c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/AbstractFaultTolerantPubSubConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/AbstractFaultTolerantPubSubConnection.java @@ -7,7 +7,6 @@ package org.whispersystems.textsecuregcm.redis; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; -import io.github.resilience4j.retry.Retry; import io.lettuce.core.RedisException; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.micrometer.core.instrument.Metrics; @@ -20,17 +19,11 @@ abstract class AbstractFaultTolerantPubSubConnection consumer) { try { - retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection))); + executeTimer.record(() -> consumer.accept(pubSubConnection)); } catch (final Throwable t) { if (t instanceof RedisException) { throw (RedisException) t; @@ -53,7 +46,7 @@ abstract class AbstractFaultTolerantPubSubConnection T withPubSubConnection(final Function function) { try { - return retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection))); + return executeTimer.record(() -> function.apply(pubSubConnection)); } catch (final Throwable t) { if (t instanceof RedisException) { throw (RedisException) t; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnection.java index 2d649f4f4..6c9956d81 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnection.java @@ -21,11 +21,10 @@ public class FaultTolerantPubSubClusterConnection extends AbstractFaultTol protected FaultTolerantPubSubClusterConnection(final String name, final StatefulRedisClusterPubSubConnection pubSubConnection, - final Retry retry, final Retry resubscribeRetry, final Scheduler topologyChangedEventScheduler) { - super(name, pubSubConnection, retry); + super(name, pubSubConnection); pubSubConnection.setNodeMessagePropagation(true); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java index cf5d11894..a1e2af12d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java @@ -5,15 +5,13 @@ package org.whispersystems.textsecuregcm.redis; -import io.github.resilience4j.retry.Retry; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; public class FaultTolerantPubSubConnection extends AbstractFaultTolerantPubSubConnection> { protected FaultTolerantPubSubConnection(final String name, - final StatefulRedisPubSubConnection pubSubConnection, - final Retry retry) { + final StatefulRedisPubSubConnection pubSubConnection) { - super(name, pubSubConnection, retry); + super(name, pubSubConnection); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java index 6d91e5f59..84671d5c8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java @@ -147,13 +147,13 @@ public class FaultTolerantRedisClient { final StatefulRedisPubSubConnection pubSubConnection = redisClient.connectPubSub(); pubSubConnections.add(pubSubConnection); - return new FaultTolerantPubSubConnection<>(name, pubSubConnection, retry); + return new FaultTolerantPubSubConnection<>(name, pubSubConnection); } public FaultTolerantPubSubConnection createBinaryPubSubConnection() { final StatefulRedisPubSubConnection pubSubConnection = redisClient.connectPubSub(ByteArrayCodec.INSTANCE); pubSubConnections.add(pubSubConnection); - return new FaultTolerantPubSubConnection<>(name, pubSubConnection, retry); + return new FaultTolerantPubSubConnection<>(name, pubSubConnection); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java index 08dc31733..7f5248336 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java @@ -198,7 +198,7 @@ public class FaultTolerantRedisClusterClient { final StatefulRedisClusterPubSubConnection pubSubConnection = clusterClient.connectPubSub(); pubSubConnections.add(pubSubConnection); - return new FaultTolerantPubSubClusterConnection<>(name, pubSubConnection, retry, topologyChangedEventRetry, + return new FaultTolerantPubSubClusterConnection<>(name, pubSubConnection, topologyChangedEventRetry, Schedulers.newSingle(name + "-redisPubSubEvents", true)); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnectionTest.java index 49dfbf5fc..f66cc10d4 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubClusterConnectionTest.java @@ -5,11 +5,8 @@ package org.whispersystems.textsecuregcm.redis; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; @@ -20,7 +17,6 @@ import static org.mockito.Mockito.when; import io.github.resilience4j.core.IntervalFunction; import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.RetryConfig; -import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisException; import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; @@ -60,8 +56,6 @@ class FaultTolerantPubSubClusterConnectionTest { retryConfiguration.setMaxAttempts(3); retryConfiguration.setWaitDuration(10); - final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig()); - final RetryConfig resubscribeRetryConfiguration = RetryConfig.custom() .maxAttempts(Integer.MAX_VALUE) .intervalFunction(IntervalFunction.ofExponentialBackoff(5)) @@ -69,27 +63,7 @@ class FaultTolerantPubSubClusterConnectionTest { final Retry resubscribeRetry = Retry.of("test-resubscribe", resubscribeRetryConfiguration); faultTolerantPubSubConnection = new FaultTolerantPubSubClusterConnection<>("test", pubSubConnection, - retry, resubscribeRetry, Schedulers.newSingle("test")); - } - - @Test - void testRetry() { - when(pubSubCommands.get(anyString())) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenReturn("value"); - - assertEquals("value", - faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); - - when(pubSubCommands.get(anyString())) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenReturn("value"); - - assertThrows(RedisCommandTimeoutException.class, - () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); + resubscribeRetry, Schedulers.newSingle("test")); } @Nested