Use separate clusters for message cache read/write operations.

This commit is contained in:
Jon Chambers 2021-01-15 13:18:04 -05:00 committed by Jon Chambers
parent efb2a1d913
commit 9c53d818f4
7 changed files with 32 additions and 28 deletions

View File

@ -285,7 +285,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ConnectionEventLogger.logConnectionEvents(redisClusterClientResources);
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messagesInsertCluster = new FaultTolerantRedisCluster("message_insert_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messageReadDeleteCluster = new FaultTolerantRedisCluster("message_read_delete_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration(), redisClusterClientResources);
BlockingQueue<Runnable> keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(10_000);
@ -296,14 +297,14 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build();
ExecutorService gcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "gcmSender-%d")).maxThreads(1).minThreads(1).build();
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, recurringJobExecutor, keyspaceNotificationDispatchExecutor);
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesInsertCluster, recurringJobExecutor, keyspaceNotificationDispatchExecutor);
DirectoryManager directory = new DirectoryManager(directoryClient);
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheCluster);
PendingDevicesManager pendingDevicesManager = new PendingDevicesManager(pendingDevices, cacheCluster);
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesCacheCluster, keyspaceNotificationDispatchExecutor);
MessagesCache messagesCache = new MessagesCache(messagesInsertCluster, messageReadDeleteCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, pushLatencyManager);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster, directoryQueue, keys, messagesManager, usernamesManager, profilesManager);

View File

@ -48,7 +48,8 @@ import static com.codahale.metrics.MetricRegistry.name;
public class MessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed {
private final FaultTolerantRedisCluster redisCluster;
private final FaultTolerantRedisCluster insertCluster;
private final FaultTolerantRedisCluster readDeleteCluster;
private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
private final ExecutorService notificationExecutorService;
@ -93,20 +94,21 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class);
public MessagesCache(final FaultTolerantRedisCluster redisCluster, final ExecutorService notificationExecutorService) throws IOException {
public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster, final ExecutorService notificationExecutorService) throws IOException {
this.redisCluster = redisCluster;
this.pubSubConnection = redisCluster.createPubSubConnection();
this.insertCluster = insertCluster;
this.readDeleteCluster = readDeleteCluster;
this.pubSubConnection = readDeleteCluster.createPubSubConnection();
this.notificationExecutorService = notificationExecutorService;
this.insertScript = ClusterLuaScript.fromResource(redisCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER);
this.removeByIdScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_id.lua", ScriptOutputType.VALUE);
this.removeBySenderScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_sender.lua", ScriptOutputType.VALUE);
this.removeByGuidScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_guid.lua", ScriptOutputType.MULTI);
this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.MULTI);
this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS);
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI);
this.insertScript = ClusterLuaScript.fromResource(insertCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER);
this.removeByIdScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_id.lua", ScriptOutputType.VALUE);
this.removeBySenderScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_sender.lua", ScriptOutputType.VALUE);
this.removeByGuidScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_guid.lua", ScriptOutputType.MULTI);
this.getItemsScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_items.lua", ScriptOutputType.MULTI);
this.removeQueueScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS);
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI);
}
@Override
@ -154,7 +156,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
insertEphemeralTimer.record(() -> {
final byte[] ephemeralQueueKey = getEphemeralMessageQueueKey(destinationUuid, destinationDevice);
redisCluster.useBinaryCluster(connection -> {
insertCluster.useBinaryCluster(connection -> {
connection.sync().rpush(ephemeralQueueKey, message.toByteArray());
connection.sync().expire(ephemeralQueueKey, MAX_EPHEMERAL_MESSAGE_DELAY.toSeconds());
});
@ -223,7 +225,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
}
public boolean hasMessages(final UUID destinationUuid, final long destinationDevice) {
return redisCluster.withBinaryCluster(connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0);
return readDeleteCluster.withBinaryCluster(connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0);
}
@SuppressWarnings("unchecked")
@ -260,7 +262,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
@VisibleForTesting
List<MessageProtos.Envelope> getMessagesToPersist(final UUID accountUuid, final long destinationDevice, final int limit) {
return getMessagesTimer.record(() -> {
final List<ScoredValue<byte[]>> scoredMessages = redisCluster.withBinaryCluster(connection -> connection.sync().zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit));
final List<ScoredValue<byte[]>> scoredMessages = readDeleteCluster.withBinaryCluster(connection -> connection.sync().zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit));
final List<MessageProtos.Envelope> envelopes = new ArrayList<>(scoredMessages.size());
for (final ScoredValue<byte[]> scoredMessage : scoredMessages) {
@ -283,7 +285,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
Optional<MessageProtos.Envelope> takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice, final long currentTimeMillis) {
final long earliestAllowableTimestamp = currentTimeMillis - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis();
return takeEphemeralMessageTimer.record(() -> redisCluster.withBinaryCluster(connection -> {
return takeEphemeralMessageTimer.record(() -> readDeleteCluster.withBinaryCluster(connection -> {
byte[] messageBytes;
while ((messageBytes = connection.sync().lpop(getEphemeralMessageQueueKey(destinationUuid, destinationDevice))) != null) {
@ -320,7 +322,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
}
int getNextSlotToPersist() {
return (int)(redisCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT);
return (int)(readDeleteCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT);
}
List<String> getQueuesToPersist(final int slot, final Instant maxTime, final int limit) {
@ -331,15 +333,15 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
}
void addQueueToPersist(final UUID accountUuid, final long deviceId) {
redisCluster.useBinaryCluster(connection -> connection.sync().zadd(getQueueIndexKey(accountUuid, deviceId), ZAddArgs.Builder.nx(), System.currentTimeMillis(), getMessageQueueKey(accountUuid, deviceId)));
readDeleteCluster.useBinaryCluster(connection -> connection.sync().zadd(getQueueIndexKey(accountUuid, deviceId), ZAddArgs.Builder.nx(), System.currentTimeMillis(), getMessageQueueKey(accountUuid, deviceId)));
}
void lockQueueForPersistence(final UUID accountUuid, final long deviceId) {
redisCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE));
readDeleteCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE));
}
void unlockQueueForPersistence(final UUID accountUuid, final long deviceId) {
redisCluster.useBinaryCluster(connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId)));
readDeleteCluster.useBinaryCluster(connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId)));
}
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {

View File

@ -96,9 +96,10 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
Keys keys = new Keys(accountDatabase, configuration.getAccountsDatabaseConfiguration().getKeyOperationRetryConfiguration());
Messages messages = new Messages(messageDatabase);
ReplicatedJedisPool redisClient = new RedisClientFactory("directory_cache_delete_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messageInsertCacheCluster = new FaultTolerantRedisCluster("message_insert_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messageReadDeleteCluster = new FaultTolerantRedisCluster("message_read_delete_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", configuration.getMetricsClusterConfiguration(), redisClusterClientResources);
MessagesCache messagesCache = new MessagesCache(messagesCacheCluster, keyspaceNotificationDispatchExecutor);
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
DirectoryQueue directoryQueue = new DirectoryQueue (configuration.getDirectoryConfiguration().getSqsConfiguration());
DirectoryManager directory = new DirectoryManager(redisClient );

View File

@ -65,7 +65,7 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest {
final AccountsManager accountsManager = mock(AccountsManager.class);
notificationExecutorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService);
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService);
messagesManager = new MessagesManager(messages, messagesCache, mock(PushLatencyManager.class));
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, PERSIST_DELAY);

View File

@ -67,7 +67,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
when(account.getNumber()).thenReturn(DESTINATION_ACCOUNT_NUMBER);
notificationExecutorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService);
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService);
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, PERSIST_DELAY);
doAnswer(invocation -> {

View File

@ -56,7 +56,7 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
getRedisCluster().useCluster(connection -> connection.sync().masters().commands().configSet("notify-keyspace-events", "Klgz"));
notificationExecutorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService);
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService);
messagesCache.start();
}

View File

@ -81,7 +81,7 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest
executorService = Executors.newSingleThreadExecutor();
messages = new Messages(new FaultTolerantDatabase("messages-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration()));
messagesCache = new MessagesCache(getRedisCluster(), executorService);
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), executorService);
account = mock(Account.class);
device = mock(Device.class);
webSocketClient = mock(WebSocketClient.class);