Only retry websocket sending if the client is still connected

This commit is contained in:
Jon Chambers 2021-09-17 16:22:23 -04:00 committed by Jon Chambers
parent 2a67b2e610
commit 09519ae942
3 changed files with 47 additions and 9 deletions

View File

@ -258,17 +258,22 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
processStoredMessages();
}
} else {
logger.debug("Failed to clear queue", cause);
if (client.isOpen()) {
logger.debug("Failed to clear queue", cause);
if (consecutiveRetries.incrementAndGet() > MAX_CONSECUTIVE_RETRIES) {
client.close(1011, "Failed to retrieve messages");
if (consecutiveRetries.incrementAndGet() > MAX_CONSECUTIVE_RETRIES) {
client.close(1011, "Failed to retrieve messages");
} else {
final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent()));
Metrics.counter(QUEUE_DRAIN_RETRY_COUNTER_NAME, tags).increment();
final long delay = RETRY_DELAY_MILLIS + random.nextInt(RETRY_DELAY_JITTER_MILLIS);
retryFuture
.set(retrySchedulingExecutor.schedule(this::processStoredMessages, delay, TimeUnit.MILLISECONDS));
}
} else {
final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent()));
Metrics.counter(QUEUE_DRAIN_RETRY_COUNTER_NAME, tags).increment();
final long delay = RETRY_DELAY_MILLIS + random.nextInt(RETRY_DELAY_JITTER_MILLIS);
retryFuture.set(retrySchedulingExecutor.schedule(this::processStoredMessages, delay, TimeUnit.MILLISECONDS));
logger.debug("Client disconnected before queue cleared");
}
}
});

View File

@ -16,6 +16,7 @@ import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -843,6 +844,7 @@ public class WebSocketConnectionTest {
});
final WebSocketClient client = mock(WebSocketClient.class);
when(client.isOpen()).thenReturn(true);
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client,
retrySchedulingExecutor);
@ -853,6 +855,33 @@ public class WebSocketConnectionTest {
verify(client).close(eq(1011), anyString());
}
@Test
public void testRetrieveMessageExceptionClientDisconnected() {
MessagesManager storedMessages = mock(MessagesManager.class);
UUID accountUuid = UUID.randomUUID();
when(device.getId()).thenReturn(2L);
when(account.getNumber()).thenReturn("+14152222222");
when(account.getUuid()).thenReturn(accountUuid);
String userAgent = "Signal-Android/4.68.3";
when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), userAgent, false))
.thenThrow(new RedisException("OH NO"));
final WebSocketClient client = mock(WebSocketClient.class);
when(client.isOpen()).thenReturn(false);
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client,
retrySchedulingExecutor);
connection.start();
verify(retrySchedulingExecutor, never()).schedule(any(Runnable.class), anyLong(), any());
verify(client, never()).close(anyInt(), anyString());
}
private OutgoingMessageEntity createMessage(long id, boolean cached, String sender, UUID senderUuid, long timestamp, boolean receipt, String content) {
return new OutgoingMessageEntity(id, cached, UUID.randomUUID(), receipt ? Envelope.Type.SERVER_DELIVERY_RECEIPT_VALUE : Envelope.Type.CIPHERTEXT_VALUE,
null, timestamp, sender, senderUuid, 1, content.getBytes(), null, 0);

View File

@ -84,6 +84,10 @@ public class WebSocketClient {
return this.created;
}
public boolean isOpen() {
return session.isOpen();
}
public void close(int code, String message) {
session.close(code, message);
}