From fadcf62166b49fed2dccb2f10a1a5e94f7fd0474 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 9 Sep 2020 18:48:02 -0400 Subject: [PATCH] Send all messages via keyspace notifications when a feature flag is enabled. --- .../textsecuregcm/WhisperServerService.java | 2 +- .../textsecuregcm/push/WebsocketSender.java | 28 +++++++- .../websocket/WebSocketConnectionTest.java | 65 ++++++++++++++++++- 3 files changed, 88 insertions(+), 7 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 995b43ef8..8753e1fbc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -300,7 +300,7 @@ public class WhisperServerService extends Application>)invocation -> { + synchronized (sendCounter) { + sendCounter.incrementAndGet(); + sendCounter.notifyAll(); + } + + return CompletableFuture.completedFuture(successResponse); + }); + + // This is a little hacky and non-obvious, but because the first call to getMessagesForDevice returns empty list of + // messages, the call to CompletableFuture.allOf(...) in processStoredMessages will produce an instantly-succeeded + // future, and the whenComplete method will get called immediately on THIS thread, so we don't need to synchronize + // or wait for anything. + connection.onDispatchSubscribed("channel"); + + connection.handleNewMessagesAvailable(); + + synchronized (sendCounter) { + while (sendCounter.get() < 1) { + sendCounter.wait(); + } + } + + connection.handleNewMessagesAvailable(); + + synchronized (sendCounter) { + while (sendCounter.get() < 2) { + sendCounter.wait(); + } + } + + verify(client, times(1)).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); + verify(client, times(2)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), any(Optional.class)); + } + @Test public void testPendingSend() throws Exception { MessagesManager storedMessages = mock(MessagesManager.class); @@ -387,7 +446,7 @@ public class WebSocketConnectionTest { verify(client).close(anyInt(), anyString()); } - @Test + @Test(timeout = 5000L) public void testProcessStoredMessageConcurrency() throws InterruptedException { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); @@ -448,7 +507,7 @@ public class WebSocketConnectionTest { verify(messagesManager).getMessagesForDevice(anyString(), any(UUID.class), anyLong(), anyString(), eq(false)); } - @Test + @Test(timeout = 5000L) public void testProcessStoredMessagesMultiplePages() throws InterruptedException { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); @@ -520,7 +579,7 @@ public class WebSocketConnectionTest { verify(client, times(1)).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); } - @Test + @Test(timeout = 5000L) public void testRequeryOnStateMismatch() throws InterruptedException { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class);