From 1dcc491feccfa28dc51a3de611347e0449d4e9a8 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Fri, 28 Aug 2020 17:32:39 -0400 Subject: [PATCH] Move cache-mirroring operations to the calling thread. --- .../textsecuregcm/WhisperServerService.java | 3 +-- .../textsecuregcm/storage/MessagesManager.java | 17 +++++++---------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 3466dbfd4..f97b3dd7b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -290,7 +290,6 @@ public class WhisperServerService extends Application(1_000)).build(); - ExecutorService messageCacheClusterExperimentExecutor = environment.lifecycle().executorService("messages_cache_experiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build(); ExecutorService websocketExperimentExecutor = environment.lifecycle().executorService("websocketPresenceExperiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build(); ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, clientPresenceExecutor); @@ -304,7 +303,7 @@ public class WhisperServerService extends Application clusterMessagesCache.insert(guid, destination, destinationUuid, destinationDevice, message, messageId), experimentExecutor); + insertExperiment.compareSupplierResult(messageId, () -> clusterMessagesCache.insert(guid, destination, destinationUuid, destinationDevice, message, messageId)); } public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) { @@ -63,7 +60,7 @@ public class MessagesManager { if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) { final List messagesFromCache = this.messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()); - getMessagesExperiment.compareSupplierResultAsync(messagesFromCache, () -> clusterMessagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()), experimentExecutor); + getMessagesExperiment.compareSupplierResult(messagesFromCache, () -> clusterMessagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size())); messages.addAll(messagesFromCache); } @@ -86,7 +83,7 @@ public class MessagesManager { public Optional delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp) { Optional removed = this.messagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp); - removeBySenderExperiment.compareSupplierResultAsync(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp), experimentExecutor); + removeBySenderExperiment.compareSupplierResult(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp)); if (!removed.isPresent()) { removed = this.messages.remove(destination, destinationDevice, source, timestamp); @@ -100,7 +97,7 @@ public class MessagesManager { public Optional delete(String destination, UUID destinationUuid, long deviceId, UUID guid) { Optional removed = this.messagesCache.remove(destination, destinationUuid, deviceId, guid); - removeByUuidExperiment.compareSupplierResultAsync(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, guid), experimentExecutor); + removeByUuidExperiment.compareSupplierResult(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, guid)); if (!removed.isPresent()) { removed = this.messages.remove(destination, guid); @@ -115,7 +112,7 @@ public class MessagesManager { public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) { if (cached) { final Optional maybeRemovedMessage = this.messagesCache.remove(destination, destinationUuid, deviceId, id); - removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id), experimentExecutor); + removeByIdExperiment.compareSupplierResult(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id)); cacheHitByIdMeter.mark(); } else { this.messages.remove(destination, id); @@ -127,7 +124,7 @@ public class MessagesManager { messages.store(messageGuid, envelope, destination, deviceId); final Optional maybeRemovedMessage = messagesCache.remove(destination, destinationUuid, deviceId, id); - removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id), experimentExecutor); + removeByIdExperiment.compareSupplierResult(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id)); } public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {