From 98e41f9a37630e0ab112dbbb7eb604e057d3c36b Mon Sep 17 00:00:00 2001 From: Jon Chambers <63609320+jon-signal@users.noreply.github.com> Date: Wed, 22 Sep 2021 10:31:39 -0400 Subject: [PATCH] Improve Redis exception handling --- .../push/ApnFallbackManager.java | 23 ++-- .../redis/FaultTolerantPubSubConnection.java | 47 ++----- .../redis/FaultTolerantRedisClient.java | 118 ------------------ .../redis/FaultTolerantRedisCluster.java | 60 ++------- .../textsecuregcm/redis/RedisException.java | 13 -- .../textsecuregcm/redis/RedisOperation.java | 32 ++--- .../storage/MessagePersister.java | 6 +- .../push/ApnFallbackManagerTest.java | 5 +- .../redis/AbstractRedisSingletonTest.java | 76 ----------- .../FaultTolerantPubSubConnectionTest.java | 7 +- .../redis/FaultTolerantRedisClientTest.java | 63 ---------- .../redis/FaultTolerantRedisClusterTest.java | 6 +- 12 files changed, 49 insertions(+), 407 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/redis/RedisException.java delete mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisSingletonTest.java delete mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClientTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java index bd9a811b1..46cde7a11 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java @@ -18,7 +18,6 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.push.ApnMessage.Type; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.RedisException; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; @@ -135,27 +134,19 @@ public class ApnFallbackManager implements Managed { } } - public void schedule(Account account, Device device) throws RedisException { + public void schedule(Account account, Device device) { schedule(account, device, System.currentTimeMillis()); } @VisibleForTesting - void schedule(Account account, Device device, long timestamp) throws RedisException { - try { - sent.mark(); - insert(account, device, timestamp + (15 * 1000), (15 * 1000)); - } catch (io.lettuce.core.RedisException e) { - throw new RedisException(e); - } + void schedule(Account account, Device device, long timestamp) { + sent.mark(); + insert(account, device, timestamp + (15 * 1000), (15 * 1000)); } - public void cancel(Account account, Device device) throws RedisException { - try { - if (remove(account, device)) { - delivered.mark(); - } - } catch (io.lettuce.core.RedisException e) { - throw new RedisException(e); + public void cancel(Account account, Device device) { + if (remove(account, device)) { + delivered.mark(); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java index c1e1059a7..13e599ca4 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java @@ -7,40 +7,28 @@ package org.whispersystems.textsecuregcm.redis; import static com.codahale.metrics.MetricRegistry.name; -import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.retry.Retry; -import io.lettuce.core.RedisCommandTimeoutException; +import io.lettuce.core.RedisException; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.ThreadDumpUtil; public class FaultTolerantPubSubConnection { - private final String name; - private final StatefulRedisClusterPubSubConnection pubSubConnection; private final CircuitBreaker circuitBreaker; private final Retry retry; private final Timer executeTimer; - private final Meter commandTimeoutMeter; - private final AtomicBoolean wroteThreadDump = new AtomicBoolean(false); - - private static final Logger log = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class); public FaultTolerantPubSubConnection(final String name, final StatefulRedisClusterPubSubConnection pubSubConnection, final CircuitBreaker circuitBreaker, final Retry retry) { - this.name = name; this.pubSubConnection = pubSubConnection; this.circuitBreaker = circuitBreaker; this.retry = retry; @@ -48,9 +36,7 @@ public class FaultTolerantPubSubConnection { this.pubSubConnection.setNodeMessagePropagation(true); final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - this.executeTimer = metricRegistry.timer(name(getClass(), name + "-pubsub", "execute")); - this.commandTimeoutMeter = metricRegistry.meter(name(getClass(), name + "-pubsub", "commandTimeout")); CircuitBreakerUtil.registerMetrics(metricRegistry, circuitBreaker, FaultTolerantPubSubConnection.class); } @@ -60,18 +46,13 @@ public class FaultTolerantPubSubConnection { circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> { try (final Timer.Context ignored = executeTimer.time()) { consumer.accept(pubSubConnection); - } catch (final RedisCommandTimeoutException e) { - recordCommandTimeout(e); - throw e; } })); } catch (final Throwable t) { - log.warn("Redis operation failure", t); - - if (t instanceof RuntimeException) { - throw (RuntimeException) t; + if (t instanceof RedisException) { + throw (RedisException) t; } else { - throw new RuntimeException(t); + throw new RedisException(t); } } } @@ -81,28 +62,14 @@ public class FaultTolerantPubSubConnection { return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> { try (final Timer.Context ignored = executeTimer.time()) { return function.apply(pubSubConnection); - } catch (final RedisCommandTimeoutException e) { - recordCommandTimeout(e); - throw e; } })); } catch (final Throwable t) { - log.warn("Redis operation failure", t); - - if (t instanceof RuntimeException) { - throw (RuntimeException) t; + if (t instanceof RedisException) { + throw (RedisException) t; } else { - throw new RuntimeException(t); + throw new RedisException(t); } } } - - private void recordCommandTimeout(final RedisCommandTimeoutException e) { - commandTimeoutMeter.mark(); - log.warn("[{}] Command timeout exception ({}-pubsub)", Thread.currentThread().getName(), this.name, e); - - if (wroteThreadDump.compareAndSet(false, true)) { - ThreadDumpUtil.writeThreadDump(); - } - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java deleted file mode 100644 index def6c6dc9..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.redis; - -import com.codahale.metrics.SharedMetricRegistries; -import com.codahale.metrics.Timer; -import com.google.common.annotations.VisibleForTesting; -import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import io.lettuce.core.RedisClient; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.codec.ByteArrayCodec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.configuration.RedisConfiguration; -import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; -import org.whispersystems.textsecuregcm.util.Constants; - -import java.time.Duration; -import java.util.function.Consumer; -import java.util.function.Function; - -import static com.codahale.metrics.MetricRegistry.name; - -public class FaultTolerantRedisClient { - - private final RedisClient client; - - private final StatefulRedisConnection stringConnection; - private final StatefulRedisConnection binaryConnection; - private final CircuitBreaker circuitBreaker; - - private final Timer executeTimer; - - private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisClient.class); - - public FaultTolerantRedisClient(final String name, final RedisConfiguration redisConfiguration) { - this(name, RedisClient.create(redisConfiguration.getUrl()), redisConfiguration.getTimeout(), redisConfiguration.getCircuitBreakerConfiguration()); - } - - @VisibleForTesting - FaultTolerantRedisClient(final String name, final RedisClient redisClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) { - this.client = redisClient; - this.client.setDefaultTimeout(commandTimeout); - - this.stringConnection = client.connect(); - this.binaryConnection = client.connect(ByteArrayCodec.INSTANCE); - - this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); - - CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), - circuitBreaker, - FaultTolerantRedisCluster.class); - - this.executeTimer = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME).timer(name(getClass(), name, "execute")); - } - - @VisibleForTesting - void shutdown() { - stringConnection.close(); - client.shutdown(); - } - - public void useClient(final Consumer> consumer) { - useConnection(stringConnection, consumer); - } - - public T withClient(final Function, T> function) { - return withConnection(stringConnection, function); - } - - public void useBinaryClient(final Consumer> consumer) { - useConnection(binaryConnection, consumer); - } - - public T withBinaryClient(final Function, T> function) { - return withConnection(binaryConnection, function); - } - - private void useConnection(final StatefulRedisConnection connection, final Consumer> consumer) { - try { - circuitBreaker.executeCheckedRunnable(() -> { - try (final Timer.Context ignored = executeTimer.time()) { - consumer.accept(connection); - } - }); - } catch (final Throwable t) { - log.warn("Redis operation failure", t); - - if (t instanceof RuntimeException) { - throw (RuntimeException) t; - } else { - throw new RuntimeException(t); - } - } - } - - private T withConnection(final StatefulRedisConnection connection, final Function, T> function) { - try { - return circuitBreaker.executeCheckedSupplier(() -> { - try (final Timer.Context ignored = executeTimer.time()) { - return function.apply(connection); - } - }); - } catch (final Throwable t) { - log.warn("Redis operation failure", t); - - if (t instanceof RuntimeException) { - throw (RuntimeException) t; - } else { - throw new RuntimeException(t); - } - } - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index c9ff5e792..5483e6c04 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -5,15 +5,12 @@ package org.whispersystems.textsecuregcm.redis; -import static com.codahale.metrics.MetricRegistry.name; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.google.common.annotations.VisibleForTesting; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.retry.Retry; import io.lettuce.core.RedisCommandTimeoutException; +import io.lettuce.core.RedisException; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; @@ -25,18 +22,14 @@ import io.lettuce.core.resource.ClientResources; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.ThreadDumpUtil; /** * A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed, @@ -56,11 +49,6 @@ public class FaultTolerantRedisCluster { private final CircuitBreaker circuitBreaker; private final Retry retry; - private final Meter commandTimeoutMeter; - private final AtomicBoolean wroteThreadDump = new AtomicBoolean(false); - - private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisCluster.class); - public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, final ClientResources clientResources) { this(name, RedisClusterClient.create(clientResources, clusterConfiguration.getUrls().stream().map(RedisURI::create).collect(Collectors.toList())), @@ -73,9 +61,6 @@ public class FaultTolerantRedisCluster { FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) { this.name = name; - final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - this.commandTimeoutMeter = metricRegistry.meter(name(getClass(), this.name, "commandTimeout")); - this.clusterClient = clusterClient; this.clusterClient.setDefaultTimeout(commandTimeout); this.clusterClient.setOptions(ClusterClientOptions.builder() @@ -128,55 +113,28 @@ public class FaultTolerantRedisCluster { private void useConnection(final StatefulRedisClusterConnection connection, final Consumer> consumer) { try { - circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> { - try { - consumer.accept(connection); - } catch (final RedisCommandTimeoutException e) { - recordCommandTimeout(e); - throw e; - } - })); + circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> consumer.accept(connection))); } catch (final Throwable t) { - log.warn("Redis operation failure", t); - - if (t instanceof RuntimeException) { - throw (RuntimeException) t; + if (t instanceof RedisException) { + throw (RedisException) t; } else { - throw new RuntimeException(t); + throw new RedisException(t); } } } private T withConnection(final StatefulRedisClusterConnection connection, final Function, T> function) { try { - return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> { - try { - return function.apply(connection); - } catch (final RedisCommandTimeoutException e) { - recordCommandTimeout(e); - throw e; - } - })); + return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> function.apply(connection))); } catch (final Throwable t) { - log.warn("Redis operation failure", t); - - if (t instanceof RuntimeException) { - throw (RuntimeException) t; + if (t instanceof RedisException) { + throw (RedisException) t; } else { - throw new RuntimeException(t); + throw new RedisException(t); } } } - private void recordCommandTimeout(final RedisCommandTimeoutException e) { - commandTimeoutMeter.mark(); - log.warn("[{}] Command timeout exception ({})", Thread.currentThread().getName(), this.name, e); - - if (wroteThreadDump.compareAndSet(false, true)) { - ThreadDumpUtil.writeThreadDump(); - } - } - public FaultTolerantPubSubConnection createPubSubConnection() { final StatefulRedisClusterPubSubConnection pubSubConnection = clusterClient.connectPubSub(); pubSubConnections.add(pubSubConnection); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/RedisException.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/RedisException.java deleted file mode 100644 index aeed3d4df..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/RedisException.java +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.redis; - -public class RedisException extends Exception { - - public RedisException(Exception e) { - super(e); - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/RedisOperation.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/RedisOperation.java index 54c899629..f4f548db9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/RedisOperation.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/RedisOperation.java @@ -5,6 +5,7 @@ package org.whispersystems.textsecuregcm.redis; +import io.lettuce.core.RedisException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,30 +13,17 @@ public class RedisOperation { private static final Logger logger = LoggerFactory.getLogger(RedisOperation.class); - public static void unchecked(Operation operation) { + /** + * Executes the given task and logs and discards any {@link RedisException} that may be thrown. This method should be + * used for best-effort tasks like gathering metrics. + * + * @param runnable the Redis-related task to be executed + */ + public static void unchecked(final Runnable runnable) { try { - operation.run(); + runnable.run(); } catch (RedisException e) { - logger.warn("Jedis failure", e); + logger.warn("Redis failure", e); } } - - public static boolean unchecked(BooleanOperation operation) { - try { - return operation.run(); - } catch (RedisException e) { - logger.warn("Jedis failure", e); - } - - return false; - } - - @FunctionalInterface - public interface Operation { - public void run() throws RedisException; - } - - public interface BooleanOperation { - public boolean run() throws RedisException; - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index 28b13b861..44a1446f0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -46,6 +46,8 @@ public class MessagePersister implements Managed { static final int QUEUE_BATCH_LIMIT = 100; static final int MESSAGE_BATCH_LIMIT = 100; + private static final long EXCEPTION_PAUSE_MILLIS = Duration.ofSeconds(3).toMillis(); + private static final String DISABLE_PERSISTER_FEATURE_FLAG = "DISABLE_MESSAGE_PERSISTER"; private static final int WORKER_THREAD_COUNT = 4; @@ -129,6 +131,8 @@ public class MessagePersister implements Managed { logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e); messagesCache.addQueueToPersist(accountUuid, deviceId); + + Util.sleep(EXCEPTION_PAUSE_MILLIS); } } @@ -165,7 +169,7 @@ public class MessagePersister implements Managed { queueSizeHistogram.update(messageCount); } finally { - messagesCache.unlockQueueForPersistence(accountUuid, deviceId); + messagesCache.unlockQueueForPersistence(accountUuid, deviceId); } } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java index 5fc0b658c..1c2e6f385 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java @@ -21,7 +21,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.ArgumentCaptor; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; -import org.whispersystems.textsecuregcm.redis.RedisException; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; @@ -67,7 +66,7 @@ class ApnFallbackManagerTest { } @Test - void testClusterInsert() throws RedisException { + void testClusterInsert() { final String endpoint = apnFallbackManager.getEndpointKey(account, device); assertTrue(apnFallbackManager.getPendingDestinations(SlotHash.getSlot(endpoint), 1).isEmpty()); @@ -88,7 +87,7 @@ class ApnFallbackManagerTest { } @Test - void testProcessNextSlot() throws RedisException { + void testProcessNextSlot() { final ApnFallbackManager.NotificationWorker worker = apnFallbackManager.new NotificationWorker(); apnFallbackManager.schedule(account, device, System.currentTimeMillis() - 30_000); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisSingletonTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisSingletonTest.java deleted file mode 100644 index cb1533967..000000000 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisSingletonTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.redis; - -import io.lettuce.core.RedisClient; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.providers.RedisClientFactory; -import redis.embedded.RedisServer; - -import java.time.Duration; -import java.util.List; - -import static org.junit.Assume.assumeFalse; - -public class AbstractRedisSingletonTest { - - private static RedisServer redisServer; - - private FaultTolerantRedisClient redisClient; - private ReplicatedJedisPool replicatedJedisPool; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - assumeFalse(System.getProperty("os.name").equalsIgnoreCase("windows")); - - redisServer = RedisServer.builder() - .setting("appendonly no") - .setting("dir " + System.getProperty("java.io.tmpdir")) - .port(AbstractRedisClusterTest.getNextRedisClusterPort()) - .build(); - - redisServer.start(); - } - - @Before - public void setUp() throws Exception { - final String redisUrl = String.format("redis://127.0.0.1:%d", redisServer.ports().get(0)); - - redisClient = new FaultTolerantRedisClient("test-client", - RedisClient.create(redisUrl), - Duration.ofSeconds(2), - new CircuitBreakerConfiguration()); - - replicatedJedisPool = new RedisClientFactory("test-pool", - redisUrl, - List.of(redisUrl), - new CircuitBreakerConfiguration()).getRedisClientPool(); - - redisClient.useClient(connection -> connection.sync().flushall()); - } - - protected FaultTolerantRedisClient getRedisClient() { - return redisClient; - } - - protected ReplicatedJedisPool getJedisPool() { - return replicatedJedisPool; - } - - @After - public void tearDown() throws Exception { - redisClient.shutdown(); - } - - @AfterClass - public static void tearDownAfterClass() { - redisServer.stop(); - } -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java index bc7476488..56c544641 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java @@ -19,6 +19,7 @@ import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -56,15 +57,17 @@ public class FaultTolerantPubSubConnectionTest { public void testBreaker() { when(pubSubCommands.get(anyString())) .thenReturn("value") - .thenThrow(new io.lettuce.core.RedisException("Badness has ensued.")); + .thenThrow(new RuntimeException("Badness has ensued.")); assertEquals("value", faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); assertThrows(RedisException.class, () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO"))); - assertThrows(CallNotPermittedException.class, + final RedisException redisException = assertThrows(RedisException.class, () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO"))); + + assertTrue(redisException.getCause() instanceof CallNotPermittedException); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClientTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClientTest.java deleted file mode 100644 index baace98e1..000000000 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClientTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.redis; - -import io.github.resilience4j.circuitbreaker.CallNotPermittedException; -import io.lettuce.core.RedisClient; -import io.lettuce.core.RedisException; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.api.sync.RedisCommands; -import org.junit.Before; -import org.junit.Test; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; - -import java.time.Duration; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class FaultTolerantRedisClientTest { - - private RedisCommands commands; - private FaultTolerantRedisClient faultTolerantRedisClient; - - @SuppressWarnings("unchecked") - @Before - public void setUp() { - final RedisClient redisClient = mock(RedisClient.class); - final StatefulRedisConnection clusterConnection = mock(StatefulRedisConnection.class); - - commands = mock(RedisCommands.class); - - when(redisClient.connect()).thenReturn(clusterConnection); - when(clusterConnection.sync()).thenReturn(commands); - - final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); - breakerConfiguration.setFailureRateThreshold(100); - breakerConfiguration.setRingBufferSizeInClosedState(1); - breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); - - faultTolerantRedisClient = new FaultTolerantRedisClient("test", redisClient, Duration.ofSeconds(2), breakerConfiguration); - } - - @Test - public void testBreaker() { - when(commands.get(anyString())) - .thenReturn("value") - .thenThrow(new io.lettuce.core.RedisException("Badness has ensued.")); - - assertEquals("value", faultTolerantRedisClient.withClient(connection -> connection.sync().get("key"))); - - assertThrows(RedisException.class, - () -> faultTolerantRedisClient.withClient(connection -> connection.sync().get("OH NO"))); - - assertThrows(CallNotPermittedException.class, - () -> faultTolerantRedisClient.withClient(connection -> connection.sync().get("OH NO"))); - } -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java index d393e857d..8e0e7090e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java @@ -66,15 +66,17 @@ public class FaultTolerantRedisClusterTest { public void testBreaker() { when(clusterCommands.get(anyString())) .thenReturn("value") - .thenThrow(new RedisException("Badness has ensued.")); + .thenThrow(new RuntimeException("Badness has ensued.")); assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); assertThrows(RedisException.class, () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO"))); - assertThrows(CallNotPermittedException.class, + final RedisException redisException = assertThrows(RedisException.class, () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO"))); + + assertTrue(redisException.getCause() instanceof CallNotPermittedException); } @Test