From 5c04f2634a4a4c7fa2f80ff17e5929fafd05588b Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Tue, 1 Sep 2020 17:06:13 -0400 Subject: [PATCH] Use a dedicated executor service for dispatching keyspace notifications. --- .../textsecuregcm/WhisperServerService.java | 11 +++++------ .../textsecuregcm/push/ClientPresenceManager.java | 15 +++++++++------ .../push/ClientPresenceManagerTest.java | 2 +- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index f1976ce31..b903cc650 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -281,11 +281,10 @@ public class WhisperServerService extends Application(1_000)).build(); + ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build(); + ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService("messageCacheNotifications").maxThreads(8).workQueue(new ArrayBlockingQueue<>(10_000)).build(); - ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, clientPresenceExecutor); + ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, recurringJobExecutor, keyspaceNotificationDispatchExecutor); DirectoryManager directory = new DirectoryManager(directoryClient); DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration()); PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheCluster); @@ -293,11 +292,11 @@ public class WhisperServerService extends Application pruneMissingPeersFuture; @@ -67,11 +69,12 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter displacementListenersByPresenceKey::size); @@ -240,7 +243,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter { + keyspaceNotificationExecutorService.execute(() -> { displacePresence(channel.substring("__keyspace@0__:".length())); remoteDisplacementMeter.mark(); }); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java index 55822d68f..3bf7c2dcf 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java @@ -36,7 +36,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { }); presenceRenewalExecutorService = Executors.newSingleThreadScheduledExecutor(); - clientPresenceManager = new ClientPresenceManager(getRedisCluster(), presenceRenewalExecutorService); + clientPresenceManager = new ClientPresenceManager(getRedisCluster(), presenceRenewalExecutorService, presenceRenewalExecutorService); } @Override