diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 6f3da497b..82a65f713 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -515,8 +515,8 @@ public class WhisperServerService extends Application { connection.addListener(this); - connection.getResources().eventBus().get() - .filter(event -> event instanceof ClusterTopologyChangedEvent) - .subscribe(event -> resubscribeAll()); final String presenceChannel = getManagerPresenceChannel(managerId); final int slot = SlotHash.getSlot(presenceChannel); @@ -124,6 +122,8 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.sync().sadd(MANAGER_SET_KEY, managerId)); pruneMissingPeersFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java index 810696b87..c620758fb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java @@ -10,28 +10,41 @@ import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.retry.Retry; import io.lettuce.core.RedisException; +import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; import java.util.function.Consumer; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; +import reactor.core.scheduler.Scheduler; public class FaultTolerantPubSubConnection { + private static final Logger logger = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class); + + + private final String name; private final StatefulRedisClusterPubSubConnection pubSubConnection; private final CircuitBreaker circuitBreaker; private final Retry retry; + private final Retry resubscribeRetry; + private final Scheduler topologyChangedEventScheduler; private final Timer executeTimer; public FaultTolerantPubSubConnection(final String name, final StatefulRedisClusterPubSubConnection pubSubConnection, final CircuitBreaker circuitBreaker, - final Retry retry) { + final Retry retry, final Retry resubscribeRetry, final Scheduler topologyChangedEventScheduler) { + this.name = name; this.pubSubConnection = pubSubConnection; this.circuitBreaker = circuitBreaker; this.retry = retry; + this.resubscribeRetry = resubscribeRetry; + this.topologyChangedEventScheduler = topologyChangedEventScheduler; this.pubSubConnection.setNodeMessagePropagation(true); @@ -58,11 +71,32 @@ public class FaultTolerantPubSubConnection { return circuitBreaker.executeCheckedSupplier( () -> retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection)))); } catch (final Throwable t) { - if (t instanceof RedisException) { - throw (RedisException) t; - } else { - throw new RedisException(t); - } + if (t instanceof RedisException) { + throw (RedisException) t; + } else { + throw new RedisException(t); + } } } + + + public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) { + + usePubSubConnection(connection -> connection.getResources().eventBus().get() + .filter(event -> event instanceof ClusterTopologyChangedEvent) + .subscribeOn(topologyChangedEventScheduler) + .subscribe(event -> { + logger.info("Got topology change event for {}, resubscribing all keyspace notifications", name); + + resubscribeRetry.executeRunnable(() -> { + try { + eventHandler.run(); + } catch (final RuntimeException e) { + logger.warn("Resubscribe for {} failed", name, e); + throw e; + } + }); + })); + } + } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 1ea33a92d..9d6955ac9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -7,9 +7,11 @@ package org.whispersystems.textsecuregcm.redis; import com.google.common.annotations.VisibleForTesting; import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.core.IntervalFunction; import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; import io.github.resilience4j.reactor.retry.RetryOperator; import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; import io.lettuce.core.ClientOptions.DisconnectedBehavior; import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisException; @@ -31,6 +33,7 @@ import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; /** * A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed, @@ -38,51 +41,61 @@ import reactor.core.publisher.Flux; */ public class FaultTolerantRedisCluster { - private final String name; + private final String name; - private final RedisClusterClient clusterClient; + private final RedisClusterClient clusterClient; - private final StatefulRedisClusterConnection stringConnection; - private final StatefulRedisClusterConnection binaryConnection; + private final StatefulRedisClusterConnection stringConnection; + private final StatefulRedisClusterConnection binaryConnection; - private final List> pubSubConnections = new ArrayList<>(); + private final List> pubSubConnections = new ArrayList<>(); - private final CircuitBreaker circuitBreaker; - private final Retry retry; + private final CircuitBreaker circuitBreaker; + private final Retry retry; + private final Retry topologyChangedEventRetry; - public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, final ClientResources clientResources) { - this(name, - RedisClusterClient.create(clientResources, clusterConfiguration.getConfigurationUri()), - clusterConfiguration.getTimeout(), - clusterConfiguration.getCircuitBreakerConfiguration(), - clusterConfiguration.getRetryConfiguration()); - } + public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, + final ClientResources clientResources) { + this(name, + RedisClusterClient.create(clientResources, clusterConfiguration.getConfigurationUri()), + clusterConfiguration.getTimeout(), + clusterConfiguration.getCircuitBreakerConfiguration(), + clusterConfiguration.getRetryConfiguration()); + } - @VisibleForTesting - FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) { - this.name = name; + @VisibleForTesting + FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, + final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) { + this.name = name; - this.clusterClient = clusterClient; - this.clusterClient.setDefaultTimeout(commandTimeout); - this.clusterClient.setOptions(ClusterClientOptions.builder() - .disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS) - .validateClusterNodeMembership(false) - .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() - .enableAllAdaptiveRefreshTriggers() - .build()) - .publishOnScheduler(true) - .build()); + this.clusterClient = clusterClient; + this.clusterClient.setDefaultTimeout(commandTimeout); + this.clusterClient.setOptions(ClusterClientOptions.builder() + .disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS) + .validateClusterNodeMembership(false) + .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() + .enableAllAdaptiveRefreshTriggers() + .build()) + .publishOnScheduler(true) + .build()); - this.stringConnection = clusterClient.connect(); - this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); + this.stringConnection = clusterClient.connect(); + this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); - this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); - this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder() - .retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); + this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); + this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder() + .retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); + final RetryConfig topologyChangedEventRetryConfig = RetryConfig.custom() + .maxAttempts(Integer.MAX_VALUE) + .intervalFunction( + IntervalFunction.ofExponentialRandomBackoff(Duration.ofSeconds(1), 1.5, Duration.ofSeconds(30))) + .build(); + + this.topologyChangedEventRetry = Retry.of(name + "-topologyChangedRetry", topologyChangedEventRetryConfig); CircuitBreakerUtil.registerMetrics(circuitBreaker, FaultTolerantRedisCluster.class); CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisCluster.class); - } + } void shutdown() { stringConnection.close(); @@ -158,6 +171,7 @@ public class FaultTolerantRedisCluster { final StatefulRedisClusterPubSubConnection pubSubConnection = clusterClient.connectPubSub(); pubSubConnections.add(pubSubConnection); - return new FaultTolerantPubSubConnection<>(name, pubSubConnection, circuitBreaker, retry); + return new FaultTolerantPubSubConnection<>(name, pubSubConnection, circuitBreaker, retry, topologyChangedEventRetry, + Schedulers.newSingle(name + "-redisPubSubEvents")); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 792984157..0f9879c63 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -14,7 +14,6 @@ import io.lettuce.core.ScoredValue; import io.lettuce.core.ScriptOutputType; import io.lettuce.core.ZAddArgs; import io.lettuce.core.cluster.SlotHash; -import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; import io.micrometer.core.instrument.Counter; @@ -104,8 +103,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class); public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster, - final Clock clock, final ExecutorService notificationExecutorService, final Scheduler messageDeliveryScheduler, - final ExecutorService messageDeletionExecutorService) throws IOException { + final ExecutorService notificationExecutorService, final Scheduler messageDeliveryScheduler, + final ExecutorService messageDeletionExecutorService, final Clock clock) throws IOException { this.readDeleteCluster = readDeleteCluster; this.pubSubConnection = readDeleteCluster.createPubSubConnection(); @@ -128,12 +127,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp @Override public void start() { - pubSubConnection.usePubSubConnection(connection -> { - connection.addListener(this); - connection.getResources().eventBus().get() - .filter(event -> event instanceof ClusterTopologyChangedEvent) - .subscribe(event -> resubscribeAll()); - }); + pubSubConnection.usePubSubConnection(connection -> connection.addListener(this)); + pubSubConnection.subscribeToClusterTopologyChangedEvents(this::resubscribeAll); } @Override @@ -142,7 +137,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp } private void resubscribeAll() { - logger.info("Got topology change event, resubscribing all keyspace notifications"); final Set queueNames; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java index 68471df97..7863a01ef 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java @@ -201,7 +201,7 @@ public class AssignUsernameCommand extends EnvironmentCommand { }; + private boolean expectExceptionOnClientPresenceManagerStop = false; + @BeforeEach void setUp() throws Exception { @@ -61,7 +63,13 @@ class ClientPresenceManagerTest { presenceRenewalExecutorService.shutdown(); presenceRenewalExecutorService.awaitTermination(1, TimeUnit.MINUTES); - clientPresenceManager.stop(); + try { + clientPresenceManager.stop(); + } catch (final Exception e) { + if (!expectExceptionOnClientPresenceManagerStop) { + throw e; + } + } } @Test @@ -294,6 +302,8 @@ class ClientPresenceManagerTest { } assertTrue(clientPresenceManager.isPresent(displacedAccountUuid, displacedAccountDeviceId)); + + expectExceptionOnClientPresenceManagerStop = true; } @Nested diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java index a2fb5b7ae..71efef66b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java @@ -8,86 +8,218 @@ package org.whispersystems.textsecuregcm.redis; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.core.IntervalFunction; import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisException; +import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; +import io.lettuce.core.event.Event; +import io.lettuce.core.event.EventBus; +import io.lettuce.core.resource.ClientResources; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; +import reactor.test.publisher.TestPublisher; class FaultTolerantPubSubConnectionTest { - private RedisClusterPubSubCommands pubSubCommands; - private FaultTolerantPubSubConnection faultTolerantPubSubConnection; + private StatefulRedisClusterPubSubConnection pubSubConnection; + private RedisClusterPubSubCommands pubSubCommands; + private FaultTolerantPubSubConnection faultTolerantPubSubConnection; + + + @SuppressWarnings("unchecked") + @BeforeEach + public void setUp() { + pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class); + + pubSubCommands = mock(RedisClusterPubSubCommands.class); + + when(pubSubConnection.sync()).thenReturn(pubSubCommands); + + final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); + breakerConfiguration.setFailureRateThreshold(100); + breakerConfiguration.setSlidingWindowSize(1); + breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1); + breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); + + final RetryConfiguration retryConfiguration = new RetryConfiguration(); + retryConfiguration.setMaxAttempts(3); + retryConfiguration.setWaitDuration(10); + + final CircuitBreaker circuitBreaker = CircuitBreaker.of("test", breakerConfiguration.toCircuitBreakerConfig()); + final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig()); + + final RetryConfig resubscribeRetryConfiguration = RetryConfig.custom() + .maxAttempts(Integer.MAX_VALUE) + .intervalFunction(IntervalFunction.ofExponentialBackoff(5)) + .build(); + final Retry resubscribeRetry = Retry.of("test-resubscribe", resubscribeRetryConfiguration); + + faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, circuitBreaker, + retry, resubscribeRetry, Schedulers.newSingle("test")); + } + + @Test + void testBreaker() { + when(pubSubCommands.get(anyString())) + .thenReturn("value") + .thenThrow(new RuntimeException("Badness has ensued.")); + + assertEquals("value", + faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); + + assertThrows(RedisException.class, + () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO"))); + + final RedisException redisException = assertThrows(RedisException.class, + () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO"))); + + assertTrue(redisException.getCause() instanceof CallNotPermittedException); + } + + @Test + void testRetry() { + when(pubSubCommands.get(anyString())) + .thenThrow(new RedisCommandTimeoutException()) + .thenThrow(new RedisCommandTimeoutException()) + .thenReturn("value"); + + assertEquals("value", + faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); + + when(pubSubCommands.get(anyString())) + .thenThrow(new RedisCommandTimeoutException()) + .thenThrow(new RedisCommandTimeoutException()) + .thenThrow(new RedisCommandTimeoutException()) + .thenReturn("value"); + + assertThrows(RedisCommandTimeoutException.class, + () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); + } + + @Nested + class ClusterTopologyChangedEventTest { + + private TestPublisher eventPublisher; + + private Runnable resubscribe; + + private AtomicInteger resubscribeCounter; + private CountDownLatch resubscribeFailure; + private CountDownLatch resubscribeSuccess; + + @BeforeEach + @SuppressWarnings("unchecked") + void setup() { + // ignore inherited stubbing + reset(pubSubConnection); + + eventPublisher = TestPublisher.createCold(); + + final ClientResources clientResources = mock(ClientResources.class); + when(pubSubConnection.getResources()) + .thenReturn(clientResources); + final EventBus eventBus = mock(EventBus.class); + when(clientResources.eventBus()) + .thenReturn(eventBus); + + final Flux eventFlux = Flux.from(eventPublisher); + when(eventBus.get()).thenReturn(eventFlux); + + resubscribeCounter = new AtomicInteger(); + + resubscribe = () -> { + try { + resubscribeCounter.incrementAndGet(); + pubSubConnection.sync().nodes((ignored) -> true); + resubscribeSuccess.countDown(); + } catch (final RuntimeException e) { + resubscribeFailure.countDown(); + throw e; + } + }; + + resubscribeSuccess = new CountDownLatch(1); + resubscribeFailure = new CountDownLatch(1); + } @SuppressWarnings("unchecked") - @BeforeEach - public void setUp() { - final StatefulRedisClusterPubSubConnection pubSubConnection = mock( - StatefulRedisClusterPubSubConnection.class); + @Test + void testSubscribeToClusterTopologyChangedEvents() throws Exception { - pubSubCommands = mock(RedisClusterPubSubCommands.class); + when(pubSubConnection.sync()) + .thenThrow(new RedisException("Cluster unavailable")); - when(pubSubConnection.sync()).thenReturn(pubSubCommands); + eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); - final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); - breakerConfiguration.setFailureRateThreshold(100); - breakerConfiguration.setSlidingWindowSize(1); - breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1); - breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); + faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(resubscribe); - final RetryConfiguration retryConfiguration = new RetryConfiguration(); - retryConfiguration.setMaxAttempts(3); - retryConfiguration.setWaitDuration(0); + assertTrue(resubscribeFailure.await(1, TimeUnit.SECONDS)); - final CircuitBreaker circuitBreaker = CircuitBreaker.of("test", breakerConfiguration.toCircuitBreakerConfig()); - final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig()); + // simulate cluster recovery - no more exceptions, run the retry + reset(pubSubConnection); + clearInvocations(pubSubCommands); + when(pubSubConnection.sync()) + .thenReturn(pubSubCommands); - faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, circuitBreaker, - retry); + assertTrue(resubscribeSuccess.await(1, TimeUnit.SECONDS)); + + assertTrue(resubscribeCounter.get() >= 2, String.format("resubscribe called %d times", resubscribeCounter.get())); + verify(pubSubCommands).nodes(any()); } @Test - void testBreaker() { - when(pubSubCommands.get(anyString())) - .thenReturn("value") - .thenThrow(new RuntimeException("Badness has ensued.")); + @SuppressWarnings("unchecked") + void testMultipleEventsWithPendingRetries() throws Exception { + // more complicated scenario: multiple events while retries are pending - assertEquals("value", faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); + // cluster is down + when(pubSubConnection.sync()) + .thenThrow(new RedisException("Cluster unavailable")); - assertThrows(RedisException.class, - () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO"))); + // publish multiple topology changed events + eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); + eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); + eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); + eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); - final RedisException redisException = assertThrows(RedisException.class, - () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO"))); + faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(resubscribe); - assertTrue(redisException.getCause() instanceof CallNotPermittedException); + assertTrue(resubscribeFailure.await(1, TimeUnit.SECONDS)); + + // simulate cluster recovery - no more exceptions, run the retry + reset(pubSubConnection); + clearInvocations(pubSubCommands); + when(pubSubConnection.sync()) + .thenReturn(pubSubCommands); + + assertTrue(resubscribeSuccess.await(1, TimeUnit.SECONDS)); + + verify(pubSubCommands, atLeastOnce()).nodes(any()); } + } - @Test - void testRetry() { - when(pubSubCommands.get(anyString())) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenReturn("value"); - - assertEquals("value", faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); - - when(pubSubCommands.get(anyString())) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenThrow(new RedisCommandTimeoutException()) - .thenReturn("value"); - - assertThrows(RedisCommandTimeoutException.class, () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); - } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index a89015174..79a0255f6 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.RandomStringUtils; @@ -51,6 +52,7 @@ class MessagePersisterIntegrationTest { private ExecutorService notificationExecutorService; private Scheduler messageDeliveryScheduler; private ExecutorService messageDeletionExecutorService; + private ScheduledExecutorService resubscribeRetryExecutorService; private MessagesCache messagesCache; private MessagesManager messagesManager; private MessagePersister messagePersister; @@ -78,9 +80,10 @@ class MessagePersisterIntegrationTest { final AccountsManager accountsManager = mock(AccountsManager.class); notificationExecutorService = Executors.newSingleThreadExecutor(); + resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor(); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), notificationExecutorService, - messageDeliveryScheduler, messageDeletionExecutorService); + REDIS_CLUSTER_EXTENSION.getRedisCluster(), notificationExecutorService, + messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC()); messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class), messageDeletionExecutorService); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index d15debee8..3e8ef0ea1 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterEach; @@ -50,6 +51,7 @@ class MessagePersisterTest { static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); private ExecutorService sharedExecutorService; + private ScheduledExecutorService resubscribeRetryExecutorService; private Scheduler messageDeliveryScheduler; private MessagesCache messagesCache; private MessagesDynamoDb messagesDynamoDb; @@ -79,10 +81,11 @@ class MessagePersisterTest { when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); sharedExecutorService = Executors.newSingleThreadExecutor(); + resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor(); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, messageDeliveryScheduler, - sharedExecutorService); + REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, + sharedExecutorService, Clock.systemUTC()); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, PERSIST_DELAY); @@ -107,6 +110,8 @@ class MessagePersisterTest { sharedExecutorService.awaitTermination(1, TimeUnit.SECONDS); messageDeliveryScheduler.dispose(); + resubscribeRetryExecutorService.shutdown(); + resubscribeRetryExecutorService.awaitTermination(1, TimeUnit.SECONDS); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index c03411d44..18e39f02d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -42,6 +42,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -80,10 +81,12 @@ class MessagesCacheTest { static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); private ExecutorService sharedExecutorService; + private ScheduledExecutorService resubscribeRetryExecutorService; private Scheduler messageDeliveryScheduler; private MessagesCache messagesCache; private static final UUID DESTINATION_UUID = UUID.randomUUID(); + private static final int DESTINATION_DEVICE_ID = 7; @BeforeEach @@ -95,10 +98,10 @@ class MessagesCacheTest { }); sharedExecutorService = Executors.newSingleThreadExecutor(); + resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor(); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, - messageDeliveryScheduler, sharedExecutorService); + REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC()); messagesCache.start(); } @@ -111,6 +114,8 @@ class MessagesCacheTest { sharedExecutorService.awaitTermination(1, TimeUnit.SECONDS); messageDeliveryScheduler.dispose(); + resubscribeRetryExecutorService.shutdown(); + resubscribeRetryExecutorService.awaitTermination(1, TimeUnit.SECONDS); } @ParameterizedTest @@ -269,11 +274,7 @@ class MessagesCacheTest { } final MessagesCache messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - REDIS_CLUSTER_EXTENSION.getRedisCluster(), - cacheClock, - sharedExecutorService, - messageDeliveryScheduler, - sharedExecutorService); + REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, cacheClock); final List actualMessages = Flux.from( messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID)) @@ -561,7 +562,6 @@ class MessagesCacheTest { void setup() throws Exception { reactiveCommands = mock(RedisAdvancedClusterReactiveCommands.class); asyncCommands = mock(RedisAdvancedClusterAsyncCommands.class); - final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.builder() .binaryReactiveCommands(reactiveCommands) .binaryAsyncCommands(asyncCommands) @@ -569,8 +569,8 @@ class MessagesCacheTest { messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); - messagesCache = new MessagesCache(mockCluster, mockCluster, Clock.systemUTC(), mock(ExecutorService.class), - messageDeliveryScheduler, Executors.newSingleThreadExecutor()); + messagesCache = new MessagesCache(mockCluster, mockCluster, mock(ExecutorService.class), + messageDeliveryScheduler, Executors.newSingleThreadExecutor(), Clock.systemUTC()); } @AfterEach diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 5cf17757d..d13830bc3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -71,13 +71,13 @@ class WebSocketConnectionIntegrationTest { static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); private ExecutorService sharedExecutorService; + private ScheduledExecutorService scheduledExecutorService; private MessagesDynamoDb messagesDynamoDb; private MessagesCache messagesCache; private ReportMessageManager reportMessageManager; private Account account; private Device device; private WebSocketClient webSocketClient; - private ScheduledExecutorService retrySchedulingExecutor; private Scheduler messageDeliveryScheduler; private long serialTimestamp = System.currentTimeMillis(); @@ -86,10 +86,10 @@ class WebSocketConnectionIntegrationTest { void setUp() throws Exception { sharedExecutorService = Executors.newSingleThreadExecutor(); + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, messageDeliveryScheduler, - sharedExecutorService); + REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC()); messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), dynamoDbExtension.getDynamoDbAsyncClient(), MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(7), sharedExecutorService); @@ -97,7 +97,6 @@ class WebSocketConnectionIntegrationTest { account = mock(Account.class); device = mock(Device.class); webSocketClient = mock(WebSocketClient.class); - retrySchedulingExecutor = Executors.newSingleThreadScheduledExecutor(); when(account.getNumber()).thenReturn("+18005551234"); when(account.getUuid()).thenReturn(UUID.randomUUID()); @@ -109,8 +108,8 @@ class WebSocketConnectionIntegrationTest { sharedExecutorService.shutdown(); sharedExecutorService.awaitTermination(2, TimeUnit.SECONDS); - retrySchedulingExecutor.shutdown(); - retrySchedulingExecutor.awaitTermination(2, TimeUnit.SECONDS); + scheduledExecutorService.shutdown(); + scheduledExecutorService.awaitTermination(2, TimeUnit.SECONDS); } @ParameterizedTest @@ -126,7 +125,7 @@ class WebSocketConnectionIntegrationTest { new AuthenticatedAccount(() -> new Pair<>(account, device)), device, webSocketClient, - retrySchedulingExecutor, + scheduledExecutorService, messageDeliveryScheduler); final List expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount); @@ -210,7 +209,7 @@ class WebSocketConnectionIntegrationTest { new AuthenticatedAccount(() -> new Pair<>(account, device)), device, webSocketClient, - retrySchedulingExecutor, + scheduledExecutorService, messageDeliveryScheduler); final int persistedMessageCount = 207; @@ -276,7 +275,7 @@ class WebSocketConnectionIntegrationTest { device, webSocketClient, 100, // use a very short timeout, so that this test completes quickly - retrySchedulingExecutor, + scheduledExecutorService, messageDeliveryScheduler); final int persistedMessageCount = 207;