diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index d23cc8a88..dda6b2d56 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -126,7 +126,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability processStoredMessages(); } - private CompletableFuture sendMessage(final Envelope message, final Optional storedMessageInfo) { + private CompletableFuture sendMessage(final Envelope message, final Optional storedMessageInfo) { try { String header; Optional body; @@ -141,25 +141,24 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability sendMessageMeter.mark(); - return client.sendRequest("PUT", "/api/v1/message", List.of(header, TimestampHeaderUtil.getTimestampHeader()), body) - .thenAccept(response -> { - boolean isReceipt = message.getType() == Envelope.Type.RECEIPT; + return client.sendRequest("PUT", "/api/v1/message", List.of(header, TimestampHeaderUtil.getTimestampHeader()), body).whenComplete((response, throwable) -> { + if (throwable == null) { + boolean isReceipt = message.getType() == Envelope.Type.RECEIPT; - if (isSuccessResponse(response) && !isReceipt) { - messageTime.update(System.currentTimeMillis() - message.getTimestamp()); - } + if (isSuccessResponse(response) && !isReceipt) { + messageTime.update(System.currentTimeMillis() - message.getTimestamp()); + } - if (isSuccessResponse(response)) { - if (storedMessageInfo.isPresent()) messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached); - if (!isReceipt) sendDeliveryReceiptFor(message); - } else if (!isSuccessResponse(response) && !storedMessageInfo.isPresent()) { - requeueMessage(message); - } - }) - .exceptionally(throwable -> { - if (!storedMessageInfo.isPresent()) requeueMessage(message); - return null; - }); + if (isSuccessResponse(response)) { + if (storedMessageInfo.isPresent()) messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached); + if (!isReceipt) sendDeliveryReceiptFor(message); + } else if (!isSuccessResponse(response) && !storedMessageInfo.isPresent()) { + requeueMessage(message); + } + } else { + if (!storedMessageInfo.isPresent()) requeueMessage(message); + } + }); } catch (CryptoEncodingException e) { logger.warn("Bad signaling key", e); return CompletableFuture.failedFuture(e); @@ -201,13 +200,13 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueClearedFuture); queueClearedFuture.whenComplete((v, cause) -> { - if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) { + if (cause == null && sentInitialQueueEmptyMessage.compareAndSet(false, true)) { client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty()); } processStoredMessagesSemaphore.release(); - if (storedMessageState.get() != StoredMessageState.EMPTY) { + if (cause == null && storedMessageState.get() != StoredMessageState.EMPTY) { processStoredMessages(); } }); @@ -227,7 +226,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability if (!Util.isEmpty(message.getSource())) { builder.setSource(message.getSource()) - .setSourceDevice(message.getSourceDevice()); + .setSourceDevice(message.getSourceDevice()); } if (message.getMessage() != null) { @@ -246,10 +245,14 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability } CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> { - if (messages.hasMore()) { - sendNextMessagePage(cachedMessagesOnly, queueClearedFuture); + if (cause == null) { + if (messages.hasMore()) { + sendNextMessagePage(cachedMessagesOnly, queueClearedFuture); + } else { + queueClearedFuture.complete(null); + } } else { - queueClearedFuture.complete(null); + queueClearedFuture.completeExceptionally(cause); } }); }