Reduce page size in MessagesDynamoDb#mayHaveUrgentMessages
This commit is contained in:
parent
744b05244d
commit
e12ba6b15b
|
@ -56,6 +56,9 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static final String LOCAL_INDEX_MESSAGE_UUID_KEY_SORT = "U";
|
static final String LOCAL_INDEX_MESSAGE_UUID_KEY_SORT = "U";
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int MAY_HAVE_URGENT_MESSAGES_QUERY_LIMIT = 20;
|
||||||
|
|
||||||
private static final String KEY_TTL = "E";
|
private static final String KEY_TTL = "E";
|
||||||
private static final String KEY_ENVELOPE_BYTES = "EB";
|
private static final String KEY_ENVELOPE_BYTES = "EB";
|
||||||
|
|
||||||
|
@ -67,8 +70,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||||
private final ExecutorService messageDeletionExecutor;
|
private final ExecutorService messageDeletionExecutor;
|
||||||
private final Scheduler messageDeletionScheduler;
|
private final Scheduler messageDeletionScheduler;
|
||||||
|
|
||||||
private static final CompletableFuture<?>[] EMPTY_FUTURE_ARRAY = new CompletableFuture<?>[0];
|
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class);
|
private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class);
|
||||||
|
|
||||||
public MessagesDynamoDb(DynamoDbClient dynamoDb, DynamoDbAsyncClient dynamoDbAsyncClient, String tableName,
|
public MessagesDynamoDb(DynamoDbClient dynamoDb, DynamoDbAsyncClient dynamoDbAsyncClient, String tableName,
|
||||||
|
@ -126,7 +127,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompletableFuture<Boolean> mayHaveUrgentMessages(final UUID accountIdentifier, final Device device) {
|
public CompletableFuture<Boolean> mayHaveUrgentMessages(final UUID accountIdentifier, final Device device) {
|
||||||
return Flux.from(load(accountIdentifier, device, null))
|
return Flux.from(load(accountIdentifier, device, MAY_HAVE_URGENT_MESSAGES_QUERY_LIMIT))
|
||||||
.any(MessageProtos.Envelope::getUrgent)
|
.any(MessageProtos.Envelope::getUrgent)
|
||||||
.toFuture();
|
.toFuture();
|
||||||
}
|
}
|
||||||
|
|
|
@ -313,11 +313,14 @@ class MessagesDynamoDbTest {
|
||||||
|
|
||||||
assertThat(messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice).join()).isFalse();
|
assertThat(messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice).join()).isFalse();
|
||||||
|
|
||||||
|
// used as the stable sort key, and the urgent message should be sorted last
|
||||||
|
int serverTimestamp = 1;
|
||||||
{
|
{
|
||||||
final MessageProtos.Envelope nonUrgentMessage = MessageProtos.Envelope.newBuilder()
|
final MessageProtos.Envelope nonUrgentMessage = MessageProtos.Envelope.newBuilder()
|
||||||
.setUrgent(false)
|
.setUrgent(false)
|
||||||
.setServerGuid(UUID.randomUUID().toString())
|
.setServerGuid(UUID.randomUUID().toString())
|
||||||
.setDestinationServiceId(UUID.randomUUID().toString())
|
.setDestinationServiceId(destinationUuid.toString())
|
||||||
|
.setServerTimestamp(serverTimestamp++)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
messagesDynamoDb.store(List.of(nonUrgentMessage), destinationUuid, destinationDevice);
|
messagesDynamoDb.store(List.of(nonUrgentMessage), destinationUuid, destinationDevice);
|
||||||
|
@ -326,13 +329,26 @@ class MessagesDynamoDbTest {
|
||||||
assertThat(messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice).join()).isFalse();
|
assertThat(messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice).join()).isFalse();
|
||||||
|
|
||||||
{
|
{
|
||||||
final MessageProtos.Envelope urgentMessage = MessageProtos.Envelope.newBuilder()
|
final List<MessageProtos.Envelope> messages = new ArrayList<>();
|
||||||
|
// store more non-urgent messages
|
||||||
|
for (int i = 0; i < MessagesDynamoDb.MAY_HAVE_URGENT_MESSAGES_QUERY_LIMIT * 5; i++) {
|
||||||
|
messages.add(MessageProtos.Envelope.newBuilder()
|
||||||
|
.setUrgent(false)
|
||||||
|
.setServerGuid(UUID.randomUUID().toString())
|
||||||
|
.setDestinationServiceId(destinationUuid.toString())
|
||||||
|
.setServerTimestamp(serverTimestamp++)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
// and one urgent message
|
||||||
|
messages.add(MessageProtos.Envelope.newBuilder()
|
||||||
.setUrgent(true)
|
.setUrgent(true)
|
||||||
.setServerGuid(UUID.randomUUID().toString())
|
.setServerGuid(UUID.randomUUID().toString())
|
||||||
.setDestinationServiceId(UUID.randomUUID().toString())
|
.setDestinationServiceId(destinationUuid.toString())
|
||||||
.build();
|
.setServerTimestamp(serverTimestamp++)
|
||||||
|
.build());
|
||||||
|
|
||||||
messagesDynamoDb.store(List.of(urgentMessage), destinationUuid, destinationDevice);
|
messagesDynamoDb.store(messages, destinationUuid, destinationDevice);
|
||||||
}
|
}
|
||||||
|
|
||||||
assertThat(messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice).join()).isTrue();
|
assertThat(messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice).join()).isTrue();
|
||||||
|
|
Loading…
Reference in New Issue