Skip fetching MRM content for stale ephemeral messages

This commit is contained in:
Chris Eager 2024-10-18 11:34:44 -05:00 committed by Chris Eager
parent 155f3d6231
commit 9573d9e385
2 changed files with 14 additions and 5 deletions

View File

@ -336,7 +336,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
final long earliestAllowableEphemeralTimestamp =
clock.millis() - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis();
final Flux<MessageProtos.Envelope> allMessages = getAllMessages(destinationUuid, destinationDevice)
final Flux<MessageProtos.Envelope> allMessages = getAllMessages(destinationUuid, destinationDevice,
earliestAllowableEphemeralTimestamp)
.publish()
// We expect exactly two subscribers to this base flux:
// 1. the websocket that delivers messages to clients
@ -375,7 +376,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
}
@VisibleForTesting
Flux<MessageProtos.Envelope> getAllMessages(final UUID destinationUuid, final byte destinationDevice) {
Flux<MessageProtos.Envelope> getAllMessages(final UUID destinationUuid, final byte destinationDevice,
final long earliestAllowableEphemeralTimestamp) {
// fetch messages by page
return getNextMessagePage(destinationUuid, destinationDevice, -1)
@ -401,7 +403,14 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
final Mono<MessageProtos.Envelope> messageMono;
if (message.hasSharedMrmKey()) {
final Mono<?> experimentMono = maybeRunMrmViewExperiment(message, destinationUuid, destinationDevice);
final Mono<?> experimentMono;
if (isStaleEphemeralMessage(message, earliestAllowableEphemeralTimestamp)) {
// skip fetching content for message that will be discarded
experimentMono = Mono.empty();
} else {
experimentMono = maybeRunMrmViewExperiment(message, destinationUuid, destinationDevice);
}
// mrm views phase 1: messageMono for sharedMrmKey is always Mono.just(), because messages always have content
// To avoid races, wait for the experiment to run, but ignore any errors

View File

@ -330,7 +330,7 @@ class MessagesCacheTest {
.get(5, TimeUnit.SECONDS);
final List<MessageProtos.Envelope> messages = messagesCache.getAllMessages(DESTINATION_UUID,
DESTINATION_DEVICE_ID)
DESTINATION_DEVICE_ID, 0)
.collectList()
.toFuture().get(5, TimeUnit.SECONDS);
@ -741,7 +741,7 @@ class MessagesCacheTest {
.thenReturn(Flux.from(emptyFinalPagePublisher))
.thenReturn(Flux.empty());
final Flux<?> allMessages = messagesCache.getAllMessages(UUID.randomUUID(), Device.PRIMARY_ID);
final Flux<?> allMessages = messagesCache.getAllMessages(UUID.randomUUID(), Device.PRIMARY_ID, 0);
// Why initialValue = 3?
// 1. messagesCache.getAllMessages() above produces the first call