Refine and expand clustered message cache metrics.

This commit is contained in:
Jon Chambers 2020-08-19 10:54:33 -04:00 committed by Jon Chambers
parent af34b43a8d
commit 2ab42f3dd6
1 changed files with 18 additions and 8 deletions

View File

@ -7,7 +7,9 @@ import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
@ -50,16 +52,20 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
private final Map<String, MessageAvailabilityListener> messageListenersByQueueName = new HashMap<>();
private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>();
private final Timer insertTimer = Metrics.timer(name(RedisClusterMessagesCache.class, "insert"));
private final Timer getMessagesTimer = Metrics.timer(name(RedisClusterMessagesCache.class, "get"));
private final Timer clearQueueTimer = Metrics.timer(name(RedisClusterMessagesCache.class, "clear"));
private final Counter pubSubMessageCounter = Metrics.counter(name(RedisClusterMessagesCache.class, "pubSubMessage"));
private final Counter newMessageNotificationCounter = Metrics.counter(name(RedisClusterMessagesCache.class, "newMessageNotification"));
private final Counter queuePersistedNotificationCounter = Metrics.counter(name(RedisClusterMessagesCache.class, "queuePersisted"));
static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot";
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);
private static final String QUEUE_KEYSPACE_PATTERN = "__keyspace@0__:user_queue::*";
private static final String PERSISTING_KEYSPACE_PATTERN = "__keyspace@0__:user_queue_persisting::*";
private static final String INSERT_TIMER_NAME = name(RedisClusterMessagesCache.class, "insert");
private static final String REMOVE_TIMER_NAME = name(RedisClusterMessagesCache.class, "remove");
private static final String GET_TIMER_NAME = name(RedisClusterMessagesCache.class, "get");
private static final String CLEAR_TIMER_NAME = name(RedisClusterMessagesCache.class, "clear");
private static final String REMOVE_METHOD_TAG = "method";
private static final String REMOVE_METHOD_ID = "id";
@ -105,7 +111,7 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
final String sender = message.hasSource() ? (message.getSource() + "::" + message.getTimestamp()) : "nil";
return (long)Metrics.timer(INSERT_TIMER_NAME).record(() ->
return (long)insertTimer.record(() ->
insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
getQueueIndexKey(destinationUuid, destinationDevice)),
@ -119,7 +125,7 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
final String sender = message.hasSource() ? (message.getSource() + "::" + message.getTimestamp()) : "nil";
return (long)Metrics.timer(INSERT_TIMER_NAME).record(() ->
return (long)insertTimer.record(() ->
insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
getQueueIndexKey(destinationUuid, destinationDevice)),
@ -191,7 +197,7 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
@Override
@SuppressWarnings("unchecked")
public List<OutgoingMessageEntity> get(final String destination, final UUID destinationUuid, final long destinationDevice, final int limit) {
return Metrics.timer(GET_TIMER_NAME).record(() -> {
return getMessagesTimer.record(() -> {
final List<byte[]> queueItems = (List<byte[]>)getItemsScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
getPersistInProgressKey(destinationUuid, destinationDevice)),
List.of(String.valueOf(limit).getBytes(StandardCharsets.UTF_8)));
@ -223,7 +229,7 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
@SuppressWarnings("unchecked")
@VisibleForTesting
List<MessageProtos.Envelope> getMessagesToPersist(final UUID accountUuid, final long destinationDevice, final int limit) {
return Metrics.timer(GET_TIMER_NAME).record(() -> {
return getMessagesTimer.record(() -> {
final List<byte[]> queueItems = (List<byte[]>)getItemsScript.executeBinary(List.of(getMessageQueueKey(accountUuid, destinationDevice),
getPersistInProgressKey(accountUuid, destinationDevice)),
List.of(String.valueOf(limit).getBytes(StandardCharsets.UTF_8)));
@ -261,7 +267,7 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
@Override
public void clear(final String destination, final UUID destinationUuid, final long deviceId) {
Metrics.timer(CLEAR_TIMER_NAME).record(() ->
clearQueueTimer.record(() ->
removeQueueScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, deviceId),
getMessageQueueMetadataKey(destinationUuid, deviceId),
getQueueIndexKey(destinationUuid, deviceId)),
@ -308,9 +314,13 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
@Override
public void message(final RedisClusterNode node, final String pattern, final String channel, final String message) {
pubSubMessageCounter.increment();
if (QUEUE_KEYSPACE_PATTERN.equals(pattern) && "zadd".equals(message)) {
newMessageNotificationCounter.increment();
notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable));
} else if (PERSISTING_KEYSPACE_PATTERN.equals(pattern) && "del".equals(message)) {
queuePersistedNotificationCounter.increment();
notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted));
}
}