From e536a407408cab8e36daaaaf4748f05c15861e91 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 6 Nov 2024 14:58:37 -0500 Subject: [PATCH] Publish "messages persisted" events --- .../push/PubSubClientEventManager.java | 26 +++++++++++- .../storage/MessagePersister.java | 8 +++- .../workers/CommandDependencies.java | 2 + .../MessagePersisterServiceCommand.java | 7 +++- .../push/PubSubClientEventManagerTest.java | 42 +++++++++++++------ .../MessagePersisterIntegrationTest.java | 9 +++- .../storage/MessagePersisterTest.java | 11 ++++- ...PushNotificationExperimentCommandTest.java | 1 + .../workers/NotifyIdleDevicesCommandTest.java | 1 + ...PushNotificationExperimentCommandTest.java | 1 + 10 files changed, 90 insertions(+), 18 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java index 709b7bd18..2ffb2fdfc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java @@ -58,6 +58,11 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter pubSubConnection; @@ -240,10 +245,29 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter - connection.async().spublish(getClientPresenceKey(accountIdentifier, deviceId), NEW_MESSAGE_EVENT_BYTES)) + connection.async().spublish(getClientPresenceKey(accountIdentifier, deviceId), NEW_MESSAGE_EVENT_BYTES)) .thenApply(listeners -> listeners > 0); } + /** + * Publishes an event notifying a specific device that messages have been persisted from short-term to long-term + * storage. + * + * @param accountIdentifier the account identifier for which messages have been persisted + * @param deviceId the ID of the device within the target account + * + * @return a future that completes when the event has been published + */ + public CompletionStage handleMessagesPersisted(final UUID accountIdentifier, final byte deviceId) { + if (pubSubConnection == null) { + throw new IllegalStateException("Presence manager not started"); + } + + return pubSubConnection.withPubSubConnection(connection -> + connection.async().spublish(getClientPresenceKey(accountIdentifier, deviceId), MESSAGES_PERSISTED_EVENT_BYTES)) + .thenRun(Util.NOOP); + } + /** * Tests whether a client with the given account/device is connected to this presence manager instance. * diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index ec86f95d3..f61865aa7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -22,6 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; import org.whispersystems.textsecuregcm.util.Util; import software.amazon.awssdk.services.dynamodb.model.ItemCollectionSizeLimitExceededException; @@ -30,6 +31,7 @@ public class MessagePersister implements Managed { private final MessagesCache messagesCache; private final MessagesManager messagesManager; private final AccountsManager accountsManager; + private final PubSubClientEventManager pubSubClientEventManager; private final Duration persistDelay; @@ -63,13 +65,16 @@ public class MessagePersister implements Managed { public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, - final DynamicConfigurationManager dynamicConfigurationManager, final Duration persistDelay, + final PubSubClientEventManager pubSubClientEventManager, + final DynamicConfigurationManager dynamicConfigurationManager, + final Duration persistDelay, final int dedicatedProcessWorkerThreadCount ) { this.messagesCache = messagesCache; this.messagesManager = messagesManager; this.accountsManager = accountsManager; + this.pubSubClientEventManager = pubSubClientEventManager; this.persistDelay = persistDelay; this.workerThreads = new Thread[dedicatedProcessWorkerThreadCount]; @@ -206,6 +211,7 @@ public class MessagePersister implements Managed { maybeUnlink(account, deviceId); // may throw, in which case we'll retry later by the usual mechanism } finally { messagesCache.unlockQueueForPersistence(accountUuid, deviceId); + pubSubClientEventManager.handleMessagesPersisted(accountUuid, deviceId); sample.stop(persistQueueTimer); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 1a6295019..c63f111ac 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -76,6 +76,7 @@ record CommandDependencies( ReportMessageManager reportMessageManager, MessagesCache messagesCache, MessagesManager messagesManager, + PubSubClientEventManager pubSubClientEventManager, KeysManager keysManager, APNSender apnSender, FcmSender fcmSender, @@ -271,6 +272,7 @@ record CommandDependencies( reportMessageManager, messagesCache, messagesManager, + pubSubClientEventManager, keys, apnSender, fcmSender, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java index 25a10b825..b261465d0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java @@ -60,8 +60,11 @@ public class MessagePersisterServiceCommand extends ServerCommand assertThrows(MessagePersistenceException.class, () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE))); + + verify(pubSubClientEventManager).handleMessagesPersisted(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java index 505783083..65f32351d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java @@ -73,6 +73,7 @@ class FinishPushNotificationExperimentCommandTest { null, null, null, + null, pushNotificationExperimentSamples, null, null, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java index 86a03200e..d0c4afc39 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java @@ -66,6 +66,7 @@ class NotifyIdleDevicesCommandTest { null, null, null, + null, null); this.idleDeviceNotificationScheduler = idleDeviceNotificationScheduler; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java index b547e3273..3b54fc52b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java @@ -62,6 +62,7 @@ class StartPushNotificationExperimentCommandTest { null, null, null, + null, pushNotificationExperimentSamples, null, null,