From 084607f3598beaed5f68db7ce3b924c4609cd3de Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 7 Nov 2024 16:21:44 -0500 Subject: [PATCH] Retire the explicit "handle new message available" system in favor of implicit presence-on-insert values --- .../textsecuregcm/WhisperServerService.java | 3 +- .../textsecuregcm/push/MessageSender.java | 63 +++++-------------- .../push/PubSubClientEventManager.java | 25 -------- .../textsecuregcm/storage/MessagesCache.java | 4 +- .../storage/MessagesManager.java | 6 +- .../textsecuregcm/push/MessageSenderTest.java | 14 ++--- .../push/PubSubClientEventManagerTest.java | 39 ------------ 7 files changed, 28 insertions(+), 126 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 59a0d6c0c..5acff8721 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -654,8 +654,7 @@ public class WhisperServerService extends Application sendMessage(final Account account, final Device device, final Envelope message, final boolean online) { - messagesManager.insert(account.getUuid(), + public void sendMessage(final Account account, final Device device, final Envelope message, final boolean online) { + final boolean destinationPresent = messagesManager.insert(account.getUuid(), device.getId(), online ? message.toBuilder().setEphemeral(true).build() : message); - return pubSubClientEventManager.handleNewMessageAvailable(account.getIdentifier(IdentityType.ACI), device.getId()) - .exceptionally(throwable -> { - // It's unlikely that the message insert (synchronous) would succeed and sending a "new message available" - // event would fail since both things happen in the same cluster, but just in case, we should "fail open" and - // act as if the client wasn't present if this happens. This is a conservative measure that biases toward - // sending more push notifications, though again, it shouldn't happen often. - CLIENT_PRESENCE_ERROR.increment(); - return false; - }) - .thenApply(clientPresent -> { - if (!clientPresent && !online) { - try { - pushNotificationManager.sendNewMessageNotification(account, device.getId(), message.getUrgent()); - } catch (final NotPushRegisteredException ignored) { - } - } + if (!destinationPresent && !online) { + try { + pushNotificationManager.sendNewMessageNotification(account, device.getId(), message.getUrgent()); + } catch (final NotPushRegisteredException ignored) { + } + } - return clientPresent; - }) - .whenComplete((clientPresent, throwable) -> Metrics.counter(SEND_COUNTER_NAME, - CHANNEL_TAG_NAME, getDeliveryChannelName(device), - EPHEMERAL_TAG_NAME, String.valueOf(online), - CLIENT_ONLINE_TAG_NAME, String.valueOf(clientPresent), - URGENT_TAG_NAME, String.valueOf(message.getUrgent()), - STORY_TAG_NAME, String.valueOf(message.getStory()), - SEALED_SENDER_TAG_NAME, String.valueOf(!message.hasSourceServiceId())) - .increment()) - .thenRun(Util.NOOP) - .toCompletableFuture(); + Metrics.counter(SEND_COUNTER_NAME, + CHANNEL_TAG_NAME, getDeliveryChannelName(device), + EPHEMERAL_TAG_NAME, String.valueOf(online), + CLIENT_ONLINE_TAG_NAME, String.valueOf(destinationPresent), + URGENT_TAG_NAME, String.valueOf(message.getUrgent()), + STORY_TAG_NAME, String.valueOf(message.getStory()), + SEALED_SENDER_TAG_NAME, String.valueOf(!message.hasSourceServiceId())) + .increment(); } @VisibleForTesting diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java index 0c8515624..200d39630 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java @@ -68,11 +68,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter listenersByAccountAndDeviceIdentifier; - private static final byte[] NEW_MESSAGE_EVENT_BYTES = ClientEvent.newBuilder() - .setNewMessageAvailable(NewMessageAvailableEvent.getDefaultInstance()) - .build() - .toByteArray(); - private static final byte[] DISCONNECT_REQUESTED_EVENT_BYTES = ClientEvent.newBuilder() .setDisconnectRequested(DisconnectRequested.getDefaultInstance()) .build() @@ -229,26 +224,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter handleNewMessageAvailable(final UUID accountIdentifier, final byte deviceId) { - if (pubSubConnection == null) { - throw new IllegalStateException("Presence manager not started"); - } - - return pubSubConnection.withPubSubConnection(connection -> - connection.async().spublish(getClientEventChannel(accountIdentifier, deviceId), NEW_MESSAGE_EVENT_BYTES)) - .thenApply(listeners -> listeners > 0); - } - /** * Publishes an event notifying a specific device that messages have been persisted from short-term to long-term * storage. diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 75eb6346b..32d705dd5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -211,13 +211,13 @@ public class MessagesCache { this.removeRecipientViewFromMrmDataScript = removeRecipientViewFromMrmDataScript; } - public void insert(final UUID messageGuid, + public boolean insert(final UUID messageGuid, final UUID destinationAccountIdentifier, final byte destinationDeviceId, final MessageProtos.Envelope message) { final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(messageGuid.toString()).build(); - insertTimer.record(() -> insertScript.execute(destinationAccountIdentifier, destinationDeviceId, messageWithGuid)); + return insertTimer.record(() -> insertScript.execute(destinationAccountIdentifier, destinationDeviceId, messageWithGuid)); } public byte[] insertSharedMultiRecipientMessagePayload( diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index a3f5381c8..0f219a91d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -59,14 +59,16 @@ public class MessagesManager { this.messageDeletionExecutor = messageDeletionExecutor; } - public void insert(UUID destinationUuid, byte destinationDevice, Envelope message) { + public boolean insert(UUID destinationUuid, byte destinationDevice, Envelope message) { final UUID messageGuid = UUID.randomUUID(); - messagesCache.insert(messageGuid, destinationUuid, destinationDevice, message); + final boolean destinationPresent = messagesCache.insert(messageGuid, destinationUuid, destinationDevice, message); if (message.hasSourceServiceId() && !destinationUuid.toString().equals(message.getSourceServiceId())) { reportMessageManager.store(message.getSourceServiceId(), messageGuid); } + + return destinationPresent; } public CompletableFuture mayHavePersistedMessages(final UUID destinationUuid, final Device destinationDevice) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java index 2ea3cb604..bbca79b13 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java @@ -10,6 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyByte; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -20,7 +21,6 @@ import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; @@ -35,21 +35,16 @@ import org.whispersystems.textsecuregcm.storage.MessagesManager; class MessageSenderTest { - private PubSubClientEventManager pubSubClientEventManager; private MessagesManager messagesManager; private PushNotificationManager pushNotificationManager; private MessageSender messageSender; @BeforeEach void setUp() { - pubSubClientEventManager = mock(PubSubClientEventManager.class); messagesManager = mock(MessagesManager.class); pushNotificationManager = mock(PushNotificationManager.class); - when(pubSubClientEventManager.handleNewMessageAvailable(any(), anyByte())) - .thenReturn(CompletableFuture.completedFuture(true)); - - messageSender = new MessageSender(pubSubClientEventManager, messagesManager, pushNotificationManager); + messageSender = new MessageSender(messagesManager, pushNotificationManager); } @CartesianTest @@ -77,10 +72,9 @@ class MessageSenderTest { .when(pushNotificationManager).sendNewMessageNotification(any(), anyByte(), anyBoolean()); } - when(pubSubClientEventManager.handleNewMessageAvailable(accountIdentifier, deviceId)) - .thenReturn(CompletableFuture.completedFuture(clientPresent)); + when(messagesManager.insert(eq(accountIdentifier), eq(deviceId), any())).thenReturn(clientPresent); - assertDoesNotThrow(() -> messageSender.sendMessage(account, device, message, onlineMessage).join()); + assertDoesNotThrow(() -> messageSender.sendMessage(account, device, message, onlineMessage)); final MessageProtos.Envelope expectedMessage = onlineMessage ? message.toBuilder().setEphemeral(true).build() diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java index 74a6e5c6a..4359c7022 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java @@ -140,30 +140,6 @@ class PubSubClientEventManagerTest { assertTrue(firstListenerConnectedElsewhere.get()); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void handleNewMessageAvailable(final boolean messageAvailableRemotely) throws InterruptedException { - final UUID accountIdentifier = UUID.randomUUID(); - final byte deviceId = Device.PRIMARY_ID; - - final CountDownLatch messageReceivedLatch = new CountDownLatch(1); - - localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter() { - @Override - public void handleNewMessageAvailable() { - messageReceivedLatch.countDown(); - } - }).toCompletableFuture().join(); - - final PubSubClientEventManager messagePresenceManager = - messageAvailableRemotely ? remotePresenceManager : localPresenceManager; - - assertTrue(messagePresenceManager.handleNewMessageAvailable(accountIdentifier, deviceId).toCompletableFuture().join()); - - assertTrue(messageReceivedLatch.await(2, TimeUnit.SECONDS), - "Message not received within time limit"); - } - @ParameterizedTest @ValueSource(booleans = {true, false}) void handleMessagesPersisted(final boolean messagesPersistedRemotely) throws InterruptedException { @@ -188,21 +164,6 @@ class PubSubClientEventManagerTest { "Message persistence event not received within time limit"); } - @Test - void handleClientDisconnected() { - final UUID accountIdentifier = UUID.randomUUID(); - final byte deviceId = Device.PRIMARY_ID; - - localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter()) - .toCompletableFuture().join(); - - assertTrue(localPresenceManager.handleNewMessageAvailable(accountIdentifier, deviceId).toCompletableFuture().join()); - - localPresenceManager.handleClientDisconnected(accountIdentifier, deviceId).toCompletableFuture().join(); - - assertFalse(localPresenceManager.handleNewMessageAvailable(accountIdentifier, deviceId).toCompletableFuture().join()); - } - @Test void isLocallyPresent() { final UUID accountIdentifier = UUID.randomUUID();