diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 3633986db..961d55c1a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -528,7 +528,6 @@ public class MessagesCache { }); } - @VisibleForTesting List getMessagesToPersist(final UUID accountUuid, final byte destinationDevice, final int limit) { @@ -537,7 +536,7 @@ public class MessagesCache { final List messages = redisCluster.withBinaryCluster(connection -> connection.sync().zrange(getMessageQueueKey(accountUuid, destinationDevice), 0, limit)); - return Flux.fromIterable(messages) + final Flux allMessages = Flux.fromIterable(messages) .mapNotNull(message -> { try { return MessageProtos.Envelope.parseFrom(message); @@ -555,7 +554,22 @@ public class MessagesCache { } return messageMono; - }) + }); + + final Flux messagesToPersist = allMessages + .filter(Predicate.not(envelope -> + envelope.getEphemeral() || isStaleMrmMessage(envelope))); + + final Flux ephemeralMessages = allMessages + .filter(MessageProtos.Envelope::getEphemeral); + discardStaleMessages(accountUuid, destinationDevice, ephemeralMessages, staleEphemeralMessagesCounter, "ephemeral"); + + final Flux staleMrmMessages = allMessages.filter(MessagesCache::isStaleMrmMessage) + // clearing the sharedMrmKey prevents unnecessary calls to update the shared MRM data + .map(envelope -> envelope.toBuilder().clearSharedMrmKey().build()); + discardStaleMessages(accountUuid, destinationDevice, staleMrmMessages, staleMrmMessagesCounter, "mrm"); + + return messagesToPersist .collectList() .doOnTerminate(() -> sample.stop(getMessagesTimer)) .block(Duration.ofSeconds(5)); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index c272fc895..2b36a56e6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -267,11 +267,7 @@ public class MessagesManager { final Device destinationDevice, final List messages) { - final List nonEphemeralMessages = messages.stream() - .filter(envelope -> !envelope.getEphemeral()) - .collect(Collectors.toList()); - - messagesDynamoDb.store(nonEphemeralMessages, destinationUuid, destinationDevice); + messagesDynamoDb.store(messages, destinationUuid, destinationDevice); final List messageGuids = messages.stream().map(message -> UUID.fromString(message.getServerGuid())) .collect(Collectors.toList()); @@ -279,7 +275,7 @@ public class MessagesManager { try { messagesRemovedFromCache = messagesCache.remove(destinationUuid, destinationDevice.getId(), messageGuids) .get(30, TimeUnit.SECONDS).size(); - PERSIST_MESSAGE_COUNTER.increment(nonEphemeralMessages.size()); + PERSIST_MESSAGE_COUNTER.increment(messages.size()); } catch (InterruptedException | ExecutionException | TimeoutException e) { logger.warn("Failed to remove messages from cache", e);