From c6a79ca17685477e286a8d7966ce83d2a69fcf1a Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Wed, 2 Nov 2022 10:45:11 -0500 Subject: [PATCH] Enable metrics on messages fluxes --- .../whispersystems/textsecuregcm/storage/MessagesCache.java | 5 ++++- .../textsecuregcm/storage/MessagesManager.java | 6 +++++- .../textsecuregcm/websocket/WebSocketConnection.java | 5 +++++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 1fb2f87fb..b8bad3e5f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -44,6 +44,7 @@ import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; @@ -92,6 +93,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp @VisibleForTesting static final Duration MAX_EPHEMERAL_MESSAGE_DELAY = Duration.ofSeconds(10); + private static final String GET_FLUX_NAME = MetricsUtil.name(MessagesCache.class, "get"); private static final int PAGE_SIZE = 100; private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class); @@ -220,7 +222,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp discardStaleEphemeralMessages(destinationUuid, destinationDevice, staleEphemeralMessages); - return messagesToPublish; + return messagesToPublish.name(GET_FLUX_NAME) + .metrics(); } private static boolean isStaleEphemeralMessage(final MessageProtos.Envelope message, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index a20579ba5..2beae6949 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -23,6 +23,7 @@ import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Pair; import reactor.core.publisher.Flux; @@ -30,6 +31,7 @@ import reactor.core.publisher.Flux; public class MessagesManager { private static final int RESULT_SET_CHUNK_SIZE = 100; + final String GET_MESSAGES_FOR_DEVICE_FLUX_NAME = MetricsUtil.name(MessagesManager.class, "getMessagesForDevice"); private static final Logger logger = LoggerFactory.getLogger(MessagesManager.class); @@ -91,7 +93,9 @@ public class MessagesManager { cachedMessagesOnly ? Flux.empty() : messagesDynamoDb.load(destinationUuid, destinationDevice, limit); final Publisher cachePublisher = messagesCache.get(destinationUuid, destinationDevice); - return Flux.concat(dynamoPublisher, cachePublisher); + return Flux.concat(dynamoPublisher, cachePublisher) + .name(GET_MESSAGES_FOR_DEVICE_FLUX_NAME) + .metrics(); } public void clear(UUID destinationUuid) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 1a9a73996..b15b4e7e4 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -41,6 +41,7 @@ import org.whispersystems.textsecuregcm.controllers.MessageController; import org.whispersystems.textsecuregcm.controllers.NoSuchUserException; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.push.DisplacedPresenceListener; import org.whispersystems.textsecuregcm.push.ReceiptSender; @@ -83,6 +84,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac "clientNonSuccessResponse"); private static final String CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME = name(WebSocketConnection.class, "messageAvailableAfterClientClosed"); + private static final String SEND_MESSAGES_FLUX_NAME = MetricsUtil.name(WebSocketConnection.class, + "websocketConnection"); private static final String STATUS_CODE_TAG = "status"; private static final String STATUS_MESSAGE_TAG = "message"; private static final String REACTIVE_TAG = "reactive"; @@ -423,6 +426,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac messagesManager.getMessagesForDeviceReactive(auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly); final Disposable subscription = Flux.from(messages) + .name(SEND_MESSAGES_FLUX_NAME) + .metrics() .limitRate(MESSAGE_PUBLISHER_LIMIT_RATE) .flatMapSequential(envelope -> Mono.fromFuture(sendMessage(envelope).orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)))