From e08c5a412eab4ed3822d1e8b67b5c43154dd6788 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Tue, 24 Aug 2021 18:24:42 -0500 Subject: [PATCH] Insert ephemeral messages in the standard cache queue --- .../textsecuregcm/push/MessageSender.java | 13 +- .../storage/MessageAvailabilityListener.java | 7 +- .../textsecuregcm/storage/MessagesCache.java | 23 +--- .../storage/MessagesManager.java | 5 +- .../textsecuregcm/push/MessageSenderTest.java | 116 ++++++++++-------- .../storage/MessagesCacheTest.java | 52 -------- 6 files changed, 75 insertions(+), 141 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java index 47f49930b..57ba089b6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java @@ -4,9 +4,14 @@ */ package org.whispersystems.textsecuregcm.push; +import static com.codahale.metrics.MetricRegistry.name; +import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; + import io.dropwizard.lifecycle.Managed; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; +import java.util.List; +import java.util.Optional; import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.push.ApnMessage.Type; import org.whispersystems.textsecuregcm.redis.RedisOperation; @@ -15,12 +20,6 @@ import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.util.Util; -import java.util.List; -import java.util.Optional; - -import static com.codahale.metrics.MetricRegistry.name; -import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; - /** * A MessageSender sends Signal messages to destination devices. Messages may be "normal" user-to-user messages, * ephemeral ("online") messages like typing indicators, or delivery receipts. @@ -88,7 +87,7 @@ public class MessageSender implements Managed { clientPresent = clientPresenceManager.isPresent(account.getUuid(), device.getId()); if (clientPresent) { - messagesManager.insertEphemeral(account.getUuid(), device.getId(), message); + messagesManager.insert(account.getUuid(), device.getId(), message.toBuilder().setEphemeral(true).build()); } } else { messagesManager.insert(account.getUuid(), device.getId(), message); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageAvailabilityListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageAvailabilityListener.java index 2fcf1b2e2..e8535924a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageAvailabilityListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageAvailabilityListener.java @@ -12,9 +12,10 @@ package org.whispersystems.textsecuregcm.storage; */ public interface MessageAvailabilityListener { - void handleNewMessagesAvailable(); + void handleNewMessagesAvailable(); - void handleNewEphemeralMessageAvailable(); + @Deprecated + void handleNewEphemeralMessageAvailable(); - void handleMessagesPersisted(); + void handleMessagesPersisted(); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index ad6a99ca1..ac2c21674 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -63,7 +63,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private final Map queueNamesByMessageListener = new IdentityHashMap<>(); private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert"), "ephemeral", "false"); - private final Timer insertEphemeralTimer = Metrics.timer(name(MessagesCache.class, "insert"), "ephemeral", "true"); private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get")); private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist")); private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear")); @@ -144,27 +143,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp guid.toString().getBytes(StandardCharsets.UTF_8)))); } - public void insertEphemeral(final UUID destinationUuid, final long destinationDevice, - final MessageProtos.Envelope message) { - - final MessageProtos.Envelope messageWithGuid; - - if (!message.hasServerGuid()) { - messageWithGuid = message.toBuilder().setServerGuid(UUID.randomUUID().toString()).build(); - } else { - messageWithGuid = message; - } - - insertEphemeralTimer.record(() -> { - final byte[] ephemeralQueueKey = getEphemeralMessageQueueKey(destinationUuid, destinationDevice); - - insertCluster.useBinaryCluster(connection -> { - connection.sync().rpush(ephemeralQueueKey, messageWithGuid.toByteArray()); - connection.sync().expire(ephemeralQueueKey, MAX_EPHEMERAL_MESSAGE_DELAY.toSeconds()); - }); - }); - } - public Optional remove(final UUID destinationUuid, final long destinationDevice, final UUID messageGuid) { return remove(destinationUuid, destinationDevice, List.of(messageGuid)).stream().findFirst(); @@ -247,6 +225,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp }); } + @Deprecated public Optional takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice) { return takeEphemeralMessage(destinationUuid, destinationDevice, System.currentTimeMillis()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 5cba99590..fa1f09116 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -59,10 +59,7 @@ public class MessagesManager { } } - public void insertEphemeral(final UUID destinationUuid, final long destinationDevice, final Envelope message) { - messagesCache.insertEphemeral(destinationUuid, destinationDevice, message); - } - + @Deprecated public Optional takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice) { return messagesCache.takeEphemeralMessage(destinationUuid, destinationDevice); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java index e811b213b..7e286b410 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java @@ -1,30 +1,34 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2013-2021 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ package org.whispersystems.textsecuregcm.push; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + import com.google.protobuf.ByteString; +import java.util.UUID; import org.apache.commons.lang3.RandomStringUtils; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessagesManager; -import java.util.UUID; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - public class MessageSenderTest { private Account account; @@ -64,80 +68,86 @@ public class MessageSenderTest { @Test public void testSendOnlineMessageClientPresent() throws Exception { - when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true); - when(device.getGcmId()).thenReturn("gcm-id"); + when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true); + when(device.getGcmId()).thenReturn("gcm-id"); - messageSender.sendMessage(account, device, message, true); + messageSender.sendMessage(account, device, message, true); - verify(messagesManager).insertEphemeral(ACCOUNT_UUID, DEVICE_ID, message); - verify(messagesManager, never()).insert(any(), anyLong(), any()); - verifyZeroInteractions(gcmSender); - verifyZeroInteractions(apnSender); + ArgumentCaptor envelopeArgumentCaptor = ArgumentCaptor.forClass( + MessageProtos.Envelope.class); + + verify(messagesManager).insert(any(), anyLong(), envelopeArgumentCaptor.capture()); + + assertTrue(envelopeArgumentCaptor.getValue().getEphemeral()); + + verifyNoInteractions(gcmSender); + verifyNoInteractions(apnSender); } @Test public void testSendOnlineMessageClientNotPresent() throws Exception { - when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); - when(device.getGcmId()).thenReturn("gcm-id"); + when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); + when(device.getGcmId()).thenReturn("gcm-id"); - messageSender.sendMessage(account, device, message, true); + messageSender.sendMessage(account, device, message, true); - verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); - verify(messagesManager, never()).insert(any(), anyLong(), any()); - verifyZeroInteractions(gcmSender); - verifyZeroInteractions(apnSender); + verify(messagesManager, never()).insert(any(), anyLong(), any()); + verifyNoInteractions(gcmSender); + verifyNoInteractions(apnSender); } @Test public void testSendMessageClientPresent() throws Exception { - when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true); - when(device.getGcmId()).thenReturn("gcm-id"); + when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true); + when(device.getGcmId()).thenReturn("gcm-id"); - messageSender.sendMessage(account, device, message, false); + messageSender.sendMessage(account, device, message, false); - verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); - verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); - verifyZeroInteractions(gcmSender); - verifyZeroInteractions(apnSender); + final ArgumentCaptor envelopeArgumentCaptor = ArgumentCaptor.forClass( + MessageProtos.Envelope.class); + + verify(messagesManager).insert(eq(ACCOUNT_UUID), eq(DEVICE_ID), envelopeArgumentCaptor.capture()); + + assertFalse(envelopeArgumentCaptor.getValue().getEphemeral()); + assertEquals(message, envelopeArgumentCaptor.getValue()); + verifyNoInteractions(gcmSender); + verifyNoInteractions(apnSender); } @Test public void testSendMessageGcmClientNotPresent() throws Exception { - when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); - when(device.getGcmId()).thenReturn("gcm-id"); + when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); + when(device.getGcmId()).thenReturn("gcm-id"); - messageSender.sendMessage(account, device, message, false); + messageSender.sendMessage(account, device, message, false); - verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); - verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); - verify(gcmSender).sendMessage(any()); - verifyZeroInteractions(apnSender); + verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); + verify(gcmSender).sendMessage(any()); + verifyNoInteractions(apnSender); } @Test public void testSendMessageApnClientNotPresent() throws Exception { - when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); - when(device.getApnId()).thenReturn("apn-id"); + when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); + when(device.getApnId()).thenReturn("apn-id"); - messageSender.sendMessage(account, device, message, false); + messageSender.sendMessage(account, device, message, false); - verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); - verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); - verifyZeroInteractions(gcmSender); - verify(apnSender).sendMessage(any()); + verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); + verifyNoInteractions(gcmSender); + verify(apnSender).sendMessage(any()); } @Test public void testSendMessageFetchClientNotPresent() throws Exception { - when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); - when(device.getFetchesMessages()).thenReturn(true); + when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); + when(device.getFetchesMessages()).thenReturn(true); - messageSender.sendMessage(account, device, message, false); + messageSender.sendMessage(account, device, message, false); - verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); - verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); - verifyZeroInteractions(gcmSender); - verifyZeroInteractions(apnSender); + verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); + verifyNoInteractions(gcmSender); + verifyNoInteractions(apnSender); } private MessageProtos.Envelope generateRandomMessage() { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index b9f1ec3bb..c1ab7b553 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -337,56 +337,4 @@ public class MessagesCacheTest extends AbstractRedisClusterTest { assertTrue(notified.get()); } - @Test(timeout = 5_000L) - public void testInsertAndNotifyEphemeralMessage() throws InterruptedException { - final AtomicBoolean notified = new AtomicBoolean(false); - final MessageProtos.Envelope message = generateRandomMessage(UUID.randomUUID(), true); - - final MessageAvailabilityListener listener = new MessageAvailabilityListener() { - @Override - public void handleNewMessagesAvailable() { - } - - @Override - public void handleNewEphemeralMessageAvailable() { - synchronized (notified) { - notified.set(true); - notified.notifyAll(); - } - } - - @Override - public void handleMessagesPersisted() { - } - }; - - messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener); - messagesCache.insertEphemeral(DESTINATION_UUID, DESTINATION_DEVICE_ID, message); - - synchronized (notified) { - while (!notified.get()) { - notified.wait(); - } - } - - assertTrue(notified.get()); - } - - @Test - public void testTakeEphemeralMessage() { - final long currentTime = System.currentTimeMillis(); - final UUID messageGuid = UUID.randomUUID(); - final MessageProtos.Envelope message = generateRandomMessage(messageGuid, true, currentTime); - - assertEquals(Optional.empty(), messagesCache.takeEphemeralMessage(DESTINATION_UUID, DESTINATION_DEVICE_ID, currentTime)); - - messagesCache.insertEphemeral(DESTINATION_UUID, DESTINATION_DEVICE_ID, message); - - assertEquals(Optional.of(message), messagesCache.takeEphemeralMessage(DESTINATION_UUID, DESTINATION_DEVICE_ID, currentTime)); - assertEquals(Optional.empty(), messagesCache.takeEphemeralMessage(DESTINATION_UUID, DESTINATION_DEVICE_ID, currentTime)); - - messagesCache.insertEphemeral(DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(UUID.randomUUID(), true, 0)); - - assertEquals(Optional.empty(), messagesCache.takeEphemeralMessage(DESTINATION_UUID, DESTINATION_DEVICE_ID, currentTime)); - } }