diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 4cbfe55e8..ed172225f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -67,7 +67,10 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear")); private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage")); private final Counter newMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification")); - private final Counter queuePersistedNotificationCounter = Metrics.counter(name(MessagesCache.class, "queuePersisted")); + private final Counter queuePersistedNotificationCounter = Metrics.counter( + name(MessagesCache.class, "queuePersisted")); + private final Counter staleEphemeralMessagesCounter = Metrics.counter( + name(MessagesCache.class, "staleEphemeralMessages")); static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot"; private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8); @@ -178,7 +181,10 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp getPersistInProgressKey(destinationUuid, destinationDevice)), List.of(String.valueOf(limit).getBytes(StandardCharsets.UTF_8))); - final List messageEntities; + final long earliestAllowableEphemeralTimestamp = System.currentTimeMillis() - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis(); + + final List messageEntities; + final List staleEphemeralMessageGuids = new ArrayList<>(); if (queueItems.size() % 2 == 0) { messageEntities = new ArrayList<>(queueItems.size() / 2); @@ -186,6 +192,11 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp for (int i = 0; i < queueItems.size() - 1; i += 2) { try { final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(queueItems.get(i)); + if (message.getEphemeral() && message.getTimestamp() < earliestAllowableEphemeralTimestamp) { + staleEphemeralMessageGuids.add(UUID.fromString(message.getServerGuid())); + continue; + } + final long id = Long.parseLong(new String(queueItems.get(i + 1), StandardCharsets.UTF_8)); messageEntities.add(constructEntityFromEnvelope(id, message)); @@ -198,6 +209,13 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp messageEntities = Collections.emptyList(); } + try { + remove(destinationUuid, destinationDevice, staleEphemeralMessageGuids); + staleEphemeralMessagesCounter.increment(staleEphemeralMessageGuids.size()); + } catch (final Throwable e) { + logger.warn("Could not remove stale ephemeral messages from cache", e); + } + return messageEntities; }); }