From bbbab4b8a49b301c085f4bde28bf54696f326c14 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Wed, 2 Nov 2022 21:30:53 -0500 Subject: [PATCH] Add more detailed queue processing rate metrics --- .../websocket/WebSocketConnection.java | 95 +++++++++++++++---- 1 file changed, 78 insertions(+), 17 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 2b6333c3b..9f347f68d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -77,6 +77,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class, "initialQueueLength"); private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue"); + private static final String DRAIN_RATE_DISTRIBUTION_NAME = name(WebSocketConnection.class, "drainRate"); + private static final String SERVER_DRAIN_RATE_DISTRIBUTION_NAME = name(WebSocketConnection.class, "serverDrainRate"); private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name(WebSocketConnection.class, "slowQueueDrain"); private static final String QUEUE_DRAIN_RETRY_COUNTER_NAME = name(WebSocketConnection.class, "queueDrainRetry"); private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement"); @@ -89,6 +91,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private static final String STATUS_CODE_TAG = "status"; private static final String STATUS_MESSAGE_TAG = "message"; private static final String REACTIVE_TAG = "reactive"; + private static final String QUEUE_LENGTH_BUCKET_TAG = "queueLengthBucket"; private static final long SLOW_DRAIN_THRESHOLD = 10_000; @@ -120,6 +123,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE); private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false); private final LongAdder sentMessageCounter = new LongAdder(); + private final LongAdder queueServerProcessingNanosCounter = new LongAdder(); private final AtomicLong queueDrainStartTime = new AtomicLong(); private final AtomicInteger consecutiveRetries = new AtomicInteger(); private final AtomicReference> retryFuture = new AtomicReference<>(); @@ -233,6 +237,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac sendFailuresMeter.mark(); } }).thenCompose(response -> { + final long responseProcessingStartNanos = System.nanoTime(); final CompletableFuture result; if (isSuccessResponse(response)) { @@ -244,22 +249,26 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac sendDeliveryReceiptFor(message); } } else { - final List tags = new ArrayList<>( - List.of( - Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())), - UserAgentTagUtil.getPlatformTag(client.getUserAgent()), - Tag.of(REACTIVE_TAG, String.valueOf(useReactive)) - )); + final List tags = new ArrayList<>( + List.of( + Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())), + UserAgentTagUtil.getPlatformTag(client.getUserAgent()), + Tag.of(REACTIVE_TAG, String.valueOf(useReactive)) + )); - // TODO Remove this once we've identified the cause of message rejections from desktop clients - if (StringUtils.isNotBlank(response.getMessage())) { - tags.add(Tag.of(STATUS_MESSAGE_TAG, response.getMessage())); - } - - Metrics.counter(NON_SUCCESS_RESPONSE_COUNTER_NAME, tags).increment(); - - result = CompletableFuture.completedFuture(null); + // TODO Remove this once we've identified the cause of message rejections from desktop clients + if (StringUtils.isNotBlank(response.getMessage())) { + tags.add(Tag.of(STATUS_MESSAGE_TAG, response.getMessage())); } + Metrics.counter(NON_SUCCESS_RESPONSE_COUNTER_NAME, tags).increment(); + + result = CompletableFuture.completedFuture(null); + } + + result.thenRun(() -> { + final long responseProcessingNanos = System.nanoTime() - responseProcessingStartNanos; + queueServerProcessingNanosCounter.add(responseProcessingNanos); + }); return result; }); @@ -330,9 +339,31 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac ); final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get(); - Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCounter.sum()); + final long serverDrainDurationNanos = queueServerProcessingNanosCounter.sum(); + final long sentMessageCount = sentMessageCounter.sum(); + + Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCount); Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, tags).record(drainDuration, TimeUnit.MILLISECONDS); + final Tags drainRateDistributionTags = Tags.of(tags) + .and(QUEUE_LENGTH_BUCKET_TAG, getQueueLengthBucketTag(sentMessageCount)); + + if (drainDuration != 0 && sentMessageCount != 0) { + // Record a bucketed distribution of the queue drain rate using total start + final double drainTimeInSeconds = drainDuration / 1e3; + final double drainRatePerSecond = sentMessageCount / drainTimeInSeconds; + Metrics.summary(DRAIN_RATE_DISTRIBUTION_NAME, drainRateDistributionTags).record(drainRatePerSecond); + } + + if (serverDrainDurationNanos != 0 && sentMessageCount != 0) { + // Record a bucketed distribution of the portion of the drain time dedicated to purely server-side processing. + // Because of asynchronous I/O and variable remote operations, this is an approximation of the overhead. + final double serverProcessingTimeInSeconds = (double) serverDrainDurationNanos / 1e9; + final double serverProcessingRatePerSecond = sentMessageCount / serverProcessingTimeInSeconds; + Metrics.summary(SERVER_DRAIN_RATE_DISTRIBUTION_NAME, drainRateDistributionTags) + .record(serverProcessingRatePerSecond); + } + if (drainDuration > SLOW_DRAIN_THRESHOLD) { Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, tags).increment(); } @@ -373,6 +404,28 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac }); } + private static String getQueueLengthBucketTag(long sentMessageCount) { + // Per Datadog: https://docs.datadoghq.com/developers/guide/what-best-practices-are-recommended-for-naming-metrics-and-tags/ + // - Tags must start with a letter. + // - May contain alphanumerics, underscores, minuses, colons, periods, and slashes. Other characters are converted to underscores. + // - A trailing underscore is removed, whether if it originated from a converted character or if it was in the original tag value. + if (sentMessageCount < 100) { + return "b_0-100"; + } else if (sentMessageCount < 1000) { + return "b_100-1000"; + } else if (sentMessageCount < 2500) { + return "b_1000-2500"; + } else if (sentMessageCount < 5000) { + return "b_2500-5000"; + } else if (sentMessageCount < 10_000) { + return "b_5000-10000"; + } else if (sentMessageCount < 15_000) { + return "b_10000-15000"; + } else { + return "b_15000.."; + } + } + private void processStoredMessages_reactive() { assert useReactive; @@ -440,15 +493,23 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac } private CompletableFuture sendMessage(Envelope envelope) { + final long sendMessageStartNanos = System.nanoTime(); final UUID messageGuid = UUID.fromString(envelope.getServerGuid()); + final CompletableFuture result; if (envelope.getStory() && !client.shouldDeliverStories()) { messagesManager.delete(auth.getAccount().getUuid(), device.getId(), messageGuid, envelope.getServerTimestamp()); - return CompletableFuture.completedFuture(null); + result = CompletableFuture.completedFuture(null); } else { - return sendMessage(envelope, new StoredMessageInfo(messageGuid, envelope.getServerTimestamp())); + result = sendMessage(envelope, new StoredMessageInfo(messageGuid, envelope.getServerTimestamp())); } + + // This is likely a trivial amount of time, since we are just enqueuing outgoing messages to send asynchronously. + final long sendMessageDurationNanos = System.nanoTime() - sendMessageStartNanos; + queueServerProcessingNanosCounter.add(sendMessageDurationNanos); + + return result; } @Override