Enable metrics on messages fluxes

This commit is contained in:
Chris Eager 2022-11-02 10:45:11 -05:00 committed by Chris Eager
parent 6426e6cc49
commit c6a79ca176
3 changed files with 14 additions and 2 deletions

View File

@ -44,6 +44,7 @@ import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
@ -92,6 +93,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
@VisibleForTesting
static final Duration MAX_EPHEMERAL_MESSAGE_DELAY = Duration.ofSeconds(10);
private static final String GET_FLUX_NAME = MetricsUtil.name(MessagesCache.class, "get");
private static final int PAGE_SIZE = 100;
private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class);
@ -220,7 +222,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
discardStaleEphemeralMessages(destinationUuid, destinationDevice, staleEphemeralMessages);
return messagesToPublish;
return messagesToPublish.name(GET_FLUX_NAME)
.metrics();
}
private static boolean isStaleEphemeralMessage(final MessageProtos.Envelope message,

View File

@ -23,6 +23,7 @@ import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import reactor.core.publisher.Flux;
@ -30,6 +31,7 @@ import reactor.core.publisher.Flux;
public class MessagesManager {
private static final int RESULT_SET_CHUNK_SIZE = 100;
final String GET_MESSAGES_FOR_DEVICE_FLUX_NAME = MetricsUtil.name(MessagesManager.class, "getMessagesForDevice");
private static final Logger logger = LoggerFactory.getLogger(MessagesManager.class);
@ -91,7 +93,9 @@ public class MessagesManager {
cachedMessagesOnly ? Flux.empty() : messagesDynamoDb.load(destinationUuid, destinationDevice, limit);
final Publisher<Envelope> cachePublisher = messagesCache.get(destinationUuid, destinationDevice);
return Flux.concat(dynamoPublisher, cachePublisher);
return Flux.concat(dynamoPublisher, cachePublisher)
.name(GET_MESSAGES_FOR_DEVICE_FLUX_NAME)
.metrics();
}
public void clear(UUID destinationUuid) {

View File

@ -41,6 +41,7 @@ import org.whispersystems.textsecuregcm.controllers.MessageController;
import org.whispersystems.textsecuregcm.controllers.NoSuchUserException;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.push.DisplacedPresenceListener;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
@ -83,6 +84,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
"clientNonSuccessResponse");
private static final String CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME = name(WebSocketConnection.class,
"messageAvailableAfterClientClosed");
private static final String SEND_MESSAGES_FLUX_NAME = MetricsUtil.name(WebSocketConnection.class,
"websocketConnection");
private static final String STATUS_CODE_TAG = "status";
private static final String STATUS_MESSAGE_TAG = "message";
private static final String REACTIVE_TAG = "reactive";
@ -423,6 +426,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
messagesManager.getMessagesForDeviceReactive(auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly);
final Disposable subscription = Flux.from(messages)
.name(SEND_MESSAGES_FLUX_NAME)
.metrics()
.limitRate(MESSAGE_PUBLISHER_LIMIT_RATE)
.flatMapSequential(envelope ->
Mono.fromFuture(sendMessage(envelope).orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)))