From 2d187abf1313c32e8666ec0db2ddfc9cac528da6 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Wed, 6 Sep 2023 13:17:47 -0500 Subject: [PATCH] Handle WebSocket sendMessage errors with onErrorResume --- .../websocket/WebSocketConnection.java | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 7e01cd8f9..90c40c93f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -352,6 +352,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac final Publisher messages = messagesManager.getMessagesForDeviceReactive(auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly); + final AtomicBoolean hasErrored = new AtomicBoolean(); + final Disposable subscription = Flux.from(messages) .name(SEND_MESSAGES_FLUX_NAME) .tap(Micrometer.metrics(Metrics.globalRegistry)) @@ -359,28 +361,23 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac .flatMapSequential(envelope -> Mono.fromFuture(() -> sendMessage(envelope) .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS))) + .onErrorResume( + // let the first error pass through to terminate the subscription + e -> { + final boolean firstError = !hasErrored.getAndSet(true); + measureSendMessageErrors(e, firstError); + + return !firstError; + }, + // otherwise just emit nothing + e -> Mono.empty() + ) .subscribeOn(messageDeliveryScheduler) .subscribe( // no additional consumer of values - it is Flux by now null, - // the first error will terminate the stream, but we may get multiple errors from in-flight messages - e -> { - queueCleared.completeExceptionally(e); - - final String errorType; - if (e instanceof TimeoutException) { - errorType = "timeout"; - } else if (e instanceof java.nio.channels.ClosedChannelException) { - errorType = "closedChannel"; - } else { - logger.warn("Send message failed", e); - errorType = "other"; - } - final Tags tags = Tags.of( - UserAgentTagUtil.getPlatformTag(client.getUserAgent()), - Tag.of(ERROR_TYPE_TAG, errorType)); - Metrics.counter(SEND_MESSAGE_ERROR_COUNTER, tags).increment(); - }, + // this first error will terminate the stream, but we may get multiple errors from in-flight messages + queueCleared::completeExceptionally, // completion () -> queueCleared.complete(null) ); @@ -388,6 +385,22 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac messageSubscription.set(subscription); } + private void measureSendMessageErrors(Throwable e, final boolean terminal) { + final String errorType; + if (e instanceof TimeoutException) { + errorType = "timeout"; + } else if (e instanceof java.nio.channels.ClosedChannelException) { + errorType = "closedChannel"; + } else { + logger.warn(terminal ? "Send message failure terminated stream" : "Send message failed", e); + errorType = "other"; + } + final Tags tags = Tags.of( + UserAgentTagUtil.getPlatformTag(client.getUserAgent()), + Tag.of(ERROR_TYPE_TAG, errorType)); + Metrics.counter(SEND_MESSAGE_ERROR_COUNTER, tags).increment(); + } + private CompletableFuture sendMessage(Envelope envelope) { final UUID messageGuid = UUID.fromString(envelope.getServerGuid());