From ad17c6e40dc3c9211211d8dfbe08d573bf4e3ff1 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Thu, 5 Sep 2024 10:32:26 -0500 Subject: [PATCH] Wait for MRM experiment mono to complete before returning default message --- .../textsecuregcm/storage/MessagesCache.java | 13 ++++++++++--- .../textsecuregcm/storage/MessagesCacheTest.java | 15 +++++++++++---- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index b817e642f..387b2993e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -396,10 +396,13 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp final Mono messageMono; if (message.hasSharedMrmKey()) { - maybeRunMrmViewExperiment(message, destinationUuid, destinationDevice); + final Mono experimentMono = maybeRunMrmViewExperiment(message, destinationUuid, destinationDevice); // mrm views phase 1: messageMono for sharedMrmKey is always Mono.just(), because messages always have content - messageMono = Mono.just(message.toBuilder().clearSharedMrmKey().build()); + // To avoid races, wait for the experiment to run, but ignore any errors + messageMono = experimentMono + .onErrorComplete() + .then(Mono.just(message.toBuilder().clearSharedMrmKey().build())); } else { messageMono = Mono.just(message); } @@ -420,7 +423,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp * * @see DynamicMessagesConfiguration#mrmViewExperimentEnabled() */ - private void maybeRunMrmViewExperiment(final MessageProtos.Envelope mrmMessage, final UUID destinationUuid, + private Mono maybeRunMrmViewExperiment(final MessageProtos.Envelope mrmMessage, final UUID destinationUuid, final byte destinationDevice) { if (dynamicConfigurationManager.getConfiguration().getMessagesConfiguration() .mrmViewExperimentEnabled()) { @@ -456,6 +459,10 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp experiment.compareFutureResult(mrmMessage.toBuilder().clearSharedMrmKey().build(), mrmMessageMono.toFuture()); + + return mrmMessageMono; + } else { + return Mono.empty(); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index cf9dbb10f..189bac16f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -563,15 +563,22 @@ class MessagesCacheTest { }); } - @Test - void testMultiRecipientMessage() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testMultiRecipientMessage(final boolean sharedMrmKeyPresent) throws Exception { final UUID destinationUuid = UUID.randomUUID(); final byte deviceId = 1; final UUID mrmGuid = UUID.randomUUID(); final SealedSenderMultiRecipientMessage mrm = generateRandomMrmMessage( new AciServiceIdentifier(destinationUuid), deviceId); - final byte[] sharedMrmDataKey = messagesCache.insertSharedMultiRecipientMessagePayload(mrmGuid, mrm); + + final byte[] sharedMrmDataKey; + if (sharedMrmKeyPresent) { + sharedMrmDataKey = messagesCache.insertSharedMultiRecipientMessagePayload(mrmGuid, mrm); + } else { + sharedMrmDataKey = new byte[]{1}; + } final UUID guid = UUID.randomUUID(); final MessageProtos.Envelope message = generateRandomMessage(guid, true) @@ -585,7 +592,7 @@ class MessagesCacheTest { .build(); messagesCache.insert(guid, destinationUuid, deviceId, message); - assertEquals(1L, (long) REDIS_CLUSTER_EXTENSION.getRedisCluster() + assertEquals(sharedMrmKeyPresent ? 1 : 0, (long) REDIS_CLUSTER_EXTENSION.getRedisCluster() .withBinaryCluster(conn -> conn.sync().exists(MessagesCache.getSharedMrmKey(mrmGuid)))); final List messages = get(destinationUuid, deviceId, 1);