From 562b495a188b48bd535d2d3ed7369c6b355b2d36 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 7 Nov 2024 17:19:55 -0500 Subject: [PATCH] Publish "messages persisted" events when unlocking queues after a persistence run --- .../push/PubSubClientEventManager.java | 24 ----------- .../storage/MessagePersister.java | 8 +--- .../textsecuregcm/storage/MessagesCache.java | 12 +++--- .../MessagesCacheUnlockQueueScript.java | 43 +++++++++++++++++++ .../workers/CommandDependencies.java | 2 - .../MessagePersisterServiceCommand.java | 1 - .../src/main/resources/lua/unlock_queue.lua | 9 ++++ .../push/PubSubClientEventManagerTest.java | 26 ----------- .../MessagePersisterIntegrationTest.java | 2 +- .../storage/MessagePersisterTest.java | 11 +---- ...PushNotificationExperimentCommandTest.java | 1 - .../workers/NotifyIdleDevicesCommandTest.java | 1 - ...PushNotificationExperimentCommandTest.java | 1 - 13 files changed, 63 insertions(+), 78 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheUnlockQueueScript.java create mode 100644 service/src/main/resources/lua/unlock_queue.lua diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java index 200d39630..7f14878b0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java @@ -58,11 +58,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter pubSubConnection; @@ -224,25 +219,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter handleMessagesPersisted(final UUID accountIdentifier, final byte deviceId) { - if (pubSubConnection == null) { - throw new IllegalStateException("Presence manager not started"); - } - - return pubSubConnection.withPubSubConnection(connection -> - connection.async().spublish(getClientEventChannel(accountIdentifier, deviceId), MESSAGES_PERSISTED_EVENT_BYTES)) - .thenRun(Util.NOOP); - } - /** * Tests whether a client with the given account/device is connected to this presence manager instance. * diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index f61865aa7..56f4d0621 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -22,7 +22,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; import org.whispersystems.textsecuregcm.util.Util; import software.amazon.awssdk.services.dynamodb.model.ItemCollectionSizeLimitExceededException; @@ -31,7 +30,6 @@ public class MessagePersister implements Managed { private final MessagesCache messagesCache; private final MessagesManager messagesManager; private final AccountsManager accountsManager; - private final PubSubClientEventManager pubSubClientEventManager; private final Duration persistDelay; @@ -63,9 +61,9 @@ public class MessagePersister implements Managed { private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class); - public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, + public MessagePersister(final MessagesCache messagesCache, + final MessagesManager messagesManager, final AccountsManager accountsManager, - final PubSubClientEventManager pubSubClientEventManager, final DynamicConfigurationManager dynamicConfigurationManager, final Duration persistDelay, final int dedicatedProcessWorkerThreadCount @@ -74,7 +72,6 @@ public class MessagePersister implements Managed { this.messagesCache = messagesCache; this.messagesManager = messagesManager; this.accountsManager = accountsManager; - this.pubSubClientEventManager = pubSubClientEventManager; this.persistDelay = persistDelay; this.workerThreads = new Thread[dedicatedProcessWorkerThreadCount]; @@ -211,7 +208,6 @@ public class MessagePersister implements Managed { maybeUnlink(account, deviceId); // may throw, in which case we'll retry later by the usual mechanism } finally { messagesCache.unlockQueueForPersistence(accountUuid, deviceId); - pubSubClientEventManager.handleMessagesPersisted(accountUuid, deviceId); sample.stop(persistQueueTimer); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 32d705dd5..36e877b2d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -125,6 +125,7 @@ public class MessagesCache { private final MessagesCacheRemoveQueueScript removeQueueScript; private final MessagesCacheGetQueuesToPersistScript getQueuesToPersistScript; private final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript; + private final MessagesCacheUnlockQueueScript unlockQueueScript; private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert")); private final Timer insertSharedMrmPayloadTimer = Metrics.timer(name(MessagesCache.class, "insertSharedMrmPayload")); @@ -176,7 +177,8 @@ public class MessagesCache { new MessagesCacheRemoveByGuidScript(redisCluster), new MessagesCacheRemoveQueueScript(redisCluster), new MessagesCacheGetQueuesToPersistScript(redisCluster), - new MessagesCacheRemoveRecipientViewFromMrmDataScript(redisCluster) + new MessagesCacheRemoveRecipientViewFromMrmDataScript(redisCluster), + new MessagesCacheUnlockQueueScript(redisCluster) ); } @@ -190,8 +192,8 @@ public class MessagesCache { final MessagesCacheGetItemsScript getItemsScript, final MessagesCacheRemoveByGuidScript removeByGuidScript, final MessagesCacheRemoveQueueScript removeQueueScript, final MessagesCacheGetQueuesToPersistScript getQueuesToPersistScript, - final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript) - throws IOException { + final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript, + final MessagesCacheUnlockQueueScript unlockQueueScript) throws IOException { this.redisCluster = redisCluster; this.clock = clock; @@ -209,6 +211,7 @@ public class MessagesCache { this.removeQueueScript = removeQueueScript; this.getQueuesToPersistScript = getQueuesToPersistScript; this.removeRecipientViewFromMrmDataScript = removeRecipientViewFromMrmDataScript; + this.unlockQueueScript = unlockQueueScript; } public boolean insert(final UUID messageGuid, @@ -599,8 +602,7 @@ public class MessagesCache { } void unlockQueueForPersistence(final UUID accountUuid, final byte deviceId) { - redisCluster.useBinaryCluster( - connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId))); + unlockQueueScript.execute(accountUuid, deviceId); } static byte[] getMessageQueueKey(final UUID accountUuid, final byte deviceId) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheUnlockQueueScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheUnlockQueueScript.java new file mode 100644 index 000000000..169892723 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheUnlockQueueScript.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import io.lettuce.core.ScriptOutputType; +import org.whispersystems.textsecuregcm.push.ClientEvent; +import org.whispersystems.textsecuregcm.push.MessagesPersistedEvent; +import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +/** + * Unlocks a message queue for persistence/message retrieval. + */ +class MessagesCacheUnlockQueueScript { + + private final ClusterLuaScript unlockQueueScript; + + private final List MESSAGES_PERSISTED_EVENT_ARGS = List.of(ClientEvent.newBuilder() + .setMessagesPersisted(MessagesPersistedEvent.getDefaultInstance()) + .build() + .toByteArray()); // eventPayload + + MessagesCacheUnlockQueueScript(final FaultTolerantRedisClusterClient redisCluster) throws IOException { + this.unlockQueueScript = + ClusterLuaScript.fromResource(redisCluster, "lua/unlock_queue.lua", ScriptOutputType.STATUS); + } + + void execute(final UUID accountIdentifier, final byte deviceId) { + final List keys = List.of( + MessagesCache.getPersistInProgressKey(accountIdentifier, deviceId), // persistInProgressKey + PubSubClientEventManager.getClientEventChannel(accountIdentifier, deviceId) // eventChannelKey + ); + + unlockQueueScript.executeBinary(keys, MESSAGES_PERSISTED_EVENT_ARGS); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index df1f18189..9e5a3e7da 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -76,7 +76,6 @@ record CommandDependencies( ReportMessageManager reportMessageManager, MessagesCache messagesCache, MessagesManager messagesManager, - PubSubClientEventManager pubSubClientEventManager, KeysManager keysManager, APNSender apnSender, FcmSender fcmSender, @@ -269,7 +268,6 @@ record CommandDependencies( reportMessageManager, messagesCache, messagesManager, - pubSubClientEventManager, keys, apnSender, fcmSender, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java index 081574d87..a83bc5a36 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java @@ -63,7 +63,6 @@ public class MessagePersisterServiceCommand extends ServerCommand assertThrows(MessagePersistenceException.class, () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE))); - - verify(pubSubClientEventManager).handleMessagesPersisted(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java index 65f32351d..505783083 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java @@ -73,7 +73,6 @@ class FinishPushNotificationExperimentCommandTest { null, null, null, - null, pushNotificationExperimentSamples, null, null, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java index d0c4afc39..86a03200e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java @@ -66,7 +66,6 @@ class NotifyIdleDevicesCommandTest { null, null, null, - null, null); this.idleDeviceNotificationScheduler = idleDeviceNotificationScheduler; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java index 3b54fc52b..b547e3273 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java @@ -62,7 +62,6 @@ class StartPushNotificationExperimentCommandTest { null, null, null, - null, pushNotificationExperimentSamples, null, null,