diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index e320cf31e..720cc3fb1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -336,7 +336,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp final long earliestAllowableEphemeralTimestamp = clock.millis() - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis(); - final Flux allMessages = getAllMessages(destinationUuid, destinationDevice) + final Flux allMessages = getAllMessages(destinationUuid, destinationDevice, + earliestAllowableEphemeralTimestamp) .publish() // We expect exactly two subscribers to this base flux: // 1. the websocket that delivers messages to clients @@ -375,7 +376,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp } @VisibleForTesting - Flux getAllMessages(final UUID destinationUuid, final byte destinationDevice) { + Flux getAllMessages(final UUID destinationUuid, final byte destinationDevice, + final long earliestAllowableEphemeralTimestamp) { // fetch messages by page return getNextMessagePage(destinationUuid, destinationDevice, -1) @@ -401,7 +403,14 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp final Mono messageMono; if (message.hasSharedMrmKey()) { - final Mono experimentMono = maybeRunMrmViewExperiment(message, destinationUuid, destinationDevice); + + final Mono experimentMono; + if (isStaleEphemeralMessage(message, earliestAllowableEphemeralTimestamp)) { + // skip fetching content for message that will be discarded + experimentMono = Mono.empty(); + } else { + experimentMono = maybeRunMrmViewExperiment(message, destinationUuid, destinationDevice); + } // mrm views phase 1: messageMono for sharedMrmKey is always Mono.just(), because messages always have content // To avoid races, wait for the experiment to run, but ignore any errors diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 3eb5beab4..9af62e4e7 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -330,7 +330,7 @@ class MessagesCacheTest { .get(5, TimeUnit.SECONDS); final List messages = messagesCache.getAllMessages(DESTINATION_UUID, - DESTINATION_DEVICE_ID) + DESTINATION_DEVICE_ID, 0) .collectList() .toFuture().get(5, TimeUnit.SECONDS); @@ -741,7 +741,7 @@ class MessagesCacheTest { .thenReturn(Flux.from(emptyFinalPagePublisher)) .thenReturn(Flux.empty()); - final Flux allMessages = messagesCache.getAllMessages(UUID.randomUUID(), Device.PRIMARY_ID); + final Flux allMessages = messagesCache.getAllMessages(UUID.randomUUID(), Device.PRIMARY_ID, 0); // Why initialValue = 3? // 1. messagesCache.getAllMessages() above produces the first call