Only send the "queue cleared" message once per websocket session.

This commit is contained in:
Jon Chambers 2020-09-09 14:18:10 -04:00 committed by Jon Chambers
parent 7bbc88d716
commit 8f53152c3e
2 changed files with 40 additions and 3 deletions

View File

@ -32,10 +32,10 @@ import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import javax.ws.rs.WebApplicationException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
@ -65,7 +65,8 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
private final WebSocketClient client;
private final String connectionId;
private boolean processingStoredMessages = false;
private boolean processingStoredMessages = false;
private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
public WebSocketConnection(PushSender pushSender,
ReceiptSender receiptSender,
@ -229,7 +230,11 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
if (messages.hasMore()) {
processStoredMessages();
} else {
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
final boolean shouldSendEmptyQueueMessage;
if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) {
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
}
}
});
}

View File

@ -39,6 +39,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -490,6 +491,37 @@ public class WebSocketConnectionTest {
verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty()));
}
@Test
public void testProcessStoredMessagesSingleEmptyCall() {
final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency");
when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID());
when(device.getId()).thenReturn(1L);
when(client.getUserAgent()).thenReturn("Test-UA");
when(messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), 1L, client.getUserAgent())).thenAnswer(new Answer<OutgoingMessageEntityList>() {
@Override
public OutgoingMessageEntityList answer(final InvocationOnMock invocation) throws Throwable {
return new OutgoingMessageEntityList(Collections.emptyList(), false);
}
});
final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
when(successResponse.getStatus()).thenReturn(200);
// This is a little hacky and non-obvious, but because we're always returning an empty list of messages, the call to
// CompletableFuture.allOf(...) in processStoredMessages will produce an instantly-succeeded future, and the
// whenComplete method will get called immediately on THIS thread, so we don't need to synchronize or wait for
// anything.
connection.processStoredMessages();
connection.processStoredMessages();
verify(client, times(1)).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty()));
}
private OutgoingMessageEntity createMessage(long id, boolean cached, String sender, UUID senderUuid, long timestamp, boolean receipt, String content) {
return new OutgoingMessageEntity(id, cached, UUID.randomUUID(), receipt ? Envelope.Type.RECEIPT_VALUE : Envelope.Type.CIPHERTEXT_VALUE,
null, timestamp, sender, senderUuid, 1, content.getBytes(), null, 0);