From 74b3daa70ac3bd1a6ac32ddfa2d57806eb22c825 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Fri, 18 Sep 2020 15:55:53 -0400 Subject: [PATCH] Collapse WebsocketSender into PushSender. --- .../textsecuregcm/WhisperServerService.java | 4 +- .../textsecuregcm/push/PushSender.java | 138 ++++++++++------ .../textsecuregcm/push/WebsocketSender.java | 97 ----------- .../textsecuregcm/push/PushSenderTest.java | 150 ++++++++++++++++++ .../websocket/WebSocketConnectionTest.java | 5 - 5 files changed, 239 insertions(+), 155 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/push/PushSenderTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index cd606870f..956193706 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -106,7 +106,6 @@ import org.whispersystems.textsecuregcm.push.GCMSender; import org.whispersystems.textsecuregcm.push.ProvisioningManager; import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.ReceiptSender; -import org.whispersystems.textsecuregcm.push.WebsocketSender; import org.whispersystems.textsecuregcm.recaptcha.RecaptchaClient; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; @@ -313,7 +312,6 @@ public class WhisperServerService extends Application) executor::getSize); + (Gauge) ((BlockingThreadPoolExecutor)executor)::getSize); + } + + @VisibleForTesting + PushSender(ApnFallbackManager apnFallbackManager, + ClientPresenceManager clientPresenceManager, + MessagesManager messagesManager, + GCMSender gcmSender, + APNSender apnSender, + int queueSize, + ExecutorService executor, + PushLatencyManager pushLatencyManager) { + + this.apnFallbackManager = apnFallbackManager; + this.clientPresenceManager = clientPresenceManager; + this.messagesManager = messagesManager; + this.gcmSender = gcmSender; + this.apnSender = apnSender; + this.queueSize = queueSize; + this.executor = executor; + this.pushLatencyManager = pushLatencyManager; } public void sendMessage(final Account account, final Device device, final Envelope message, boolean online) @@ -80,19 +111,44 @@ public class PushSender implements Managed { } } - private void sendSynchronousMessage(Account account, Device device, Envelope message, boolean online) { - if (device.getGcmId() != null) sendGcmMessage(account, device, message, online); - else if (device.getApnId() != null) sendApnMessage(account, device, message, online); - else if (device.getFetchesMessages()) sendWebSocketMessage(account, device, message, online); - else throw new AssertionError(); - } + @VisibleForTesting + void sendSynchronousMessage(Account account, Device device, Envelope message, boolean online) { + final String channel; - private void sendGcmMessage(Account account, Device device, Envelope message, boolean online) { - final boolean delivered = webSocketSender.sendMessage(account, device, message, WebsocketSender.Type.GCM, online); - - if (!delivered && !online) { - sendGcmNotification(account, device); + if (device.getGcmId() != null) { + channel = "gcm"; + } else if (device.getApnId() != null) { + channel = "apn"; + } else if (device.getFetchesMessages()) { + channel = "websocket"; + } else { + throw new AssertionError(); } + + final boolean clientPresent = clientPresenceManager.isPresent(account.getUuid(), device.getId()); + + if (online) { + if (clientPresent) { + messagesManager.insertEphemeral(account.getUuid(), device.getId(), message); + } + } else { + messagesManager.insert(account.getUuid(), device.getId(), message); + + if (!clientPresent) { + if (!Util.isEmpty(device.getGcmId())) { + sendGcmNotification(account, device); + } else if (!Util.isEmpty(device.getApnId()) || !Util.isEmpty(device.getVoipApnId())) { + sendApnNotification(account, device); + } + } + } + + final List tags = List.of( + Tag.of(CHANNEL_TAG_NAME, channel), + Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(online)), + Tag.of(CLIENT_ONLINE_TAG_NAME, String.valueOf(clientPresent))); + + Metrics.counter(SEND_COUNTER_NAME, tags).increment(); } private void sendGcmNotification(Account account, Device device) { @@ -104,21 +160,9 @@ public class PushSender implements Managed { RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId())); } - private void sendApnMessage(Account account, Device device, Envelope outgoingMessage, boolean online) { - final boolean delivered = webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.APN, online); - - if (!delivered && outgoingMessage.getType() != Envelope.Type.RECEIPT && !online) { - sendApnNotification(account, device, false); - } - } - - private void sendApnNotification(Account account, Device device, boolean newOnly) { + private void sendApnNotification(Account account, Device device) { ApnMessage apnMessage; - if (newOnly && RedisOperation.unchecked(() -> apnFallbackManager.isScheduled(account, device))) { - return; - } - if (!Util.isEmpty(device.getVoipApnId())) { apnMessage = new ApnMessage(device.getVoipApnId(), account.getNumber(), device.getId(), true, Optional.empty()); RedisOperation.unchecked(() -> apnFallbackManager.schedule(account, device)); @@ -131,13 +175,8 @@ public class PushSender implements Managed { RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId())); } - private void sendWebSocketMessage(Account account, Device device, Envelope outgoingMessage, boolean online) - { - webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.WEB, online); - } - @Override - public void start() throws Exception { + public void start() { apnSender.start(); gcmSender.start(); } @@ -150,5 +189,4 @@ public class PushSender implements Managed { apnSender.stop(); gcmSender.stop(); } - } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java deleted file mode 100644 index be558574e..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (C) 2014 Open WhisperSystems - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.whispersystems.textsecuregcm.push; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SharedMetricRegistries; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.Metrics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.storage.MessagesManager; -import org.whispersystems.textsecuregcm.util.Constants; - -import static com.codahale.metrics.MetricRegistry.name; -import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; - -public class WebsocketSender { - - public enum Type { - APN, - GCM, - WEB - } - - @SuppressWarnings("unused") - private static final Logger logger = LoggerFactory.getLogger(WebsocketSender.class); - - private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - - private final Meter websocketOnlineMeter = metricRegistry.meter(name(getClass(), "ws_online" )); - private final Meter websocketOfflineMeter = metricRegistry.meter(name(getClass(), "ws_offline" )); - - private final Meter apnOnlineMeter = metricRegistry.meter(name(getClass(), "apn_online" )); - private final Meter apnOfflineMeter = metricRegistry.meter(name(getClass(), "apn_offline")); - - private final Meter gcmOnlineMeter = metricRegistry.meter(name(getClass(), "gcm_online" )); - private final Meter gcmOfflineMeter = metricRegistry.meter(name(getClass(), "gcm_offline")); - - private final Counter ephemeralOnlineCounter = Metrics.counter(name(getClass(), "ephemeral"), "online", "true"); - private final Counter ephemeralOfflineCounter = Metrics.counter(name(getClass(), "ephemeral"), "offline", "true"); - - private final MessagesManager messagesManager; - private final ClientPresenceManager clientPresenceManager; - - public WebsocketSender(MessagesManager messagesManager, ClientPresenceManager clientPresenceManager) { - this.messagesManager = messagesManager; - this.clientPresenceManager = clientPresenceManager; - } - - public boolean sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) { - final boolean clientPresent = clientPresenceManager.isPresent(account.getUuid(), device.getId()); - - if (online) { - if (clientPresent) { - ephemeralOnlineCounter.increment(); - messagesManager.insertEphemeral(account.getUuid(), device.getId(), message); - return true; - } else { - ephemeralOfflineCounter.increment(); - return false; - } - } else { - messagesManager.insert(account.getUuid(), device.getId(), message); - - if (clientPresent) { - if (channel == Type.APN) apnOnlineMeter.mark(); - else if (channel == Type.GCM) gcmOnlineMeter.mark(); - else websocketOnlineMeter.mark(); - - return true; - } else { - if (channel == Type.APN) apnOfflineMeter.mark(); - else if (channel == Type.GCM) gcmOfflineMeter.mark(); - else websocketOfflineMeter.mark(); - - return false; - } - } - } -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushSenderTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushSenderTest.java new file mode 100644 index 000000000..1a8d4f58e --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushSenderTest.java @@ -0,0 +1,150 @@ +package org.whispersystems.textsecuregcm.push; + +import com.google.protobuf.ByteString; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.Before; +import org.junit.Test; +import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.MessagesManager; + +import java.util.UUID; +import java.util.concurrent.ExecutorService; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class PushSenderTest { + + private Account account; + private Device device; + private MessageProtos.Envelope message; + + private ClientPresenceManager clientPresenceManager; + private MessagesManager messagesManager; + private GCMSender gcmSender; + private APNSender apnSender; + private PushSender pushSender; + + private static final UUID ACCOUNT_UUID = UUID.randomUUID(); + private static final long DEVICE_ID = 1L; + + @Before + public void setUp() { + + account = mock(Account.class); + device = mock(Device.class); + message = generateRandomMessage(); + + clientPresenceManager = mock(ClientPresenceManager.class); + messagesManager = mock(MessagesManager.class); + gcmSender = mock(GCMSender.class); + apnSender = mock(APNSender.class); + pushSender = new PushSender(mock(ApnFallbackManager.class), + clientPresenceManager, + messagesManager, + gcmSender, + apnSender, + 0, + mock(ExecutorService.class), + mock(PushLatencyManager.class)); + + when(account.getUuid()).thenReturn(ACCOUNT_UUID); + when(device.getId()).thenReturn(DEVICE_ID); + } + + @Test + public void testSendOnlineMessageClientPresent() { + when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true); + when(device.getGcmId()).thenReturn("gcm-id"); + + pushSender.sendSynchronousMessage(account, device, message, true); + + verify(messagesManager).insertEphemeral(ACCOUNT_UUID, DEVICE_ID, message); + verify(messagesManager, never()).insert(any(), anyLong(), any()); + verifyZeroInteractions(gcmSender); + verifyZeroInteractions(apnSender); + } + + @Test + public void testSendOnlineMessageClientNotPresent() { + when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); + when(device.getGcmId()).thenReturn("gcm-id"); + + pushSender.sendSynchronousMessage(account, device, message, true); + + verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); + verify(messagesManager, never()).insert(any(), anyLong(), any()); + verifyZeroInteractions(gcmSender); + verifyZeroInteractions(apnSender); + } + + @Test + public void testSendMessageClientPresent() { + when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true); + when(device.getGcmId()).thenReturn("gcm-id"); + + pushSender.sendSynchronousMessage(account, device, message, false); + + verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); + verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); + verifyZeroInteractions(gcmSender); + verifyZeroInteractions(apnSender); + } + + @Test + public void testSendMessageGcmClientNotPresent() { + when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); + when(device.getGcmId()).thenReturn("gcm-id"); + + pushSender.sendSynchronousMessage(account, device, message, false); + + verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); + verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); + verify(gcmSender).sendMessage(any()); + verifyZeroInteractions(apnSender); + } + + @Test + public void testSendMessageApnClientNotPresent() { + when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); + when(device.getApnId()).thenReturn("apn-id"); + + pushSender.sendSynchronousMessage(account, device, message, false); + + verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); + verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); + verifyZeroInteractions(gcmSender); + verify(apnSender).sendMessage(any()); + } + + @Test + public void testSendMessageFetchClientNotPresent() { + when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); + when(device.getFetchesMessages()).thenReturn(true); + + pushSender.sendSynchronousMessage(account, device, message, false); + + verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); + verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); + verifyZeroInteractions(gcmSender); + verifyZeroInteractions(apnSender); + } + + private MessageProtos.Envelope generateRandomMessage() { + return MessageProtos.Envelope.newBuilder() + .setTimestamp(System.currentTimeMillis()) + .setServerTimestamp(System.currentTimeMillis()) + .setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256))) + .setType(MessageProtos.Envelope.Type.CIPHERTEXT) + .setServerGuid(UUID.randomUUID().toString()) + .build(); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 733b343d8..d0728d37f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -13,7 +13,6 @@ import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList; import org.whispersystems.textsecuregcm.push.ApnFallbackManager; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.ReceiptSender; -import org.whispersystems.textsecuregcm.push.WebsocketSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; @@ -48,10 +47,8 @@ import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; @@ -245,7 +242,6 @@ public class WebSocketConnectionTest { @Test public void testPendingSend() throws Exception { MessagesManager storedMessages = mock(MessagesManager.class); - WebsocketSender websocketSender = mock(WebsocketSender.class); final Envelope firstMessage = Envelope.newBuilder() .setLegacyMessage(ByteString.copyFrom("first".getBytes())) @@ -331,7 +327,6 @@ public class WebSocketConnectionTest { futures.get(0).completeExceptionally(new IOException()); verify(receiptSender, times(1)).sendReceipt(eq(account), eq("sender2"), eq(secondMessage.getTimestamp())); - verifyNoMoreInteractions(websocketSender); connection.stop(); verify(client).close(anyInt(), anyString());