Measure individual message timeouts
This commit is contained in:
		
							parent
							
								
									69ed0edb74
								
							
						
					
					
						commit
						5bec89ecc8
					
				| 
						 | 
					@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
 | 
				
			||||||
import java.util.concurrent.ScheduledFuture;
 | 
					import java.util.concurrent.ScheduledFuture;
 | 
				
			||||||
import java.util.concurrent.Semaphore;
 | 
					import java.util.concurrent.Semaphore;
 | 
				
			||||||
import java.util.concurrent.TimeUnit;
 | 
					import java.util.concurrent.TimeUnit;
 | 
				
			||||||
 | 
					import java.util.concurrent.TimeoutException;
 | 
				
			||||||
import java.util.concurrent.atomic.AtomicBoolean;
 | 
					import java.util.concurrent.atomic.AtomicBoolean;
 | 
				
			||||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
					import java.util.concurrent.atomic.AtomicInteger;
 | 
				
			||||||
import java.util.concurrent.atomic.AtomicLong;
 | 
					import java.util.concurrent.atomic.AtomicLong;
 | 
				
			||||||
| 
						 | 
					@ -84,8 +85,11 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
 | 
				
			||||||
      "messageAvailableAfterClientClosed");
 | 
					      "messageAvailableAfterClientClosed");
 | 
				
			||||||
  private static final String SEND_MESSAGES_FLUX_NAME = MetricsUtil.name(WebSocketConnection.class,
 | 
					  private static final String SEND_MESSAGES_FLUX_NAME = MetricsUtil.name(WebSocketConnection.class,
 | 
				
			||||||
      "sendMessages");
 | 
					      "sendMessages");
 | 
				
			||||||
 | 
					  private static final String SEND_MESSAGE_TIMEOUT_COUNTER = MetricsUtil.name(WebSocketConnection.class,
 | 
				
			||||||
 | 
					      "sendMessageTimeout");
 | 
				
			||||||
  private static final String STATUS_CODE_TAG = "status";
 | 
					  private static final String STATUS_CODE_TAG = "status";
 | 
				
			||||||
  private static final String STATUS_MESSAGE_TAG = "message";
 | 
					  private static final String STATUS_MESSAGE_TAG = "message";
 | 
				
			||||||
 | 
					  private static final String ERROR_TYPE_TAG = "errorType";
 | 
				
			||||||
  private static final String REACTIVE_TAG = "reactive";
 | 
					  private static final String REACTIVE_TAG = "reactive";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  private static final long SLOW_DRAIN_THRESHOLD = 10_000;
 | 
					  private static final long SLOW_DRAIN_THRESHOLD = 10_000;
 | 
				
			||||||
| 
						 | 
					@ -426,7 +430,22 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
 | 
				
			||||||
        .metrics()
 | 
					        .metrics()
 | 
				
			||||||
        .limitRate(MESSAGE_PUBLISHER_LIMIT_RATE)
 | 
					        .limitRate(MESSAGE_PUBLISHER_LIMIT_RATE)
 | 
				
			||||||
        .flatMapSequential(envelope ->
 | 
					        .flatMapSequential(envelope ->
 | 
				
			||||||
            Mono.fromFuture(sendMessage(envelope).orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)))
 | 
					            Mono.fromFuture(sendMessage(envelope)
 | 
				
			||||||
 | 
					                    .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS))
 | 
				
			||||||
 | 
					                .doOnError(e -> {
 | 
				
			||||||
 | 
					                  final String errorType;
 | 
				
			||||||
 | 
					                  if (!(e instanceof TimeoutException)) {
 | 
				
			||||||
 | 
					                    // TimeoutExceptions are expected, no need to log
 | 
				
			||||||
 | 
					                    logger.warn("Send message failed", e);
 | 
				
			||||||
 | 
					                    errorType = "other";
 | 
				
			||||||
 | 
					                  } else {
 | 
				
			||||||
 | 
					                    errorType = "timeout";
 | 
				
			||||||
 | 
					                  }
 | 
				
			||||||
 | 
					                  final Tags tags = Tags.of(
 | 
				
			||||||
 | 
					                      UserAgentTagUtil.getPlatformTag(client.getUserAgent()),
 | 
				
			||||||
 | 
					                      Tag.of(ERROR_TYPE_TAG, errorType));
 | 
				
			||||||
 | 
					                  Metrics.counter(SEND_MESSAGE_TIMEOUT_COUNTER, tags).increment();
 | 
				
			||||||
 | 
					                }))
 | 
				
			||||||
        .doOnError(queueCleared::completeExceptionally)
 | 
					        .doOnError(queueCleared::completeExceptionally)
 | 
				
			||||||
        .doOnComplete(() -> queueCleared.complete(null))
 | 
					        .doOnComplete(() -> queueCleared.complete(null))
 | 
				
			||||||
        .subscribeOn(reactiveScheduler)
 | 
					        .subscribeOn(reactiveScheduler)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue