From e0178fa0ea4fb993e9185845bd8ac6567ada52ed Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Wed, 2 Nov 2022 10:51:44 -0500 Subject: [PATCH] Move additional handling of `MessagesManager#delete` to executor --- .../textsecuregcm/WhisperServerService.java | 8 +++++--- .../textsecuregcm/storage/MessagesDynamoDb.java | 5 +++++ .../textsecuregcm/storage/MessagesManager.java | 10 +++++++--- .../textsecuregcm/websocket/WebSocketConnection.java | 2 +- .../textsecuregcm/workers/AssignUsernameCommand.java | 2 +- .../textsecuregcm/workers/DeleteUserCommand.java | 4 ++-- .../workers/SetUserDiscoverabilityCommand.java | 4 ++-- .../storage/MessagePersisterIntegrationTest.java | 3 ++- .../textsecuregcm/storage/MessagesManagerTest.java | 3 ++- .../websocket/WebSocketConnectionIntegrationTest.java | 6 +++--- 10 files changed, 30 insertions(+), 17 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 978b63d1f..102212f40 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -469,9 +469,11 @@ public class WhisperServerService extends Application messages, final UUID destinationAccountUuid, final long destinationDeviceId) { @@ -176,6 +180,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { return null; }) .map(Optional::ofNullable) + .subscribeOn(messageDeletionScheduler) .last(Optional.empty()) // if the flux is empty, last() will throw without a default .toFuture(); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 2beae6949..18422b557 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -15,6 +15,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -44,14 +45,17 @@ public class MessagesManager { private final MessagesDynamoDb messagesDynamoDb; private final MessagesCache messagesCache; private final ReportMessageManager reportMessageManager; + private final ExecutorService messageDeletionExecutor; public MessagesManager( final MessagesDynamoDb messagesDynamoDb, final MessagesCache messagesCache, - final ReportMessageManager reportMessageManager) { + final ReportMessageManager reportMessageManager, + final ExecutorService messageDeletionExecutor) { this.messagesDynamoDb = messagesDynamoDb; this.messagesCache = messagesCache; this.reportMessageManager = reportMessageManager; + this.messageDeletionExecutor = messageDeletionExecutor; } public void insert(UUID destinationUuid, long destinationDevice, Envelope message) { @@ -111,7 +115,7 @@ public class MessagesManager { public CompletableFuture> delete(UUID destinationUuid, long destinationDeviceId, UUID guid, @Nullable Long serverTimestamp) { return messagesCache.remove(destinationUuid, destinationDeviceId, guid) - .thenCompose(removed -> { + .thenComposeAsync(removed -> { if (removed.isPresent()) { cacheHitByGuidMeter.mark(); @@ -126,7 +130,7 @@ public class MessagesManager { return messagesDynamoDb.deleteMessage(destinationUuid, destinationDeviceId, guid, serverTimestamp); } - }); + }, messageDeletionExecutor); } /** diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index b15b4e7e4..43d1535a1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -127,7 +127,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private final Random random = new Random(); private final boolean useReactive; - private Scheduler reactiveScheduler; + private final Scheduler reactiveScheduler; private enum StoredMessageState { EMPTY, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java index 3fdc3b598..8bf20d7ad 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java @@ -186,7 +186,7 @@ public class AssignUsernameCommand extends EnvironmentCommand new Pair<>(account, device)), device, webSocketClient, @@ -207,7 +207,7 @@ class WebSocketConnectionIntegrationTest { void testProcessStoredMessagesClientClosed(final boolean useReactive) { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), - new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager), + new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), new AuthenticatedAccount(() -> new Pair<>(account, device)), device, webSocketClient, @@ -273,7 +273,7 @@ class WebSocketConnectionIntegrationTest { void testProcessStoredMessagesSendFutureTimeout(final boolean useReactive) { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), - new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager), + new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), new AuthenticatedAccount(() -> new Pair<>(account, device)), device, webSocketClient,