diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index a46bc5305..9b0b0b206 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -258,17 +258,22 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac processStoredMessages(); } } else { - logger.debug("Failed to clear queue", cause); + if (client.isOpen()) { + logger.debug("Failed to clear queue", cause); - if (consecutiveRetries.incrementAndGet() > MAX_CONSECUTIVE_RETRIES) { - client.close(1011, "Failed to retrieve messages"); + if (consecutiveRetries.incrementAndGet() > MAX_CONSECUTIVE_RETRIES) { + client.close(1011, "Failed to retrieve messages"); + } else { + final List tags = List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent())); + + Metrics.counter(QUEUE_DRAIN_RETRY_COUNTER_NAME, tags).increment(); + + final long delay = RETRY_DELAY_MILLIS + random.nextInt(RETRY_DELAY_JITTER_MILLIS); + retryFuture + .set(retrySchedulingExecutor.schedule(this::processStoredMessages, delay, TimeUnit.MILLISECONDS)); + } } else { - final List tags = List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent())); - - Metrics.counter(QUEUE_DRAIN_RETRY_COUNTER_NAME, tags).increment(); - - final long delay = RETRY_DELAY_MILLIS + random.nextInt(RETRY_DELAY_JITTER_MILLIS); - retryFuture.set(retrySchedulingExecutor.schedule(this::processStoredMessages, delay, TimeUnit.MILLISECONDS)); + logger.debug("Client disconnected before queue cleared"); } } }); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 5d0b76a02..b3294937a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -16,6 +16,7 @@ import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -843,6 +844,7 @@ public class WebSocketConnectionTest { }); final WebSocketClient client = mock(WebSocketClient.class); + when(client.isOpen()).thenReturn(true); WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, retrySchedulingExecutor); @@ -853,6 +855,33 @@ public class WebSocketConnectionTest { verify(client).close(eq(1011), anyString()); } + @Test + public void testRetrieveMessageExceptionClientDisconnected() { + MessagesManager storedMessages = mock(MessagesManager.class); + + UUID accountUuid = UUID.randomUUID(); + + when(device.getId()).thenReturn(2L); + + when(account.getNumber()).thenReturn("+14152222222"); + when(account.getUuid()).thenReturn(accountUuid); + + String userAgent = "Signal-Android/4.68.3"; + + when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), userAgent, false)) + .thenThrow(new RedisException("OH NO")); + + final WebSocketClient client = mock(WebSocketClient.class); + when(client.isOpen()).thenReturn(false); + + WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, + retrySchedulingExecutor); + connection.start(); + + verify(retrySchedulingExecutor, never()).schedule(any(Runnable.class), anyLong(), any()); + verify(client, never()).close(anyInt(), anyString()); + } + private OutgoingMessageEntity createMessage(long id, boolean cached, String sender, UUID senderUuid, long timestamp, boolean receipt, String content) { return new OutgoingMessageEntity(id, cached, UUID.randomUUID(), receipt ? Envelope.Type.SERVER_DELIVERY_RECEIPT_VALUE : Envelope.Type.CIPHERTEXT_VALUE, null, timestamp, sender, senderUuid, 1, content.getBytes(), null, 0); diff --git a/websocket-resources/src/main/java/org/whispersystems/websocket/WebSocketClient.java b/websocket-resources/src/main/java/org/whispersystems/websocket/WebSocketClient.java index 43e50ab7f..2d8384789 100644 --- a/websocket-resources/src/main/java/org/whispersystems/websocket/WebSocketClient.java +++ b/websocket-resources/src/main/java/org/whispersystems/websocket/WebSocketClient.java @@ -84,6 +84,10 @@ public class WebSocketClient { return this.created; } + public boolean isOpen() { + return session.isOpen(); + } + public void close(int code, String message) { session.close(code, message); }