diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 569453bbd..e357df125 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -472,10 +472,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, ClientE PRESENCE_MANAGER_TAG, "legacy") .increment(); - storedMessageState.compareAndSet(StoredMessageState.EMPTY, StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE); - - processStoredMessages(); - return true; } @@ -484,6 +480,10 @@ public class WebSocketConnection implements MessageAvailabilityListener, ClientE Metrics.counter(MESSAGE_AVAILABLE_COUNTER_NAME, PRESENCE_MANAGER_TAG, "pubsub") .increment(); + + storedMessageState.compareAndSet(StoredMessageState.EMPTY, StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE); + + processStoredMessages(); } @Override @@ -498,10 +498,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, ClientE PRESENCE_MANAGER_TAG, "legacy") .increment(); - storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE); - - processStoredMessages(); - return true; } @@ -510,6 +506,10 @@ public class WebSocketConnection implements MessageAvailabilityListener, ClientE Metrics.counter(MESSAGES_PERSISTED_COUNTER_NAME, PRESENCE_MANAGER_TAG, "pubsub") .increment(); + + storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE); + + processStoredMessages(); } @Override diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 1e871f8c8..1af3b9a67 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -268,7 +268,7 @@ class WebSocketConnectionTest { // or wait for anything. connection.start(); - connection.handleNewMessagesAvailable(); + connection.handleNewMessageAvailable(); synchronized (sendCounter) { while (sendCounter.get() < 1) { @@ -276,7 +276,7 @@ class WebSocketConnectionTest { } } - connection.handleNewMessagesAvailable(); + connection.handleNewMessageAvailable(); synchronized (sendCounter) { while (sendCounter.get() < 2) { @@ -693,7 +693,7 @@ class WebSocketConnectionTest { when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), any(Optional.class))) .thenAnswer(invocation -> { - connection.handleNewMessagesAvailable(); + connection.handleNewMessageAvailable(); return CompletableFuture.completedFuture(successResponse); }); @@ -741,7 +741,7 @@ class WebSocketConnectionTest { verify(messagesManager).getMessagesForDeviceReactive(account.getUuid(), device, false); - connection.handleNewMessagesAvailable(); + connection.handleNewMessageAvailable(); verify(messagesManager).getMessagesForDeviceReactive(account.getUuid(), device, true); } @@ -769,7 +769,7 @@ class WebSocketConnectionTest { // whenComplete method will get called immediately on THIS thread, so we don't need to synchronize or wait for // anything. connection.processStoredMessages(); - connection.handleMessagesPersisted(); + connection.handleMessagesPersistedPubSub(); verify(messagesManager, times(2)).getMessagesForDeviceReactive(account.getUuid(), device, false); }