Publish "messages persisted" events when unlocking queues after a persistence run

This commit is contained in:
Jon Chambers 2024-11-07 17:19:55 -05:00 committed by Jon Chambers
parent 084607f359
commit 562b495a18
13 changed files with 63 additions and 78 deletions

View File

@ -58,11 +58,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
.build() .build()
.toByteArray(); .toByteArray();
private final byte[] MESSAGES_PERSISTED_EVENT_BYTES = ClientEvent.newBuilder()
.setMessagesPersisted(MessagesPersistedEvent.getDefaultInstance())
.build()
.toByteArray();
@Nullable @Nullable
private FaultTolerantPubSubClusterConnection<byte[], byte[]> pubSubConnection; private FaultTolerantPubSubClusterConnection<byte[], byte[]> pubSubConnection;
@ -224,25 +219,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
}); });
} }
/**
* Publishes an event notifying a specific device that messages have been persisted from short-term to long-term
* storage.
*
* @param accountIdentifier the account identifier for which messages have been persisted
* @param deviceId the ID of the device within the target account
*
* @return a future that completes when the event has been published
*/
public CompletionStage<Void> handleMessagesPersisted(final UUID accountIdentifier, final byte deviceId) {
if (pubSubConnection == null) {
throw new IllegalStateException("Presence manager not started");
}
return pubSubConnection.withPubSubConnection(connection ->
connection.async().spublish(getClientEventChannel(accountIdentifier, deviceId), MESSAGES_PERSISTED_EVENT_BYTES))
.thenRun(Util.NOOP);
}
/** /**
* Tests whether a client with the given account/device is connected to this presence manager instance. * Tests whether a client with the given account/device is connected to this presence manager instance.
* *

View File

@ -22,7 +22,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.util.Util;
import software.amazon.awssdk.services.dynamodb.model.ItemCollectionSizeLimitExceededException; import software.amazon.awssdk.services.dynamodb.model.ItemCollectionSizeLimitExceededException;
@ -31,7 +30,6 @@ public class MessagePersister implements Managed {
private final MessagesCache messagesCache; private final MessagesCache messagesCache;
private final MessagesManager messagesManager; private final MessagesManager messagesManager;
private final AccountsManager accountsManager; private final AccountsManager accountsManager;
private final PubSubClientEventManager pubSubClientEventManager;
private final Duration persistDelay; private final Duration persistDelay;
@ -63,9 +61,9 @@ public class MessagePersister implements Managed {
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class); private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, public MessagePersister(final MessagesCache messagesCache,
final MessagesManager messagesManager,
final AccountsManager accountsManager, final AccountsManager accountsManager,
final PubSubClientEventManager pubSubClientEventManager,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager, final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final Duration persistDelay, final Duration persistDelay,
final int dedicatedProcessWorkerThreadCount final int dedicatedProcessWorkerThreadCount
@ -74,7 +72,6 @@ public class MessagePersister implements Managed {
this.messagesCache = messagesCache; this.messagesCache = messagesCache;
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
this.accountsManager = accountsManager; this.accountsManager = accountsManager;
this.pubSubClientEventManager = pubSubClientEventManager;
this.persistDelay = persistDelay; this.persistDelay = persistDelay;
this.workerThreads = new Thread[dedicatedProcessWorkerThreadCount]; this.workerThreads = new Thread[dedicatedProcessWorkerThreadCount];
@ -211,7 +208,6 @@ public class MessagePersister implements Managed {
maybeUnlink(account, deviceId); // may throw, in which case we'll retry later by the usual mechanism maybeUnlink(account, deviceId); // may throw, in which case we'll retry later by the usual mechanism
} finally { } finally {
messagesCache.unlockQueueForPersistence(accountUuid, deviceId); messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
pubSubClientEventManager.handleMessagesPersisted(accountUuid, deviceId);
sample.stop(persistQueueTimer); sample.stop(persistQueueTimer);
} }

View File

@ -125,6 +125,7 @@ public class MessagesCache {
private final MessagesCacheRemoveQueueScript removeQueueScript; private final MessagesCacheRemoveQueueScript removeQueueScript;
private final MessagesCacheGetQueuesToPersistScript getQueuesToPersistScript; private final MessagesCacheGetQueuesToPersistScript getQueuesToPersistScript;
private final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript; private final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript;
private final MessagesCacheUnlockQueueScript unlockQueueScript;
private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert")); private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert"));
private final Timer insertSharedMrmPayloadTimer = Metrics.timer(name(MessagesCache.class, "insertSharedMrmPayload")); private final Timer insertSharedMrmPayloadTimer = Metrics.timer(name(MessagesCache.class, "insertSharedMrmPayload"));
@ -176,7 +177,8 @@ public class MessagesCache {
new MessagesCacheRemoveByGuidScript(redisCluster), new MessagesCacheRemoveByGuidScript(redisCluster),
new MessagesCacheRemoveQueueScript(redisCluster), new MessagesCacheRemoveQueueScript(redisCluster),
new MessagesCacheGetQueuesToPersistScript(redisCluster), new MessagesCacheGetQueuesToPersistScript(redisCluster),
new MessagesCacheRemoveRecipientViewFromMrmDataScript(redisCluster) new MessagesCacheRemoveRecipientViewFromMrmDataScript(redisCluster),
new MessagesCacheUnlockQueueScript(redisCluster)
); );
} }
@ -190,8 +192,8 @@ public class MessagesCache {
final MessagesCacheGetItemsScript getItemsScript, final MessagesCacheRemoveByGuidScript removeByGuidScript, final MessagesCacheGetItemsScript getItemsScript, final MessagesCacheRemoveByGuidScript removeByGuidScript,
final MessagesCacheRemoveQueueScript removeQueueScript, final MessagesCacheRemoveQueueScript removeQueueScript,
final MessagesCacheGetQueuesToPersistScript getQueuesToPersistScript, final MessagesCacheGetQueuesToPersistScript getQueuesToPersistScript,
final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript) final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript,
throws IOException { final MessagesCacheUnlockQueueScript unlockQueueScript) throws IOException {
this.redisCluster = redisCluster; this.redisCluster = redisCluster;
this.clock = clock; this.clock = clock;
@ -209,6 +211,7 @@ public class MessagesCache {
this.removeQueueScript = removeQueueScript; this.removeQueueScript = removeQueueScript;
this.getQueuesToPersistScript = getQueuesToPersistScript; this.getQueuesToPersistScript = getQueuesToPersistScript;
this.removeRecipientViewFromMrmDataScript = removeRecipientViewFromMrmDataScript; this.removeRecipientViewFromMrmDataScript = removeRecipientViewFromMrmDataScript;
this.unlockQueueScript = unlockQueueScript;
} }
public boolean insert(final UUID messageGuid, public boolean insert(final UUID messageGuid,
@ -599,8 +602,7 @@ public class MessagesCache {
} }
void unlockQueueForPersistence(final UUID accountUuid, final byte deviceId) { void unlockQueueForPersistence(final UUID accountUuid, final byte deviceId) {
redisCluster.useBinaryCluster( unlockQueueScript.execute(accountUuid, deviceId);
connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId)));
} }
static byte[] getMessageQueueKey(final UUID accountUuid, final byte deviceId) { static byte[] getMessageQueueKey(final UUID accountUuid, final byte deviceId) {

View File

@ -0,0 +1,43 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import io.lettuce.core.ScriptOutputType;
import org.whispersystems.textsecuregcm.push.ClientEvent;
import org.whispersystems.textsecuregcm.push.MessagesPersistedEvent;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
/**
* Unlocks a message queue for persistence/message retrieval.
*/
class MessagesCacheUnlockQueueScript {
private final ClusterLuaScript unlockQueueScript;
private final List<byte[]> MESSAGES_PERSISTED_EVENT_ARGS = List.of(ClientEvent.newBuilder()
.setMessagesPersisted(MessagesPersistedEvent.getDefaultInstance())
.build()
.toByteArray()); // eventPayload
MessagesCacheUnlockQueueScript(final FaultTolerantRedisClusterClient redisCluster) throws IOException {
this.unlockQueueScript =
ClusterLuaScript.fromResource(redisCluster, "lua/unlock_queue.lua", ScriptOutputType.STATUS);
}
void execute(final UUID accountIdentifier, final byte deviceId) {
final List<byte[]> keys = List.of(
MessagesCache.getPersistInProgressKey(accountIdentifier, deviceId), // persistInProgressKey
PubSubClientEventManager.getClientEventChannel(accountIdentifier, deviceId) // eventChannelKey
);
unlockQueueScript.executeBinary(keys, MESSAGES_PERSISTED_EVENT_ARGS);
}
}

View File

@ -76,7 +76,6 @@ record CommandDependencies(
ReportMessageManager reportMessageManager, ReportMessageManager reportMessageManager,
MessagesCache messagesCache, MessagesCache messagesCache,
MessagesManager messagesManager, MessagesManager messagesManager,
PubSubClientEventManager pubSubClientEventManager,
KeysManager keysManager, KeysManager keysManager,
APNSender apnSender, APNSender apnSender,
FcmSender fcmSender, FcmSender fcmSender,
@ -269,7 +268,6 @@ record CommandDependencies(
reportMessageManager, reportMessageManager,
messagesCache, messagesCache,
messagesManager, messagesManager,
pubSubClientEventManager,
keys, keys,
apnSender, apnSender,
fcmSender, fcmSender,

View File

@ -63,7 +63,6 @@ public class MessagePersisterServiceCommand extends ServerCommand<WhisperServerC
final MessagePersister messagePersister = new MessagePersister(deps.messagesCache(), final MessagePersister messagePersister = new MessagePersister(deps.messagesCache(),
deps.messagesManager(), deps.messagesManager(),
deps.accountsManager(), deps.accountsManager(),
deps.pubSubClientEventManager(),
deps.dynamicConfigurationManager(), deps.dynamicConfigurationManager(),
Duration.ofMinutes(configuration.getMessageCacheConfiguration().getPersistDelayMinutes()), Duration.ofMinutes(configuration.getMessageCacheConfiguration().getPersistDelayMinutes()),
namespace.getInt(WORKER_COUNT)); namespace.getInt(WORKER_COUNT));

View File

@ -0,0 +1,9 @@
-- Unlocks a message queue when a persist-to-DynamoDB run has finished and publishes an event notifying listeners that
-- messages have been persisted
local persistInProgressKey = KEYS[1] -- simple string key whose presence indicates a lock
local eventChannelKey = KEYS[2] -- the channel on which to publish the "messages persisted" event
local eventPayload = ARGV[1] -- [bytes] a protobuf payload for a "message persisted" pub/sub event
redis.call("DEL", persistInProgressKey)
redis.call("SPUBLISH", eventChannelKey, eventPayload)

View File

@ -20,10 +20,8 @@ import io.lettuce.core.cluster.pubsub.api.async.RedisClusterPubSubAsyncCommands;
import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
@ -140,30 +138,6 @@ class PubSubClientEventManagerTest {
assertTrue(firstListenerConnectedElsewhere.get()); assertTrue(firstListenerConnectedElsewhere.get());
} }
@ParameterizedTest
@ValueSource(booleans = {true, false})
void handleMessagesPersisted(final boolean messagesPersistedRemotely) throws InterruptedException {
final UUID accountIdentifier = UUID.randomUUID();
final byte deviceId = Device.PRIMARY_ID;
final CountDownLatch messagesPersistedLatch = new CountDownLatch(1);
localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter() {
@Override
public void handleMessagesPersisted() {
messagesPersistedLatch.countDown();
}
}).toCompletableFuture().join();
final PubSubClientEventManager persistingPresenceManager =
messagesPersistedRemotely ? remotePresenceManager : localPresenceManager;
persistingPresenceManager.handleMessagesPersisted(accountIdentifier, deviceId).toCompletableFuture().join();
assertTrue(messagesPersistedLatch.await(2, TimeUnit.SECONDS),
"Message persistence event not received within time limit");
}
@Test @Test
void isLocallyPresent() { void isLocallyPresent() {
final UUID accountIdentifier = UUID.randomUUID(); final UUID accountIdentifier = UUID.randomUUID();

View File

@ -89,7 +89,7 @@ class MessagePersisterIntegrationTest {
pubSubClientEventManager.start(); pubSubClientEventManager.start();
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
pubSubClientEventManager, dynamicConfigurationManager, PERSIST_DELAY, 1); dynamicConfigurationManager, PERSIST_DELAY, 1);
account = mock(Account.class); account = mock(Account.class);

View File

@ -46,7 +46,6 @@ import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; import org.whispersystems.textsecuregcm.tests.util.DevicesHelper;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
@ -67,7 +66,6 @@ class MessagePersisterTest {
private MessagePersister messagePersister; private MessagePersister messagePersister;
private AccountsManager accountsManager; private AccountsManager accountsManager;
private MessagesManager messagesManager; private MessagesManager messagesManager;
private PubSubClientEventManager pubSubClientEventManager;
private Account destinationAccount; private Account destinationAccount;
private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID(); private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID();
@ -102,8 +100,7 @@ class MessagePersisterTest {
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager); messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager);
pubSubClientEventManager = mock(PubSubClientEventManager.class); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, pubSubClientEventManager,
dynamicConfigurationManager, PERSIST_DELAY, 1); dynamicConfigurationManager, PERSIST_DELAY, 1);
when(messagesManager.clear(any(UUID.class), anyByte())).thenReturn(CompletableFuture.completedFuture(null)); when(messagesManager.clear(any(UUID.class), anyByte())).thenReturn(CompletableFuture.completedFuture(null));
@ -157,8 +154,6 @@ class MessagePersisterTest {
verify(messagesDynamoDb, atLeastOnce()).store(messagesCaptor.capture(), eq(DESTINATION_ACCOUNT_UUID), verify(messagesDynamoDb, atLeastOnce()).store(messagesCaptor.capture(), eq(DESTINATION_ACCOUNT_UUID),
eq(DESTINATION_DEVICE)); eq(DESTINATION_DEVICE));
assertEquals(messageCount, messagesCaptor.getAllValues().stream().mapToInt(List::size).sum()); assertEquals(messageCount, messagesCaptor.getAllValues().stream().mapToInt(List::size).sum());
verify(pubSubClientEventManager).handleMessagesPersisted(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID);
} }
@Test @Test
@ -228,8 +223,6 @@ class MessagePersisterTest {
assertEquals(List.of(queueName), assertEquals(List.of(queueName),
messagesCache.getQueuesToPersist(SlotHash.getSlot(queueName), messagesCache.getQueuesToPersist(SlotHash.getSlot(queueName),
Instant.now().plus(messagePersister.getPersistDelay()), 1)); Instant.now().plus(messagePersister.getPersistDelay()), 1));
verify(pubSubClientEventManager).handleMessagesPersisted(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID);
} }
@Test @Test
@ -248,8 +241,6 @@ class MessagePersisterTest {
assertTimeoutPreemptively(Duration.ofSeconds(1), () -> assertTimeoutPreemptively(Duration.ofSeconds(1), () ->
assertThrows(MessagePersistenceException.class, assertThrows(MessagePersistenceException.class,
() -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE))); () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE)));
verify(pubSubClientEventManager).handleMessagesPersisted(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID);
} }
@Test @Test

View File

@ -73,7 +73,6 @@ class FinishPushNotificationExperimentCommandTest {
null, null,
null, null,
null, null,
null,
pushNotificationExperimentSamples, pushNotificationExperimentSamples,
null, null,
null, null,

View File

@ -66,7 +66,6 @@ class NotifyIdleDevicesCommandTest {
null, null,
null, null,
null, null,
null,
null); null);
this.idleDeviceNotificationScheduler = idleDeviceNotificationScheduler; this.idleDeviceNotificationScheduler = idleDeviceNotificationScheduler;

View File

@ -62,7 +62,6 @@ class StartPushNotificationExperimentCommandTest {
null, null,
null, null,
null, null,
null,
pushNotificationExperimentSamples, pushNotificationExperimentSamples,
null, null,
null, null,