Insert ephemeral messages in the standard cache queue

This commit is contained in:
Chris Eager 2021-08-24 18:24:42 -05:00 committed by Chris Eager
parent a7443a9ece
commit e08c5a412e
6 changed files with 75 additions and 141 deletions

View File

@ -4,9 +4,14 @@
*/ */
package org.whispersystems.textsecuregcm.push; package org.whispersystems.textsecuregcm.push;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import io.dropwizard.lifecycle.Managed; import io.dropwizard.lifecycle.Managed;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tag;
import java.util.List;
import java.util.Optional;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.push.ApnMessage.Type; import org.whispersystems.textsecuregcm.push.ApnMessage.Type;
import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.redis.RedisOperation;
@ -15,12 +20,6 @@ import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.util.Util;
import java.util.List;
import java.util.Optional;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
/** /**
* A MessageSender sends Signal messages to destination devices. Messages may be "normal" user-to-user messages, * A MessageSender sends Signal messages to destination devices. Messages may be "normal" user-to-user messages,
* ephemeral ("online") messages like typing indicators, or delivery receipts. * ephemeral ("online") messages like typing indicators, or delivery receipts.
@ -88,7 +87,7 @@ public class MessageSender implements Managed {
clientPresent = clientPresenceManager.isPresent(account.getUuid(), device.getId()); clientPresent = clientPresenceManager.isPresent(account.getUuid(), device.getId());
if (clientPresent) { if (clientPresent) {
messagesManager.insertEphemeral(account.getUuid(), device.getId(), message); messagesManager.insert(account.getUuid(), device.getId(), message.toBuilder().setEphemeral(true).build());
} }
} else { } else {
messagesManager.insert(account.getUuid(), device.getId(), message); messagesManager.insert(account.getUuid(), device.getId(), message);

View File

@ -12,9 +12,10 @@ package org.whispersystems.textsecuregcm.storage;
*/ */
public interface MessageAvailabilityListener { public interface MessageAvailabilityListener {
void handleNewMessagesAvailable(); void handleNewMessagesAvailable();
void handleNewEphemeralMessageAvailable(); @Deprecated
void handleNewEphemeralMessageAvailable();
void handleMessagesPersisted(); void handleMessagesPersisted();
} }

View File

@ -63,7 +63,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>(); private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>();
private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert"), "ephemeral", "false"); private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert"), "ephemeral", "false");
private final Timer insertEphemeralTimer = Metrics.timer(name(MessagesCache.class, "insert"), "ephemeral", "true");
private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get")); private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get"));
private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist")); private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist"));
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear")); private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
@ -144,27 +143,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
guid.toString().getBytes(StandardCharsets.UTF_8)))); guid.toString().getBytes(StandardCharsets.UTF_8))));
} }
public void insertEphemeral(final UUID destinationUuid, final long destinationDevice,
final MessageProtos.Envelope message) {
final MessageProtos.Envelope messageWithGuid;
if (!message.hasServerGuid()) {
messageWithGuid = message.toBuilder().setServerGuid(UUID.randomUUID().toString()).build();
} else {
messageWithGuid = message;
}
insertEphemeralTimer.record(() -> {
final byte[] ephemeralQueueKey = getEphemeralMessageQueueKey(destinationUuid, destinationDevice);
insertCluster.useBinaryCluster(connection -> {
connection.sync().rpush(ephemeralQueueKey, messageWithGuid.toByteArray());
connection.sync().expire(ephemeralQueueKey, MAX_EPHEMERAL_MESSAGE_DELAY.toSeconds());
});
});
}
public Optional<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice, public Optional<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice,
final UUID messageGuid) { final UUID messageGuid) {
return remove(destinationUuid, destinationDevice, List.of(messageGuid)).stream().findFirst(); return remove(destinationUuid, destinationDevice, List.of(messageGuid)).stream().findFirst();
@ -247,6 +225,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
}); });
} }
@Deprecated
public Optional<MessageProtos.Envelope> takeEphemeralMessage(final UUID destinationUuid, public Optional<MessageProtos.Envelope> takeEphemeralMessage(final UUID destinationUuid,
final long destinationDevice) { final long destinationDevice) {
return takeEphemeralMessage(destinationUuid, destinationDevice, System.currentTimeMillis()); return takeEphemeralMessage(destinationUuid, destinationDevice, System.currentTimeMillis());

View File

@ -59,10 +59,7 @@ public class MessagesManager {
} }
} }
public void insertEphemeral(final UUID destinationUuid, final long destinationDevice, final Envelope message) { @Deprecated
messagesCache.insertEphemeral(destinationUuid, destinationDevice, message);
}
public Optional<Envelope> takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice) { public Optional<Envelope> takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice) {
return messagesCache.takeEphemeralMessage(destinationUuid, destinationDevice); return messagesCache.takeEphemeralMessage(destinationUuid, destinationDevice);
} }

View File

@ -1,30 +1,34 @@
/* /*
* Copyright 2013-2020 Signal Messenger, LLC * Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only * SPDX-License-Identifier: AGPL-3.0-only
*/ */
package org.whispersystems.textsecuregcm.push; package org.whispersystems.textsecuregcm.push;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import java.util.UUID;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.MessagesManager;
import java.util.UUID;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class MessageSenderTest { public class MessageSenderTest {
private Account account; private Account account;
@ -64,80 +68,86 @@ public class MessageSenderTest {
@Test @Test
public void testSendOnlineMessageClientPresent() throws Exception { public void testSendOnlineMessageClientPresent() throws Exception {
when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true); when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true);
when(device.getGcmId()).thenReturn("gcm-id"); when(device.getGcmId()).thenReturn("gcm-id");
messageSender.sendMessage(account, device, message, true); messageSender.sendMessage(account, device, message, true);
verify(messagesManager).insertEphemeral(ACCOUNT_UUID, DEVICE_ID, message); ArgumentCaptor<MessageProtos.Envelope> envelopeArgumentCaptor = ArgumentCaptor.forClass(
verify(messagesManager, never()).insert(any(), anyLong(), any()); MessageProtos.Envelope.class);
verifyZeroInteractions(gcmSender);
verifyZeroInteractions(apnSender); verify(messagesManager).insert(any(), anyLong(), envelopeArgumentCaptor.capture());
assertTrue(envelopeArgumentCaptor.getValue().getEphemeral());
verifyNoInteractions(gcmSender);
verifyNoInteractions(apnSender);
} }
@Test @Test
public void testSendOnlineMessageClientNotPresent() throws Exception { public void testSendOnlineMessageClientNotPresent() throws Exception {
when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false);
when(device.getGcmId()).thenReturn("gcm-id"); when(device.getGcmId()).thenReturn("gcm-id");
messageSender.sendMessage(account, device, message, true); messageSender.sendMessage(account, device, message, true);
verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); verify(messagesManager, never()).insert(any(), anyLong(), any());
verify(messagesManager, never()).insert(any(), anyLong(), any()); verifyNoInteractions(gcmSender);
verifyZeroInteractions(gcmSender); verifyNoInteractions(apnSender);
verifyZeroInteractions(apnSender);
} }
@Test @Test
public void testSendMessageClientPresent() throws Exception { public void testSendMessageClientPresent() throws Exception {
when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true); when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true);
when(device.getGcmId()).thenReturn("gcm-id"); when(device.getGcmId()).thenReturn("gcm-id");
messageSender.sendMessage(account, device, message, false); messageSender.sendMessage(account, device, message, false);
verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); final ArgumentCaptor<MessageProtos.Envelope> envelopeArgumentCaptor = ArgumentCaptor.forClass(
verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); MessageProtos.Envelope.class);
verifyZeroInteractions(gcmSender);
verifyZeroInteractions(apnSender); verify(messagesManager).insert(eq(ACCOUNT_UUID), eq(DEVICE_ID), envelopeArgumentCaptor.capture());
assertFalse(envelopeArgumentCaptor.getValue().getEphemeral());
assertEquals(message, envelopeArgumentCaptor.getValue());
verifyNoInteractions(gcmSender);
verifyNoInteractions(apnSender);
} }
@Test @Test
public void testSendMessageGcmClientNotPresent() throws Exception { public void testSendMessageGcmClientNotPresent() throws Exception {
when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false);
when(device.getGcmId()).thenReturn("gcm-id"); when(device.getGcmId()).thenReturn("gcm-id");
messageSender.sendMessage(account, device, message, false); messageSender.sendMessage(account, device, message, false);
verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message);
verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); verify(gcmSender).sendMessage(any());
verify(gcmSender).sendMessage(any()); verifyNoInteractions(apnSender);
verifyZeroInteractions(apnSender);
} }
@Test @Test
public void testSendMessageApnClientNotPresent() throws Exception { public void testSendMessageApnClientNotPresent() throws Exception {
when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false);
when(device.getApnId()).thenReturn("apn-id"); when(device.getApnId()).thenReturn("apn-id");
messageSender.sendMessage(account, device, message, false); messageSender.sendMessage(account, device, message, false);
verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message);
verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); verifyNoInteractions(gcmSender);
verifyZeroInteractions(gcmSender); verify(apnSender).sendMessage(any());
verify(apnSender).sendMessage(any());
} }
@Test @Test
public void testSendMessageFetchClientNotPresent() throws Exception { public void testSendMessageFetchClientNotPresent() throws Exception {
when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false);
when(device.getFetchesMessages()).thenReturn(true); when(device.getFetchesMessages()).thenReturn(true);
messageSender.sendMessage(account, device, message, false); messageSender.sendMessage(account, device, message, false);
verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message);
verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); verifyNoInteractions(gcmSender);
verifyZeroInteractions(gcmSender); verifyNoInteractions(apnSender);
verifyZeroInteractions(apnSender);
} }
private MessageProtos.Envelope generateRandomMessage() { private MessageProtos.Envelope generateRandomMessage() {

View File

@ -337,56 +337,4 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
assertTrue(notified.get()); assertTrue(notified.get());
} }
@Test(timeout = 5_000L)
public void testInsertAndNotifyEphemeralMessage() throws InterruptedException {
final AtomicBoolean notified = new AtomicBoolean(false);
final MessageProtos.Envelope message = generateRandomMessage(UUID.randomUUID(), true);
final MessageAvailabilityListener listener = new MessageAvailabilityListener() {
@Override
public void handleNewMessagesAvailable() {
}
@Override
public void handleNewEphemeralMessageAvailable() {
synchronized (notified) {
notified.set(true);
notified.notifyAll();
}
}
@Override
public void handleMessagesPersisted() {
}
};
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
messagesCache.insertEphemeral(DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
synchronized (notified) {
while (!notified.get()) {
notified.wait();
}
}
assertTrue(notified.get());
}
@Test
public void testTakeEphemeralMessage() {
final long currentTime = System.currentTimeMillis();
final UUID messageGuid = UUID.randomUUID();
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, true, currentTime);
assertEquals(Optional.empty(), messagesCache.takeEphemeralMessage(DESTINATION_UUID, DESTINATION_DEVICE_ID, currentTime));
messagesCache.insertEphemeral(DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
assertEquals(Optional.of(message), messagesCache.takeEphemeralMessage(DESTINATION_UUID, DESTINATION_DEVICE_ID, currentTime));
assertEquals(Optional.empty(), messagesCache.takeEphemeralMessage(DESTINATION_UUID, DESTINATION_DEVICE_ID, currentTime));
messagesCache.insertEphemeral(DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(UUID.randomUUID(), true, 0));
assertEquals(Optional.empty(), messagesCache.takeEphemeralMessage(DESTINATION_UUID, DESTINATION_DEVICE_ID, currentTime));
}
} }