From 2ab42f3dd621d0181d2ddbb6a99f1cf0f7cdf5c7 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 19 Aug 2020 10:54:33 -0400 Subject: [PATCH] Refine and expand clustered message cache metrics. --- .../storage/RedisClusterMessagesCache.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java index bebda380e..e577d2364 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java @@ -7,7 +7,9 @@ import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; +import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.entities.MessageProtos; @@ -50,16 +52,20 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter messageListenersByQueueName = new HashMap<>(); private final Map queueNamesByMessageListener = new IdentityHashMap<>(); + private final Timer insertTimer = Metrics.timer(name(RedisClusterMessagesCache.class, "insert")); + private final Timer getMessagesTimer = Metrics.timer(name(RedisClusterMessagesCache.class, "get")); + private final Timer clearQueueTimer = Metrics.timer(name(RedisClusterMessagesCache.class, "clear")); + private final Counter pubSubMessageCounter = Metrics.counter(name(RedisClusterMessagesCache.class, "pubSubMessage")); + private final Counter newMessageNotificationCounter = Metrics.counter(name(RedisClusterMessagesCache.class, "newMessageNotification")); + private final Counter queuePersistedNotificationCounter = Metrics.counter(name(RedisClusterMessagesCache.class, "queuePersisted")); + static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot"; private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8); private static final String QUEUE_KEYSPACE_PATTERN = "__keyspace@0__:user_queue::*"; private static final String PERSISTING_KEYSPACE_PATTERN = "__keyspace@0__:user_queue_persisting::*"; - private static final String INSERT_TIMER_NAME = name(RedisClusterMessagesCache.class, "insert"); private static final String REMOVE_TIMER_NAME = name(RedisClusterMessagesCache.class, "remove"); - private static final String GET_TIMER_NAME = name(RedisClusterMessagesCache.class, "get"); - private static final String CLEAR_TIMER_NAME = name(RedisClusterMessagesCache.class, "clear"); private static final String REMOVE_METHOD_TAG = "method"; private static final String REMOVE_METHOD_ID = "id"; @@ -105,7 +111,7 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter + return (long)insertTimer.record(() -> insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice), getMessageQueueMetadataKey(destinationUuid, destinationDevice), getQueueIndexKey(destinationUuid, destinationDevice)), @@ -119,7 +125,7 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter + return (long)insertTimer.record(() -> insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice), getMessageQueueMetadataKey(destinationUuid, destinationDevice), getQueueIndexKey(destinationUuid, destinationDevice)), @@ -191,7 +197,7 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter get(final String destination, final UUID destinationUuid, final long destinationDevice, final int limit) { - return Metrics.timer(GET_TIMER_NAME).record(() -> { + return getMessagesTimer.record(() -> { final List queueItems = (List)getItemsScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice), getPersistInProgressKey(destinationUuid, destinationDevice)), List.of(String.valueOf(limit).getBytes(StandardCharsets.UTF_8))); @@ -223,7 +229,7 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter getMessagesToPersist(final UUID accountUuid, final long destinationDevice, final int limit) { - return Metrics.timer(GET_TIMER_NAME).record(() -> { + return getMessagesTimer.record(() -> { final List queueItems = (List)getItemsScript.executeBinary(List.of(getMessageQueueKey(accountUuid, destinationDevice), getPersistInProgressKey(accountUuid, destinationDevice)), List.of(String.valueOf(limit).getBytes(StandardCharsets.UTF_8))); @@ -261,7 +267,7 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter + clearQueueTimer.record(() -> removeQueueScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, deviceId), getMessageQueueMetadataKey(destinationUuid, deviceId), getQueueIndexKey(destinationUuid, deviceId)), @@ -308,9 +314,13 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable)); } else if (PERSISTING_KEYSPACE_PATTERN.equals(pattern) && "del".equals(message)) { + queuePersistedNotificationCounter.increment(); notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted)); } }