Make sure to release the semaphore even if something goes wrong getting messages.

This commit is contained in:
Jon Chambers 2021-03-18 17:45:44 -04:00 committed by Jon Chambers
parent 9778775046
commit 7509520883
1 changed files with 47 additions and 41 deletions

View File

@ -227,59 +227,65 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
} }
private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueClearedFuture) { private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueClearedFuture) {
final OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly); try {
final CompletableFuture<?>[] sendFutures = new CompletableFuture[messages.getMessages().size()]; final OutgoingMessageEntityList messages = messagesManager
.getMessagesForDevice(account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly);
for (int i = 0; i < messages.getMessages().size(); i++) { final CompletableFuture<?>[] sendFutures = new CompletableFuture[messages.getMessages().size()];
final OutgoingMessageEntity message = messages.getMessages().get(i);
final Envelope.Builder builder = Envelope.newBuilder()
.setType(Envelope.Type.valueOf(message.getType()))
.setTimestamp(message.getTimestamp())
.setServerTimestamp(message.getServerTimestamp());
if (!Util.isEmpty(message.getSource())) { for (int i = 0; i < messages.getMessages().size(); i++) {
builder.setSource(message.getSource()) final OutgoingMessageEntity message = messages.getMessages().get(i);
.setSourceDevice(message.getSourceDevice()); final Envelope.Builder builder = Envelope.newBuilder()
if (message.getSourceUuid() != null) { .setType(Envelope.Type.valueOf(message.getType()))
builder.setSourceUuid(message.getSourceUuid().toString()); .setTimestamp(message.getTimestamp())
.setServerTimestamp(message.getServerTimestamp());
if (!Util.isEmpty(message.getSource())) {
builder.setSource(message.getSource())
.setSourceDevice(message.getSourceDevice());
if (message.getSourceUuid() != null) {
builder.setSourceUuid(message.getSourceUuid().toString());
}
} }
}
if (message.getMessage() != null) { if (message.getMessage() != null) {
builder.setLegacyMessage(ByteString.copyFrom(message.getMessage())); builder.setLegacyMessage(ByteString.copyFrom(message.getMessage()));
} }
if (message.getContent() != null) { if (message.getContent() != null) {
builder.setContent(ByteString.copyFrom(message.getContent())); builder.setContent(ByteString.copyFrom(message.getContent()));
} }
if (message.getRelay() != null && !message.getRelay().isEmpty()) { if (message.getRelay() != null && !message.getRelay().isEmpty()) {
builder.setRelay(message.getRelay()); builder.setRelay(message.getRelay());
} }
final Envelope envelope = builder.build(); final Envelope envelope = builder.build();
if (envelope.getSerializedSize() > MAX_DESKTOP_MESSAGE_SIZE && isDesktopClient) { if (envelope.getSerializedSize() > MAX_DESKTOP_MESSAGE_SIZE && isDesktopClient) {
messagesManager.delete(account.getUuid(), device.getId(), message.getGuid()); messagesManager.delete(account.getUuid(), device.getId(), message.getGuid());
discardedMessagesMeter.mark(); discardedMessagesMeter.mark();
sendFutures[i] = CompletableFuture.completedFuture(null); sendFutures[i] = CompletableFuture.completedFuture(null);
} else {
sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getGuid())));
}
}
CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> {
if (cause == null) {
if (messages.hasMore()) {
sendNextMessagePage(cachedMessagesOnly, queueClearedFuture);
} else { } else {
queueClearedFuture.complete(null); sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getGuid())));
} }
} else {
queueClearedFuture.completeExceptionally(cause);
} }
});
CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> {
if (cause == null) {
if (messages.hasMore()) {
sendNextMessagePage(cachedMessagesOnly, queueClearedFuture);
} else {
queueClearedFuture.complete(null);
}
} else {
queueClearedFuture.completeExceptionally(cause);
}
});
} catch (final Exception e) {
queueClearedFuture.completeExceptionally(e);
}
} }
@Override @Override