Don't attempt to send more messages if sending failed for any reason.

This commit is contained in:
Jon Chambers 2020-09-17 15:05:55 -04:00 committed by Jon Chambers
parent e9e18afb4a
commit e146135bd1
1 changed files with 27 additions and 24 deletions

View File

@ -126,7 +126,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
processStoredMessages(); processStoredMessages();
} }
private CompletableFuture<Void> sendMessage(final Envelope message, final Optional<StoredMessageInfo> storedMessageInfo) { private CompletableFuture<WebSocketResponseMessage> sendMessage(final Envelope message, final Optional<StoredMessageInfo> storedMessageInfo) {
try { try {
String header; String header;
Optional<byte[]> body; Optional<byte[]> body;
@ -141,8 +141,8 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
sendMessageMeter.mark(); sendMessageMeter.mark();
return client.sendRequest("PUT", "/api/v1/message", List.of(header, TimestampHeaderUtil.getTimestampHeader()), body) return client.sendRequest("PUT", "/api/v1/message", List.of(header, TimestampHeaderUtil.getTimestampHeader()), body).whenComplete((response, throwable) -> {
.thenAccept(response -> { if (throwable == null) {
boolean isReceipt = message.getType() == Envelope.Type.RECEIPT; boolean isReceipt = message.getType() == Envelope.Type.RECEIPT;
if (isSuccessResponse(response) && !isReceipt) { if (isSuccessResponse(response) && !isReceipt) {
@ -155,10 +155,9 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
} else if (!isSuccessResponse(response) && !storedMessageInfo.isPresent()) { } else if (!isSuccessResponse(response) && !storedMessageInfo.isPresent()) {
requeueMessage(message); requeueMessage(message);
} }
}) } else {
.exceptionally(throwable -> {
if (!storedMessageInfo.isPresent()) requeueMessage(message); if (!storedMessageInfo.isPresent()) requeueMessage(message);
return null; }
}); });
} catch (CryptoEncodingException e) { } catch (CryptoEncodingException e) {
logger.warn("Bad signaling key", e); logger.warn("Bad signaling key", e);
@ -201,13 +200,13 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueClearedFuture); sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueClearedFuture);
queueClearedFuture.whenComplete((v, cause) -> { queueClearedFuture.whenComplete((v, cause) -> {
if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) { if (cause == null && sentInitialQueueEmptyMessage.compareAndSet(false, true)) {
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty()); client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
} }
processStoredMessagesSemaphore.release(); processStoredMessagesSemaphore.release();
if (storedMessageState.get() != StoredMessageState.EMPTY) { if (cause == null && storedMessageState.get() != StoredMessageState.EMPTY) {
processStoredMessages(); processStoredMessages();
} }
}); });
@ -246,11 +245,15 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
} }
CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> { CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> {
if (cause == null) {
if (messages.hasMore()) { if (messages.hasMore()) {
sendNextMessagePage(cachedMessagesOnly, queueClearedFuture); sendNextMessagePage(cachedMessagesOnly, queueClearedFuture);
} else { } else {
queueClearedFuture.complete(null); queueClearedFuture.complete(null);
} }
} else {
queueClearedFuture.completeExceptionally(cause);
}
}); });
} }