From 9d19fc9ecc0f1bd7ba6db1373c284a576c60fb41 Mon Sep 17 00:00:00 2001 From: Jon Chambers <63609320+jon-signal@users.noreply.github.com> Date: Wed, 6 Nov 2024 12:10:44 -0500 Subject: [PATCH] Shift authority to the new pub/sub client presence system --- .../textsecuregcm/WhisperServerService.java | 8 +- .../controllers/KeepAliveController.java | 9 +- .../textsecuregcm/push/MessageSender.java | 98 +++++----- .../push/PubSubClientEventManager.java | 41 ++--- .../websocket/WebSocketConnection.java | 22 +-- .../workers/CommandDependencies.java | 11 +- .../textsecuregcm/push/MessageSenderTest.java | 172 +++++++----------- .../push/PubSubClientEventManagerTest.java | 14 +- 8 files changed, 157 insertions(+), 218 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 95847a042..62464ac2d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -619,7 +619,7 @@ public class WhisperServerService extends Application spamFilters = ServiceLoader.load(SpamFilter.class) @@ -1159,7 +1159,7 @@ public class WhisperServerService extends Application { - if (!clientPresenceManager.isLocallyPresent(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId())) { + if (!pubSubClientEventManager.isLocallyPresent(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId())) { final Duration age = Duration.between(context.getClient().getCreated(), Instant.now()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java index a0ad876b7..246a0b456 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java @@ -7,12 +7,16 @@ package org.whispersystems.textsecuregcm.push; import static com.codahale.metrics.MetricRegistry.name; import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; +import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import org.whispersystems.textsecuregcm.identity.IdentityType; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessagesManager; -import java.util.Objects; +import org.whispersystems.textsecuregcm.util.Util; +import java.util.concurrent.CompletableFuture; /** * A MessageSender sends Signal messages to destination devices. Messages may be "normal" user-to-user messages, @@ -29,7 +33,6 @@ import java.util.Objects; */ public class MessageSender { - private final ClientPresenceManager clientPresenceManager; private final PubSubClientEventManager pubSubClientEventManager; private final MessagesManager messagesManager; private final PushNotificationManager pushNotificationManager; @@ -38,71 +41,68 @@ public class MessageSender { private static final String CHANNEL_TAG_NAME = "channel"; private static final String EPHEMERAL_TAG_NAME = "ephemeral"; private static final String CLIENT_ONLINE_TAG_NAME = "clientOnline"; - private static final String PUB_SUB_CLIENT_ONLINE_TAG_NAME = "pubSubClientOnline"; private static final String URGENT_TAG_NAME = "urgent"; private static final String STORY_TAG_NAME = "story"; private static final String SEALED_SENDER_TAG_NAME = "sealedSender"; - public MessageSender(final ClientPresenceManager clientPresenceManager, - final PubSubClientEventManager pubSubClientEventManager, + private static final Counter CLIENT_PRESENCE_ERROR = + Metrics.counter(MetricsUtil.name(MessageSender.class, "clientPresenceError")); + + public MessageSender(final PubSubClientEventManager pubSubClientEventManager, final MessagesManager messagesManager, final PushNotificationManager pushNotificationManager) { - this.clientPresenceManager = clientPresenceManager; this.pubSubClientEventManager = pubSubClientEventManager; this.messagesManager = messagesManager; this.pushNotificationManager = pushNotificationManager; } - public void sendMessage(final Account account, final Device device, final Envelope message, final boolean online) { + public CompletableFuture sendMessage(final Account account, final Device device, final Envelope message, final boolean online) { + messagesManager.insert(account.getUuid(), + device.getId(), + online ? message.toBuilder().setEphemeral(true).build() : message); - final String channel; + return pubSubClientEventManager.handleNewMessageAvailable(account.getIdentifier(IdentityType.ACI), device.getId()) + .exceptionally(throwable -> { + // It's unlikely that the message insert (synchronous) would succeed and sending a "new message available" + // event would fail since both things happen in the same cluster, but just in case, we should "fail open" and + // act as if the client wasn't present if this happens. This is a conservative measure that biases toward + // sending more push notifications, though again, it shouldn't happen often. + CLIENT_PRESENCE_ERROR.increment(); + return false; + }) + .thenApply(clientPresent -> { + if (!clientPresent && !online) { + try { + pushNotificationManager.sendNewMessageNotification(account, device.getId(), message.getUrgent()); + } catch (final NotPushRegisteredException ignored) { + } + } - if (device.getGcmId() != null) { - channel = "gcm"; - } else if (device.getApnId() != null) { - channel = "apn"; - } else if (device.getFetchesMessages()) { - channel = "websocket"; - } else { - channel = "none"; - } - - final boolean clientPresent; - - if (online) { - clientPresent = clientPresenceManager.isPresent(account.getUuid(), device.getId()); - - if (clientPresent) { - messagesManager.insert(account.getUuid(), device.getId(), message.toBuilder().setEphemeral(true).build()); - } else { - messagesManager.removeRecipientViewFromMrmData(device.getId(), message); - } - } else { - messagesManager.insert(account.getUuid(), device.getId(), message); - - // We check for client presence after inserting the message to take a conservative view of notifications. If the - // client wasn't present at the time of insertion but is now, they'll retrieve the message. If they were present - // but disconnected before the message was delivered, we should send a notification. - clientPresent = clientPresenceManager.isPresent(account.getUuid(), device.getId()); - - if (!clientPresent) { - try { - pushNotificationManager.sendNewMessageNotification(account, device.getId(), message.getUrgent()); - } catch (final NotPushRegisteredException ignored) { - } - } - } - - pubSubClientEventManager.handleNewMessageAvailable(account.getIdentifier(IdentityType.ACI), device.getId()) - .whenComplete((present, throwable) -> Metrics.counter(SEND_COUNTER_NAME, - CHANNEL_TAG_NAME, channel, + return clientPresent; + }) + .whenComplete((clientPresent, throwable) -> Metrics.counter(SEND_COUNTER_NAME, + CHANNEL_TAG_NAME, getDeliveryChannelName(device), EPHEMERAL_TAG_NAME, String.valueOf(online), CLIENT_ONLINE_TAG_NAME, String.valueOf(clientPresent), - PUB_SUB_CLIENT_ONLINE_TAG_NAME, String.valueOf(Objects.requireNonNullElse(present, false)), URGENT_TAG_NAME, String.valueOf(message.getUrgent()), STORY_TAG_NAME, String.valueOf(message.getStory()), SEALED_SENDER_TAG_NAME, String.valueOf(!message.hasSourceServiceId())) - .increment()); + .increment()) + .thenRun(Util.NOOP) + .toCompletableFuture(); + } + + @VisibleForTesting + static String getDeliveryChannelName(final Device device) { + if (device.getGcmId() != null) { + return "gcm"; + } else if (device.getApnId() != null) { + return "apn"; + } else if (device.getFetchesMessages()) { + return "websocket"; + } else { + return "none"; + } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java index 082f71c47..1d55f60a3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java @@ -15,9 +15,21 @@ import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tags; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; @@ -25,14 +37,6 @@ import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.util.RedisClusterUtil; import org.whispersystems.textsecuregcm.util.UUIDUtil; import org.whispersystems.textsecuregcm.util.Util; -import javax.annotation.Nullable; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; /** * The pub/sub-based client presence manager uses the Redis 7 sharded pub/sub system to notify connected clients that @@ -54,9 +58,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter pubSubConnection; @@ -90,12 +91,10 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter()); @@ -140,10 +139,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter displacedListener = new AtomicReference<>(); @@ -201,10 +196,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter> unsubscribeFuture = new AtomicReference<>(); // Note that we're relying on some specific implementation details of `ConcurrentHashMap#compute(...)`. In @@ -248,10 +239,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter connection.async().spublish(getClientPresenceKey(accountIdentifier, deviceId), NEW_MESSAGE_EVENT_BYTES)) .thenApply(listeners -> listeners > 0); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index b87281b94..1d907e4da 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -513,6 +513,17 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac ); Metrics.counter(DISPLACEMENT_COUNTER_NAME, tags).increment(); + } + + @Override + public void handleConnectionDisplaced(final boolean connectedElsewhere) { + final Tags tags = Tags.of( + UserAgentTagUtil.getPlatformTag(client.getUserAgent()), + Tag.of("connectedElsewhere", String.valueOf(connectedElsewhere)), + Tag.of(PRESENCE_MANAGER_TAG, "pubsub") + ); + + Metrics.counter(DISPLACEMENT_COUNTER_NAME, tags).increment(); final int code; final String message; @@ -534,17 +545,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac } } - @Override - public void handleConnectionDisplaced(final boolean connectedElsewhere) { - final Tags tags = Tags.of( - UserAgentTagUtil.getPlatformTag(client.getUserAgent()), - Tag.of("connectedElsewhere", String.valueOf(connectedElsewhere)), - Tag.of(PRESENCE_MANAGER_TAG, "pubsub") - ); - - Metrics.counter(DISPLACEMENT_COUNTER_NAME, tags).increment(); - } - private record StoredMessageInfo(UUID guid, long serverTimestamp) { } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index c86d68d17..7dc53f5c5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -31,18 +31,17 @@ import org.whispersystems.textsecuregcm.backup.Cdn3RemoteStorageManager; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.controllers.SecureStorageController; import org.whispersystems.textsecuregcm.controllers.SecureValueRecovery2Controller; -import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.experiment.PushNotificationExperimentSamples; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.metrics.MicrometerAwsSdkMetricPublisher; import org.whispersystems.textsecuregcm.push.APNSender; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; -import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.FcmSender; +import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; +import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; import org.whispersystems.textsecuregcm.storage.AccountLockManager; @@ -218,9 +217,7 @@ record CommandDependencies( storageServiceExecutor, storageServiceRetryExecutor, configuration.getSecureStorageServiceConfiguration()); ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, recurringJobExecutor, keyspaceNotificationDispatchExecutor); - ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager( - dynamicConfigurationManager); - PubSubClientEventManager pubSubClientEventManager = new PubSubClientEventManager(messagesCluster, clientEventExecutor, experimentEnrollmentManager); + PubSubClientEventManager pubSubClientEventManager = new PubSubClientEventManager(messagesCluster, clientEventExecutor); MessagesCache messagesCache = new MessagesCache(messagesCluster, keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC(), dynamicConfigurationManager); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java index 344505e47..2ea3cb604 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java @@ -7,55 +7,41 @@ package org.whispersystems.textsecuregcm.push; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyByte; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.ArgumentCaptor; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junitpioneer.jupiter.cartesian.CartesianTest; import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessagesManager; class MessageSenderTest { - private Account account; - private Device device; - private MessageProtos.Envelope message; - - private ClientPresenceManager clientPresenceManager; private PubSubClientEventManager pubSubClientEventManager; private MessagesManager messagesManager; private PushNotificationManager pushNotificationManager; private MessageSender messageSender; - private static final UUID ACCOUNT_UUID = UUID.randomUUID(); - private static final byte DEVICE_ID = 1; - @BeforeEach void setUp() { - - account = mock(Account.class); - device = mock(Device.class); - message = generateRandomMessage(); - - clientPresenceManager = mock(ClientPresenceManager.class); pubSubClientEventManager = mock(PubSubClientEventManager.class); messagesManager = mock(MessagesManager.class); pushNotificationManager = mock(PushNotificationManager.class); @@ -63,109 +49,85 @@ class MessageSenderTest { when(pubSubClientEventManager.handleNewMessageAvailable(any(), anyByte())) .thenReturn(CompletableFuture.completedFuture(true)); - messageSender = new MessageSender(clientPresenceManager, pubSubClientEventManager, messagesManager, pushNotificationManager); - - when(account.getUuid()).thenReturn(ACCOUNT_UUID); - when(device.getId()).thenReturn(DEVICE_ID); + messageSender = new MessageSender(pubSubClientEventManager, messagesManager, pushNotificationManager); } - @Test - void testSendOnlineMessageClientPresent() throws Exception { - when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true); - when(device.getGcmId()).thenReturn("gcm-id"); + @CartesianTest + void sendMessage(@CartesianTest.Values(booleans = {true, false}) final boolean clientPresent, + @CartesianTest.Values(booleans = {true, false}) final boolean onlineMessage, + @CartesianTest.Values(booleans = {true, false}) final boolean hasPushToken) throws NotPushRegisteredException { - messageSender.sendMessage(account, device, message, true); + final boolean expectPushNotificationAttempt = !clientPresent && !onlineMessage; - ArgumentCaptor envelopeArgumentCaptor = ArgumentCaptor.forClass( - MessageProtos.Envelope.class); + final UUID accountIdentifier = UUID.randomUUID(); + final byte deviceId = Device.PRIMARY_ID; - verify(messagesManager).insert(any(), anyByte(), envelopeArgumentCaptor.capture()); - verify(messagesManager, never()).removeRecipientViewFromMrmData(anyByte(), any(MessageProtos.Envelope.class)); + final Account account = mock(Account.class); + final Device device = mock(Device.class); + final MessageProtos.Envelope message = generateRandomMessage(); - assertTrue(envelopeArgumentCaptor.getValue().getEphemeral()); + when(account.getUuid()).thenReturn(accountIdentifier); + when(account.getIdentifier(IdentityType.ACI)).thenReturn(accountIdentifier); + when(device.getId()).thenReturn(deviceId); - verifyNoInteractions(pushNotificationManager); + if (hasPushToken) { + when(device.getApnId()).thenReturn("apns-token"); + } else { + doThrow(NotPushRegisteredException.class) + .when(pushNotificationManager).sendNewMessageNotification(any(), anyByte(), anyBoolean()); + } + + when(pubSubClientEventManager.handleNewMessageAvailable(accountIdentifier, deviceId)) + .thenReturn(CompletableFuture.completedFuture(clientPresent)); + + assertDoesNotThrow(() -> messageSender.sendMessage(account, device, message, onlineMessage).join()); + + final MessageProtos.Envelope expectedMessage = onlineMessage + ? message.toBuilder().setEphemeral(true).build() + : message.toBuilder().build(); + + verify(messagesManager).insert(accountIdentifier, deviceId, expectedMessage); + + if (expectPushNotificationAttempt) { + verify(pushNotificationManager).sendNewMessageNotification(account, deviceId, expectedMessage.getUrgent()); + } else { + verifyNoInteractions(pushNotificationManager); + } } @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testSendOnlineMessageClientNotPresent(final boolean hasSharedMrmKey) throws Exception { + @MethodSource + void getDeliveryChannelName(final Device device, final String expectedChannelName) { + assertEquals(expectedChannelName, MessageSender.getDeliveryChannelName(device)); + } - when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); - when(device.getGcmId()).thenReturn("gcm-id"); + private static List getDeliveryChannelName() { + final List arguments = new ArrayList<>(); - if (hasSharedMrmKey) { - messageSender.sendMessage(account, device, - message.toBuilder().setSharedMrmKey(ByteString.copyFromUtf8("sharedMrmKey")).build(), true); - } else { - messageSender.sendMessage(account, device, message, true); + { + final Device apnDevice = mock(Device.class); + when(apnDevice.getApnId()).thenReturn("apns-token"); + + arguments.add(Arguments.of(apnDevice, "apn")); } - verify(messagesManager, never()).insert(any(), anyByte(), any()); - verify(messagesManager).removeRecipientViewFromMrmData(anyByte(), any(MessageProtos.Envelope.class)); + { + final Device fcmDevice = mock(Device.class); + when(fcmDevice.getGcmId()).thenReturn("fcm-token"); - verifyNoInteractions(pushNotificationManager); - } + arguments.add(Arguments.of(fcmDevice, "gcm")); + } - @Test - void testSendMessageClientPresent() throws Exception { - when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true); - when(device.getGcmId()).thenReturn("gcm-id"); + { + final Device fetchesMessagesDevice = mock(Device.class); + when(fetchesMessagesDevice.getFetchesMessages()).thenReturn(true); - messageSender.sendMessage(account, device, message, false); + arguments.add(Arguments.of(fetchesMessagesDevice, "websocket")); + } - final ArgumentCaptor envelopeArgumentCaptor = ArgumentCaptor.forClass( - MessageProtos.Envelope.class); + arguments.add(Arguments.of(mock(Device.class), "none")); - verify(messagesManager).insert(eq(ACCOUNT_UUID), eq(DEVICE_ID), envelopeArgumentCaptor.capture()); - - assertFalse(envelopeArgumentCaptor.getValue().getEphemeral()); - assertEquals(message, envelopeArgumentCaptor.getValue()); - verifyNoInteractions(pushNotificationManager); - } - - @Test - void testSendMessageGcmClientNotPresent() throws Exception { - when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); - when(device.getGcmId()).thenReturn("gcm-id"); - - messageSender.sendMessage(account, device, message, false); - - verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); - verify(pushNotificationManager).sendNewMessageNotification(account, device.getId(), message.getUrgent()); - } - - @Test - void testSendMessageApnClientNotPresent() throws Exception { - when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); - when(device.getApnId()).thenReturn("apn-id"); - - messageSender.sendMessage(account, device, message, false); - - verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); - verify(pushNotificationManager).sendNewMessageNotification(account, device.getId(), message.getUrgent()); - } - - @Test - void testSendMessageFetchClientNotPresent() throws Exception { - when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); - when(device.getFetchesMessages()).thenReturn(true); - - doThrow(NotPushRegisteredException.class) - .when(pushNotificationManager).sendNewMessageNotification(account, DEVICE_ID, message.getUrgent()); - - assertDoesNotThrow(() -> messageSender.sendMessage(account, device, message, false)); - verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); - } - - @Test - void testSendMessageNoChannel() { - when(device.getGcmId()).thenReturn(null); - when(device.getApnId()).thenReturn(null); - when(device.getFetchesMessages()).thenReturn(false); - - assertDoesNotThrow(() -> messageSender.sendMessage(account, device, message, false)); - verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); + return arguments; } private MessageProtos.Envelope generateRandomMessage() { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java index 7293b3424..7c2d330a9 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java @@ -33,7 +33,6 @@ import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.Device; @@ -69,11 +68,8 @@ class PubSubClientEventManagerTest { @BeforeEach void setUp() { - final ExperimentEnrollmentManager experimentEnrollmentManager = mock(ExperimentEnrollmentManager.class); - when(experimentEnrollmentManager.isEnrolled(any(UUID.class), any())).thenReturn(true); - - localPresenceManager = new PubSubClientEventManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), clientEventExecutor, experimentEnrollmentManager); - remotePresenceManager = new PubSubClientEventManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), clientEventExecutor, experimentEnrollmentManager); + localPresenceManager = new PubSubClientEventManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), clientEventExecutor); + remotePresenceManager = new PubSubClientEventManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), clientEventExecutor); localPresenceManager.start(); remotePresenceManager.start(); @@ -264,9 +260,6 @@ class PubSubClientEventManagerTest { @Test void resubscribe() { - final ExperimentEnrollmentManager experimentEnrollmentManager = mock(ExperimentEnrollmentManager.class); - when(experimentEnrollmentManager.isEnrolled(any(UUID.class), any())).thenReturn(true); - @SuppressWarnings("unchecked") final RedisClusterPubSubCommands pubSubCommands = mock(RedisClusterPubSubCommands.class); @@ -280,8 +273,7 @@ class PubSubClientEventManagerTest { .binaryPubSubAsyncCommands(pubSubAsyncCommands) .build(); - final PubSubClientEventManager presenceManager = - new PubSubClientEventManager(clusterClient, Runnable::run, experimentEnrollmentManager); + final PubSubClientEventManager presenceManager = new PubSubClientEventManager(clusterClient, Runnable::run); presenceManager.start();