From bac268a21c73cd1cb3d819aa1bfd89cc416822d7 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Fri, 9 Oct 2020 20:58:10 -0400 Subject: [PATCH] Don't send a reply to clients until messages are safely in a non-volatile store. --- .../textsecuregcm/WhisperServerService.java | 2 +- .../textsecuregcm/push/MessageSender.java | 52 +-------------- .../textsecuregcm/storage/MessagesCache.java | 4 +- .../util/BlockingThreadPoolExecutor.java | 56 ----------------- .../textsecuregcm/push/MessageSenderTest.java | 27 ++++---- .../util/BlockingThreadPoolExecutorTest.java | 63 ------------------- 6 files changed, 17 insertions(+), 187 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/util/BlockingThreadPoolExecutor.java delete mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/tests/util/BlockingThreadPoolExecutorTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 9389328f1..e4170aec9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -328,7 +328,7 @@ public class WhisperServerService extends Application) ((BlockingThreadPoolExecutor)executor)::getSize); - } - - @VisibleForTesting - MessageSender(ApnFallbackManager apnFallbackManager, - ClientPresenceManager clientPresenceManager, - MessagesManager messagesManager, - GCMSender gcmSender, - APNSender apnSender, - int queueSize, - ExecutorService executor, - PushLatencyManager pushLatencyManager) { - this.apnFallbackManager = apnFallbackManager; this.clientPresenceManager = clientPresenceManager; this.messagesManager = messagesManager; this.gcmSender = gcmSender; this.apnSender = apnSender; - this.queueSize = queueSize; - this.executor = executor; this.pushLatencyManager = pushLatencyManager; } @@ -105,15 +69,6 @@ public class MessageSender implements Managed { throw new NotPushRegisteredException("No delivery possible!"); } - if (queueSize > 0) { - executor.execute(() -> sendSynchronousMessage(account, device, message, online)); - } else { - sendSynchronousMessage(account, device, message, online); - } - } - - @VisibleForTesting - void sendSynchronousMessage(Account account, Device device, Envelope message, boolean online) { final String channel; if (device.getGcmId() != null) { @@ -193,10 +148,7 @@ public class MessageSender implements Managed { } @Override - public void stop() throws Exception { - executor.shutdown(); - executor.awaitTermination(5, TimeUnit.MINUTES); - + public void stop() { apnSender.stop(); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index e6b2d559b..887f8ee33 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -158,8 +158,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp final byte[] ephemeralQueueKey = getEphemeralMessageQueueKey(destinationUuid, destinationDevice); redisCluster.useBinaryCluster(connection -> { - connection.async().rpush(ephemeralQueueKey, message.toByteArray()); - connection.async().expire(ephemeralQueueKey, MAX_EPHEMERAL_MESSAGE_DELAY.toSeconds()); + connection.sync().rpush(ephemeralQueueKey, message.toByteArray()); + connection.sync().expire(ephemeralQueueKey, MAX_EPHEMERAL_MESSAGE_DELAY.toSeconds()); }); }); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/BlockingThreadPoolExecutor.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/BlockingThreadPoolExecutor.java deleted file mode 100644 index 5fd621127..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/util/BlockingThreadPoolExecutor.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.util; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SharedMetricRegistries; -import com.codahale.metrics.Timer; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import static com.codahale.metrics.MetricRegistry.name; - -public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { - - private final Semaphore semaphore; - private final Timer acquirePermitTimer; - - public BlockingThreadPoolExecutor(String name, int threads, int bound) { - super(threads, threads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue()); - this.semaphore = new Semaphore(bound); - - final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - - this.acquirePermitTimer = metricRegistry.timer(name(getClass(), name, "acquirePermit")); - metricRegistry.gauge(name(getClass(), name, "permitsAvailable"), () -> semaphore::availablePermits); - } - - @Override - public void execute(Runnable task) { - try (final Timer.Context ignored = acquirePermitTimer.time()) { - semaphore.acquireUninterruptibly(); - } - - try { - super.execute(task); - } catch (Throwable t) { - semaphore.release(); - throw new RuntimeException(t); - } - } - - @Override - protected void afterExecute(Runnable r, Throwable t) { - semaphore.release(); - } - - public int getSize() { - return ((LinkedBlockingQueue)getQueue()).size(); - } -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java index a0c05bea0..e811b213b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java @@ -16,7 +16,6 @@ import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessagesManager; import java.util.UUID; -import java.util.concurrent.ExecutorService; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -57,8 +56,6 @@ public class MessageSenderTest { messagesManager, gcmSender, apnSender, - 0, - mock(ExecutorService.class), mock(PushLatencyManager.class)); when(account.getUuid()).thenReturn(ACCOUNT_UUID); @@ -66,11 +63,11 @@ public class MessageSenderTest { } @Test - public void testSendOnlineMessageClientPresent() { + public void testSendOnlineMessageClientPresent() throws Exception { when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true); when(device.getGcmId()).thenReturn("gcm-id"); - messageSender.sendSynchronousMessage(account, device, message, true); + messageSender.sendMessage(account, device, message, true); verify(messagesManager).insertEphemeral(ACCOUNT_UUID, DEVICE_ID, message); verify(messagesManager, never()).insert(any(), anyLong(), any()); @@ -79,11 +76,11 @@ public class MessageSenderTest { } @Test - public void testSendOnlineMessageClientNotPresent() { + public void testSendOnlineMessageClientNotPresent() throws Exception { when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); when(device.getGcmId()).thenReturn("gcm-id"); - messageSender.sendSynchronousMessage(account, device, message, true); + messageSender.sendMessage(account, device, message, true); verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); verify(messagesManager, never()).insert(any(), anyLong(), any()); @@ -92,11 +89,11 @@ public class MessageSenderTest { } @Test - public void testSendMessageClientPresent() { + public void testSendMessageClientPresent() throws Exception { when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(true); when(device.getGcmId()).thenReturn("gcm-id"); - messageSender.sendSynchronousMessage(account, device, message, false); + messageSender.sendMessage(account, device, message, false); verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); @@ -105,11 +102,11 @@ public class MessageSenderTest { } @Test - public void testSendMessageGcmClientNotPresent() { + public void testSendMessageGcmClientNotPresent() throws Exception { when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); when(device.getGcmId()).thenReturn("gcm-id"); - messageSender.sendSynchronousMessage(account, device, message, false); + messageSender.sendMessage(account, device, message, false); verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); @@ -118,11 +115,11 @@ public class MessageSenderTest { } @Test - public void testSendMessageApnClientNotPresent() { + public void testSendMessageApnClientNotPresent() throws Exception { when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); when(device.getApnId()).thenReturn("apn-id"); - messageSender.sendSynchronousMessage(account, device, message, false); + messageSender.sendMessage(account, device, message, false); verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); @@ -131,11 +128,11 @@ public class MessageSenderTest { } @Test - public void testSendMessageFetchClientNotPresent() { + public void testSendMessageFetchClientNotPresent() throws Exception { when(clientPresenceManager.isPresent(ACCOUNT_UUID, DEVICE_ID)).thenReturn(false); when(device.getFetchesMessages()).thenReturn(true); - messageSender.sendSynchronousMessage(account, device, message, false); + messageSender.sendMessage(account, device, message, false); verify(messagesManager, never()).insertEphemeral(any(), anyLong(), any()); verify(messagesManager).insert(ACCOUNT_UUID, DEVICE_ID, message); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/BlockingThreadPoolExecutorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/BlockingThreadPoolExecutorTest.java deleted file mode 100644 index cc58e2624..000000000 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/BlockingThreadPoolExecutorTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.tests.util; - -import org.junit.Test; -import org.whispersystems.textsecuregcm.util.BlockingThreadPoolExecutor; -import org.whispersystems.textsecuregcm.util.Util; - -import static org.junit.Assert.assertTrue; - -public class BlockingThreadPoolExecutorTest { - - @Test - public void testBlocking() { - BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor("test", 1, 3); - long start = System.currentTimeMillis(); - - executor.execute(new Runnable() { - @Override - public void run() { - Util.sleep(1000); - } - }); - - assertTrue(System.currentTimeMillis() - start < 500); - start = System.currentTimeMillis(); - - executor.execute(new Runnable() { - @Override - public void run() { - Util.sleep(1000); - } - }); - - assertTrue(System.currentTimeMillis() - start < 500); - - start = System.currentTimeMillis(); - - executor.execute(new Runnable() { - @Override - public void run() { - Util.sleep(1000); - } - }); - - assertTrue(System.currentTimeMillis() - start < 500); - - start = System.currentTimeMillis(); - - executor.execute(new Runnable() { - @Override - public void run() { - Util.sleep(1000); - } - }); - - assertTrue(System.currentTimeMillis() - start > 500); - } - -}