diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index a715def60..d23cc8a88 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -195,17 +195,26 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability @VisibleForTesting void processStoredMessages() { if (processStoredMessagesSemaphore.tryAcquire()) { - final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY); + final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY); + final CompletableFuture queueClearedFuture = new CompletableFuture<>(); + + sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueClearedFuture); + + queueClearedFuture.whenComplete((v, cause) -> { + if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) { + client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty()); + } - if (state == StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE || state == StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE) { - sendNextMessagePage(state == StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE); - } else { processStoredMessagesSemaphore.release(); - } + + if (storedMessageState.get() != StoredMessageState.EMPTY) { + processStoredMessages(); + } + }); } } - private void sendNextMessagePage(final boolean cachedMessagesOnly) { + private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture queueClearedFuture) { final OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly); final CompletableFuture[] sendFutures = new CompletableFuture[messages.getMessages().size()]; @@ -238,14 +247,9 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> { if (messages.hasMore()) { - sendNextMessagePage(cachedMessagesOnly); + sendNextMessagePage(cachedMessagesOnly, queueClearedFuture); } else { - if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) { - client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty()); - } - - processStoredMessagesSemaphore.release(); - processStoredMessages(); + queueClearedFuture.complete(null); } }); }