diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 544de4692..645220b1d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -213,6 +213,13 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0); } + public CompletableFuture hasMessagesAsync(final UUID destinationUuid, final byte destinationDevice) { + return redisCluster.withBinaryCluster(connection -> + connection.async().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) + .thenApply(cardinality -> cardinality > 0)) + .toCompletableFuture(); + } + public Publisher get(final UUID destinationUuid, final byte destinationDevice) { final long earliestAllowableEphemeralTimestamp = diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index 88663972c..449187d00 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -129,6 +129,40 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { Metrics.counter(MESSAGES_STORED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", scheme.name())).increment(writeItems.size()); } + public CompletableFuture mayHaveMessages(final UUID accountIdentifier, final Device device) { + return Flux.fromIterable(dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes()) + .flatMap(scheme -> mayHaveMessages(accountIdentifier, device, scheme)) + .any(mayHaveMessages -> mayHaveMessages) + .toFuture(); + } + + private Mono mayHaveMessages(final UUID accountIdentifier, final Device device, final DynamoKeyScheme scheme) { + final AttributeValue partitionKey = convertPartitionKey(accountIdentifier, device, scheme); + + QueryRequest.Builder queryRequestBuilder = QueryRequest.builder() + .tableName(tableName) + .consistentRead(false) + .limit(1); + + queryRequestBuilder = switch (scheme) { + case TRADITIONAL -> queryRequestBuilder + .keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )") + .expressionAttributeNames(Map.of( + "#part", KEY_PARTITION, + "#sort", KEY_SORT)) + .expressionAttributeValues(Map.of( + ":part", partitionKey, + ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(device.getId(), scheme))); + case LAZY_DELETION -> queryRequestBuilder + .keyConditionExpression("#part = :part") + .expressionAttributeNames(Map.of("#part", KEY_PARTITION)) + .expressionAttributeValues(Map.of(":part", partitionKey)); + }; + + return Mono.fromFuture(dbAsyncClient.query(queryRequestBuilder.build()) + .thenApply(queryResponse -> queryResponse.count() > 0)); + } + public Publisher load(final UUID destinationAccountUuid, final Device device, final Integer limit) { return Flux.concat( dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes() diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index d07930211..9976d51c4 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -63,6 +63,13 @@ public class MessagesManager { } } + public CompletableFuture mayHaveMessages(final UUID destinationUuid, final Device destinationDevice) { + return messagesCache.hasMessagesAsync(destinationUuid, destinationDevice.getId()) + .thenCompose(hasMessages -> hasMessages + ? CompletableFuture.completedFuture(true) + : messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice)); + } + public boolean hasCachedMessages(final UUID destinationUuid, final byte destinationDevice) { return messagesCache.hasMessages(destinationUuid, destinationDevice); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 07c53cf5f..9f04dac46 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -201,6 +201,17 @@ class MessagesCacheTest { assertTrue(messagesCache.hasMessages(DESTINATION_UUID, DESTINATION_DEVICE_ID)); } + @Test + void testHasMessagesAsync() { + assertFalse(messagesCache.hasMessagesAsync(DESTINATION_UUID, DESTINATION_DEVICE_ID).join()); + + final UUID messageGuid = UUID.randomUUID(); + final MessageProtos.Envelope message = generateRandomMessage(messageGuid, true); + messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message); + + assertTrue(messagesCache.hasMessagesAsync(DESTINATION_UUID, DESTINATION_DEVICE_ID).join()); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testGetMessages(final boolean sealedSender) throws Exception { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java index e22c57ec9..322f8abd9 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java @@ -25,9 +25,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.reactivestreams.Publisher; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagesConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; @@ -416,4 +418,31 @@ class MessagesDynamoDbTest { .isEmpty(); } + @ParameterizedTest + @MethodSource + void mayHaveMessages(final List schemes) { + final UUID destinationUuid = UUID.randomUUID(); + final byte destinationDeviceId = (byte) (random.nextInt(Device.MAXIMUM_DEVICE_ID) + 1); + final Device destinationDevice = DevicesHelper.createDevice(destinationDeviceId); + + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + when(dynamicConfiguration.getMessagesConfiguration()).thenReturn(new DynamicMessagesConfiguration(schemes)); + + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + + assertThat(messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice).join()).isFalse(); + + messagesDynamoDb.store(List.of(MESSAGE1, MESSAGE2, MESSAGE3), destinationUuid, destinationDevice); + + assertThat(messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice).join()).isTrue(); + } + + private static List> mayHaveMessages() { + return List.of( + List.of(DynamicMessagesConfiguration.DynamoKeyScheme.TRADITIONAL), + List.of(DynamicMessagesConfiguration.DynamoKeyScheme.LAZY_DELETION), + List.of(DynamicMessagesConfiguration.DynamoKeyScheme.TRADITIONAL, DynamicMessagesConfiguration.DynamoKeyScheme.LAZY_DELETION), + List.of(DynamicMessagesConfiguration.DynamoKeyScheme.LAZY_DELETION, DynamicMessagesConfiguration.DynamoKeyScheme.TRADITIONAL) + ); + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java index c899f0939..e655fcb3b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java @@ -5,15 +5,21 @@ package org.whispersystems.textsecuregcm.storage; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; class MessagesManagerTest { @@ -46,4 +52,29 @@ class MessagesManagerTest { verifyNoMoreInteractions(reportMessageManager); } + + @ParameterizedTest + @CsvSource({ + "false, false, false", + "false, true, true", + "true, false, true", + "true, true, true" + }) + void mayHaveMessages(final boolean hasCachedMessages, final boolean hasPersistedMessages, final boolean expectMayHaveMessages) { + final UUID accountIdentifier = UUID.randomUUID(); + final Device device = mock(Device.class); + when(device.getId()).thenReturn(Device.PRIMARY_ID); + + when(messagesCache.hasMessagesAsync(accountIdentifier, Device.PRIMARY_ID)) + .thenReturn(CompletableFuture.completedFuture(hasCachedMessages)); + + when(messagesDynamoDb.mayHaveMessages(accountIdentifier, device)) + .thenReturn(CompletableFuture.completedFuture(hasPersistedMessages)); + + if (hasCachedMessages) { + verifyNoInteractions(messagesDynamoDb); + } + + assertEquals(expectMayHaveMessages, messagesManager.mayHaveMessages(accountIdentifier, device).join()); + } }