diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index 386431119..3f0992c28 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -24,7 +24,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Predicate; -import java.util.stream.Stream; import org.reactivestreams.Publisher; import org.slf4j.Logger; @@ -118,8 +117,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { } public CompletableFuture mayHaveMessages(final UUID accountIdentifier, final Device device) { - return - dbAsyncClient.query(QueryRequest.builder() + return dbAsyncClient.query(QueryRequest.builder() .tableName(tableName) .consistentRead(false) .limit(1) @@ -129,6 +127,12 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { .thenApply(queryResponse -> queryResponse.count() > 0); } + public CompletableFuture mayHaveUrgentMessages(final UUID accountIdentifier, final Device device) { + return Flux.from(load(accountIdentifier, device, null)) + .any(MessageProtos.Envelope::getUrgent) + .toFuture(); + } + public Publisher load(final UUID destinationAccountUuid, final Device device, final Integer limit) { QueryRequest.Builder queryRequestBuilder = QueryRequest.builder() .tableName(tableName) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 9414be5ea..35ece5bbb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -96,6 +96,10 @@ public class MessagesManager { }); } + public CompletableFuture mayHaveUrgentPersistedMessages(final UUID destinationUuid, final Device destinationDevice) { + return messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice); + } + public Mono, Boolean>> getMessagesForDevice(UUID destinationUuid, Device destinationDevice, boolean cachedMessagesOnly) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java index bb44bd815..0ea2206c3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java @@ -304,4 +304,37 @@ class MessagesDynamoDbTest { assertThat(messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice).join()).isTrue(); } + + @Test + void mayHaveUrgentMessages() { + final UUID destinationUuid = UUID.randomUUID(); + final byte destinationDeviceId = (byte) (random.nextInt(Device.MAXIMUM_DEVICE_ID) + 1); + final Device destinationDevice = DevicesHelper.createDevice(destinationDeviceId); + + assertThat(messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice).join()).isFalse(); + + { + final MessageProtos.Envelope nonUrgentMessage = MessageProtos.Envelope.newBuilder() + .setUrgent(false) + .setServerGuid(UUID.randomUUID().toString()) + .setDestinationServiceId(UUID.randomUUID().toString()) + .build(); + + messagesDynamoDb.store(List.of(nonUrgentMessage), destinationUuid, destinationDevice); + } + + assertThat(messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice).join()).isFalse(); + + { + final MessageProtos.Envelope urgentMessage = MessageProtos.Envelope.newBuilder() + .setUrgent(true) + .setServerGuid(UUID.randomUUID().toString()) + .setDestinationServiceId(UUID.randomUUID().toString()) + .build(); + + messagesDynamoDb.store(List.of(urgentMessage), destinationUuid, destinationDevice); + } + + assertThat(messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice).join()).isTrue(); + } }