Only respond to cluster toplogy events from the same cluster to which we're subscribed

This commit is contained in:
Jon Chambers 2024-11-05 11:11:02 -05:00 committed by Jon Chambers
parent 5afcd634b6
commit 60cdcf5f0c
2 changed files with 118 additions and 97 deletions

View File

@ -35,7 +35,18 @@ public class FaultTolerantPubSubClusterConnection<K, V> extends AbstractFaultTol
public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) { public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) {
usePubSubConnection(connection -> connection.getResources().eventBus().get() usePubSubConnection(connection -> connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent) .filter(event -> {
// If we use shared `ClientResources` for multiple clients, we may receive topology change events for clusters
// other than our own. Filter for clusters that have at least one node in common with our current view of our
// partitions.
if (event instanceof ClusterTopologyChangedEvent clusterTopologyChangedEvent) {
return withPubSubConnection(c -> c.getPartitions().stream().anyMatch(redisClusterNode ->
clusterTopologyChangedEvent.before().contains(redisClusterNode) ||
clusterTopologyChangedEvent.after().contains(redisClusterNode)));
}
return false;
})
.subscribeOn(topologyChangedEventScheduler) .subscribeOn(topologyChangedEventScheduler)
.subscribe(event -> { .subscribe(event -> {
logger.info("Got topology change event for {}, resubscribing all keyspace notifications", getName()); logger.info("Got topology change event for {}, resubscribing all keyspace notifications", getName());

View File

@ -5,6 +5,7 @@
package org.whispersystems.textsecuregcm.redis; package org.whispersystems.textsecuregcm.redis;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
@ -19,17 +20,18 @@ import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig; import io.github.resilience4j.retry.RetryConfig;
import io.lettuce.core.RedisException; import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands;
import io.lettuce.core.event.Event; import io.lettuce.core.event.Event;
import io.lettuce.core.event.EventBus; import io.lettuce.core.event.EventBus;
import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.ClientResources;
import java.util.Collections; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -42,15 +44,31 @@ class FaultTolerantPubSubClusterConnectionTest {
private RedisClusterPubSubCommands<String, String> pubSubCommands; private RedisClusterPubSubCommands<String, String> pubSubCommands;
private FaultTolerantPubSubClusterConnection<String, String> faultTolerantPubSubConnection; private FaultTolerantPubSubClusterConnection<String, String> faultTolerantPubSubConnection;
private TestPublisher<Event> eventPublisher;
private Runnable resubscribe;
private AtomicInteger resubscribeCounter;
private CountDownLatch resubscribeFailure;
private CountDownLatch resubscribeSuccess;
private RedisClusterNode nodeInCluster;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class); pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class);
pubSubCommands = mock(RedisClusterPubSubCommands.class); pubSubCommands = mock(RedisClusterPubSubCommands.class);
nodeInCluster = mock(RedisClusterNode.class);
final ClientResources clientResources = mock(ClientResources.class);
final Partitions partitions = new Partitions();
partitions.add(nodeInCluster);
when(pubSubConnection.sync()).thenReturn(pubSubCommands); when(pubSubConnection.sync()).thenReturn(pubSubCommands);
when(pubSubConnection.getResources()).thenReturn(clientResources);
when(pubSubConnection.getPartitions()).thenReturn(partitions);
final RetryConfiguration retryConfiguration = new RetryConfiguration(); final RetryConfiguration retryConfiguration = new RetryConfiguration();
retryConfiguration.setMaxAttempts(3); retryConfiguration.setMaxAttempts(3);
@ -64,33 +82,11 @@ class FaultTolerantPubSubClusterConnectionTest {
faultTolerantPubSubConnection = new FaultTolerantPubSubClusterConnection<>("test", pubSubConnection, faultTolerantPubSubConnection = new FaultTolerantPubSubClusterConnection<>("test", pubSubConnection,
resubscribeRetry, Schedulers.newSingle("test")); resubscribeRetry, Schedulers.newSingle("test"));
}
@Nested
class ClusterTopologyChangedEventTest {
private TestPublisher<Event> eventPublisher;
private Runnable resubscribe;
private AtomicInteger resubscribeCounter;
private CountDownLatch resubscribeFailure;
private CountDownLatch resubscribeSuccess;
@BeforeEach
@SuppressWarnings("unchecked")
void setup() {
// ignore inherited stubbing
reset(pubSubConnection);
eventPublisher = TestPublisher.createCold(); eventPublisher = TestPublisher.createCold();
final ClientResources clientResources = mock(ClientResources.class);
when(pubSubConnection.getResources())
.thenReturn(clientResources);
final EventBus eventBus = mock(EventBus.class); final EventBus eventBus = mock(EventBus.class);
when(clientResources.eventBus()) when(clientResources.eventBus()).thenReturn(eventBus);
.thenReturn(eventBus);
final Flux<Event> eventFlux = Flux.from(eventPublisher); final Flux<Event> eventFlux = Flux.from(eventPublisher);
when(eventBus.get()).thenReturn(eventFlux); when(eventBus.get()).thenReturn(eventFlux);
@ -119,7 +115,7 @@ class FaultTolerantPubSubClusterConnectionTest {
when(pubSubConnection.sync()) when(pubSubConnection.sync())
.thenThrow(new RedisException("Cluster unavailable")); .thenThrow(new RedisException("Cluster unavailable"));
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); eventPublisher.next(new ClusterTopologyChangedEvent(List.of(nodeInCluster), List.of(nodeInCluster)));
faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(resubscribe); faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(resubscribe);
@ -137,6 +133,19 @@ class FaultTolerantPubSubClusterConnectionTest {
verify(pubSubCommands).nodes(any()); verify(pubSubCommands).nodes(any());
} }
@Test
void testFilterClusterTopologyChangeEvents() throws InterruptedException {
final CountDownLatch topologyEventLatch = new CountDownLatch(1);
faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(topologyEventLatch::countDown);
final RedisClusterNode nodeFromDifferentCluster = mock(RedisClusterNode.class);
eventPublisher.next(new ClusterTopologyChangedEvent(List.of(nodeFromDifferentCluster), List.of(nodeFromDifferentCluster)));
assertFalse(topologyEventLatch.await(1, TimeUnit.SECONDS));
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
void testMultipleEventsWithPendingRetries() throws Exception { void testMultipleEventsWithPendingRetries() throws Exception {
@ -147,10 +156,13 @@ class FaultTolerantPubSubClusterConnectionTest {
.thenThrow(new RedisException("Cluster unavailable")); .thenThrow(new RedisException("Cluster unavailable"));
// publish multiple topology changed events // publish multiple topology changed events
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); final ClusterTopologyChangedEvent clusterTopologyChangedEvent =
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); new ClusterTopologyChangedEvent(List.of(nodeInCluster), List.of(nodeInCluster));
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList()));
eventPublisher.next(new ClusterTopologyChangedEvent(Collections.emptyList(), Collections.emptyList())); eventPublisher.next(clusterTopologyChangedEvent);
eventPublisher.next(clusterTopologyChangedEvent);
eventPublisher.next(clusterTopologyChangedEvent);
eventPublisher.next(clusterTopologyChangedEvent);
faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(resubscribe); faultTolerantPubSubConnection.subscribeToClusterTopologyChangedEvents(resubscribe);
@ -166,6 +178,4 @@ class FaultTolerantPubSubClusterConnectionTest {
verify(pubSubCommands, atLeastOnce()).nodes(any()); verify(pubSubCommands, atLeastOnce()).nodes(any());
} }
}
} }