From e35e34d2e0ff9277945cc260bf8cf255023ce9a4 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 6 Aug 2020 16:08:50 -0400 Subject: [PATCH] Move operation-mirroring logic to MessagesManager. --- .../textsecuregcm/WhisperServerService.java | 9 ++-- .../textsecuregcm/storage/MessagesCache.java | 40 +++-------------- .../storage/MessagesManager.java | 44 +++++++++++++------ .../storage/MessagesCacheTest.java | 2 +- 4 files changed, 43 insertions(+), 52 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index cebeaa162..6121319a6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -30,7 +30,6 @@ import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.DeserializationFeature; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.wavefront.sdk.common.WavefrontSender; import io.dropwizard.Application; import io.dropwizard.auth.AuthFilter; import io.dropwizard.auth.PolymorphicAuthDynamicFeature; @@ -171,6 +170,8 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import static com.codahale.metrics.MetricRegistry.name; @@ -333,6 +334,8 @@ public class WhisperServerService extends Application(1_000)).build(); + DirectoryManager directory = new DirectoryManager(directoryClient); DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration()); PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheCluster); @@ -341,9 +344,9 @@ public class WhisperServerService extends Application(1_000)); - - private final Experiment insertExperiment = new Experiment("MessagesCache", "insert"); - private final Experiment removeByIdExperiment = new Experiment("MessagesCache", "removeById"); - private final Experiment removeBySenderExperiment = new Experiment("MessagesCache", "removeBySender"); - private final Experiment removeByUuidExperiment = new Experiment("MessagesCache", "removeByUuid"); - private final Experiment getMessagesExperiment = new Experiment("MessagesCache", "getMessages"); - - public MessagesCache(ReplicatedJedisPool jedisPool, Messages database, AccountsManager accountsManager, int delayMinutes, RedisClusterMessagesCache clusterMessagesCache) throws IOException { + public MessagesCache(ReplicatedJedisPool jedisPool, Messages database, AccountsManager accountsManager, int delayMinutes) throws IOException { this.jedisPool = jedisPool; this.database = database; this.accountsManager = accountsManager; @@ -84,8 +75,6 @@ public class MessagesCache implements Managed, UserMessagesCache { this.insertOperation = new InsertOperation(jedisPool); this.removeOperation = new RemoveOperation(jedisPool); this.getOperation = new GetOperation(jedisPool); - - this.clusterMessagesCache = clusterMessagesCache; } @Override @@ -95,10 +84,7 @@ public class MessagesCache implements Managed, UserMessagesCache { Timer.Context timer = insertTimer.time(); try { - final long messageId = insertOperation.insert(guid, destination, destinationDevice, System.currentTimeMillis(), messageWithGuid); - insertExperiment.compareSupplierResultAsync(messageId, () -> clusterMessagesCache.insert(guid, destination, destinationUuid, destinationDevice, message, messageId), experimentExecutor); - - return messageId; + return insertOperation.insert(guid, destination, destinationDevice, System.currentTimeMillis(), messageWithGuid); } finally { timer.stop(); } @@ -120,11 +106,7 @@ public class MessagesCache implements Managed, UserMessagesCache { logger.warn("Failed to parse envelope", e); } - final Optional maybeRemovedMessage = Optional.ofNullable(removedMessageEntity); - - removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, id), experimentExecutor); - - return maybeRemovedMessage; + return Optional.ofNullable(removedMessageEntity); } @Override @@ -144,11 +126,7 @@ public class MessagesCache implements Managed, UserMessagesCache { timer.stop(); } - final Optional maybeRemovedMessage = Optional.ofNullable(removedMessageEntity); - - removeBySenderExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, sender, timestamp), experimentExecutor); - - return maybeRemovedMessage; + return Optional.ofNullable(removedMessageEntity); } @Override @@ -168,11 +146,7 @@ public class MessagesCache implements Managed, UserMessagesCache { timer.stop(); } - final Optional maybeRemovedMessage = Optional.ofNullable(removedMessageEntity); - - removeByUuidExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, guid), experimentExecutor); - - return maybeRemovedMessage; + return Optional.ofNullable(removedMessageEntity); } @Override @@ -194,8 +168,6 @@ public class MessagesCache implements Managed, UserMessagesCache { } } - getMessagesExperiment.compareSupplierResultAsync(results, () -> clusterMessagesCache.get(destination, destinationUuid, destinationDevice, limit), experimentExecutor); - return results; } finally { timer.stop(); @@ -241,8 +213,6 @@ public class MessagesCache implements Managed, UserMessagesCache { public void stop() throws Exception { messagePersister.shutdown(); logger.info("Message persister shut down..."); - - this.experimentExecutor.shutdown(); } private static class Key { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 493c8d5ea..c27ae5426 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -4,11 +4,10 @@ package org.whispersystems.textsecuregcm.storage; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList; +import org.whispersystems.textsecuregcm.experiment.Experiment; import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.util.Constants; @@ -16,6 +15,7 @@ import org.whispersystems.textsecuregcm.util.Constants; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutorService; import static com.codahale.metrics.MetricRegistry.name; @@ -29,19 +29,31 @@ public class MessagesManager { private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" )); private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid")); - private final Messages messages; - private final MessagesCache messagesCache; - private final PushLatencyManager pushLatencyManager; + private final Messages messages; + private final MessagesCache messagesCache; + private final RedisClusterMessagesCache clusterMessagesCache; + private final PushLatencyManager pushLatencyManager; - public MessagesManager(Messages messages, MessagesCache messagesCache, PushLatencyManager pushLatencyManager) { - this.messages = messages; - this.messagesCache = messagesCache; - this.pushLatencyManager = pushLatencyManager; + private final ExecutorService experimentExecutor; + private final Experiment insertExperiment = new Experiment("MessagesCache", "insert"); + private final Experiment removeByIdExperiment = new Experiment("MessagesCache", "removeById"); + private final Experiment removeBySenderExperiment = new Experiment("MessagesCache", "removeBySender"); + private final Experiment removeByUuidExperiment = new Experiment("MessagesCache", "removeByUuid"); + private final Experiment getMessagesExperiment = new Experiment("MessagesCache", "getMessages"); + + public MessagesManager(Messages messages, MessagesCache messagesCache, RedisClusterMessagesCache clusterMessagesCache, PushLatencyManager pushLatencyManager, final ExecutorService experimentExecutor) { + this.messages = messages; + this.messagesCache = messagesCache; + this.clusterMessagesCache = clusterMessagesCache; + this.pushLatencyManager = pushLatencyManager; + this.experimentExecutor = experimentExecutor; } public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) { - UUID guid = UUID.randomUUID(); - messagesCache.insert(guid, destination, destinationUuid, destinationDevice, message); + final UUID guid = UUID.randomUUID(); + final long messageId = messagesCache.insert(guid, destination, destinationUuid, destinationDevice, message); + + insertExperiment.compareSupplierResultAsync(messageId, () -> clusterMessagesCache.insert(guid, destination, destinationUuid, destinationDevice, message, messageId), experimentExecutor); } public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) { @@ -50,7 +62,10 @@ public class MessagesManager { List messages = this.messages.load(destination, destinationDevice); if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) { - messages.addAll(this.messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size())); + final List messagesFromCache = this.messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()); + getMessagesExperiment.compareSupplierResultAsync(messagesFromCache, () -> clusterMessagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()), experimentExecutor); + + messages.addAll(messagesFromCache); } return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE); @@ -69,6 +84,7 @@ public class MessagesManager { public Optional delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp) { Optional removed = this.messagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp); + removeBySenderExperiment.compareSupplierResultAsync(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp), experimentExecutor); if (!removed.isPresent()) { removed = this.messages.remove(destination, destinationDevice, source, timestamp); @@ -82,6 +98,7 @@ public class MessagesManager { public Optional delete(String destination, UUID destinationUuid, long deviceId, UUID guid) { Optional removed = this.messagesCache.remove(destination, destinationUuid, deviceId, guid); + removeByUuidExperiment.compareSupplierResultAsync(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, guid), experimentExecutor); if (!removed.isPresent()) { removed = this.messages.remove(destination, guid); @@ -95,7 +112,8 @@ public class MessagesManager { public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) { if (cached) { - this.messagesCache.remove(destination, destinationUuid, deviceId, id); + final Optional maybeRemovedMessage = this.messagesCache.remove(destination, destinationUuid, deviceId, id); + removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id), experimentExecutor); cacheHitByIdMeter.mark(); } else { this.messages.remove(destination, id); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 6a0c50ba8..23b813019 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -26,7 +26,7 @@ public class MessagesCacheTest extends AbstractMessagesCacheTest { final RedisClientFactory clientFactory = new RedisClientFactory("message-cache-test", redisUrl, List.of(redisUrl), new CircuitBreakerConfiguration()); final ReplicatedJedisPool jedisPool = clientFactory.getRedisClientPool(); - messagesCache = new MessagesCache(jedisPool, mock(Messages.class), mock(AccountsManager.class), 60, mock(RedisClusterMessagesCache.class)); + messagesCache = new MessagesCache(jedisPool, mock(Messages.class), mock(AccountsManager.class), 60); } @After