Refactor WebSocket message sending error and completion to subscriber from “doOn…”

This commit is contained in:
Chris Eager 2023-03-10 11:38:59 -06:00 committed by Chris Eager
parent fbdcb942e8
commit 292f69256e
2 changed files with 54 additions and 59 deletions

View File

@ -210,7 +210,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
client.close(1000, "OK"); client.close(1000, "OK");
} }
private CompletableFuture<?> sendMessage(final Envelope message, StoredMessageInfo storedMessageInfo) { private CompletableFuture<Void> sendMessage(final Envelope message, StoredMessageInfo storedMessageInfo) {
// clear ephemeral field from the envelope // clear ephemeral field from the envelope
final Optional<byte[]> body = Optional.ofNullable(message.toBuilder().clearEphemeral().build().toByteArray()); final Optional<byte[]> body = Optional.ofNullable(message.toBuilder().clearEphemeral().build().toByteArray());
@ -227,11 +227,12 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
sendFailuresMeter.mark(); sendFailuresMeter.mark();
} }
}).thenCompose(response -> { }).thenCompose(response -> {
final CompletableFuture<?> result; final CompletableFuture<Void> result;
if (isSuccessResponse(response)) { if (isSuccessResponse(response)) {
result = messagesManager.delete(auth.getAccount().getUuid(), device.getId(), result = messagesManager.delete(auth.getAccount().getUuid(), device.getId(),
storedMessageInfo.guid(), storedMessageInfo.serverTimestamp()); storedMessageInfo.guid(), storedMessageInfo.serverTimestamp())
.thenApply(ignored -> null);
if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) { if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) {
recordMessageDeliveryDuration(message.getTimestamp(), device); recordMessageDeliveryDuration(message.getTimestamp(), device);
@ -364,31 +365,37 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
.limitRate(MESSAGE_PUBLISHER_LIMIT_RATE) .limitRate(MESSAGE_PUBLISHER_LIMIT_RATE)
.flatMapSequential(envelope -> .flatMapSequential(envelope ->
Mono.fromFuture(sendMessage(envelope) Mono.fromFuture(sendMessage(envelope)
.orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)) .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)))
.doOnError(e -> {
final String errorType;
if (e instanceof TimeoutException) {
errorType = "timeout";
} else if (e instanceof java.nio.channels.ClosedChannelException) {
errorType = "closedChannel";
} else {
logger.warn("Send message failed", e);
errorType = "other";
}
final Tags tags = Tags.of(
UserAgentTagUtil.getPlatformTag(client.getUserAgent()),
Tag.of(ERROR_TYPE_TAG, errorType));
Metrics.counter(SEND_MESSAGE_ERROR_COUNTER, tags).increment();
}))
.doOnError(queueCleared::completeExceptionally)
.doOnComplete(() -> queueCleared.complete(null))
.subscribeOn(reactiveScheduler) .subscribeOn(reactiveScheduler)
.subscribe(); .subscribe(
// no additional consumer of values - it is Flux<Void> by now
null,
// the first error will terminate the stream, but we may get multiple errors from in-flight messages
e -> {
queueCleared.completeExceptionally(e);
final String errorType;
if (e instanceof TimeoutException) {
errorType = "timeout";
} else if (e instanceof java.nio.channels.ClosedChannelException) {
errorType = "closedChannel";
} else {
logger.warn("Send message failed", e);
errorType = "other";
}
final Tags tags = Tags.of(
UserAgentTagUtil.getPlatformTag(client.getUserAgent()),
Tag.of(ERROR_TYPE_TAG, errorType));
Metrics.counter(SEND_MESSAGE_ERROR_COUNTER, tags).increment();
},
// completion
() -> queueCleared.complete(null)
);
messageSubscription.set(subscription); messageSubscription.set(subscription);
} }
private CompletableFuture<?> sendMessage(Envelope envelope) { private CompletableFuture<Void> sendMessage(Envelope envelope) {
final UUID messageGuid = UUID.fromString(envelope.getServerGuid()); final UUID messageGuid = UUID.fromString(envelope.getServerGuid());
if (envelope.getStory() && !client.shouldDeliverStories()) { if (envelope.getStory() && !client.shouldDeliverStories()) {

View File

@ -87,6 +87,7 @@ class WebSocketConnectionTest {
private Device device; private Device device;
private AuthenticatedAccount auth; private AuthenticatedAccount auth;
private UpgradeRequest upgradeRequest; private UpgradeRequest upgradeRequest;
private MessagesManager messagesManager;
private ReceiptSender receiptSender; private ReceiptSender receiptSender;
private ScheduledExecutorService retrySchedulingExecutor; private ScheduledExecutorService retrySchedulingExecutor;
@ -98,6 +99,7 @@ class WebSocketConnectionTest {
device = mock(Device.class); device = mock(Device.class);
auth = new AuthenticatedAccount(() -> new Pair<>(account, device)); auth = new AuthenticatedAccount(() -> new Pair<>(account, device));
upgradeRequest = mock(UpgradeRequest.class); upgradeRequest = mock(UpgradeRequest.class);
messagesManager = mock(MessagesManager.class);
receiptSender = mock(ReceiptSender.class); receiptSender = mock(ReceiptSender.class);
retrySchedulingExecutor = mock(ScheduledExecutorService.class); retrySchedulingExecutor = mock(ScheduledExecutorService.class);
} }
@ -109,9 +111,8 @@ class WebSocketConnectionTest {
@Test @Test
void testCredentials() { void testCredentials() {
MessagesManager storedMessages = mock(MessagesManager.class);
WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator); WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator);
AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, storedMessages, AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, messagesManager,
mock(PushNotificationManager.class), mock(ClientPresenceManager.class), mock(PushNotificationManager.class), mock(ClientPresenceManager.class),
retrySchedulingExecutor); retrySchedulingExecutor);
WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class);
@ -145,7 +146,6 @@ class WebSocketConnectionTest {
@Test @Test
void testOpen() { void testOpen() {
MessagesManager storedMessages = mock(MessagesManager.class);
UUID accountUuid = UUID.randomUUID(); UUID accountUuid = UUID.randomUUID();
UUID senderOneUuid = UUID.randomUUID(); UUID senderOneUuid = UUID.randomUUID();
@ -171,9 +171,12 @@ class WebSocketConnectionTest {
when(accountsManager.getByE164("sender1")).thenReturn(Optional.of(sender1)); when(accountsManager.getByE164("sender1")).thenReturn(Optional.of(sender1));
when(accountsManager.getByE164("sender2")).thenReturn(Optional.empty()); when(accountsManager.getByE164("sender2")).thenReturn(Optional.empty());
when(messagesManager.delete(any(), anyLong(), any(), any())).thenReturn(
CompletableFuture.completedFuture(Optional.empty()));
String userAgent = HttpHeaders.USER_AGENT; String userAgent = HttpHeaders.USER_AGENT;
when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) when(messagesManager.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false))
.thenReturn(Flux.fromIterable(outgoingMessages)); .thenReturn(Flux.fromIterable(outgoingMessages));
final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>(); final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
@ -187,7 +190,7 @@ class WebSocketConnectionTest {
return future; return future;
}); });
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager,
auth, device, client, retrySchedulingExecutor, Schedulers.immediate()); auth, device, client, retrySchedulingExecutor, Schedulers.immediate());
connection.start(); connection.start();
@ -203,7 +206,7 @@ class WebSocketConnectionTest {
futures.get(0).completeExceptionally(new IOException()); futures.get(0).completeExceptionally(new IOException());
futures.get(2).completeExceptionally(new IOException()); futures.get(2).completeExceptionally(new IOException());
verify(storedMessages, times(1)).delete(eq(accountUuid), eq(deviceId), verify(messagesManager, times(1)).delete(eq(accountUuid), eq(deviceId),
eq(UUID.fromString(outgoingMessages.get(1).getServerGuid())), eq(outgoingMessages.get(1).getServerTimestamp())); eq(UUID.fromString(outgoingMessages.get(1).getServerGuid())), eq(outgoingMessages.get(1).getServerTimestamp()));
verify(receiptSender, times(1)).sendReceipt(eq(accountUuid), eq(deviceId), eq(senderOneUuid), verify(receiptSender, times(1)).sendReceipt(eq(accountUuid), eq(deviceId), eq(senderOneUuid),
eq(2222L)); eq(2222L));
@ -214,7 +217,6 @@ class WebSocketConnectionTest {
@Test @Test
public void testOnlineSend() { public void testOnlineSend() {
final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
@ -277,8 +279,6 @@ class WebSocketConnectionTest {
@Test @Test
void testPendingSend() { void testPendingSend() {
MessagesManager storedMessages = mock(MessagesManager.class);
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
final UUID senderTwoUuid = UUID.randomUUID(); final UUID senderTwoUuid = UUID.randomUUID();
@ -319,9 +319,12 @@ class WebSocketConnectionTest {
when(accountsManager.getByE164("sender1")).thenReturn(Optional.of(sender1)); when(accountsManager.getByE164("sender1")).thenReturn(Optional.of(sender1));
when(accountsManager.getByE164("sender2")).thenReturn(Optional.empty()); when(accountsManager.getByE164("sender2")).thenReturn(Optional.empty());
when(messagesManager.delete(any(), anyLong(), any(), any())).thenReturn(
CompletableFuture.completedFuture(Optional.empty()));
String userAgent = HttpHeaders.USER_AGENT; String userAgent = HttpHeaders.USER_AGENT;
when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) when(messagesManager.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false))
.thenReturn(Flux.fromIterable(pendingMessages)); .thenReturn(Flux.fromIterable(pendingMessages));
final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>(); final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
@ -335,7 +338,7 @@ class WebSocketConnectionTest {
return future; return future;
}); });
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager,
auth, device, client, retrySchedulingExecutor, Schedulers.immediate()); auth, device, client, retrySchedulingExecutor, Schedulers.immediate());
connection.start(); connection.start();
@ -358,7 +361,6 @@ class WebSocketConnectionTest {
@Test @Test
void testProcessStoredMessageConcurrency() { void testProcessStoredMessageConcurrency() {
final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
@ -424,7 +426,6 @@ class WebSocketConnectionTest {
@Test @Test
void testProcessStoredMessagesMultiplePages() { void testProcessStoredMessagesMultiplePages() {
final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
@ -477,7 +478,6 @@ class WebSocketConnectionTest {
@Test @Test
void testProcessStoredMessagesContainsSenderUuid() { void testProcessStoredMessagesContainsSenderUuid() {
final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
@ -540,7 +540,6 @@ class WebSocketConnectionTest {
@Test @Test
void testProcessStoredMessagesSingleEmptyCall() { void testProcessStoredMessagesSingleEmptyCall() {
final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
@ -570,7 +569,6 @@ class WebSocketConnectionTest {
@Test @Test
public void testRequeryOnStateMismatch() { public void testRequeryOnStateMismatch() {
final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
@ -627,7 +625,6 @@ class WebSocketConnectionTest {
@Test @Test
void testProcessCachedMessagesOnly() { void testProcessCachedMessagesOnly() {
final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
@ -660,7 +657,6 @@ class WebSocketConnectionTest {
@Test @Test
void testProcessDatabaseMessagesAfterPersist() { void testProcessDatabaseMessagesAfterPersist() {
final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
@ -690,8 +686,6 @@ class WebSocketConnectionTest {
@Test @Test
void testRetrieveMessageException() { void testRetrieveMessageException() {
MessagesManager storedMessages = mock(MessagesManager.class);
UUID accountUuid = UUID.randomUUID(); UUID accountUuid = UUID.randomUUID();
when(device.getId()).thenReturn(2L); when(device.getId()).thenReturn(2L);
@ -699,7 +693,7 @@ class WebSocketConnectionTest {
when(account.getNumber()).thenReturn("+14152222222"); when(account.getNumber()).thenReturn("+14152222222");
when(account.getUuid()).thenReturn(accountUuid); when(account.getUuid()).thenReturn(accountUuid);
when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) when(messagesManager.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false))
.thenReturn(Flux.error(new RedisException("OH NO"))); .thenReturn(Flux.error(new RedisException("OH NO")));
when(retrySchedulingExecutor.schedule(any(Runnable.class), anyLong(), any())).thenAnswer( when(retrySchedulingExecutor.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(
@ -711,7 +705,7 @@ class WebSocketConnectionTest {
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
when(client.isOpen()).thenReturn(true); when(client.isOpen()).thenReturn(true);
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
connection.start(); connection.start();
@ -722,8 +716,6 @@ class WebSocketConnectionTest {
@Test @Test
void testRetrieveMessageExceptionClientDisconnected() { void testRetrieveMessageExceptionClientDisconnected() {
MessagesManager storedMessages = mock(MessagesManager.class);
UUID accountUuid = UUID.randomUUID(); UUID accountUuid = UUID.randomUUID();
when(device.getId()).thenReturn(2L); when(device.getId()).thenReturn(2L);
@ -731,13 +723,13 @@ class WebSocketConnectionTest {
when(account.getNumber()).thenReturn("+14152222222"); when(account.getNumber()).thenReturn("+14152222222");
when(account.getUuid()).thenReturn(accountUuid); when(account.getUuid()).thenReturn(accountUuid);
when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) when(messagesManager.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false))
.thenReturn(Flux.error(new RedisException("OH NO"))); .thenReturn(Flux.error(new RedisException("OH NO")));
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
when(client.isOpen()).thenReturn(false); when(client.isOpen()).thenReturn(false);
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
connection.start(); connection.start();
@ -748,8 +740,6 @@ class WebSocketConnectionTest {
@Test @Test
@Disabled("This test is flaky") @Disabled("This test is flaky")
void testReactivePublisherLimitRate() { void testReactivePublisherLimitRate() {
MessagesManager storedMessages = mock(MessagesManager.class);
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
final long deviceId = 2L; final long deviceId = 2L;
@ -771,7 +761,7 @@ class WebSocketConnectionTest {
}); });
}); });
when(storedMessages.getMessagesForDeviceReactive(eq(accountUuid), eq(deviceId), anyBoolean())) when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(deviceId), anyBoolean()))
.thenReturn(flux); .thenReturn(flux);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
@ -779,10 +769,10 @@ class WebSocketConnectionTest {
final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
when(successResponse.getStatus()).thenReturn(200); when(successResponse.getStatus()).thenReturn(200);
when(client.sendRequest(any(), any(), any(), any())).thenReturn(CompletableFuture.completedFuture(successResponse)); when(client.sendRequest(any(), any(), any(), any())).thenReturn(CompletableFuture.completedFuture(successResponse));
when(storedMessages.delete(any(), anyLong(), any(), any())).thenReturn( when(messagesManager.delete(any(), anyLong(), any(), any())).thenReturn(
CompletableFuture.completedFuture(Optional.empty())); CompletableFuture.completedFuture(Optional.empty()));
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor); retrySchedulingExecutor);
connection.start(); connection.start();
@ -808,8 +798,6 @@ class WebSocketConnectionTest {
@Test @Test
void testReactivePublisherDisposedWhenConnectionStopped() { void testReactivePublisherDisposedWhenConnectionStopped() {
MessagesManager storedMessages = mock(MessagesManager.class);
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
final long deviceId = 2L; final long deviceId = 2L;
@ -830,7 +818,7 @@ class WebSocketConnectionTest {
s.onCancel(() -> canceled.set(true)); s.onCancel(() -> canceled.set(true));
}); });
when(storedMessages.getMessagesForDeviceReactive(eq(accountUuid), eq(deviceId), anyBoolean())) when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(deviceId), anyBoolean()))
.thenReturn(flux); .thenReturn(flux);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
@ -838,10 +826,10 @@ class WebSocketConnectionTest {
final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
when(successResponse.getStatus()).thenReturn(200); when(successResponse.getStatus()).thenReturn(200);
when(client.sendRequest(any(), any(), any(), any())).thenReturn(CompletableFuture.completedFuture(successResponse)); when(client.sendRequest(any(), any(), any(), any())).thenReturn(CompletableFuture.completedFuture(successResponse));
when(storedMessages.delete(any(), anyLong(), any(), any())).thenReturn( when(messagesManager.delete(any(), anyLong(), any(), any())).thenReturn(
CompletableFuture.completedFuture(Optional.empty())); CompletableFuture.completedFuture(Optional.empty()));
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
connection.start(); connection.start();