Move `onErrorResume` to individual `sendMessage` `Mono`

This commit is contained in:
Chris Eager 2023-09-06 16:52:08 -05:00 committed by ravi-signal
parent 656326355a
commit a9a2e40fed
1 changed files with 11 additions and 10 deletions

View File

@ -360,17 +360,18 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
.limitRate(MESSAGE_PUBLISHER_LIMIT_RATE) .limitRate(MESSAGE_PUBLISHER_LIMIT_RATE)
.flatMapSequential(envelope -> .flatMapSequential(envelope ->
Mono.fromFuture(() -> sendMessage(envelope) Mono.fromFuture(() -> sendMessage(envelope)
.orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS))) .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS))
.onErrorResume( .onErrorResume(
// let the first error pass through to terminate the subscription // let the first error pass through to terminate the subscription
e -> { e -> {
final boolean firstError = !hasErrored.getAndSet(true); final boolean firstError = !hasErrored.getAndSet(true);
measureSendMessageErrors(e, firstError); measureSendMessageErrors(e, firstError);
return !firstError; return !firstError;
}, },
// otherwise just emit nothing // otherwise just emit nothing
e -> Mono.empty() e -> Mono.empty()
)
) )
.subscribeOn(messageDeliveryScheduler) .subscribeOn(messageDeliveryScheduler)
.subscribe( .subscribe(