Add more detailed queue processing rate metrics
This commit is contained in:
		
							parent
							
								
									f83080eb8d
								
							
						
					
					
						commit
						bbbab4b8a4
					
				| 
						 | 
					@ -77,6 +77,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
 | 
				
			||||||
  private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class,
 | 
					  private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class,
 | 
				
			||||||
      "initialQueueLength");
 | 
					      "initialQueueLength");
 | 
				
			||||||
  private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue");
 | 
					  private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue");
 | 
				
			||||||
 | 
					  private static final String DRAIN_RATE_DISTRIBUTION_NAME = name(WebSocketConnection.class, "drainRate");
 | 
				
			||||||
 | 
					  private static final String SERVER_DRAIN_RATE_DISTRIBUTION_NAME = name(WebSocketConnection.class, "serverDrainRate");
 | 
				
			||||||
  private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name(WebSocketConnection.class, "slowQueueDrain");
 | 
					  private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name(WebSocketConnection.class, "slowQueueDrain");
 | 
				
			||||||
  private static final String QUEUE_DRAIN_RETRY_COUNTER_NAME = name(WebSocketConnection.class, "queueDrainRetry");
 | 
					  private static final String QUEUE_DRAIN_RETRY_COUNTER_NAME = name(WebSocketConnection.class, "queueDrainRetry");
 | 
				
			||||||
  private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement");
 | 
					  private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement");
 | 
				
			||||||
| 
						 | 
					@ -89,6 +91,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
 | 
				
			||||||
  private static final String STATUS_CODE_TAG = "status";
 | 
					  private static final String STATUS_CODE_TAG = "status";
 | 
				
			||||||
  private static final String STATUS_MESSAGE_TAG = "message";
 | 
					  private static final String STATUS_MESSAGE_TAG = "message";
 | 
				
			||||||
  private static final String REACTIVE_TAG = "reactive";
 | 
					  private static final String REACTIVE_TAG = "reactive";
 | 
				
			||||||
 | 
					  private static final String QUEUE_LENGTH_BUCKET_TAG = "queueLengthBucket";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  private static final long SLOW_DRAIN_THRESHOLD = 10_000;
 | 
					  private static final long SLOW_DRAIN_THRESHOLD = 10_000;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -120,6 +123,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
 | 
				
			||||||
      StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
 | 
					      StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
 | 
				
			||||||
  private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
 | 
					  private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
 | 
				
			||||||
  private final LongAdder sentMessageCounter = new LongAdder();
 | 
					  private final LongAdder sentMessageCounter = new LongAdder();
 | 
				
			||||||
 | 
					  private final LongAdder queueServerProcessingNanosCounter = new LongAdder();
 | 
				
			||||||
  private final AtomicLong queueDrainStartTime = new AtomicLong();
 | 
					  private final AtomicLong queueDrainStartTime = new AtomicLong();
 | 
				
			||||||
  private final AtomicInteger consecutiveRetries = new AtomicInteger();
 | 
					  private final AtomicInteger consecutiveRetries = new AtomicInteger();
 | 
				
			||||||
  private final AtomicReference<ScheduledFuture<?>> retryFuture = new AtomicReference<>();
 | 
					  private final AtomicReference<ScheduledFuture<?>> retryFuture = new AtomicReference<>();
 | 
				
			||||||
| 
						 | 
					@ -233,6 +237,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
 | 
				
			||||||
            sendFailuresMeter.mark();
 | 
					            sendFailuresMeter.mark();
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
        }).thenCompose(response -> {
 | 
					        }).thenCompose(response -> {
 | 
				
			||||||
 | 
					          final long responseProcessingStartNanos = System.nanoTime();
 | 
				
			||||||
          final CompletableFuture<?> result;
 | 
					          final CompletableFuture<?> result;
 | 
				
			||||||
          if (isSuccessResponse(response)) {
 | 
					          if (isSuccessResponse(response)) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -255,12 +260,16 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
 | 
				
			||||||
            if (StringUtils.isNotBlank(response.getMessage())) {
 | 
					            if (StringUtils.isNotBlank(response.getMessage())) {
 | 
				
			||||||
              tags.add(Tag.of(STATUS_MESSAGE_TAG, response.getMessage()));
 | 
					              tags.add(Tag.of(STATUS_MESSAGE_TAG, response.getMessage()));
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					 | 
				
			||||||
            Metrics.counter(NON_SUCCESS_RESPONSE_COUNTER_NAME, tags).increment();
 | 
					            Metrics.counter(NON_SUCCESS_RESPONSE_COUNTER_NAME, tags).increment();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            result = CompletableFuture.completedFuture(null);
 | 
					            result = CompletableFuture.completedFuture(null);
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          result.thenRun(() -> {
 | 
				
			||||||
 | 
					            final long responseProcessingNanos = System.nanoTime() - responseProcessingStartNanos;
 | 
				
			||||||
 | 
					            queueServerProcessingNanosCounter.add(responseProcessingNanos);
 | 
				
			||||||
 | 
					          });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
          return result;
 | 
					          return result;
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
| 
						 | 
					@ -330,9 +339,31 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
 | 
				
			||||||
          );
 | 
					          );
 | 
				
			||||||
          final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get();
 | 
					          final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
          Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCounter.sum());
 | 
					          final long serverDrainDurationNanos = queueServerProcessingNanosCounter.sum();
 | 
				
			||||||
 | 
					          final long sentMessageCount = sentMessageCounter.sum();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCount);
 | 
				
			||||||
          Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, tags).record(drainDuration, TimeUnit.MILLISECONDS);
 | 
					          Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, tags).record(drainDuration, TimeUnit.MILLISECONDS);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          final Tags drainRateDistributionTags = Tags.of(tags)
 | 
				
			||||||
 | 
					              .and(QUEUE_LENGTH_BUCKET_TAG, getQueueLengthBucketTag(sentMessageCount));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          if (drainDuration != 0 && sentMessageCount != 0) {
 | 
				
			||||||
 | 
					            // Record a bucketed distribution of the queue drain rate using total start
 | 
				
			||||||
 | 
					            final double drainTimeInSeconds = drainDuration / 1e3;
 | 
				
			||||||
 | 
					            final double drainRatePerSecond = sentMessageCount / drainTimeInSeconds;
 | 
				
			||||||
 | 
					            Metrics.summary(DRAIN_RATE_DISTRIBUTION_NAME, drainRateDistributionTags).record(drainRatePerSecond);
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          if (serverDrainDurationNanos != 0 && sentMessageCount != 0) {
 | 
				
			||||||
 | 
					            // Record a bucketed distribution of the portion of the drain time dedicated to purely server-side processing.
 | 
				
			||||||
 | 
					            // Because of asynchronous I/O and variable remote operations, this is an approximation of the overhead.
 | 
				
			||||||
 | 
					            final double serverProcessingTimeInSeconds = (double) serverDrainDurationNanos / 1e9;
 | 
				
			||||||
 | 
					            final double serverProcessingRatePerSecond = sentMessageCount / serverProcessingTimeInSeconds;
 | 
				
			||||||
 | 
					            Metrics.summary(SERVER_DRAIN_RATE_DISTRIBUTION_NAME, drainRateDistributionTags)
 | 
				
			||||||
 | 
					                .record(serverProcessingRatePerSecond);
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
          if (drainDuration > SLOW_DRAIN_THRESHOLD) {
 | 
					          if (drainDuration > SLOW_DRAIN_THRESHOLD) {
 | 
				
			||||||
            Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, tags).increment();
 | 
					            Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, tags).increment();
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
| 
						 | 
					@ -373,6 +404,28 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
 | 
				
			||||||
    });
 | 
					    });
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  private static String getQueueLengthBucketTag(long sentMessageCount) {
 | 
				
			||||||
 | 
					    // Per Datadog: https://docs.datadoghq.com/developers/guide/what-best-practices-are-recommended-for-naming-metrics-and-tags/
 | 
				
			||||||
 | 
					    // - Tags must start with a letter.
 | 
				
			||||||
 | 
					    // - May contain alphanumerics, underscores, minuses, colons, periods, and slashes. Other characters are converted to underscores.
 | 
				
			||||||
 | 
					    // - A trailing underscore is removed, whether if it originated from a converted character or if it was in the original tag value.
 | 
				
			||||||
 | 
					    if (sentMessageCount < 100) {
 | 
				
			||||||
 | 
					      return "b_0-100";
 | 
				
			||||||
 | 
					    } else if (sentMessageCount < 1000) {
 | 
				
			||||||
 | 
					      return "b_100-1000";
 | 
				
			||||||
 | 
					    } else if (sentMessageCount < 2500) {
 | 
				
			||||||
 | 
					      return "b_1000-2500";
 | 
				
			||||||
 | 
					    } else if (sentMessageCount < 5000) {
 | 
				
			||||||
 | 
					      return "b_2500-5000";
 | 
				
			||||||
 | 
					    } else if (sentMessageCount < 10_000) {
 | 
				
			||||||
 | 
					      return "b_5000-10000";
 | 
				
			||||||
 | 
					    } else if (sentMessageCount < 15_000) {
 | 
				
			||||||
 | 
					      return "b_10000-15000";
 | 
				
			||||||
 | 
					    } else {
 | 
				
			||||||
 | 
					      return "b_15000..";
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  private void processStoredMessages_reactive() {
 | 
					  private void processStoredMessages_reactive() {
 | 
				
			||||||
    assert useReactive;
 | 
					    assert useReactive;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -440,15 +493,23 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  private CompletableFuture<?> sendMessage(Envelope envelope) {
 | 
					  private CompletableFuture<?> sendMessage(Envelope envelope) {
 | 
				
			||||||
 | 
					    final long sendMessageStartNanos = System.nanoTime();
 | 
				
			||||||
    final UUID messageGuid = UUID.fromString(envelope.getServerGuid());
 | 
					    final UUID messageGuid = UUID.fromString(envelope.getServerGuid());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    final CompletableFuture<?> result;
 | 
				
			||||||
    if (envelope.getStory() && !client.shouldDeliverStories()) {
 | 
					    if (envelope.getStory() && !client.shouldDeliverStories()) {
 | 
				
			||||||
      messagesManager.delete(auth.getAccount().getUuid(), device.getId(), messageGuid, envelope.getServerTimestamp());
 | 
					      messagesManager.delete(auth.getAccount().getUuid(), device.getId(), messageGuid, envelope.getServerTimestamp());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      return CompletableFuture.completedFuture(null);
 | 
					      result = CompletableFuture.completedFuture(null);
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      return sendMessage(envelope, new StoredMessageInfo(messageGuid, envelope.getServerTimestamp()));
 | 
					      result = sendMessage(envelope, new StoredMessageInfo(messageGuid, envelope.getServerTimestamp()));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // This is likely a trivial amount of time, since we are just enqueuing outgoing messages to send asynchronously.
 | 
				
			||||||
 | 
					    final long sendMessageDurationNanos = System.nanoTime() - sendMessageStartNanos;
 | 
				
			||||||
 | 
					    queueServerProcessingNanosCounter.add(sendMessageDurationNanos);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return result;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  @Override
 | 
					  @Override
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue