Move messages cache stale discard to a separate scheduler
This commit is contained in:
parent
99ad211c01
commit
df8fb5cab7
|
@ -329,7 +329,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
config.getAppConfig().getConfigurationName(),
|
config.getAppConfig().getConfigurationName(),
|
||||||
DynamicConfiguration.class);
|
DynamicConfiguration.class);
|
||||||
|
|
||||||
BlockingQueue<Runnable> messageDeletionQueue = new ArrayBlockingQueue<>(10_000);
|
BlockingQueue<Runnable> messageDeletionQueue = new LinkedBlockingQueue<>();
|
||||||
Metrics.gaugeCollectionSize(name(getClass(), "messageDeletionQueueSize"), Collections.emptyList(),
|
Metrics.gaugeCollectionSize(name(getClass(), "messageDeletionQueueSize"), Collections.emptyList(),
|
||||||
messageDeletionQueue);
|
messageDeletionQueue);
|
||||||
ExecutorService messageDeletionAsyncExecutor = environment.lifecycle()
|
ExecutorService messageDeletionAsyncExecutor = environment.lifecycle()
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.whispersystems.textsecuregcm.util.Pair;
|
||||||
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
|
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.scheduler.Scheduler;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
public class MessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed {
|
public class MessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed {
|
||||||
|
@ -62,6 +63,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
|
|
||||||
private final ExecutorService notificationExecutorService;
|
private final ExecutorService notificationExecutorService;
|
||||||
private final ExecutorService messageDeletionExecutorService;
|
private final ExecutorService messageDeletionExecutorService;
|
||||||
|
// messageDeletionExecutorService wrapped into a reactor Scheduler
|
||||||
|
private final Scheduler messageDeletionScheduler;
|
||||||
|
|
||||||
private final ClusterLuaScript insertScript;
|
private final ClusterLuaScript insertScript;
|
||||||
private final ClusterLuaScript removeByGuidScript;
|
private final ClusterLuaScript removeByGuidScript;
|
||||||
|
@ -108,6 +111,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
|
|
||||||
this.notificationExecutorService = notificationExecutorService;
|
this.notificationExecutorService = notificationExecutorService;
|
||||||
this.messageDeletionExecutorService = messageDeletionExecutorService;
|
this.messageDeletionExecutorService = messageDeletionExecutorService;
|
||||||
|
this.messageDeletionScheduler = Schedulers.fromExecutorService(messageDeletionExecutorService, "messageDeletion");
|
||||||
|
|
||||||
this.insertScript = ClusterLuaScript.fromResource(insertCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER);
|
this.insertScript = ClusterLuaScript.fromResource(insertCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER);
|
||||||
this.removeByGuidScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_guid.lua",
|
this.removeByGuidScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_guid.lua",
|
||||||
|
@ -236,7 +240,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
staleEphemeralMessages
|
staleEphemeralMessages
|
||||||
.map(e -> UUID.fromString(e.getServerGuid()))
|
.map(e -> UUID.fromString(e.getServerGuid()))
|
||||||
.buffer(PAGE_SIZE)
|
.buffer(PAGE_SIZE)
|
||||||
.subscribeOn(Schedulers.boundedElastic())
|
.subscribeOn(messageDeletionScheduler)
|
||||||
.subscribe(staleEphemeralMessageGuids ->
|
.subscribe(staleEphemeralMessageGuids ->
|
||||||
remove(destinationUuid, destinationDevice, staleEphemeralMessageGuids)
|
remove(destinationUuid, destinationDevice, staleEphemeralMessageGuids)
|
||||||
.thenAccept(removedMessages -> staleEphemeralMessagesCounter.increment(removedMessages.size())),
|
.thenAccept(removedMessages -> staleEphemeralMessagesCounter.increment(removedMessages.size())),
|
||||||
|
|
|
@ -560,7 +560,7 @@ class MessagesCacheTest {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
messagesCache = new MessagesCache(mockCluster, mockCluster, Clock.systemUTC(), mock(ExecutorService.class),
|
messagesCache = new MessagesCache(mockCluster, mockCluster, Clock.systemUTC(), mock(ExecutorService.class),
|
||||||
mock(ExecutorService.class));
|
Executors.newSingleThreadExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
|
|
Loading…
Reference in New Issue