diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index 819cdac4d..386431119 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -118,38 +118,24 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { } public CompletableFuture mayHaveMessages(final UUID accountIdentifier, final Device device) { - final List> mayHaveMessagesFutures = - Stream.of(convertPartitionKeyDeprecated(accountIdentifier, device), convertPartitionKey(accountIdentifier, device)) - .distinct() - .map(partitionKey -> dbAsyncClient.query(QueryRequest.builder() - .tableName(tableName) - .consistentRead(false) - .limit(1) - .keyConditionExpression("#part = :part") - .expressionAttributeNames(Map.of("#part", KEY_PARTITION)) - .expressionAttributeValues(Map.of(":part", partitionKey)).build()) - .thenApply(queryResponse -> queryResponse.count() > 0)) - .toList(); - - return CompletableFuture.allOf(mayHaveMessagesFutures.toArray(EMPTY_FUTURE_ARRAY)) - .thenApply(ignored -> mayHaveMessagesFutures.stream().anyMatch(CompletableFuture::join)); + return + dbAsyncClient.query(QueryRequest.builder() + .tableName(tableName) + .consistentRead(false) + .limit(1) + .keyConditionExpression("#part = :part") + .expressionAttributeNames(Map.of("#part", KEY_PARTITION)) + .expressionAttributeValues(Map.of(":part", convertPartitionKey(accountIdentifier, device))).build()) + .thenApply(queryResponse -> queryResponse.count() > 0); } public Publisher load(final UUID destinationAccountUuid, final Device device, final Integer limit) { - return Flux.concat( - Stream.of(convertPartitionKeyDeprecated(destinationAccountUuid, device), convertPartitionKey(destinationAccountUuid, device)) - .distinct() - .map(pk -> load(limit, pk)) - .toList()); - } - - public Publisher load(final Integer limit, final AttributeValue partitionKey) { QueryRequest.Builder queryRequestBuilder = QueryRequest.builder() .tableName(tableName) .consistentRead(true) .keyConditionExpression("#part = :part") .expressionAttributeNames(Map.of("#part", KEY_PARTITION)) - .expressionAttributeValues(Map.of(":part", partitionKey)); + .expressionAttributeValues(Map.of(":part", convertPartitionKey(destinationAccountUuid, device))); if (limit != null) { // some callers don’t take advantage of reactive streams, so we want to support limiting the fetch size. Otherwise, @@ -173,19 +159,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { public CompletableFuture> deleteMessageByDestinationAndGuid( final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid) { - return Stream.of(convertPartitionKey(destinationAccountUuid, destinationDevice), - convertPartitionKeyDeprecated(destinationAccountUuid, destinationDevice)) - .distinct() - .map(pk -> deleteMessageByDestinationAndGuid(pk, messageUuid)) - // this combines the futures by producing a future that returns an arbitrary nonempty - // result if there is one, which should be OK because only one of the keys - // should produce a nonempty result for any given message uuid - .reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b))) - .get(); - } - - public CompletableFuture> deleteMessageByDestinationAndGuid( - final AttributeValue partitionKey, final UUID messageUuid) { + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice); final QueryRequest queryRequest = QueryRequest.builder() .tableName(tableName) .indexName(LOCAL_INDEX_MESSAGE_UUID_NAME) @@ -227,22 +201,9 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { public CompletableFuture> deleteMessage(final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) { - return Stream.of(convertPartitionKey(destinationAccountUuid, destinationDevice), - convertPartitionKeyDeprecated(destinationAccountUuid, destinationDevice)) - .distinct() - .map(pk -> deleteMessage(pk, destinationDevice, messageUuid, serverTimestamp)) - // this combines the futures by producing a future that returns an arbitrary nonempty - // result if there is one, which should be OK because only one of the keys - // should produce a nonempty result for any given message uuid - .reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b))) - .get(); - } - - public CompletableFuture> deleteMessage(final AttributeValue partitionKey, final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) { - final AttributeValue sortKey = convertSortKey(serverTimestamp, messageUuid); DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder() .tableName(tableName) - .key(Map.of(KEY_PARTITION, partitionKey, KEY_SORT, sortKey)) + .key(Map.of(KEY_PARTITION, convertPartitionKey(destinationAccountUuid, destinationDevice), KEY_SORT, convertSortKey(serverTimestamp, messageUuid))) .returnValues(ReturnValue.ALL_OLD); return dbAsyncClient.deleteItem(deleteItemRequest.build()) @@ -278,14 +239,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { return AttributeValues.fromByteBuffer(byteBuffer.flip()); } - private static AttributeValue convertPartitionKeyDeprecated(final UUID destinationAccountUuid, final Device destinationDevice) { - final ByteBuffer byteBuffer = ByteBuffer.allocate(24); - byteBuffer.putLong(destinationAccountUuid.getMostSignificantBits()); - byteBuffer.putLong(destinationAccountUuid.getLeastSignificantBits()); - byteBuffer.putLong(destinationDevice.getCreated() & ~0x7f + destinationDevice.getId()); - return AttributeValues.fromByteBuffer(byteBuffer.flip()); - } - private static AttributeValue convertSortKey(final long serverTimestamp, final UUID messageUuid) { final ByteBuffer byteBuffer = ByteBuffer.allocate(24); byteBuffer.putLong(serverTimestamp);