Use lots of specific subscriptions instead of one monster subscription to minimize load.

This commit is contained in:
Jon Chambers 2020-08-19 12:55:00 -04:00 committed by Jon Chambers
parent 56b27ea785
commit 71c0056c66
1 changed files with 41 additions and 14 deletions

View File

@ -25,10 +25,12 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@ -62,8 +64,8 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot";
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);
private static final String QUEUE_KEYSPACE_PATTERN = "__keyspace@0__:user_queue::*";
private static final String PERSISTING_KEYSPACE_PATTERN = "__keyspace@0__:user_queue_persisting::*";
private static final String QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue::";
private static final String PERSISTING_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_persisting::";
private static final String REMOVE_TIMER_NAME = name(RedisClusterMessagesCache.class, "remove");
@ -94,16 +96,22 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.handle((event, sink) -> {
subscribeForKeyspaceNotifications();
resubscribeAll();
sink.next(event);
});
});
subscribeForKeyspaceNotifications();
}
private void subscribeForKeyspaceNotifications() {
pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().psubscribe(QUEUE_KEYSPACE_PATTERN, PERSISTING_KEYSPACE_PATTERN));
private void resubscribeAll() {
final Set<String> queueNames;
synchronized (messageListenersByQueueName) {
queueNames = new HashSet<>(messageListenersByQueueName.keySet());
}
for (final String queueName : queueNames) {
subscribeForKeyspaceNotifications(queueName);
}
}
@Override
@ -300,26 +308,45 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
messageListenersByQueueName.put(queueName, listener);
queueNamesByMessageListener.put(listener, queueName);
}
subscribeForKeyspaceNotifications(queueName);
}
public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) {
synchronized (messageListenersByQueueName) {
final String queueName = queueNamesByMessageListener.remove(listener);
final String queueName = queueNamesByMessageListener.remove(listener);
unsubscribeFromKeyspaceNotifications(queueName);
synchronized (messageListenersByQueueName) {
if (queueName != null) {
messageListenersByQueueName.remove(queueName);
}
}
}
@Override
public void message(final RedisClusterNode node, final String pattern, final String channel, final String message) {
pubSubMessageCounter.increment();
private void subscribeForKeyspaceNotifications(final String queueName) {
final int slot = SlotHash.getSlot(queueName);
if (QUEUE_KEYSPACE_PATTERN.equals(pattern) && "zadd".equals(message)) {
pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot))
.commands()
.subscribe(QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}",
PERSISTING_KEYSPACE_PREFIX + "{" + queueName + "}"));
}
private void unsubscribeFromKeyspaceNotifications(final String queueName) {
pubSubConnection.usePubSubConnection(connection -> connection.sync().masters()
.commands()
.unsubscribe(QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}",
PERSISTING_KEYSPACE_PREFIX + "{" + queueName + "}"));
}
@Override
public void message(final RedisClusterNode node, final String channel, final String message) {
pubSubMessageCounter.increment();
if (channel.startsWith(QUEUE_KEYSPACE_PREFIX) && "zadd".equals(message)) {
newMessageNotificationCounter.increment();
notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable));
} else if (PERSISTING_KEYSPACE_PATTERN.equals(pattern) && "del".equals(message)) {
} else if (channel.startsWith(PERSISTING_KEYSPACE_PREFIX) && "del".equals(message)) {
queuePersistedNotificationCounter.increment();
notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted));
}