diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 63af9cbee..408635cb2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -10,6 +10,7 @@ import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.codec.ByteArrayCodec; import io.lettuce.core.event.connection.ConnectionEvent; @@ -70,6 +71,8 @@ public class FaultTolerantRedisCluster { this.clusterClient.getResources().eventBus().get().subscribe(event -> { if (event instanceof ConnectionEvent) { log.info("Connection event for {}: {}", this.name, event); + } else if (event instanceof ClusterTopologyChangedEvent) { + log.info("Cluster topology for {} changed: {}", this.name, event); } }); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java index 8591348d8..cf0df9046 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java @@ -7,10 +7,13 @@ import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import io.lettuce.core.event.EventBus; +import io.lettuce.core.resource.ClientResources; import org.junit.Before; import org.junit.Test; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; +import reactor.core.publisher.Flux; import java.time.Duration; @@ -30,12 +33,17 @@ public class FaultTolerantRedisClusterTest { final RedisClusterClient clusterClient = mock(RedisClusterClient.class); final StatefulRedisClusterConnection clusterConnection = mock(StatefulRedisClusterConnection.class); final StatefulRedisClusterPubSubConnection pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class); + final ClientResources clientResources = mock(ClientResources.class); + final EventBus eventBus = mock(EventBus.class); clusterCommands = mock(RedisAdvancedClusterCommands.class); when(clusterClient.connect()).thenReturn(clusterConnection); when(clusterClient.connectPubSub()).thenReturn(pubSubConnection); + when(clusterClient.getResources()).thenReturn(clientResources); when(clusterConnection.sync()).thenReturn(clusterCommands); + when(clientResources.eventBus()).thenReturn(eventBus); + when(eventBus.get()).thenReturn(mock(Flux.class)); final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); breakerConfiguration.setFailureRateThreshold(100);