From 71c0056c66e9148aae7ff5337cf4217a78a7d9e3 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 19 Aug 2020 12:55:00 -0400 Subject: [PATCH] Use lots of specific subscriptions instead of one monster subscription to minimize load. --- .../storage/RedisClusterMessagesCache.java | 55 ++++++++++++++----- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java index e577d2364..2e6987d0f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java @@ -25,10 +25,12 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -62,8 +64,8 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter event instanceof ClusterTopologyChangedEvent) .handle((event, sink) -> { - subscribeForKeyspaceNotifications(); + resubscribeAll(); sink.next(event); }); }); - - subscribeForKeyspaceNotifications(); } - private void subscribeForKeyspaceNotifications() { - pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().psubscribe(QUEUE_KEYSPACE_PATTERN, PERSISTING_KEYSPACE_PATTERN)); + private void resubscribeAll() { + final Set queueNames; + + synchronized (messageListenersByQueueName) { + queueNames = new HashSet<>(messageListenersByQueueName.keySet()); + } + + for (final String queueName : queueNames) { + subscribeForKeyspaceNotifications(queueName); + } } @Override @@ -300,26 +308,45 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)) + .commands() + .subscribe(QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}", + PERSISTING_KEYSPACE_PREFIX + "{" + queueName + "}")); + } + + private void unsubscribeFromKeyspaceNotifications(final String queueName) { + pubSubConnection.usePubSubConnection(connection -> connection.sync().masters() + .commands() + .unsubscribe(QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}", + PERSISTING_KEYSPACE_PREFIX + "{" + queueName + "}")); + } + + @Override + public void message(final RedisClusterNode node, final String channel, final String message) { + pubSubMessageCounter.increment(); + if (channel.startsWith(QUEUE_KEYSPACE_PREFIX) && "zadd".equals(message)) { newMessageNotificationCounter.increment(); notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable)); - } else if (PERSISTING_KEYSPACE_PATTERN.equals(pattern) && "del".equals(message)) { + } else if (channel.startsWith(PERSISTING_KEYSPACE_PREFIX) && "del".equals(message)) { queuePersistedNotificationCounter.increment(); notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted)); }