From a96c0ec7a3db50a2b28e7c3db0feb1d8fe9a0b0d Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Mon, 16 Dec 2024 17:12:24 -0500 Subject: [PATCH] Enqueue async operations from a dedicated thread --- .../textsecuregcm/WhisperServerService.java | 4 ++- .../push/WebSocketConnectionEventManager.java | 26 +++++++++++++------ .../workers/CommandDependencies.java | 4 ++- .../WebSocketConnectionEventManagerTest.java | 12 +++++++-- .../MessagePersisterIntegrationTest.java | 9 ++++++- 5 files changed, 42 insertions(+), 13 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 4b611f7bc..cace17d10 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -487,6 +487,8 @@ public class WhisperServerService extends Application pubSubConnection; @@ -102,13 +108,15 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter()); @@ -169,8 +177,9 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter { - subscribeFuture.set(pubSubConnection.withPubSubConnection(connection -> - connection.async().ssubscribe(eventChannel))); + subscribeFuture.set(CompletableFuture.supplyAsync(() -> pubSubConnection.withPubSubConnection(connection -> + connection.async().ssubscribe(eventChannel)), asyncOperationQueueingExecutor) + .thenCompose(Function.identity())); if (existingListener != null) { displacedListener.set(existingListener); @@ -223,9 +232,10 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter { - unsubscribeFuture.set(pubSubConnection.withPubSubConnection(connection -> - connection.async().sunsubscribe(getClientEventChannel(accountIdentifier, deviceId))) - .thenRun(Util.NOOP)); + unsubscribeFuture.set(CompletableFuture.supplyAsync(() -> pubSubConnection.withPubSubConnection(connection -> + connection.async().sunsubscribe(getClientEventChannel(accountIdentifier, deviceId))) + .thenRun(Util.NOOP), asyncOperationQueueingExecutor) + .thenCompose(Function.identity())); return null; }); @@ -295,9 +305,9 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter { if (existingListener == null && pubSubConnection != null) { // Enqueue, but do not block on, an "unsubscribe" operation - pubSubConnection.usePubSubConnection(connection -> + asyncOperationQueueingExecutor.execute(() -> pubSubConnection.usePubSubConnection(connection -> connection.async().sunsubscribe(getClientEventChannel(accountAndDeviceIdentifier.accountIdentifier(), - accountAndDeviceIdentifier.deviceId()))); + accountAndDeviceIdentifier.deviceId())))); } // Make no change to the existing listener whether present or absent diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 307a4f722..34657f680 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -143,6 +143,8 @@ record CommandDependencies( .maxThreads(16).minThreads(16).build(); ExecutorService clientEventExecutor = environment.lifecycle() .virtualExecutorService(name(name, "clientEvent-%d")); + ExecutorService asyncOperationQueueingExecutor = environment.lifecycle() + .executorService(name(name, "asyncOperationQueueing-%d")).minThreads(1).maxThreads(1).build(); ExecutorService disconnectionRequestListenerExecutor = environment.lifecycle() .virtualExecutorService(name(name, "disconnectionRequest-%d")); @@ -283,7 +285,7 @@ record CommandDependencies( Clock.systemUTC()); WebSocketConnectionEventManager webSocketConnectionEventManager = - new WebSocketConnectionEventManager(accountsManager, pushNotificationManager, messagesCluster, clientEventExecutor); + new WebSocketConnectionEventManager(accountsManager, pushNotificationManager, messagesCluster, clientEventExecutor, asyncOperationQueueingExecutor); environment.lifecycle().manage(apnSender); environment.lifecycle().manage(disconnectionRequestManager); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManagerTest.java index 917cbb85a..b669fb126 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManagerTest.java @@ -51,6 +51,7 @@ class WebSocketConnectionEventManagerTest { private WebSocketConnectionEventManager remoteEventManager; private static ExecutorService webSocketConnectionEventExecutor; + private static ExecutorService asyncOperationQueueingExecutor; @RegisterExtension static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); @@ -73,6 +74,7 @@ class WebSocketConnectionEventManagerTest { @BeforeAll static void setUpBeforeAll() { webSocketConnectionEventExecutor = Executors.newVirtualThreadPerTaskExecutor(); + asyncOperationQueueingExecutor = Executors.newSingleThreadExecutor(); } @BeforeEach @@ -80,12 +82,14 @@ class WebSocketConnectionEventManagerTest { localEventManager = new WebSocketConnectionEventManager(mock(AccountsManager.class), mock(PushNotificationManager.class), REDIS_CLUSTER_EXTENSION.getRedisCluster(), - webSocketConnectionEventExecutor); + webSocketConnectionEventExecutor, + asyncOperationQueueingExecutor); remoteEventManager = new WebSocketConnectionEventManager(mock(AccountsManager.class), mock(PushNotificationManager.class), REDIS_CLUSTER_EXTENSION.getRedisCluster(), - webSocketConnectionEventExecutor); + webSocketConnectionEventExecutor, + asyncOperationQueueingExecutor); localEventManager.start(); remoteEventManager.start(); @@ -100,6 +104,7 @@ class WebSocketConnectionEventManagerTest { @AfterAll static void tearDownAfterAll() { webSocketConnectionEventExecutor.shutdown(); + asyncOperationQueueingExecutor.shutdown(); } @ParameterizedTest @@ -242,6 +247,7 @@ class WebSocketConnectionEventManagerTest { mock(AccountsManager.class), mock(PushNotificationManager.class), clusterClient, + Runnable::run, Runnable::run); eventManager.start(); @@ -309,6 +315,7 @@ class WebSocketConnectionEventManagerTest { mock(AccountsManager.class), mock(PushNotificationManager.class), clusterClient, + Runnable::run, Runnable::run); eventManager.start(); @@ -366,6 +373,7 @@ class WebSocketConnectionEventManagerTest { accountsManager, pushNotificationManager, clusterClient, + Runnable::run, Runnable::run); eventManager.start(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index 79c8cfc95..610203367 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -54,6 +54,7 @@ class MessagePersisterIntegrationTest { private Scheduler messageDeliveryScheduler; private ExecutorService messageDeletionExecutorService; private ExecutorService websocketConnectionEventExecutor; + private ExecutorService asyncOperationQueueingExecutor; private MessagesCache messagesCache; private MessagesManager messagesManager; private WebSocketConnectionEventManager webSocketConnectionEventManager; @@ -86,10 +87,12 @@ class MessagePersisterIntegrationTest { messageDeletionExecutorService); websocketConnectionEventExecutor = Executors.newVirtualThreadPerTaskExecutor(); + asyncOperationQueueingExecutor = Executors.newSingleThreadExecutor(); webSocketConnectionEventManager = new WebSocketConnectionEventManager(mock(AccountsManager.class), mock(PushNotificationManager.class), REDIS_CLUSTER_EXTENSION.getRedisCluster(), - websocketConnectionEventExecutor); + websocketConnectionEventExecutor, + asyncOperationQueueingExecutor); webSocketConnectionEventManager.start(); @@ -108,6 +111,7 @@ class MessagePersisterIntegrationTest { when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); } + @SuppressWarnings("ResultOfMethodCallIgnored") @AfterEach void tearDown() throws Exception { messageDeletionExecutorService.shutdown(); @@ -116,6 +120,9 @@ class MessagePersisterIntegrationTest { websocketConnectionEventExecutor.shutdown(); websocketConnectionEventExecutor.awaitTermination(15, TimeUnit.SECONDS); + asyncOperationQueueingExecutor.shutdown(); + asyncOperationQueueingExecutor.awaitTermination(15, TimeUnit.SECONDS); + messageDeliveryScheduler.dispose(); webSocketConnectionEventManager.stop();