Log cluster topology change events, too.

This commit is contained in:
Jon Chambers 2020-10-13 13:51:05 -04:00 committed by Jon Chambers
parent 290a82e61c
commit 3a84775912
2 changed files with 11 additions and 0 deletions

View File

@ -10,6 +10,7 @@ import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.event.connection.ConnectionEvent;
@ -70,6 +71,8 @@ public class FaultTolerantRedisCluster {
this.clusterClient.getResources().eventBus().get().subscribe(event -> {
if (event instanceof ConnectionEvent) {
log.info("Connection event for {}: {}", this.name, event);
} else if (event instanceof ClusterTopologyChangedEvent) {
log.info("Cluster topology for {} changed: {}", this.name, event);
}
});

View File

@ -7,10 +7,13 @@ import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.resource.ClientResources;
import org.junit.Before;
import org.junit.Test;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import reactor.core.publisher.Flux;
import java.time.Duration;
@ -30,12 +33,17 @@ public class FaultTolerantRedisClusterTest {
final RedisClusterClient clusterClient = mock(RedisClusterClient.class);
final StatefulRedisClusterConnection<String, String> clusterConnection = mock(StatefulRedisClusterConnection.class);
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class);
final ClientResources clientResources = mock(ClientResources.class);
final EventBus eventBus = mock(EventBus.class);
clusterCommands = mock(RedisAdvancedClusterCommands.class);
when(clusterClient.connect()).thenReturn(clusterConnection);
when(clusterClient.connectPubSub()).thenReturn(pubSubConnection);
when(clusterClient.getResources()).thenReturn(clientResources);
when(clusterConnection.sync()).thenReturn(clusterCommands);
when(clientResources.eventBus()).thenReturn(eventBus);
when(eventBus.get()).thenReturn(mock(Flux.class));
final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration();
breakerConfiguration.setFailureRateThreshold(100);