diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 4d4f86302..bd3126389 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -285,7 +285,8 @@ public class WhisperServerService extends Application keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(10_000); @@ -296,14 +297,14 @@ public class WhisperServerService extends Application implements Managed { - private final FaultTolerantRedisCluster redisCluster; + private final FaultTolerantRedisCluster insertCluster; + private final FaultTolerantRedisCluster readDeleteCluster; private final FaultTolerantPubSubConnection pubSubConnection; private final ExecutorService notificationExecutorService; @@ -93,20 +94,21 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class); - public MessagesCache(final FaultTolerantRedisCluster redisCluster, final ExecutorService notificationExecutorService) throws IOException { + public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster, final ExecutorService notificationExecutorService) throws IOException { - this.redisCluster = redisCluster; - this.pubSubConnection = redisCluster.createPubSubConnection(); + this.insertCluster = insertCluster; + this.readDeleteCluster = readDeleteCluster; + this.pubSubConnection = readDeleteCluster.createPubSubConnection(); this.notificationExecutorService = notificationExecutorService; - this.insertScript = ClusterLuaScript.fromResource(redisCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER); - this.removeByIdScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_id.lua", ScriptOutputType.VALUE); - this.removeBySenderScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_sender.lua", ScriptOutputType.VALUE); - this.removeByGuidScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_guid.lua", ScriptOutputType.MULTI); - this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.MULTI); - this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS); - this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI); + this.insertScript = ClusterLuaScript.fromResource(insertCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER); + this.removeByIdScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_id.lua", ScriptOutputType.VALUE); + this.removeBySenderScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_sender.lua", ScriptOutputType.VALUE); + this.removeByGuidScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_guid.lua", ScriptOutputType.MULTI); + this.getItemsScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_items.lua", ScriptOutputType.MULTI); + this.removeQueueScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS); + this.getQueuesToPersistScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI); } @Override @@ -154,7 +156,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp insertEphemeralTimer.record(() -> { final byte[] ephemeralQueueKey = getEphemeralMessageQueueKey(destinationUuid, destinationDevice); - redisCluster.useBinaryCluster(connection -> { + insertCluster.useBinaryCluster(connection -> { connection.sync().rpush(ephemeralQueueKey, message.toByteArray()); connection.sync().expire(ephemeralQueueKey, MAX_EPHEMERAL_MESSAGE_DELAY.toSeconds()); }); @@ -223,7 +225,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp } public boolean hasMessages(final UUID destinationUuid, final long destinationDevice) { - return redisCluster.withBinaryCluster(connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0); + return readDeleteCluster.withBinaryCluster(connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0); } @SuppressWarnings("unchecked") @@ -260,7 +262,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp @VisibleForTesting List getMessagesToPersist(final UUID accountUuid, final long destinationDevice, final int limit) { return getMessagesTimer.record(() -> { - final List> scoredMessages = redisCluster.withBinaryCluster(connection -> connection.sync().zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit)); + final List> scoredMessages = readDeleteCluster.withBinaryCluster(connection -> connection.sync().zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit)); final List envelopes = new ArrayList<>(scoredMessages.size()); for (final ScoredValue scoredMessage : scoredMessages) { @@ -283,7 +285,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp Optional takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice, final long currentTimeMillis) { final long earliestAllowableTimestamp = currentTimeMillis - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis(); - return takeEphemeralMessageTimer.record(() -> redisCluster.withBinaryCluster(connection -> { + return takeEphemeralMessageTimer.record(() -> readDeleteCluster.withBinaryCluster(connection -> { byte[] messageBytes; while ((messageBytes = connection.sync().lpop(getEphemeralMessageQueueKey(destinationUuid, destinationDevice))) != null) { @@ -320,7 +322,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp } int getNextSlotToPersist() { - return (int)(redisCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT); + return (int)(readDeleteCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT); } List getQueuesToPersist(final int slot, final Instant maxTime, final int limit) { @@ -331,15 +333,15 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp } void addQueueToPersist(final UUID accountUuid, final long deviceId) { - redisCluster.useBinaryCluster(connection -> connection.sync().zadd(getQueueIndexKey(accountUuid, deviceId), ZAddArgs.Builder.nx(), System.currentTimeMillis(), getMessageQueueKey(accountUuid, deviceId))); + readDeleteCluster.useBinaryCluster(connection -> connection.sync().zadd(getQueueIndexKey(accountUuid, deviceId), ZAddArgs.Builder.nx(), System.currentTimeMillis(), getMessageQueueKey(accountUuid, deviceId))); } void lockQueueForPersistence(final UUID accountUuid, final long deviceId) { - redisCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE)); + readDeleteCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE)); } void unlockQueueForPersistence(final UUID accountUuid, final long deviceId) { - redisCluster.useBinaryCluster(connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId))); + readDeleteCluster.useBinaryCluster(connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId))); } public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index 689a0881a..13476071d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -96,9 +96,10 @@ public class DeleteUserCommand extends EnvironmentCommand { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 7d16aae7c..ca0ade698 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -56,7 +56,7 @@ public class MessagesCacheTest extends AbstractRedisClusterTest { getRedisCluster().useCluster(connection -> connection.sync().masters().commands().configSet("notify-keyspace-events", "Klgz")); notificationExecutorService = Executors.newSingleThreadExecutor(); - messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService); + messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService); messagesCache.start(); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 3ab903386..9804267be 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -81,7 +81,7 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest executorService = Executors.newSingleThreadExecutor(); messages = new Messages(new FaultTolerantDatabase("messages-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration())); - messagesCache = new MessagesCache(getRedisCluster(), executorService); + messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), executorService); account = mock(Account.class); device = mock(Device.class); webSocketClient = mock(WebSocketClient.class);