Discard stale messages from MessagesCache#getMessagesToPersist

This commit is contained in:
Chris Eager 2025-01-31 16:35:13 -06:00 committed by Chris Eager
parent 3f9863c441
commit 6d30a45017
2 changed files with 19 additions and 9 deletions

View File

@ -528,7 +528,6 @@ public class MessagesCache {
});
}
@VisibleForTesting
List<MessageProtos.Envelope> getMessagesToPersist(final UUID accountUuid, final byte destinationDevice,
final int limit) {
@ -537,7 +536,7 @@ public class MessagesCache {
final List<byte[]> messages = redisCluster.withBinaryCluster(connection ->
connection.sync().zrange(getMessageQueueKey(accountUuid, destinationDevice), 0, limit));
return Flux.fromIterable(messages)
final Flux<MessageProtos.Envelope> allMessages = Flux.fromIterable(messages)
.mapNotNull(message -> {
try {
return MessageProtos.Envelope.parseFrom(message);
@ -555,7 +554,22 @@ public class MessagesCache {
}
return messageMono;
})
});
final Flux<MessageProtos.Envelope> messagesToPersist = allMessages
.filter(Predicate.not(envelope ->
envelope.getEphemeral() || isStaleMrmMessage(envelope)));
final Flux<MessageProtos.Envelope> ephemeralMessages = allMessages
.filter(MessageProtos.Envelope::getEphemeral);
discardStaleMessages(accountUuid, destinationDevice, ephemeralMessages, staleEphemeralMessagesCounter, "ephemeral");
final Flux<MessageProtos.Envelope> staleMrmMessages = allMessages.filter(MessagesCache::isStaleMrmMessage)
// clearing the sharedMrmKey prevents unnecessary calls to update the shared MRM data
.map(envelope -> envelope.toBuilder().clearSharedMrmKey().build());
discardStaleMessages(accountUuid, destinationDevice, staleMrmMessages, staleMrmMessagesCounter, "mrm");
return messagesToPersist
.collectList()
.doOnTerminate(() -> sample.stop(getMessagesTimer))
.block(Duration.ofSeconds(5));

View File

@ -267,11 +267,7 @@ public class MessagesManager {
final Device destinationDevice,
final List<Envelope> messages) {
final List<Envelope> nonEphemeralMessages = messages.stream()
.filter(envelope -> !envelope.getEphemeral())
.collect(Collectors.toList());
messagesDynamoDb.store(nonEphemeralMessages, destinationUuid, destinationDevice);
messagesDynamoDb.store(messages, destinationUuid, destinationDevice);
final List<UUID> messageGuids = messages.stream().map(message -> UUID.fromString(message.getServerGuid()))
.collect(Collectors.toList());
@ -279,7 +275,7 @@ public class MessagesManager {
try {
messagesRemovedFromCache = messagesCache.remove(destinationUuid, destinationDevice.getId(), messageGuids)
.get(30, TimeUnit.SECONDS).size();
PERSIST_MESSAGE_COUNTER.increment(nonEphemeralMessages.size());
PERSIST_MESSAGE_COUNTER.increment(messages.size());
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.warn("Failed to remove messages from cache", e);