From a0770db17919fd41629149e22f88c2ead98e8055 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Thu, 5 Sep 2024 09:27:43 -0500 Subject: [PATCH] Add timer to removeRecipientViewFromMrmData --- .../textsecuregcm/storage/MessagesCache.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index e8b660268..ac1114a1d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -148,6 +148,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get")); private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist")); private final Timer removeByGuidTimer = Metrics.timer(name(MessagesCache.class, "removeByGuid")); + private final Timer removeRecipientViewTimer = Metrics.timer(name(MessagesCache.class, "removeRecipientView")); private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear")); private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage")); private final Counter newMessageNotificationCounter = Metrics.counter( @@ -427,8 +428,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp final Experiment experiment = new Experiment(MRM_VIEWS_EXPERIMENT_NAME); final byte[] key = mrmMessage.getSharedMrmKey().toByteArray(); - final byte[] sharedMrmViewKey = MessagesCache.getSharedMrmViewKey( - new AciServiceIdentifier(destinationUuid), destinationDevice); + final byte[] sharedMrmViewKey = MessagesCache.getSharedMrmViewKey(new AciServiceIdentifier(destinationUuid), + destinationDevice); final Mono mrmMessageMono = Mono.from(redisCluster.withBinaryClusterReactive( conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey) @@ -463,13 +464,22 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp */ private void removeRecipientViewFromMrmData(final List sharedMrmKeys, final UUID accountUuid, final byte deviceId) { + + final Timer.Sample sample = Timer.start(); Flux.fromIterable(sharedMrmKeys) .collectMultimap(SlotHash::getSlot) .flatMapMany(slotsAndKeys -> Flux.fromIterable(slotsAndKeys.values())) .flatMap( keys -> removeRecipientViewFromMrmDataScript.execute(keys, new AciServiceIdentifier(accountUuid), deviceId), REMOVE_MRM_RECIPIENT_VIEW_CONCURRENCY) - .subscribe(sharedMrmDataKeyRemovedCounter::increment, e -> logger.warn("Error removing recipient view", e)); + .doOnNext(sharedMrmDataKeyRemovedCounter::increment) + .onErrorResume(e -> { + logger.warn("Error removing recipient view", e); + return Mono.just(0L); + }) + .then() + .doOnTerminate(() -> sample.stop(removeRecipientViewTimer)) + .subscribe(); } private Mono, Long>> getNextMessagePage(final UUID destinationUuid,