diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 147f62cff..887a91ff6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -84,8 +85,11 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac "messageAvailableAfterClientClosed"); private static final String SEND_MESSAGES_FLUX_NAME = MetricsUtil.name(WebSocketConnection.class, "sendMessages"); + private static final String SEND_MESSAGE_TIMEOUT_COUNTER = MetricsUtil.name(WebSocketConnection.class, + "sendMessageTimeout"); private static final String STATUS_CODE_TAG = "status"; private static final String STATUS_MESSAGE_TAG = "message"; + private static final String ERROR_TYPE_TAG = "errorType"; private static final String REACTIVE_TAG = "reactive"; private static final long SLOW_DRAIN_THRESHOLD = 10_000; @@ -426,7 +430,22 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac .metrics() .limitRate(MESSAGE_PUBLISHER_LIMIT_RATE) .flatMapSequential(envelope -> - Mono.fromFuture(sendMessage(envelope).orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS))) + Mono.fromFuture(sendMessage(envelope) + .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)) + .doOnError(e -> { + final String errorType; + if (!(e instanceof TimeoutException)) { + // TimeoutExceptions are expected, no need to log + logger.warn("Send message failed", e); + errorType = "other"; + } else { + errorType = "timeout"; + } + final Tags tags = Tags.of( + UserAgentTagUtil.getPlatformTag(client.getUserAgent()), + Tag.of(ERROR_TYPE_TAG, errorType)); + Metrics.counter(SEND_MESSAGE_TIMEOUT_COUNTER, tags).increment(); + })) .doOnError(queueCleared::completeExceptionally) .doOnComplete(() -> queueCleared.complete(null)) .subscribeOn(reactiveScheduler)