Revert "Add more detailed queue processing rate metrics"

This reverts commit bbbab4b8a4.
This commit is contained in:
Chris Eager 2022-11-08 09:35:39 -06:00 committed by GitHub
parent ad5925908e
commit 69ed0edb74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 17 additions and 78 deletions

View File

@ -75,8 +75,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class,
"initialQueueLength");
private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue");
private static final String DRAIN_RATE_DISTRIBUTION_NAME = name(WebSocketConnection.class, "drainRate");
private static final String SERVER_DRAIN_RATE_DISTRIBUTION_NAME = name(WebSocketConnection.class, "serverDrainRate");
private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name(WebSocketConnection.class, "slowQueueDrain");
private static final String QUEUE_DRAIN_RETRY_COUNTER_NAME = name(WebSocketConnection.class, "queueDrainRetry");
private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement");
@ -89,7 +87,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private static final String STATUS_CODE_TAG = "status";
private static final String STATUS_MESSAGE_TAG = "message";
private static final String REACTIVE_TAG = "reactive";
private static final String QUEUE_LENGTH_BUCKET_TAG = "queueLengthBucket";
private static final long SLOW_DRAIN_THRESHOLD = 10_000;
@ -121,7 +118,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
private final LongAdder sentMessageCounter = new LongAdder();
private final LongAdder queueServerProcessingNanosCounter = new LongAdder();
private final AtomicLong queueDrainStartTime = new AtomicLong();
private final AtomicInteger consecutiveRetries = new AtomicInteger();
private final AtomicReference<ScheduledFuture<?>> retryFuture = new AtomicReference<>();
@ -235,7 +231,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
sendFailuresMeter.mark();
}
}).thenCompose(response -> {
final long responseProcessingStartNanos = System.nanoTime();
final CompletableFuture<?> result;
if (isSuccessResponse(response)) {
@ -247,26 +242,22 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
sendDeliveryReceiptFor(message);
}
} else {
final List<Tag> tags = new ArrayList<>(
List.of(
Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())),
UserAgentTagUtil.getPlatformTag(client.getUserAgent()),
Tag.of(REACTIVE_TAG, String.valueOf(useReactive))
));
final List<Tag> tags = new ArrayList<>(
List.of(
Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())),
UserAgentTagUtil.getPlatformTag(client.getUserAgent()),
Tag.of(REACTIVE_TAG, String.valueOf(useReactive))
));
// TODO Remove this once we've identified the cause of message rejections from desktop clients
if (StringUtils.isNotBlank(response.getMessage())) {
tags.add(Tag.of(STATUS_MESSAGE_TAG, response.getMessage()));
// TODO Remove this once we've identified the cause of message rejections from desktop clients
if (StringUtils.isNotBlank(response.getMessage())) {
tags.add(Tag.of(STATUS_MESSAGE_TAG, response.getMessage()));
}
Metrics.counter(NON_SUCCESS_RESPONSE_COUNTER_NAME, tags).increment();
result = CompletableFuture.completedFuture(null);
}
Metrics.counter(NON_SUCCESS_RESPONSE_COUNTER_NAME, tags).increment();
result = CompletableFuture.completedFuture(null);
}
result.thenRun(() -> {
final long responseProcessingNanos = System.nanoTime() - responseProcessingStartNanos;
queueServerProcessingNanosCounter.add(responseProcessingNanos);
});
return result;
});
@ -335,31 +326,9 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
);
final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get();
final long serverDrainDurationNanos = queueServerProcessingNanosCounter.sum();
final long sentMessageCount = sentMessageCounter.sum();
Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCount);
Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCounter.sum());
Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, tags).record(drainDuration, TimeUnit.MILLISECONDS);
final Tags drainRateDistributionTags = Tags.of(tags)
.and(QUEUE_LENGTH_BUCKET_TAG, getQueueLengthBucketTag(sentMessageCount));
if (drainDuration != 0 && sentMessageCount != 0) {
// Record a bucketed distribution of the queue drain rate using total start
final double drainTimeInSeconds = drainDuration / 1e3;
final double drainRatePerSecond = sentMessageCount / drainTimeInSeconds;
Metrics.summary(DRAIN_RATE_DISTRIBUTION_NAME, drainRateDistributionTags).record(drainRatePerSecond);
}
if (serverDrainDurationNanos != 0 && sentMessageCount != 0) {
// Record a bucketed distribution of the portion of the drain time dedicated to purely server-side processing.
// Because of asynchronous I/O and variable remote operations, this is an approximation of the overhead.
final double serverProcessingTimeInSeconds = (double) serverDrainDurationNanos / 1e9;
final double serverProcessingRatePerSecond = sentMessageCount / serverProcessingTimeInSeconds;
Metrics.summary(SERVER_DRAIN_RATE_DISTRIBUTION_NAME, drainRateDistributionTags)
.record(serverProcessingRatePerSecond);
}
if (drainDuration > SLOW_DRAIN_THRESHOLD) {
Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, tags).increment();
}
@ -400,28 +369,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
});
}
private static String getQueueLengthBucketTag(long sentMessageCount) {
// Per Datadog: https://docs.datadoghq.com/developers/guide/what-best-practices-are-recommended-for-naming-metrics-and-tags/
// - Tags must start with a letter.
// - May contain alphanumerics, underscores, minuses, colons, periods, and slashes. Other characters are converted to underscores.
// - A trailing underscore is removed, whether if it originated from a converted character or if it was in the original tag value.
if (sentMessageCount < 100) {
return "b_0-100";
} else if (sentMessageCount < 1000) {
return "b_100-1000";
} else if (sentMessageCount < 2500) {
return "b_1000-2500";
} else if (sentMessageCount < 5000) {
return "b_2500-5000";
} else if (sentMessageCount < 10_000) {
return "b_5000-10000";
} else if (sentMessageCount < 15_000) {
return "b_10000-15000";
} else {
return "b_15000..";
}
}
private void processStoredMessages_reactive() {
assert useReactive;
@ -489,23 +436,15 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
}
private CompletableFuture<?> sendMessage(Envelope envelope) {
final long sendMessageStartNanos = System.nanoTime();
final UUID messageGuid = UUID.fromString(envelope.getServerGuid());
final CompletableFuture<?> result;
if (envelope.getStory() && !client.shouldDeliverStories()) {
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), messageGuid, envelope.getServerTimestamp());
result = CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(null);
} else {
result = sendMessage(envelope, new StoredMessageInfo(messageGuid, envelope.getServerTimestamp()));
return sendMessage(envelope, new StoredMessageInfo(messageGuid, envelope.getServerTimestamp()));
}
// This is likely a trivial amount of time, since we are just enqueuing outgoing messages to send asynchronously.
final long sendMessageDurationNanos = System.nanoTime() - sendMessageStartNanos;
queueServerProcessingNanosCounter.add(sendMessageDurationNanos);
return result;
}
@Override