From 3a847759122c83340d21d19bf56e74507c02cc25 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Tue, 13 Oct 2020 13:51:05 -0400 Subject: [PATCH] Log cluster topology change events, too. --- .../textsecuregcm/redis/FaultTolerantRedisCluster.java | 3 +++ .../redis/FaultTolerantRedisClusterTest.java | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 63af9cbee..408635cb2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -10,6 +10,7 @@ import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.codec.ByteArrayCodec; import io.lettuce.core.event.connection.ConnectionEvent; @@ -70,6 +71,8 @@ public class FaultTolerantRedisCluster { this.clusterClient.getResources().eventBus().get().subscribe(event -> { if (event instanceof ConnectionEvent) { log.info("Connection event for {}: {}", this.name, event); + } else if (event instanceof ClusterTopologyChangedEvent) { + log.info("Cluster topology for {} changed: {}", this.name, event); } }); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java index 8591348d8..cf0df9046 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java @@ -7,10 +7,13 @@ import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import io.lettuce.core.event.EventBus; +import io.lettuce.core.resource.ClientResources; import org.junit.Before; import org.junit.Test; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; +import reactor.core.publisher.Flux; import java.time.Duration; @@ -30,12 +33,17 @@ public class FaultTolerantRedisClusterTest { final RedisClusterClient clusterClient = mock(RedisClusterClient.class); final StatefulRedisClusterConnection clusterConnection = mock(StatefulRedisClusterConnection.class); final StatefulRedisClusterPubSubConnection pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class); + final ClientResources clientResources = mock(ClientResources.class); + final EventBus eventBus = mock(EventBus.class); clusterCommands = mock(RedisAdvancedClusterCommands.class); when(clusterClient.connect()).thenReturn(clusterConnection); when(clusterClient.connectPubSub()).thenReturn(pubSubConnection); + when(clusterClient.getResources()).thenReturn(clientResources); when(clusterConnection.sync()).thenReturn(clusterCommands); + when(clientResources.eventBus()).thenReturn(eventBus); + when(eventBus.get()).thenReturn(mock(Flux.class)); final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); breakerConfiguration.setFailureRateThreshold(100);