Add a method for checking for persisted urgent messages
This commit is contained in:
parent
bbe41278ed
commit
0e267509da
|
@ -24,7 +24,6 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.stream.Stream;
|
|
||||||
|
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -118,8 +117,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompletableFuture<Boolean> mayHaveMessages(final UUID accountIdentifier, final Device device) {
|
public CompletableFuture<Boolean> mayHaveMessages(final UUID accountIdentifier, final Device device) {
|
||||||
return
|
return dbAsyncClient.query(QueryRequest.builder()
|
||||||
dbAsyncClient.query(QueryRequest.builder()
|
|
||||||
.tableName(tableName)
|
.tableName(tableName)
|
||||||
.consistentRead(false)
|
.consistentRead(false)
|
||||||
.limit(1)
|
.limit(1)
|
||||||
|
@ -129,6 +127,12 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||||
.thenApply(queryResponse -> queryResponse.count() > 0);
|
.thenApply(queryResponse -> queryResponse.count() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<Boolean> mayHaveUrgentMessages(final UUID accountIdentifier, final Device device) {
|
||||||
|
return Flux.from(load(accountIdentifier, device, null))
|
||||||
|
.any(MessageProtos.Envelope::getUrgent)
|
||||||
|
.toFuture();
|
||||||
|
}
|
||||||
|
|
||||||
public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid, final Device device, final Integer limit) {
|
public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid, final Device device, final Integer limit) {
|
||||||
QueryRequest.Builder queryRequestBuilder = QueryRequest.builder()
|
QueryRequest.Builder queryRequestBuilder = QueryRequest.builder()
|
||||||
.tableName(tableName)
|
.tableName(tableName)
|
||||||
|
|
|
@ -96,6 +96,10 @@ public class MessagesManager {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<Boolean> mayHaveUrgentPersistedMessages(final UUID destinationUuid, final Device destinationDevice) {
|
||||||
|
return messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice);
|
||||||
|
}
|
||||||
|
|
||||||
public Mono<Pair<List<Envelope>, Boolean>> getMessagesForDevice(UUID destinationUuid, Device destinationDevice,
|
public Mono<Pair<List<Envelope>, Boolean>> getMessagesForDevice(UUID destinationUuid, Device destinationDevice,
|
||||||
boolean cachedMessagesOnly) {
|
boolean cachedMessagesOnly) {
|
||||||
|
|
||||||
|
|
|
@ -304,4 +304,37 @@ class MessagesDynamoDbTest {
|
||||||
|
|
||||||
assertThat(messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice).join()).isTrue();
|
assertThat(messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice).join()).isTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void mayHaveUrgentMessages() {
|
||||||
|
final UUID destinationUuid = UUID.randomUUID();
|
||||||
|
final byte destinationDeviceId = (byte) (random.nextInt(Device.MAXIMUM_DEVICE_ID) + 1);
|
||||||
|
final Device destinationDevice = DevicesHelper.createDevice(destinationDeviceId);
|
||||||
|
|
||||||
|
assertThat(messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice).join()).isFalse();
|
||||||
|
|
||||||
|
{
|
||||||
|
final MessageProtos.Envelope nonUrgentMessage = MessageProtos.Envelope.newBuilder()
|
||||||
|
.setUrgent(false)
|
||||||
|
.setServerGuid(UUID.randomUUID().toString())
|
||||||
|
.setDestinationServiceId(UUID.randomUUID().toString())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
messagesDynamoDb.store(List.of(nonUrgentMessage), destinationUuid, destinationDevice);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice).join()).isFalse();
|
||||||
|
|
||||||
|
{
|
||||||
|
final MessageProtos.Envelope urgentMessage = MessageProtos.Envelope.newBuilder()
|
||||||
|
.setUrgent(true)
|
||||||
|
.setServerGuid(UUID.randomUUID().toString())
|
||||||
|
.setDestinationServiceId(UUID.randomUUID().toString())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
messagesDynamoDb.store(List.of(urgentMessage), destinationUuid, destinationDevice);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice).join()).isTrue();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue