diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 101314622..43aa6f181 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -329,7 +329,7 @@ public class WhisperServerService extends Application messageDeletionQueue = new ArrayBlockingQueue<>(10_000); + BlockingQueue messageDeletionQueue = new LinkedBlockingQueue<>(); Metrics.gaugeCollectionSize(name(getClass(), "messageDeletionQueueSize"), Collections.emptyList(), messageDeletionQueue); ExecutorService messageDeletionAsyncExecutor = environment.lifecycle() diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index b8bad3e5f..986aa3b4c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -52,6 +52,7 @@ import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.RedisClusterUtil; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class MessagesCache extends RedisClusterPubSubAdapter implements Managed { @@ -62,6 +63,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private final ExecutorService notificationExecutorService; private final ExecutorService messageDeletionExecutorService; + // messageDeletionExecutorService wrapped into a reactor Scheduler + private final Scheduler messageDeletionScheduler; private final ClusterLuaScript insertScript; private final ClusterLuaScript removeByGuidScript; @@ -108,6 +111,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp this.notificationExecutorService = notificationExecutorService; this.messageDeletionExecutorService = messageDeletionExecutorService; + this.messageDeletionScheduler = Schedulers.fromExecutorService(messageDeletionExecutorService, "messageDeletion"); this.insertScript = ClusterLuaScript.fromResource(insertCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER); this.removeByGuidScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_guid.lua", @@ -236,7 +240,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp staleEphemeralMessages .map(e -> UUID.fromString(e.getServerGuid())) .buffer(PAGE_SIZE) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(messageDeletionScheduler) .subscribe(staleEphemeralMessageGuids -> remove(destinationUuid, destinationDevice, staleEphemeralMessageGuids) .thenAccept(removedMessages -> staleEphemeralMessagesCounter.increment(removedMessages.size())), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 1a981d513..493f1b829 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -560,7 +560,7 @@ class MessagesCacheTest { .build(); messagesCache = new MessagesCache(mockCluster, mockCluster, Clock.systemUTC(), mock(ExecutorService.class), - mock(ExecutorService.class)); + Executors.newSingleThreadExecutor()); } @AfterEach