From 8f53152c3e555bc03a44340c59e28392712c5e2c Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 9 Sep 2020 14:18:10 -0400 Subject: [PATCH] Only send the "queue cleared" message once per websocket session. --- .../websocket/WebSocketConnection.java | 11 +++++-- .../websocket/WebSocketConnectionTest.java | 32 +++++++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index c9c73a1ac..d81b9f753 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -32,10 +32,10 @@ import org.whispersystems.websocket.messages.WebSocketResponseMessage; import javax.ws.rs.WebApplicationException; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import static com.codahale.metrics.MetricRegistry.name; import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; @@ -65,7 +65,8 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability private final WebSocketClient client; private final String connectionId; - private boolean processingStoredMessages = false; + private boolean processingStoredMessages = false; + private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false); public WebSocketConnection(PushSender pushSender, ReceiptSender receiptSender, @@ -229,7 +230,11 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability if (messages.hasMore()) { processStoredMessages(); } else { - client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty()); + final boolean shouldSendEmptyQueueMessage; + + if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) { + client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty()); + } } }); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 6b02922d8..edc91d681 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -39,6 +39,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -490,6 +491,37 @@ public class WebSocketConnectionTest { verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); } + @Test + public void testProcessStoredMessagesSingleEmptyCall() { + final MessagesManager messagesManager = mock(MessagesManager.class); + final WebSocketClient client = mock(WebSocketClient.class); + final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency"); + + when(account.getNumber()).thenReturn("+18005551234"); + when(account.getUuid()).thenReturn(UUID.randomUUID()); + when(device.getId()).thenReturn(1L); + when(client.getUserAgent()).thenReturn("Test-UA"); + + when(messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), 1L, client.getUserAgent())).thenAnswer(new Answer() { + @Override + public OutgoingMessageEntityList answer(final InvocationOnMock invocation) throws Throwable { + return new OutgoingMessageEntityList(Collections.emptyList(), false); + } + }); + + final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); + when(successResponse.getStatus()).thenReturn(200); + + // This is a little hacky and non-obvious, but because we're always returning an empty list of messages, the call to + // CompletableFuture.allOf(...) in processStoredMessages will produce an instantly-succeeded future, and the + // whenComplete method will get called immediately on THIS thread, so we don't need to synchronize or wait for + // anything. + connection.processStoredMessages(); + connection.processStoredMessages(); + + verify(client, times(1)).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); + } + private OutgoingMessageEntity createMessage(long id, boolean cached, String sender, UUID senderUuid, long timestamp, boolean receipt, String content) { return new OutgoingMessageEntity(id, cached, UUID.randomUUID(), receipt ? Envelope.Type.RECEIPT_VALUE : Envelope.Type.CIPHERTEXT_VALUE, null, timestamp, sender, senderUuid, 1, content.getBytes(), null, 0);