From 81e8143a435c94cb4ae7151138548ebcf38be205 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 27 Aug 2020 13:33:44 -0400 Subject: [PATCH] Rely solely on the clustered message cache. --- .../textsecuregcm/WhisperServerService.java | 13 +- .../storage/MessagePersister.java | 221 ------------------ .../storage/MessagesManager.java | 34 +-- .../storage/RedisClusterMessagePersister.java | 9 +- .../storage/RedisClusterMessagesCache.java | 15 -- .../ClearMessagesCacheClusterCommand.java | 20 -- .../src/main/resources/lua/insert_item.lua | 10 +- .../RedisClusterMessagePersisterTest.java | 2 +- .../RedisClusterMessagesCacheTest.java | 11 - 9 files changed, 13 insertions(+), 322 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearMessagesCacheClusterCommand.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 54c186de2..2ed625e63 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -125,9 +125,7 @@ import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.FeatureFlags; import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager; import org.whispersystems.textsecuregcm.storage.Keys; -import org.whispersystems.textsecuregcm.storage.MessagePersister; import org.whispersystems.textsecuregcm.storage.Messages; -import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PendingAccounts; import org.whispersystems.textsecuregcm.storage.PendingAccountsManager; @@ -151,7 +149,6 @@ import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler; import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener; import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; import org.whispersystems.textsecuregcm.workers.CertificateCommand; -import org.whispersystems.textsecuregcm.workers.ClearMessagesCacheClusterCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; import org.whispersystems.textsecuregcm.workers.ScourMessageCacheCommand; import org.whispersystems.textsecuregcm.workers.VacuumCommand; @@ -186,7 +183,6 @@ public class WhisperServerService extends Application("accountdb", "accountsdb.xml") { @@ -275,12 +271,10 @@ public class WhisperServerService extends Application queuesToPersist = getQueuesToPersist(); - queueCountHistogram.update(queuesToPersist.size()); - - for (byte[] queue : queuesToPersist) { - Key key = Key.fromUserMessageQueue(queue); - - persistQueue(jedisPool, key); - notifyClients(accountsManager, pubSubManager, pushSender, key); - } - - if (queuesToPersist.isEmpty()) { - //noinspection BusyWait - Thread.sleep(10_000); - } - } catch (Throwable t) { - logger.error("Exception while persisting: ", t); - } - } else { - try { - Thread.sleep(10_000); - } catch (final InterruptedException ignored) { - } - } - } - - synchronized (this) { - finished = true; - notifyAll(); - } - } - - @Override - public synchronized void stop() { - running.set(false); - while (!finished) Util.wait(this); - - logger.info("Message persister shut down..."); - } - - private void persistQueue(ReplicatedJedisPool jedisPool, Key key) { - Timer.Context timer = persistQueueTimer.time(); - - int messagesPersistedCount = 0; - - UUID destinationUuid = accountsManager.get(key.getAddress()).map(Account::getUuid).orElse(null); - - try (Jedis jedis = jedisPool.getWriteResource()) { - while (true) { - jedis.setex(key.getUserMessageQueuePersistInProgress(), 30, "1".getBytes()); - - Set messages = jedis.zrangeWithScores(key.getUserMessageQueue(), 0, CHUNK_SIZE); - - for (Tuple message : messages) { - persistMessage(key, destinationUuid, (long)message.getScore(), message.getBinaryElement()); - messagesPersistedCount++; - } - - if (messages.size() < CHUNK_SIZE) { - jedis.del(key.getUserMessageQueuePersistInProgress()); - return; - } - } - } finally { - timer.stop(); - queueSizeHistogram.update(messagesPersistedCount); - } - } - - private void persistMessage(Key key, UUID destinationUuid, long score, byte[] message) { - try { - MessageProtos.Envelope envelope = MessageProtos.Envelope.parseFrom(message); - UUID guid = envelope.hasServerGuid() ? UUID.fromString(envelope.getServerGuid()) : null; - - envelope = envelope.toBuilder().clearServerGuid().build(); - - messagesManager.persistMessage(key.getAddress(), destinationUuid, envelope, guid, key.getDeviceId()); - } catch (InvalidProtocolBufferException e) { - logger.error("Error parsing envelope", e); - } - } - - private List getQueuesToPersist() { - Timer.Context timer = getQueuesTimer.time(); - try { - long maxTime = System.currentTimeMillis() - delayTimeUnit.toMillis(delayTime); - List keys = Collections.singletonList(Key.getUserMessageQueueIndex()); - List args = Arrays.asList(String.valueOf(maxTime).getBytes(), String.valueOf(100).getBytes()); - - //noinspection unchecked - return (List)getQueuesScript.execute(keys, args); - } finally { - timer.stop(); - } - } - - private void notifyClients(AccountsManager accountsManager, PubSubManager pubSubManager, PushSender pushSender, Key key) { - Timer.Context timer = notifyTimer.time(); - - try { - boolean notified = pubSubManager.publish(new WebsocketAddress(key.getAddress(), key.getDeviceId()), - PubSubProtos.PubSubMessage.newBuilder() - .setType(PubSubProtos.PubSubMessage.Type.QUERY_DB) - .build()); - - if (!notified) { - Optional account = accountsManager.get(key.getAddress()); - - if (account.isPresent()) { - Optional device = account.get().getDevice(key.getDeviceId()); - - if (device.isPresent()) { - try { - pushSender.sendQueuedNotification(account.get(), device.get()); - } catch (NotPushRegisteredException e) { - logger.warn("After message persistence, no longer push registered!"); - } - } - } - } - } finally { - timer.stop(); - } - } - -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 590ab5119..32fb4fc7a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -7,7 +7,6 @@ import com.codahale.metrics.SharedMetricRegistries; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList; -import org.whispersystems.textsecuregcm.experiment.Experiment; import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.util.Constants; @@ -29,28 +28,17 @@ public class MessagesManager { private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid")); private final Messages messages; - private final MessagesCache messagesCache; private final RedisClusterMessagesCache clusterMessagesCache; private final PushLatencyManager pushLatencyManager; - private final Experiment insertExperiment = new Experiment("MessagesCache", "insert"); - private final Experiment removeByIdExperiment = new Experiment("MessagesCache", "removeById"); - private final Experiment removeBySenderExperiment = new Experiment("MessagesCache", "removeBySender"); - private final Experiment removeByUuidExperiment = new Experiment("MessagesCache", "removeByUuid"); - private final Experiment getMessagesExperiment = new Experiment("MessagesCache", "getMessages"); - - public MessagesManager(Messages messages, MessagesCache messagesCache, RedisClusterMessagesCache clusterMessagesCache, PushLatencyManager pushLatencyManager) { + public MessagesManager(Messages messages, RedisClusterMessagesCache clusterMessagesCache, PushLatencyManager pushLatencyManager) { this.messages = messages; - this.messagesCache = messagesCache; this.clusterMessagesCache = clusterMessagesCache; this.pushLatencyManager = pushLatencyManager; } public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) { - final UUID guid = UUID.randomUUID(); - final long messageId = messagesCache.insert(guid, destination, destinationUuid, destinationDevice, message); - - insertExperiment.compareSupplierResult(messageId, () -> clusterMessagesCache.insert(guid, destination, destinationUuid, destinationDevice, message, messageId)); + clusterMessagesCache.insert(UUID.randomUUID(), destination, destinationUuid, destinationDevice, message); } public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) { @@ -59,31 +47,25 @@ public class MessagesManager { List messages = this.messages.load(destination, destinationDevice); if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) { - final List messagesFromCache = this.messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()); - getMessagesExperiment.compareSupplierResult(messagesFromCache, () -> clusterMessagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size())); - - messages.addAll(messagesFromCache); + messages.addAll(clusterMessagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size())); } return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE); } public void clear(String destination, UUID destinationUuid) { - this.messagesCache.clear(destination, destinationUuid); this.clusterMessagesCache.clear(destination, destinationUuid); this.messages.clear(destination); } public void clear(String destination, UUID destinationUuid, long deviceId) { - this.messagesCache.clear(destination, destinationUuid, deviceId); this.clusterMessagesCache.clear(destination, destinationUuid, deviceId); this.messages.clear(destination, deviceId); } public Optional delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp) { - Optional removed = this.messagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp); - removeBySenderExperiment.compareSupplierResult(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp)); + Optional removed = clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp); if (!removed.isPresent()) { removed = this.messages.remove(destination, destinationDevice, source, timestamp); @@ -96,8 +78,7 @@ public class MessagesManager { } public Optional delete(String destination, UUID destinationUuid, long deviceId, UUID guid) { - Optional removed = this.messagesCache.remove(destination, destinationUuid, deviceId, guid); - removeByUuidExperiment.compareSupplierResult(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, guid)); + Optional removed = clusterMessagesCache.remove(destination, destinationUuid, deviceId, guid); if (!removed.isPresent()) { removed = this.messages.remove(destination, guid); @@ -111,8 +92,7 @@ public class MessagesManager { public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) { if (cached) { - final Optional maybeRemovedMessage = this.messagesCache.remove(destination, destinationUuid, deviceId, id); - removeByIdExperiment.compareSupplierResult(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id)); + clusterMessagesCache.remove(destination, destinationUuid, deviceId, id); cacheHitByIdMeter.mark(); } else { this.messages.remove(destination, id); @@ -122,7 +102,7 @@ public class MessagesManager { public void persistMessage(String destination, UUID destinationUuid, Envelope envelope, UUID messageGuid, long deviceId) { messages.store(messageGuid, envelope, destination, deviceId); - delete(destination, destinationUuid, deviceId, messageGuid); + clusterMessagesCache.remove(destination, destinationUuid, deviceId, messageGuid); } public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java index eb88d6143..7a5cfdfce 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java @@ -30,7 +30,6 @@ public class RedisClusterMessagePersister implements Managed { private final PubSubManager pubSubManager; private final PushSender pushSender; private final AccountsManager accountsManager; - private final FeatureFlagsManager featureFlagsManager; private final Duration persistDelay; @@ -51,13 +50,12 @@ public class RedisClusterMessagePersister implements Managed { private static final Logger logger = LoggerFactory.getLogger(RedisClusterMessagePersister.class); - public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final MessagesManager messagesManager, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final FeatureFlagsManager featureFlagsManager, final Duration persistDelay) { + public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final MessagesManager messagesManager, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final Duration persistDelay) { this.messagesCache = messagesCache; this.messagesManager = messagesManager; this.pubSubManager = pubSubManager; this.pushSender = pushSender; this.accountsManager = accountsManager; - this.featureFlagsManager = featureFlagsManager; this.persistDelay = persistDelay; } @@ -72,10 +70,7 @@ public class RedisClusterMessagePersister implements Managed { workerThread = new Thread(() -> { while (running) { - if (featureFlagsManager.isFeatureFlagActive(ENABLE_PERSISTENCE_FLAG)) { - persistNextQueues(Instant.now()); - } - + persistNextQueues(Instant.now()); Util.sleep(100); } }); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java index 45eefde4f..699d91307 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java @@ -138,21 +138,6 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter - insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice), - getMessageQueueMetadataKey(destinationUuid, destinationDevice), - getQueueIndexKey(destinationUuid, destinationDevice)), - List.of(messageWithGuid.toByteArray(), - String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8), - sender.getBytes(StandardCharsets.UTF_8), - guid.toString().getBytes(StandardCharsets.UTF_8), - String.valueOf(messageId).getBytes(StandardCharsets.UTF_8)))); - } - @Override public Optional remove(final String destination, final UUID destinationUuid, final long destinationDevice, final long id) { try { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearMessagesCacheClusterCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearMessagesCacheClusterCommand.java deleted file mode 100644 index b57b0edaa..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearMessagesCacheClusterCommand.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.whispersystems.textsecuregcm.workers; - -import io.dropwizard.cli.ConfiguredCommand; -import io.dropwizard.setup.Bootstrap; -import net.sourceforge.argparse4j.inf.Namespace; -import org.whispersystems.textsecuregcm.WhisperServerConfiguration; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; - -public class ClearMessagesCacheClusterCommand extends ConfiguredCommand { - - public ClearMessagesCacheClusterCommand() { - super("clearmessagescluster", "remove all keys from messages cache cluster"); - } - - @Override - protected void run(final Bootstrap bootstrap, final Namespace namespace, final WhisperServerConfiguration config) { - final FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration()); - messagesCacheCluster.useCluster(connection -> connection.sync().masters().commands().flushallAsync()); - } -} diff --git a/service/src/main/resources/lua/insert_item.lua b/service/src/main/resources/lua/insert_item.lua index 70ef4d248..e7ab2f0d1 100644 --- a/service/src/main/resources/lua/insert_item.lua +++ b/service/src/main/resources/lua/insert_item.lua @@ -1,15 +1,7 @@ -- keys: queue_key [1], queue_metadata_key [2], queue_total_index [3] -- argv: message [1], current_time [2], sender (possibly null) [3], guid [4], messageId (possibly null) [5] -local messageId - -if ARGV[5] ~= nil then - -- TODO: Remove this branch (and ARGV[5]) once the migration to a clustered message cache is finished - messageId = tonumber(ARGV[5]) - redis.call("HSET", KEYS[2], "counter", messageId) -else - messageId = redis.call("HINCRBY", KEYS[2], "counter", 1) -end +local messageId = redis.call("HINCRBY", KEYS[2], "counter", 1) redis.call("ZADD", KEYS[1], "NX", messageId, ARGV[1]) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java index f0b1b7d3f..3f2cf032e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java @@ -64,7 +64,7 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { notificationExecutorService = Executors.newSingleThreadExecutor(); messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService); - messagePersister = new RedisClusterMessagePersister(messagesCache, messagesManager, pubSubManager, mock(PushSender.class), accountsManager, featureFlagsManager, PERSIST_DELAY); + messagePersister = new RedisClusterMessagePersister(messagesCache, messagesManager, pubSubManager, mock(PushSender.class), accountsManager, PERSIST_DELAY); doAnswer(invocation -> { final String destination = invocation.getArgument(0, String.class); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCacheTest.java index 0a7d1e4c3..f621cf717 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCacheTest.java @@ -54,17 +54,6 @@ public class RedisClusterMessagesCacheTest extends AbstractMessagesCacheTest { return messagesCache; } - @Test - @Parameters({"true", "false"}) - public void testInsertWithPrescribedId(final boolean sealedSender) { - final UUID firstMessageGuid = UUID.randomUUID(); - final UUID secondMessageGuid = UUID.randomUUID(); - final long messageId = 74; - - assertEquals(messageId, messagesCache.insert(firstMessageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(firstMessageGuid, sealedSender), messageId)); - assertEquals(messageId + 1, messagesCache.insert(secondMessageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(secondMessageGuid, sealedSender))); - } - @Test public void testClearNullUuid() { // We're happy as long as this doesn't throw an exception