Handle WebSocket sendMessage errors with onErrorResume

This commit is contained in:
Chris Eager 2023-09-06 13:17:47 -05:00 committed by Chris Eager
parent b701412295
commit 2d187abf13
1 changed files with 31 additions and 18 deletions

View File

@ -352,6 +352,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
final Publisher<Envelope> messages =
messagesManager.getMessagesForDeviceReactive(auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly);
final AtomicBoolean hasErrored = new AtomicBoolean();
final Disposable subscription = Flux.from(messages)
.name(SEND_MESSAGES_FLUX_NAME)
.tap(Micrometer.metrics(Metrics.globalRegistry))
@ -359,28 +361,23 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
.flatMapSequential(envelope ->
Mono.fromFuture(() -> sendMessage(envelope)
.orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)))
.onErrorResume(
// let the first error pass through to terminate the subscription
e -> {
final boolean firstError = !hasErrored.getAndSet(true);
measureSendMessageErrors(e, firstError);
return !firstError;
},
// otherwise just emit nothing
e -> Mono.empty()
)
.subscribeOn(messageDeliveryScheduler)
.subscribe(
// no additional consumer of values - it is Flux<Void> by now
null,
// the first error will terminate the stream, but we may get multiple errors from in-flight messages
e -> {
queueCleared.completeExceptionally(e);
final String errorType;
if (e instanceof TimeoutException) {
errorType = "timeout";
} else if (e instanceof java.nio.channels.ClosedChannelException) {
errorType = "closedChannel";
} else {
logger.warn("Send message failed", e);
errorType = "other";
}
final Tags tags = Tags.of(
UserAgentTagUtil.getPlatformTag(client.getUserAgent()),
Tag.of(ERROR_TYPE_TAG, errorType));
Metrics.counter(SEND_MESSAGE_ERROR_COUNTER, tags).increment();
},
// this first error will terminate the stream, but we may get multiple errors from in-flight messages
queueCleared::completeExceptionally,
// completion
() -> queueCleared.complete(null)
);
@ -388,6 +385,22 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
messageSubscription.set(subscription);
}
private void measureSendMessageErrors(Throwable e, final boolean terminal) {
final String errorType;
if (e instanceof TimeoutException) {
errorType = "timeout";
} else if (e instanceof java.nio.channels.ClosedChannelException) {
errorType = "closedChannel";
} else {
logger.warn(terminal ? "Send message failure terminated stream" : "Send message failed", e);
errorType = "other";
}
final Tags tags = Tags.of(
UserAgentTagUtil.getPlatformTag(client.getUserAgent()),
Tag.of(ERROR_TYPE_TAG, errorType));
Metrics.counter(SEND_MESSAGE_ERROR_COUNTER, tags).increment();
}
private CompletableFuture<Void> sendMessage(Envelope envelope) {
final UUID messageGuid = UUID.fromString(envelope.getServerGuid());