From ae0f8df11b3b8e4ab44844ae6edd9cba3f69dd22 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Fri, 14 Aug 2020 11:14:23 -0400 Subject: [PATCH] Break out FaultTolerantPubSubConnection as its own thing so different use cases can have their own subscription space. --- .../push/ClientPresenceManager.java | 25 ++++++--- .../redis/FaultTolerantPubSubConnection.java | 36 +++++++++++++ .../redis/FaultTolerantRedisCluster.java | 44 +++++++-------- .../storage/RedisClusterMessagesCache.java | 23 ++++---- .../push/ClientPresenceManagerTest.java | 5 +- .../FaultTolerantPubSubConnectionTest.java | 53 +++++++++++++++++++ .../redis/FaultTolerantRedisClusterTest.java | 22 +------- 7 files changed, 146 insertions(+), 62 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java index 7d29b868f..38078af20 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java @@ -15,6 +15,7 @@ import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; +import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.util.Constants; @@ -42,8 +43,10 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter pubSubConnection; + + private final ClusterLuaScript clearPresenceScript; private final ScheduledExecutorService scheduledExecutorService; private ScheduledFuture pruneMissingPeersFuture; @@ -65,8 +68,9 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter displacementListenersByPresenceKey::size); @@ -79,9 +83,14 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter getPubSubConnection() { + return pubSubConnection; + } + @Override public void start() { - presenceCluster.usePubSubConnection(connection -> { + pubSubConnection.usePubSubConnection(connection -> { connection.addListener(this); connection.getResources().eventBus().get() .filter(event -> event instanceof ClusterTopologyChangedEvent) @@ -103,7 +112,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.removeListener(this)); + pubSubConnection.usePubSubConnection(connection -> connection.removeListener(this)); if (pruneMissingPeersFuture != null) { pruneMissingPeersFuture.cancel(false); @@ -118,7 +127,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.sync().masters().commands().unsubscribe(getManagerPresenceChannel(managerId))); + pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getManagerPresenceChannel(managerId))); } public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) { @@ -175,7 +184,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)) + pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)) .commands() .subscribe(getKeyspaceNotificationChannel(presenceKey))); } @@ -187,7 +196,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.async().masters().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey))); + pubSubConnection.usePubSubConnection(connection -> connection.async().masters().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey))); } void pruneMissingPeers() { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java new file mode 100644 index 000000000..41fd7f442 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnection.java @@ -0,0 +1,36 @@ +package org.whispersystems.textsecuregcm.redis; + +import com.codahale.metrics.SharedMetricRegistries; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; +import org.whispersystems.textsecuregcm.util.Constants; + +import java.util.function.Consumer; +import java.util.function.Function; + +public class FaultTolerantPubSubConnection { + + private final StatefulRedisClusterPubSubConnection pubSubConnection; + private final CircuitBreaker circuitBreaker; + + public FaultTolerantPubSubConnection(final String name, final StatefulRedisClusterPubSubConnection pubSubConnection, final CircuitBreakerConfiguration circuitBreakerConfiguration) { + this.pubSubConnection = pubSubConnection; + this.circuitBreaker = CircuitBreaker.of(name + "-pubsub", circuitBreakerConfiguration.toCircuitBreakerConfig()); + + CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), + this.circuitBreaker, + FaultTolerantRedisCluster.class); + + this.pubSubConnection.setNodeMessagePropagation(true); + } + + public void usePubSubConnection(final Consumer> consumer) { + this.circuitBreaker.executeRunnable(() -> consumer.accept(pubSubConnection)); + } + + public T withPubSubConnection(final Function, T> consumer) { + return this.circuitBreaker.executeSupplier(() -> consumer.apply(pubSubConnection)); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 888aec501..ae6269d11 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -13,6 +13,7 @@ import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.Constants; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; import java.util.function.Function; @@ -25,15 +26,18 @@ import java.util.stream.Collectors; */ public class FaultTolerantRedisCluster { + private final String name; + private final RedisClusterClient clusterClient; - private final StatefulRedisClusterConnection stringClusterConnection; - private final StatefulRedisClusterConnection binaryClusterConnection; - private final StatefulRedisClusterPubSubConnection pubSubClusterConnection; + private final StatefulRedisClusterConnection stringClusterConnection; + private final StatefulRedisClusterConnection binaryClusterConnection; - private final CircuitBreaker readCircuitBreaker; - private final CircuitBreaker writeCircuitBreaker; - private final CircuitBreaker pubSubCircuitBreaker; + private final List> pubSubConnections = new ArrayList<>(); + + private final CircuitBreakerConfiguration circuitBreakerConfiguration; + private final CircuitBreaker readCircuitBreaker; + private final CircuitBreaker writeCircuitBreaker; public FaultTolerantRedisCluster(final String name, final List urls, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) { this(name, RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())), timeout, circuitBreakerConfiguration); @@ -41,17 +45,17 @@ public class FaultTolerantRedisCluster { @VisibleForTesting FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) { + this.name = name; + this.clusterClient = clusterClient; this.clusterClient.setDefaultTimeout(timeout); this.stringClusterConnection = clusterClient.connect(); this.binaryClusterConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); - this.pubSubClusterConnection = clusterClient.connectPubSub(); - this.readCircuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig()); - this.writeCircuitBreaker = CircuitBreaker.of(name + "-write", circuitBreakerConfiguration.toCircuitBreakerConfig()); - this.pubSubCircuitBreaker = CircuitBreaker.of(name + "-pubsub", circuitBreakerConfiguration.toCircuitBreakerConfig()); - this.pubSubClusterConnection.setNodeMessagePropagation(true); + this.circuitBreakerConfiguration = circuitBreakerConfiguration; + this.readCircuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig()); + this.writeCircuitBreaker = CircuitBreaker.of(name + "-write", circuitBreakerConfiguration.toCircuitBreakerConfig()); CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), readCircuitBreaker, @@ -60,16 +64,15 @@ public class FaultTolerantRedisCluster { CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), writeCircuitBreaker, FaultTolerantRedisCluster.class); - - CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), - pubSubCircuitBreaker, - FaultTolerantRedisCluster.class); } void shutdown() { stringClusterConnection.close(); binaryClusterConnection.close(); - pubSubClusterConnection.close(); + + for (final StatefulRedisClusterPubSubConnection pubSubConnection : pubSubConnections) { + pubSubConnection.close(); + } clusterClient.shutdown(); } @@ -106,11 +109,10 @@ public class FaultTolerantRedisCluster { return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(binaryClusterConnection)); } - public void usePubSubConnection(final Consumer> consumer) { - this.pubSubCircuitBreaker.executeRunnable(() -> consumer.accept(pubSubClusterConnection)); - } + public FaultTolerantPubSubConnection createPubSubConnection() { + final StatefulRedisClusterPubSubConnection pubSubConnection = clusterClient.connectPubSub(); + pubSubConnections.add(pubSubConnection); - public T withPubSubConnection(final Function, T> consumer) { - return this.pubSubCircuitBreaker.executeSupplier(() -> consumer.apply(pubSubClusterConnection)); + return new FaultTolerantPubSubConnection<>(name, pubSubConnection, circuitBreakerConfiguration); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java index f9b8aa7fd..9a8650667 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java @@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; +import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.util.RedisClusterUtil; @@ -33,8 +34,10 @@ import static com.codahale.metrics.MetricRegistry.name; public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter implements UserMessagesCache { - private final FaultTolerantRedisCluster redisCluster; - private final ExecutorService notificationExecutorService; + private final FaultTolerantRedisCluster redisCluster; + private final FaultTolerantPubSubConnection pubSubConnection; + + private final ExecutorService notificationExecutorService; private final ClusterLuaScript insertScript; private final ClusterLuaScript removeByIdScript; @@ -67,7 +70,9 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter { + pubSubConnection.usePubSubConnection(connection -> { connection.addListener(this); connection.getResources().eventBus().get() .filter(event -> event instanceof ClusterTopologyChangedEvent) .handle((event, sink) -> { - resubscribeAll(); + subscribeForKeyspaceNotifications(); sink.next(event); }); - - connection.sync().masters().commands().psubscribe(QUEUE_KEYSPACE_PATTERN, PERSISTING_KEYSPACE_PATTERN); }); + + subscribeForKeyspaceNotifications(); } - private void resubscribeAll() { - redisCluster.usePubSubConnection(connection -> connection.sync().masters().commands().psubscribe(QUEUE_KEYSPACE_PATTERN, PERSISTING_KEYSPACE_PATTERN)); + private void subscribeForKeyspaceNotifications() { + pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().psubscribe(QUEUE_KEYSPACE_PATTERN, PERSISTING_KEYSPACE_PATTERN)); } @Override diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java index d72f4de4c..c0c224fbb 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java @@ -123,7 +123,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { } }); - getRedisCluster().usePubSubConnection(connection -> connection.getResources().eventBus().publish(new ClusterTopologyChangedEvent(List.of(), List.of()))); + clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.getResources().eventBus().publish(new ClusterTopologyChangedEvent(List.of(), List.of()))); getRedisCluster().useWriteCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), UUID.randomUUID().toString())); @@ -170,8 +170,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { addClientPresence(missingPeerId); } - getRedisCluster().usePubSubConnection(connection -> connection.sync().masters().commands().subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId))); - + clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().masters().commands().subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId))); clientPresenceManager.pruneMissingPeers(); assertEquals(1, (long)getRedisCluster().withWriteCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId)))); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java new file mode 100644 index 000000000..5449cc82a --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java @@ -0,0 +1,53 @@ +package org.whispersystems.textsecuregcm.redis; + +import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; +import io.lettuce.core.RedisException; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; +import org.junit.Before; +import org.junit.Test; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class FaultTolerantPubSubConnectionTest { + + private RedisClusterPubSubCommands pubSubCommands; + private FaultTolerantPubSubConnection faultTolerantPubSubConnection; + + @SuppressWarnings("unchecked") + @Before + public void setUp() { + final StatefulRedisClusterPubSubConnection pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class); + + pubSubCommands = mock(RedisClusterPubSubCommands.class); + + when(pubSubConnection.sync()).thenReturn(pubSubCommands); + + final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); + breakerConfiguration.setFailureRateThreshold(100); + breakerConfiguration.setRingBufferSizeInClosedState(1); + breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); + + faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, breakerConfiguration); + } + + @Test + public void testBreaker() { + when(pubSubCommands.get(anyString())) + .thenReturn("value") + .thenThrow(new io.lettuce.core.RedisException("Badness has ensued.")); + + assertEquals("value", faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("key"))); + + assertThrows(RedisException.class, + () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO"))); + + assertThrows(CircuitBreakerOpenException.class, + () -> faultTolerantPubSubConnection.withPubSubConnection(connection -> connection.sync().get("OH NO"))); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java index b437c5437..6905dd07f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java @@ -6,7 +6,6 @@ import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; -import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; import org.junit.Before; import org.junit.Test; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; @@ -21,9 +20,7 @@ import static org.mockito.Mockito.when; public class FaultTolerantRedisClusterTest { private RedisAdvancedClusterCommands clusterCommands; - private RedisClusterPubSubCommands pubSubCommands; - - private FaultTolerantRedisCluster faultTolerantCluster; + private FaultTolerantRedisCluster faultTolerantCluster; @SuppressWarnings("unchecked") @Before @@ -33,12 +30,10 @@ public class FaultTolerantRedisClusterTest { final StatefulRedisClusterPubSubConnection pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class); clusterCommands = mock(RedisAdvancedClusterCommands.class); - pubSubCommands = mock(RedisClusterPubSubCommands.class); when(clusterClient.connect()).thenReturn(clusterConnection); when(clusterClient.connectPubSub()).thenReturn(pubSubConnection); when(clusterConnection.sync()).thenReturn(clusterCommands); - when(pubSubConnection.sync()).thenReturn(pubSubCommands); final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); breakerConfiguration.setFailureRateThreshold(100); @@ -92,19 +87,4 @@ public class FaultTolerantRedisClusterTest { assertThrows(CircuitBreakerOpenException.class, () -> faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("OH NO"))); } - - @Test - public void testPubSubBreaker() { - when(pubSubCommands.publish(anyString(), anyString())) - .thenReturn(1L) - .thenThrow(new RedisException("Badness has ensued.")); - - assertEquals(1L, (long)faultTolerantCluster.withPubSubConnection(connection -> connection.sync().publish("channel", "message"))); - - assertThrows(RedisException.class, - () -> faultTolerantCluster.withPubSubConnection(connection -> connection.sync().publish("channel", "OH NO"))); - - assertThrows(CircuitBreakerOpenException.class, - () -> faultTolerantCluster.withPubSubConnection(connection -> connection.sync().publish("channel", "OH NO"))); - } }