Add retry after exceptions during a cluster topology change event callback

This commit is contained in:
Chris Eager 2023-03-23 09:50:15 -05:00 committed by Chris Eager
parent 0cc84131de
commit 3ccfeb490b
13 changed files with 323 additions and 132 deletions

View File

@ -515,8 +515,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts); StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts);
StoredVerificationCodeManager pendingDevicesManager = new StoredVerificationCodeManager(pendingDevices); StoredVerificationCodeManager pendingDevicesManager = new StoredVerificationCodeManager(pendingDevices);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, Clock.systemUTC(), MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster,
keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionAsyncExecutor); keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionAsyncExecutor, clock);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager); PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster, ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
config.getReportMessageConfiguration().getCounterTtl()); config.getReportMessageConfiguration().getCounterTtl());

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2013-2020 Signal Messenger, LLC * Copyright 2013 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only * SPDX-License-Identifier: AGPL-3.0-only
*/ */
@ -18,7 +18,6 @@ import io.lettuce.core.RedisFuture;
import io.lettuce.core.ScriptOutputType; import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import java.io.IOException; import java.io.IOException;
@ -86,8 +85,10 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
final ExecutorService keyspaceNotificationExecutorService) throws IOException { final ExecutorService keyspaceNotificationExecutorService) throws IOException {
this.presenceCluster = presenceCluster; this.presenceCluster = presenceCluster;
this.pubSubConnection = this.presenceCluster.createPubSubConnection(); this.pubSubConnection = this.presenceCluster.createPubSubConnection();
this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua", ScriptOutputType.INTEGER); this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua",
this.renewPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/renew_presence.lua", ScriptOutputType.VALUE); ScriptOutputType.INTEGER);
this.renewPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/renew_presence.lua",
ScriptOutputType.VALUE);
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
this.keyspaceNotificationExecutorService = keyspaceNotificationExecutorService; this.keyspaceNotificationExecutorService = keyspaceNotificationExecutorService;
@ -112,9 +113,6 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
public void start() { public void start() {
pubSubConnection.usePubSubConnection(connection -> { pubSubConnection.usePubSubConnection(connection -> {
connection.addListener(this); connection.addListener(this);
connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribe(event -> resubscribeAll());
final String presenceChannel = getManagerPresenceChannel(managerId); final String presenceChannel = getManagerPresenceChannel(managerId);
final int slot = SlotHash.getSlot(presenceChannel); final int slot = SlotHash.getSlot(presenceChannel);
@ -124,6 +122,8 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
.subscribe(presenceChannel); .subscribe(presenceChannel);
}); });
pubSubConnection.subscribeToClusterTopologyChangedEvents(this::resubscribeAll);
presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId)); presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId));
pruneMissingPeersFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> { pruneMissingPeersFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {

View File

@ -10,28 +10,41 @@ import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.Retry;
import io.lettuce.core.RedisException; import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import reactor.core.scheduler.Scheduler;
public class FaultTolerantPubSubConnection<K, V> { public class FaultTolerantPubSubConnection<K, V> {
private static final Logger logger = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class);
private final String name;
private final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection; private final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection;
private final CircuitBreaker circuitBreaker; private final CircuitBreaker circuitBreaker;
private final Retry retry; private final Retry retry;
private final Retry resubscribeRetry;
private final Scheduler topologyChangedEventScheduler;
private final Timer executeTimer; private final Timer executeTimer;
public FaultTolerantPubSubConnection(final String name, public FaultTolerantPubSubConnection(final String name,
final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection, final CircuitBreaker circuitBreaker, final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection, final CircuitBreaker circuitBreaker,
final Retry retry) { final Retry retry, final Retry resubscribeRetry, final Scheduler topologyChangedEventScheduler) {
this.name = name;
this.pubSubConnection = pubSubConnection; this.pubSubConnection = pubSubConnection;
this.circuitBreaker = circuitBreaker; this.circuitBreaker = circuitBreaker;
this.retry = retry; this.retry = retry;
this.resubscribeRetry = resubscribeRetry;
this.topologyChangedEventScheduler = topologyChangedEventScheduler;
this.pubSubConnection.setNodeMessagePropagation(true); this.pubSubConnection.setNodeMessagePropagation(true);
@ -65,4 +78,25 @@ public class FaultTolerantPubSubConnection<K, V> {
} }
} }
} }
public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) {
usePubSubConnection(connection -> connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribeOn(topologyChangedEventScheduler)
.subscribe(event -> {
logger.info("Got topology change event for {}, resubscribing all keyspace notifications", name);
resubscribeRetry.executeRunnable(() -> {
try {
eventHandler.run();
} catch (final RuntimeException e) {
logger.warn("Resubscribe for {} failed", name, e);
throw e;
}
});
}));
}
} }

View File

@ -7,9 +7,11 @@ package org.whispersystems.textsecuregcm.redis;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator;
import io.github.resilience4j.reactor.retry.RetryOperator; import io.github.resilience4j.reactor.retry.RetryOperator;
import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.lettuce.core.ClientOptions.DisconnectedBehavior; import io.lettuce.core.ClientOptions.DisconnectedBehavior;
import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException; import io.lettuce.core.RedisException;
@ -31,6 +33,7 @@ import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
/** /**
* A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed, * A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed,
@ -49,8 +52,10 @@ public class FaultTolerantRedisCluster {
private final CircuitBreaker circuitBreaker; private final CircuitBreaker circuitBreaker;
private final Retry retry; private final Retry retry;
private final Retry topologyChangedEventRetry;
public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, final ClientResources clientResources) { public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration,
final ClientResources clientResources) {
this(name, this(name,
RedisClusterClient.create(clientResources, clusterConfiguration.getConfigurationUri()), RedisClusterClient.create(clientResources, clusterConfiguration.getConfigurationUri()),
clusterConfiguration.getTimeout(), clusterConfiguration.getTimeout(),
@ -59,7 +64,8 @@ public class FaultTolerantRedisCluster {
} }
@VisibleForTesting @VisibleForTesting
FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) { FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout,
final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) {
this.name = name; this.name = name;
this.clusterClient = clusterClient; this.clusterClient = clusterClient;
@ -79,6 +85,13 @@ public class FaultTolerantRedisCluster {
this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig());
this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder() this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder()
.retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); .retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build());
final RetryConfig topologyChangedEventRetryConfig = RetryConfig.custom()
.maxAttempts(Integer.MAX_VALUE)
.intervalFunction(
IntervalFunction.ofExponentialRandomBackoff(Duration.ofSeconds(1), 1.5, Duration.ofSeconds(30)))
.build();
this.topologyChangedEventRetry = Retry.of(name + "-topologyChangedRetry", topologyChangedEventRetryConfig);
CircuitBreakerUtil.registerMetrics(circuitBreaker, FaultTolerantRedisCluster.class); CircuitBreakerUtil.registerMetrics(circuitBreaker, FaultTolerantRedisCluster.class);
CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisCluster.class); CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisCluster.class);
@ -158,6 +171,7 @@ public class FaultTolerantRedisCluster {
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub(); final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub();
pubSubConnections.add(pubSubConnection); pubSubConnections.add(pubSubConnection);
return new FaultTolerantPubSubConnection<>(name, pubSubConnection, circuitBreaker, retry); return new FaultTolerantPubSubConnection<>(name, pubSubConnection, circuitBreaker, retry, topologyChangedEventRetry,
Schedulers.newSingle(name + "-redisPubSubEvents"));
} }
} }

View File

@ -14,7 +14,6 @@ import io.lettuce.core.ScoredValue;
import io.lettuce.core.ScriptOutputType; import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.ZAddArgs; import io.lettuce.core.ZAddArgs;
import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
@ -104,8 +103,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class); private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class);
public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster, public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster,
final Clock clock, final ExecutorService notificationExecutorService, final Scheduler messageDeliveryScheduler, final ExecutorService notificationExecutorService, final Scheduler messageDeliveryScheduler,
final ExecutorService messageDeletionExecutorService) throws IOException { final ExecutorService messageDeletionExecutorService, final Clock clock) throws IOException {
this.readDeleteCluster = readDeleteCluster; this.readDeleteCluster = readDeleteCluster;
this.pubSubConnection = readDeleteCluster.createPubSubConnection(); this.pubSubConnection = readDeleteCluster.createPubSubConnection();
@ -128,12 +127,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
@Override @Override
public void start() { public void start() {
pubSubConnection.usePubSubConnection(connection -> { pubSubConnection.usePubSubConnection(connection -> connection.addListener(this));
connection.addListener(this); pubSubConnection.subscribeToClusterTopologyChangedEvents(this::resubscribeAll);
connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribe(event -> resubscribeAll());
});
} }
@Override @Override
@ -142,7 +137,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
} }
private void resubscribeAll() { private void resubscribeAll() {
logger.info("Got topology change event, resubscribing all keyspace notifications");
final Set<String> queueNames; final Set<String> queueNames;

View File

@ -201,7 +201,7 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster,
Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor); Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor);
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster, MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster,
Clock.systemUTC(), keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionExecutor); keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC());
DirectoryQueue directoryQueue = new DirectoryQueue( DirectoryQueue directoryQueue = new DirectoryQueue(
configuration.getDirectoryConfiguration().getSqsConfiguration()); configuration.getDirectoryConfiguration().getSqsConfiguration());
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);

View File

@ -180,7 +180,7 @@ record CommandDependencies(
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster,
Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor); Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor);
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster, MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster,
Clock.systemUTC(), keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionExecutor); keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC());
DirectoryQueue directoryQueue = new DirectoryQueue( DirectoryQueue directoryQueue = new DirectoryQueue(
configuration.getDirectoryConfiguration().getSqsConfiguration()); configuration.getDirectoryConfiguration().getSqsConfiguration());
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);

View File

@ -42,6 +42,8 @@ class ClientPresenceManagerTest {
private static final DisplacedPresenceListener NO_OP = connectedElsewhere -> { private static final DisplacedPresenceListener NO_OP = connectedElsewhere -> {
}; };
private boolean expectExceptionOnClientPresenceManagerStop = false;
@BeforeEach @BeforeEach
void setUp() throws Exception { void setUp() throws Exception {
@ -61,7 +63,13 @@ class ClientPresenceManagerTest {
presenceRenewalExecutorService.shutdown(); presenceRenewalExecutorService.shutdown();
presenceRenewalExecutorService.awaitTermination(1, TimeUnit.MINUTES); presenceRenewalExecutorService.awaitTermination(1, TimeUnit.MINUTES);
try {
clientPresenceManager.stop(); clientPresenceManager.stop();
} catch (final Exception e) {
if (!expectExceptionOnClientPresenceManagerStop) {
throw e;
}
}
} }
@Test @Test
@ -294,6 +302,8 @@ class ClientPresenceManagerTest {
} }
assertTrue(clientPresenceManager.isPresent(displacedAccountUuid, displacedAccountDeviceId)); assertTrue(clientPresenceManager.isPresent(displacedAccountUuid, displacedAccountDeviceId));
expectExceptionOnClientPresenceManagerStop = true;
} }
@Nested @Nested

View File

@ -8,32 +8,52 @@ package org.whispersystems.textsecuregcm.redis;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException; import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands;
import io.lettuce.core.event.Event;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.resource.ClientResources;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.publisher.TestPublisher;
class FaultTolerantPubSubConnectionTest { class FaultTolerantPubSubConnectionTest {
private StatefulRedisClusterPubSubConnection<String, String> pubSubConnection;
private RedisClusterPubSubCommands<String, String> pubSubCommands; private RedisClusterPubSubCommands<String, String> pubSubCommands;
private FaultTolerantPubSubConnection<String, String> faultTolerantPubSubConnection; private FaultTolerantPubSubConnection<String, String> faultTolerantPubSubConnection;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = mock( pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class);
StatefulRedisClusterPubSubConnection.class);
pubSubCommands = mock(RedisClusterPubSubCommands.class); pubSubCommands = mock(RedisClusterPubSubCommands.class);
@ -47,13 +67,19 @@ class FaultTolerantPubSubConnectionTest {
final RetryConfiguration retryConfiguration = new RetryConfiguration(); final RetryConfiguration retryConfiguration = new RetryConfiguration();
retryConfiguration.setMaxAttempts(3); retryConfiguration.setMaxAttempts(3);
retryConfiguration.setWaitDuration(0); retryConfiguration.setWaitDuration(10);
final CircuitBreaker circuitBreaker = CircuitBreaker.of("test", breakerConfiguration.toCircuitBreakerConfig()); final CircuitBreaker circuitBreaker = CircuitBreaker.of("test", breakerConfiguration.toCircuitBreakerConfig());
final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig()); final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig());
final RetryConfig resubscribeRetryConfiguration = RetryConfig.custom()
.maxAttempts(Integer.MAX_VALUE)
.intervalFunction(IntervalFunction.ofExponentialBackoff(5))
.build();
final Retry resubscribeRetry = Retry.of("test-resubscribe", resubscribeRetryConfiguration);
faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, circuitBreaker, faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, circuitBreaker,
retry); retry, resubscribeRetry, Schedulers.newSingle("test"));
} }
@Test @Test
@ -62,7 +88,8 @@ class FaultTolerantPubSubConnectionTest {
.thenReturn("value") .thenReturn("value")
.thenThrow(new RuntimeException("Badness has ensued.")); .thenThrow(new RuntimeException("Badness has ensued."));
assertEquals("value", faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); assertEquals("value",
faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key")));
assertThrows(RedisException.class, assertThrows(RedisException.class,
() -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO"))); () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO")));
@ -80,7 +107,8 @@ class FaultTolerantPubSubConnectionTest {
.thenThrow(new RedisCommandTimeoutException()) .thenThrow(new RedisCommandTimeoutException())
.thenReturn("value"); .thenReturn("value");
assertEquals("value", faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); assertEquals("value",
faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key")));
when(pubSubCommands.get(anyString())) when(pubSubCommands.get(anyString()))
.thenThrow(new RedisCommandTimeoutException()) .thenThrow(new RedisCommandTimeoutException())
@ -88,6 +116,110 @@ class FaultTolerantPubSubConnectionTest {
.thenThrow(new RedisCommandTimeoutException()) .thenThrow(new RedisCommandTimeoutException())
.thenReturn("value"); .thenReturn("value");
assertThrows(RedisCommandTimeoutException.class, () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); assertThrows(RedisCommandTimeoutException.class,
() -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key")));
}
@Nested
class ClusterTopologyChangedEventTest {
private TestPublisher<Event> eventPublisher;
private Runnable resubscribe;
private AtomicInteger resubscribeCounter;
private CountDownLatch resubscribeFailure;
private CountDownLatch resubscribeSuccess;
@BeforeEach
@SuppressWarnings("unchecked")
void setup() {
// ignore inherited stubbing
reset(pubSubConnection);
eventPublisher = TestPublisher.createCold();
final ClientResources clientResources = mock(ClientResources.class);
when(pubSubConnection.getResources())
.thenReturn(clientResources);
final EventBus eventBus = mock(EventBus.class);
when(clientResources.eventBus())
.thenReturn(eventBus);
final Flux<Event> eventFlux = Flux.from(eventPublisher);
when(eventBus.get()).thenReturn(eventFlux);
resubscribeCounter = new AtomicInteger();
resubscribe = () -> {
try {
resubscribeCounter.incrementAndGet();
pubSubConnection.sync().nodes((ignored) -> true);
resubscribeSuccess.countDown();
} catch (final RuntimeException e) {
resubscribeFailure.countDown();
throw e;
}
};
resubscribeSuccess = new CountDownLatch(1);
resubscribeFailure = new CountDownLatch(1);
}
@SuppressWarnings("unchecked")
@Test
void testSubscribeToClusterTopologyChangedEvents() throws Exception {
when(pubSubConnection.sync())
.thenThrow(new RedisException("Cluster unavailable"));
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList()));
faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(resubscribe);
assertTrue(resubscribeFailure.await(1, TimeUnit.SECONDS));
// simulate cluster recovery - no more exceptions, run the retry
reset(pubSubConnection);
clearInvocations(pubSubCommands);
when(pubSubConnection.sync())
.thenReturn(pubSubCommands);
assertTrue(resubscribeSuccess.await(1, TimeUnit.SECONDS));
assertTrue(resubscribeCounter.get() >= 2, String.format("resubscribe called %d times", resubscribeCounter.get()));
verify(pubSubCommands).nodes(any());
}
@Test
@SuppressWarnings("unchecked")
void testMultipleEventsWithPendingRetries() throws Exception {
// more complicated scenario: multiple events while retries are pending
// cluster is down
when(pubSubConnection.sync())
.thenThrow(new RedisException("Cluster unavailable"));
// publish multiple topology changed events
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList()));
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList()));
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList()));
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList()));
faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(resubscribe);
assertTrue(resubscribeFailure.await(1, TimeUnit.SECONDS));
// simulate cluster recovery - no more exceptions, run the retry
reset(pubSubConnection);
clearInvocations(pubSubCommands);
when(pubSubConnection.sync())
.thenReturn(pubSubCommands);
assertTrue(resubscribeSuccess.await(1, TimeUnit.SECONDS));
verify(pubSubCommands, atLeastOnce()).nodes(any());
} }
} }
}

View File

@ -24,6 +24,7 @@ import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
@ -51,6 +52,7 @@ class MessagePersisterIntegrationTest {
private ExecutorService notificationExecutorService; private ExecutorService notificationExecutorService;
private Scheduler messageDeliveryScheduler; private Scheduler messageDeliveryScheduler;
private ExecutorService messageDeletionExecutorService; private ExecutorService messageDeletionExecutorService;
private ScheduledExecutorService resubscribeRetryExecutorService;
private MessagesCache messagesCache; private MessagesCache messagesCache;
private MessagesManager messagesManager; private MessagesManager messagesManager;
private MessagePersister messagePersister; private MessagePersister messagePersister;
@ -78,9 +80,10 @@ class MessagePersisterIntegrationTest {
final AccountsManager accountsManager = mock(AccountsManager.class); final AccountsManager accountsManager = mock(AccountsManager.class);
notificationExecutorService = Executors.newSingleThreadExecutor(); notificationExecutorService = Executors.newSingleThreadExecutor();
resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor();
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), notificationExecutorService, REDIS_CLUSTER_EXTENSION.getRedisCluster(), notificationExecutorService,
messageDeliveryScheduler, messageDeletionExecutorService); messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC());
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class), messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class),
messageDeletionExecutorService); messageDeletionExecutorService);
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,

View File

@ -30,6 +30,7 @@ import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -50,6 +51,7 @@ class MessagePersisterTest {
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
private ExecutorService sharedExecutorService; private ExecutorService sharedExecutorService;
private ScheduledExecutorService resubscribeRetryExecutorService;
private Scheduler messageDeliveryScheduler; private Scheduler messageDeliveryScheduler;
private MessagesCache messagesCache; private MessagesCache messagesCache;
private MessagesDynamoDb messagesDynamoDb; private MessagesDynamoDb messagesDynamoDb;
@ -79,10 +81,11 @@ class MessagePersisterTest {
when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration());
sharedExecutorService = Executors.newSingleThreadExecutor(); sharedExecutorService = Executors.newSingleThreadExecutor();
resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, messageDeliveryScheduler, REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler,
sharedExecutorService); sharedExecutorService, Clock.systemUTC());
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, PERSIST_DELAY); dynamicConfigurationManager, PERSIST_DELAY);
@ -107,6 +110,8 @@ class MessagePersisterTest {
sharedExecutorService.awaitTermination(1, TimeUnit.SECONDS); sharedExecutorService.awaitTermination(1, TimeUnit.SECONDS);
messageDeliveryScheduler.dispose(); messageDeliveryScheduler.dispose();
resubscribeRetryExecutorService.shutdown();
resubscribeRetryExecutorService.awaitTermination(1, TimeUnit.SECONDS);
} }
@Test @Test

View File

@ -42,6 +42,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -80,10 +81,12 @@ class MessagesCacheTest {
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
private ExecutorService sharedExecutorService; private ExecutorService sharedExecutorService;
private ScheduledExecutorService resubscribeRetryExecutorService;
private Scheduler messageDeliveryScheduler; private Scheduler messageDeliveryScheduler;
private MessagesCache messagesCache; private MessagesCache messagesCache;
private static final UUID DESTINATION_UUID = UUID.randomUUID(); private static final UUID DESTINATION_UUID = UUID.randomUUID();
private static final int DESTINATION_DEVICE_ID = 7; private static final int DESTINATION_DEVICE_ID = 7;
@BeforeEach @BeforeEach
@ -95,10 +98,10 @@ class MessagesCacheTest {
}); });
sharedExecutorService = Executors.newSingleThreadExecutor(); sharedExecutorService = Executors.newSingleThreadExecutor();
resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC());
messageDeliveryScheduler, sharedExecutorService);
messagesCache.start(); messagesCache.start();
} }
@ -111,6 +114,8 @@ class MessagesCacheTest {
sharedExecutorService.awaitTermination(1, TimeUnit.SECONDS); sharedExecutorService.awaitTermination(1, TimeUnit.SECONDS);
messageDeliveryScheduler.dispose(); messageDeliveryScheduler.dispose();
resubscribeRetryExecutorService.shutdown();
resubscribeRetryExecutorService.awaitTermination(1, TimeUnit.SECONDS);
} }
@ParameterizedTest @ParameterizedTest
@ -269,11 +274,7 @@ class MessagesCacheTest {
} }
final MessagesCache messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), final MessagesCache messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, cacheClock);
cacheClock,
sharedExecutorService,
messageDeliveryScheduler,
sharedExecutorService);
final List<MessageProtos.Envelope> actualMessages = Flux.from( final List<MessageProtos.Envelope> actualMessages = Flux.from(
messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID)) messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID))
@ -561,7 +562,6 @@ class MessagesCacheTest {
void setup() throws Exception { void setup() throws Exception {
reactiveCommands = mock(RedisAdvancedClusterReactiveCommands.class); reactiveCommands = mock(RedisAdvancedClusterReactiveCommands.class);
asyncCommands = mock(RedisAdvancedClusterAsyncCommands.class); asyncCommands = mock(RedisAdvancedClusterAsyncCommands.class);
final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.builder() final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.builder()
.binaryReactiveCommands(reactiveCommands) .binaryReactiveCommands(reactiveCommands)
.binaryAsyncCommands(asyncCommands) .binaryAsyncCommands(asyncCommands)
@ -569,8 +569,8 @@ class MessagesCacheTest {
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(mockCluster, mockCluster, Clock.systemUTC(), mock(ExecutorService.class), messagesCache = new MessagesCache(mockCluster, mockCluster, mock(ExecutorService.class),
messageDeliveryScheduler, Executors.newSingleThreadExecutor()); messageDeliveryScheduler, Executors.newSingleThreadExecutor(), Clock.systemUTC());
} }
@AfterEach @AfterEach

View File

@ -71,13 +71,13 @@ class WebSocketConnectionIntegrationTest {
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
private ExecutorService sharedExecutorService; private ExecutorService sharedExecutorService;
private ScheduledExecutorService scheduledExecutorService;
private MessagesDynamoDb messagesDynamoDb; private MessagesDynamoDb messagesDynamoDb;
private MessagesCache messagesCache; private MessagesCache messagesCache;
private ReportMessageManager reportMessageManager; private ReportMessageManager reportMessageManager;
private Account account; private Account account;
private Device device; private Device device;
private WebSocketClient webSocketClient; private WebSocketClient webSocketClient;
private ScheduledExecutorService retrySchedulingExecutor;
private Scheduler messageDeliveryScheduler; private Scheduler messageDeliveryScheduler;
private long serialTimestamp = System.currentTimeMillis(); private long serialTimestamp = System.currentTimeMillis();
@ -86,10 +86,10 @@ class WebSocketConnectionIntegrationTest {
void setUp() throws Exception { void setUp() throws Exception {
sharedExecutorService = Executors.newSingleThreadExecutor(); sharedExecutorService = Executors.newSingleThreadExecutor();
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, messageDeliveryScheduler, REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC());
sharedExecutorService);
messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(),
dynamoDbExtension.getDynamoDbAsyncClient(), MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(7), dynamoDbExtension.getDynamoDbAsyncClient(), MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(7),
sharedExecutorService); sharedExecutorService);
@ -97,7 +97,6 @@ class WebSocketConnectionIntegrationTest {
account = mock(Account.class); account = mock(Account.class);
device = mock(Device.class); device = mock(Device.class);
webSocketClient = mock(WebSocketClient.class); webSocketClient = mock(WebSocketClient.class);
retrySchedulingExecutor = Executors.newSingleThreadScheduledExecutor();
when(account.getNumber()).thenReturn("+18005551234"); when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID()); when(account.getUuid()).thenReturn(UUID.randomUUID());
@ -109,8 +108,8 @@ class WebSocketConnectionIntegrationTest {
sharedExecutorService.shutdown(); sharedExecutorService.shutdown();
sharedExecutorService.awaitTermination(2, TimeUnit.SECONDS); sharedExecutorService.awaitTermination(2, TimeUnit.SECONDS);
retrySchedulingExecutor.shutdown(); scheduledExecutorService.shutdown();
retrySchedulingExecutor.awaitTermination(2, TimeUnit.SECONDS); scheduledExecutorService.awaitTermination(2, TimeUnit.SECONDS);
} }
@ParameterizedTest @ParameterizedTest
@ -126,7 +125,7 @@ class WebSocketConnectionIntegrationTest {
new AuthenticatedAccount(() -> new Pair<>(account, device)), new AuthenticatedAccount(() -> new Pair<>(account, device)),
device, device,
webSocketClient, webSocketClient,
retrySchedulingExecutor, scheduledExecutorService,
messageDeliveryScheduler); messageDeliveryScheduler);
final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount); final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount);
@ -210,7 +209,7 @@ class WebSocketConnectionIntegrationTest {
new AuthenticatedAccount(() -> new Pair<>(account, device)), new AuthenticatedAccount(() -> new Pair<>(account, device)),
device, device,
webSocketClient, webSocketClient,
retrySchedulingExecutor, scheduledExecutorService,
messageDeliveryScheduler); messageDeliveryScheduler);
final int persistedMessageCount = 207; final int persistedMessageCount = 207;
@ -276,7 +275,7 @@ class WebSocketConnectionIntegrationTest {
device, device,
webSocketClient, webSocketClient,
100, // use a very short timeout, so that this test completes quickly 100, // use a very short timeout, so that this test completes quickly
retrySchedulingExecutor, scheduledExecutorService,
messageDeliveryScheduler); messageDeliveryScheduler);
final int persistedMessageCount = 207; final int persistedMessageCount = 207;