Filter stale ephemeral messages from cache

This commit is contained in:
Chris Eager 2021-08-26 11:57:15 -05:00 committed by Chris Eager
parent 3e5087e60b
commit d1d6e5c652
1 changed files with 20 additions and 2 deletions

View File

@ -67,7 +67,10 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage"));
private final Counter newMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification"));
private final Counter queuePersistedNotificationCounter = Metrics.counter(name(MessagesCache.class, "queuePersisted"));
private final Counter queuePersistedNotificationCounter = Metrics.counter(
name(MessagesCache.class, "queuePersisted"));
private final Counter staleEphemeralMessagesCounter = Metrics.counter(
name(MessagesCache.class, "staleEphemeralMessages"));
static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot";
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);
@ -178,7 +181,10 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
getPersistInProgressKey(destinationUuid, destinationDevice)),
List.of(String.valueOf(limit).getBytes(StandardCharsets.UTF_8)));
final List<OutgoingMessageEntity> messageEntities;
final long earliestAllowableEphemeralTimestamp = System.currentTimeMillis() - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis();
final List<OutgoingMessageEntity> messageEntities;
final List<UUID> staleEphemeralMessageGuids = new ArrayList<>();
if (queueItems.size() % 2 == 0) {
messageEntities = new ArrayList<>(queueItems.size() / 2);
@ -186,6 +192,11 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
for (int i = 0; i < queueItems.size() - 1; i += 2) {
try {
final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(queueItems.get(i));
if (message.getEphemeral() && message.getTimestamp() < earliestAllowableEphemeralTimestamp) {
staleEphemeralMessageGuids.add(UUID.fromString(message.getServerGuid()));
continue;
}
final long id = Long.parseLong(new String(queueItems.get(i + 1), StandardCharsets.UTF_8));
messageEntities.add(constructEntityFromEnvelope(id, message));
@ -198,6 +209,13 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
messageEntities = Collections.emptyList();
}
try {
remove(destinationUuid, destinationDevice, staleEphemeralMessageGuids);
staleEphemeralMessagesCounter.increment(staleEphemeralMessageGuids.size());
} catch (final Throwable e) {
logger.warn("Could not remove stale ephemeral messages from cache", e);
}
return messageEntities;
});
}