From a9a2e40fed4088369ddc1772c1253baa786a24a0 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Wed, 6 Sep 2023 16:52:08 -0500 Subject: [PATCH] Move `onErrorResume` to individual `sendMessage` `Mono` --- .../websocket/WebSocketConnection.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 90c40c93f..b64bf7196 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -360,17 +360,18 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac .limitRate(MESSAGE_PUBLISHER_LIMIT_RATE) .flatMapSequential(envelope -> Mono.fromFuture(() -> sendMessage(envelope) - .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS))) - .onErrorResume( - // let the first error pass through to terminate the subscription - e -> { - final boolean firstError = !hasErrored.getAndSet(true); - measureSendMessageErrors(e, firstError); + .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)) + .onErrorResume( + // let the first error pass through to terminate the subscription + e -> { + final boolean firstError = !hasErrored.getAndSet(true); + measureSendMessageErrors(e, firstError); - return !firstError; - }, - // otherwise just emit nothing - e -> Mono.empty() + return !firstError; + }, + // otherwise just emit nothing + e -> Mono.empty() + ) ) .subscribeOn(messageDeliveryScheduler) .subscribe(