From 69ed0edb7420efd6e0e466a719cd44cbaf973964 Mon Sep 17 00:00:00 2001 From: Chris Eager <79161849+eager-signal@users.noreply.github.com> Date: Tue, 8 Nov 2022 09:35:39 -0600 Subject: [PATCH] Revert "Add more detailed queue processing rate metrics" This reverts commit bbbab4b8a49b301c085f4bde28bf54696f326c14. --- .../websocket/WebSocketConnection.java | 95 ++++--------------- 1 file changed, 17 insertions(+), 78 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index af36618ed..147f62cff 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -75,8 +75,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class, "initialQueueLength"); private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue"); - private static final String DRAIN_RATE_DISTRIBUTION_NAME = name(WebSocketConnection.class, "drainRate"); - private static final String SERVER_DRAIN_RATE_DISTRIBUTION_NAME = name(WebSocketConnection.class, "serverDrainRate"); private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name(WebSocketConnection.class, "slowQueueDrain"); private static final String QUEUE_DRAIN_RETRY_COUNTER_NAME = name(WebSocketConnection.class, "queueDrainRetry"); private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement"); @@ -89,7 +87,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private static final String STATUS_CODE_TAG = "status"; private static final String STATUS_MESSAGE_TAG = "message"; private static final String REACTIVE_TAG = "reactive"; - private static final String QUEUE_LENGTH_BUCKET_TAG = "queueLengthBucket"; private static final long SLOW_DRAIN_THRESHOLD = 10_000; @@ -121,7 +118,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE); private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false); private final LongAdder sentMessageCounter = new LongAdder(); - private final LongAdder queueServerProcessingNanosCounter = new LongAdder(); private final AtomicLong queueDrainStartTime = new AtomicLong(); private final AtomicInteger consecutiveRetries = new AtomicInteger(); private final AtomicReference> retryFuture = new AtomicReference<>(); @@ -235,7 +231,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac sendFailuresMeter.mark(); } }).thenCompose(response -> { - final long responseProcessingStartNanos = System.nanoTime(); final CompletableFuture result; if (isSuccessResponse(response)) { @@ -247,26 +242,22 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac sendDeliveryReceiptFor(message); } } else { - final List tags = new ArrayList<>( - List.of( - Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())), - UserAgentTagUtil.getPlatformTag(client.getUserAgent()), - Tag.of(REACTIVE_TAG, String.valueOf(useReactive)) - )); + final List tags = new ArrayList<>( + List.of( + Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())), + UserAgentTagUtil.getPlatformTag(client.getUserAgent()), + Tag.of(REACTIVE_TAG, String.valueOf(useReactive)) + )); - // TODO Remove this once we've identified the cause of message rejections from desktop clients - if (StringUtils.isNotBlank(response.getMessage())) { - tags.add(Tag.of(STATUS_MESSAGE_TAG, response.getMessage())); + // TODO Remove this once we've identified the cause of message rejections from desktop clients + if (StringUtils.isNotBlank(response.getMessage())) { + tags.add(Tag.of(STATUS_MESSAGE_TAG, response.getMessage())); + } + + Metrics.counter(NON_SUCCESS_RESPONSE_COUNTER_NAME, tags).increment(); + + result = CompletableFuture.completedFuture(null); } - Metrics.counter(NON_SUCCESS_RESPONSE_COUNTER_NAME, tags).increment(); - - result = CompletableFuture.completedFuture(null); - } - - result.thenRun(() -> { - final long responseProcessingNanos = System.nanoTime() - responseProcessingStartNanos; - queueServerProcessingNanosCounter.add(responseProcessingNanos); - }); return result; }); @@ -335,31 +326,9 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac ); final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get(); - final long serverDrainDurationNanos = queueServerProcessingNanosCounter.sum(); - final long sentMessageCount = sentMessageCounter.sum(); - - Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCount); + Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCounter.sum()); Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, tags).record(drainDuration, TimeUnit.MILLISECONDS); - final Tags drainRateDistributionTags = Tags.of(tags) - .and(QUEUE_LENGTH_BUCKET_TAG, getQueueLengthBucketTag(sentMessageCount)); - - if (drainDuration != 0 && sentMessageCount != 0) { - // Record a bucketed distribution of the queue drain rate using total start - final double drainTimeInSeconds = drainDuration / 1e3; - final double drainRatePerSecond = sentMessageCount / drainTimeInSeconds; - Metrics.summary(DRAIN_RATE_DISTRIBUTION_NAME, drainRateDistributionTags).record(drainRatePerSecond); - } - - if (serverDrainDurationNanos != 0 && sentMessageCount != 0) { - // Record a bucketed distribution of the portion of the drain time dedicated to purely server-side processing. - // Because of asynchronous I/O and variable remote operations, this is an approximation of the overhead. - final double serverProcessingTimeInSeconds = (double) serverDrainDurationNanos / 1e9; - final double serverProcessingRatePerSecond = sentMessageCount / serverProcessingTimeInSeconds; - Metrics.summary(SERVER_DRAIN_RATE_DISTRIBUTION_NAME, drainRateDistributionTags) - .record(serverProcessingRatePerSecond); - } - if (drainDuration > SLOW_DRAIN_THRESHOLD) { Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, tags).increment(); } @@ -400,28 +369,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac }); } - private static String getQueueLengthBucketTag(long sentMessageCount) { - // Per Datadog: https://docs.datadoghq.com/developers/guide/what-best-practices-are-recommended-for-naming-metrics-and-tags/ - // - Tags must start with a letter. - // - May contain alphanumerics, underscores, minuses, colons, periods, and slashes. Other characters are converted to underscores. - // - A trailing underscore is removed, whether if it originated from a converted character or if it was in the original tag value. - if (sentMessageCount < 100) { - return "b_0-100"; - } else if (sentMessageCount < 1000) { - return "b_100-1000"; - } else if (sentMessageCount < 2500) { - return "b_1000-2500"; - } else if (sentMessageCount < 5000) { - return "b_2500-5000"; - } else if (sentMessageCount < 10_000) { - return "b_5000-10000"; - } else if (sentMessageCount < 15_000) { - return "b_10000-15000"; - } else { - return "b_15000.."; - } - } - private void processStoredMessages_reactive() { assert useReactive; @@ -489,23 +436,15 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac } private CompletableFuture sendMessage(Envelope envelope) { - final long sendMessageStartNanos = System.nanoTime(); final UUID messageGuid = UUID.fromString(envelope.getServerGuid()); - final CompletableFuture result; if (envelope.getStory() && !client.shouldDeliverStories()) { messagesManager.delete(auth.getAccount().getUuid(), device.getId(), messageGuid, envelope.getServerTimestamp()); - result = CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(null); } else { - result = sendMessage(envelope, new StoredMessageInfo(messageGuid, envelope.getServerTimestamp())); + return sendMessage(envelope, new StoredMessageInfo(messageGuid, envelope.getServerTimestamp())); } - - // This is likely a trivial amount of time, since we are just enqueuing outgoing messages to send asynchronously. - final long sendMessageDurationNanos = System.nanoTime() - sendMessageStartNanos; - queueServerProcessingNanosCounter.add(sendMessageDurationNanos); - - return result; } @Override