Consolidate semaphore release logic.

This commit is contained in:
Jon Chambers 2020-09-11 11:01:07 -04:00 committed by Jon Chambers
parent 89788fa665
commit c02b255766
1 changed files with 17 additions and 13 deletions

View File

@ -195,17 +195,26 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
@VisibleForTesting
void processStoredMessages() {
if (processStoredMessagesSemaphore.tryAcquire()) {
final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY);
final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY);
final CompletableFuture<Void> queueClearedFuture = new CompletableFuture<>();
sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueClearedFuture);
queueClearedFuture.whenComplete((v, cause) -> {
if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) {
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
}
if (state == StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE || state == StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE) {
sendNextMessagePage(state == StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE);
} else {
processStoredMessagesSemaphore.release();
}
if (storedMessageState.get() != StoredMessageState.EMPTY) {
processStoredMessages();
}
});
}
}
private void sendNextMessagePage(final boolean cachedMessagesOnly) {
private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueClearedFuture) {
final OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly);
final CompletableFuture<?>[] sendFutures = new CompletableFuture[messages.getMessages().size()];
@ -238,14 +247,9 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> {
if (messages.hasMore()) {
sendNextMessagePage(cachedMessagesOnly);
sendNextMessagePage(cachedMessagesOnly, queueClearedFuture);
} else {
if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) {
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
}
processStoredMessagesSemaphore.release();
processStoredMessages();
queueClearedFuture.complete(null);
}
});
}