From a30227518777eff7c33a6491cb2b356b46b32c91 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Fri, 12 Apr 2024 13:31:17 -0500 Subject: [PATCH] Use a single cluster instance in MessagesCache --- .../textsecuregcm/WhisperServerService.java | 4 +- .../textsecuregcm/storage/MessagesCache.java | 38 +++++++++---------- .../workers/AssignUsernameCommand.java | 9 ++--- .../workers/CommandDependencies.java | 9 ++--- .../MessagePersisterIntegrationTest.java | 4 +- .../storage/MessagePersisterTest.java | 5 +-- .../storage/MessagesCacheTest.java | 10 ++--- .../WebSocketConnectionIntegrationTest.java | 5 +-- 8 files changed, 37 insertions(+), 47 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 8224621f3..f130d0a8b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -566,8 +566,8 @@ public class WhisperServerService extends Application implements Managed { - private final FaultTolerantRedisCluster readDeleteCluster; + private final FaultTolerantRedisCluster redisCluster; private final FaultTolerantPubSubConnection pubSubConnection; private final Clock clock; @@ -110,12 +110,12 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class); - public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster, - final ExecutorService notificationExecutorService, final Scheduler messageDeliveryScheduler, - final ExecutorService messageDeletionExecutorService, final Clock clock) throws IOException { + public MessagesCache(final FaultTolerantRedisCluster redisCluster, final ExecutorService notificationExecutorService, + final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock) + throws IOException { - this.readDeleteCluster = readDeleteCluster; - this.pubSubConnection = readDeleteCluster.createPubSubConnection(); + this.redisCluster = redisCluster; + this.pubSubConnection = redisCluster.createPubSubConnection(); this.clock = clock; this.notificationExecutorService = notificationExecutorService; @@ -123,13 +123,13 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp this.messageDeletionExecutorService = messageDeletionExecutorService; this.messageDeletionScheduler = Schedulers.fromExecutorService(messageDeletionExecutorService, "messageDeletion"); - this.insertScript = ClusterLuaScript.fromResource(insertCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER); - this.removeByGuidScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_guid.lua", + this.insertScript = ClusterLuaScript.fromResource(redisCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER); + this.removeByGuidScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_guid.lua", ScriptOutputType.MULTI); - this.getItemsScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_items.lua", ScriptOutputType.MULTI); - this.removeQueueScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_queue.lua", + this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.MULTI); + this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS); - this.getQueuesToPersistScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_queues_to_persist.lua", + this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI); } @@ -209,7 +209,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp } public boolean hasMessages(final UUID destinationUuid, final byte destinationDevice) { - return readDeleteCluster.withBinaryCluster( + return redisCluster.withBinaryCluster( connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0); } @@ -324,7 +324,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp List getMessagesToPersist(final UUID accountUuid, final byte destinationDevice, final int limit) { return getMessagesTimer.record(() -> { - final List> scoredMessages = readDeleteCluster.withBinaryCluster( + final List> scoredMessages = redisCluster.withBinaryCluster( connection -> connection.sync() .zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit)); final List envelopes = new ArrayList<>(scoredMessages.size()); @@ -360,7 +360,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp } int getNextSlotToPersist() { - return (int) (readDeleteCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) + return (int) (redisCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT); } @@ -373,23 +373,23 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp } void addQueueToPersist(final UUID accountUuid, final byte deviceId) { - readDeleteCluster.useBinaryCluster(connection -> connection.sync() + redisCluster.useBinaryCluster(connection -> connection.sync() .zadd(getQueueIndexKey(accountUuid, deviceId), ZAddArgs.Builder.nx(), System.currentTimeMillis(), getMessageQueueKey(accountUuid, deviceId))); } void lockQueueForPersistence(final UUID accountUuid, final byte deviceId) { - readDeleteCluster.useBinaryCluster( + redisCluster.useBinaryCluster( connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE)); } void unlockQueueForPersistence(final UUID accountUuid, final byte deviceId) { - readDeleteCluster.useBinaryCluster( + redisCluster.useBinaryCluster( connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId))); } boolean lockAccountForMessagePersisterCleanup(final UUID accountUuid) { - return readDeleteCluster.withBinaryCluster( + return redisCluster.withBinaryCluster( connection -> "OK".equals( connection.sync().set( getUnlinkInProgressKey(accountUuid), @@ -398,7 +398,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp } void unlockAccountForMessagePersisterCleanup(final UUID accountUuid) { - readDeleteCluster.useBinaryCluster( + redisCluster.useBinaryCluster( connection -> connection.sync().del(getUnlinkInProgressKey(accountUuid))); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java index 0b0f8ae08..e67f2aac6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java @@ -173,10 +173,7 @@ public class AssignUsernameCommand extends EnvironmentCommand actualMessages = Flux.from( messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID)) @@ -561,8 +561,8 @@ class MessagesCacheTest { messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); - messagesCache = new MessagesCache(mockCluster, mockCluster, mock(ExecutorService.class), - messageDeliveryScheduler, Executors.newSingleThreadExecutor(), Clock.systemUTC()); + messagesCache = new MessagesCache(mockCluster, mock(ExecutorService.class), messageDeliveryScheduler, + Executors.newSingleThreadExecutor(), Clock.systemUTC()); } @AfterEach diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 56560996a..8f2be9cdf 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -57,7 +57,6 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.ReportMessageManager; -import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.messages.WebSocketResponseMessage; import reactor.core.scheduler.Scheduler; @@ -90,8 +89,8 @@ class WebSocketConnectionIntegrationTest { sharedExecutorService = Executors.newSingleThreadExecutor(); scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); - messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC()); + messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, + messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC()); messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(), DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.MESSAGES.tableName(), Duration.ofDays(7), sharedExecutorService);