diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index ceee674e1..2b0906267 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -205,19 +205,42 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability cachedMessagesOnly = lastPersistedState <= lastDatabaseClearedState; } - OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly); - CompletableFuture[] sendFutures = new CompletableFuture[messages.getMessages().size()]; + sendNextMessagePage(cachedMessagesOnly).thenAccept(hasMoreStoredMessages -> { + final boolean mayHaveMoreMessages; + + synchronized (this) { + processingStoredMessages = false; + mayHaveMoreMessages = hasMoreStoredMessages || storedMessageState > processedState; + } + + if (mayHaveMoreMessages) { + processStoredMessages(); + } else { + synchronized (this) { + lastDatabaseClearedState = processedState; + } + + if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) { + client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty()); + } + } + }); + } + + private CompletableFuture sendNextMessagePage(final boolean cachedMessagesOnly) { + final OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly); + final CompletableFuture[] sendFutures = new CompletableFuture[messages.getMessages().size()]; for (int i = 0; i < messages.getMessages().size(); i++) { - OutgoingMessageEntity message = messages.getMessages().get(i); - Envelope.Builder builder = Envelope.newBuilder() - .setType(Envelope.Type.valueOf(message.getType())) - .setTimestamp(message.getTimestamp()) - .setServerTimestamp(message.getServerTimestamp()); + final OutgoingMessageEntity message = messages.getMessages().get(i); + final Envelope.Builder builder = Envelope.newBuilder() + .setType(Envelope.Type.valueOf(message.getType())) + .setTimestamp(message.getTimestamp()) + .setServerTimestamp(message.getServerTimestamp()); if (!Util.isEmpty(message.getSource())) { builder.setSource(message.getSource()) - .setSourceDevice(message.getSourceDevice()); + .setSourceDevice(message.getSourceDevice()); } if (message.getMessage() != null) { @@ -235,26 +258,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getId(), message.isCached()))); } - CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> { - final boolean mayHaveMoreMessages; - - synchronized (this) { - processingStoredMessages = false; - mayHaveMoreMessages = messages.hasMore() || storedMessageState > processedState; - } - - if (mayHaveMoreMessages) { - processStoredMessages(); - } else { - synchronized (this) { - lastDatabaseClearedState = processedState; - } - - if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) { - client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty()); - } - } - }); + return CompletableFuture.allOf(sendFutures).handle((v, cause) -> messages.hasMore()); } @Override