diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index d78fdcdba..a715def60 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -37,6 +37,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static com.codahale.metrics.MetricRegistry.name; import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; @@ -66,10 +67,15 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability private final WebSocketClient client; private final String connectionId; - private final Semaphore processStoredMessagesSemaphore = new Semaphore(1); - private final AtomicBoolean newMessagesAvailable = new AtomicBoolean(true); - private final AtomicBoolean persistedMessagesAvailable = new AtomicBoolean(true); - private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false); + private final Semaphore processStoredMessagesSemaphore = new Semaphore(1); + private final AtomicReference storedMessageState = new AtomicReference<>(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE); + private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false); + + private enum StoredMessageState { + EMPTY, + CACHED_NEW_MESSAGES_AVAILABLE, + PERSISTED_NEW_MESSAGES_AVAILABLE + } public WebSocketConnection(PushSender pushSender, ReceiptSender receiptSender, @@ -96,7 +102,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability switch (pubSubMessage.getType().getNumber()) { case PubSubMessage.Type.QUERY_DB_VALUE: pubSubPersistedMeter.mark(); - newMessagesAvailable.set(true); + storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE); processStoredMessages(); break; case PubSubMessage.Type.DELIVER_VALUE: @@ -189,8 +195,10 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability @VisibleForTesting void processStoredMessages() { if (processStoredMessagesSemaphore.tryAcquire()) { - if (newMessagesAvailable.getAndSet(false)) { - sendNextMessagePage(!persistedMessagesAvailable.getAndSet(false)); + final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY); + + if (state == StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE || state == StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE) { + sendNextMessagePage(state == StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE); } else { processStoredMessagesSemaphore.release(); } @@ -246,7 +254,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability public void handleNewMessagesAvailable() { messageAvailableMeter.mark(); - newMessagesAvailable.set(true); + storedMessageState.compareAndSet(StoredMessageState.EMPTY, StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE); processStoredMessages(); } @@ -262,8 +270,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability public void handleMessagesPersisted() { messagesPersistedMeter.mark(); - persistedMessagesAvailable.set(true); - newMessagesAvailable.set(true); + storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE); processStoredMessages(); }