diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index 946a17e18..ebee30a0b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -37,7 +37,6 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; -import software.amazon.awssdk.services.dynamodb.model.DeleteRequest; import software.amazon.awssdk.services.dynamodb.model.PutRequest; import software.amazon.awssdk.services.dynamodb.model.QueryRequest; import software.amazon.awssdk.services.dynamodb.model.ReturnValue; @@ -215,38 +214,57 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { }, messageDeletionExecutor); } - public void deleteAllMessagesForAccount(final UUID destinationAccountUuid) { - deleteByAccount.record(() -> { - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid); - final QueryRequest queryRequest = QueryRequest.builder() - .tableName(tableName) - .projectionExpression(KEY_SORT) - .consistentRead(true) - .keyConditionExpression("#part = :part") - .expressionAttributeNames(Map.of("#part", KEY_PARTITION)) - .expressionAttributeValues(Map.of(":part", partitionKey)) - .build(); - deleteRowsMatchingQuery(partitionKey, queryRequest); - }); + public CompletableFuture deleteAllMessagesForAccount(final UUID destinationAccountUuid) { + final Timer.Sample sample = Timer.start(); + + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid); + + return Flux.from(dbAsyncClient.queryPaginator(QueryRequest.builder() + .tableName(tableName) + .projectionExpression(KEY_SORT) + .consistentRead(true) + .keyConditionExpression("#part = :part") + .expressionAttributeNames(Map.of("#part", KEY_PARTITION)) + .expressionAttributeValues(Map.of(":part", partitionKey)) + .build()) + .items()) + .flatMap(item -> Mono.fromFuture(dbAsyncClient.deleteItem(DeleteItemRequest.builder() + .tableName(tableName) + .key(Map.of( + KEY_PARTITION, partitionKey, + KEY_SORT, item.get(KEY_SORT))) + .build()))) + .doOnComplete(() -> sample.stop(deleteByAccount)) + .then() + .toFuture(); } - public void deleteAllMessagesForDevice(final UUID destinationAccountUuid, final long destinationDeviceId) { - deleteByDevice.record(() -> { - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid); - final QueryRequest queryRequest = QueryRequest.builder() - .tableName(tableName) - .keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )") - .expressionAttributeNames(Map.of( - "#part", KEY_PARTITION, - "#sort", KEY_SORT)) - .expressionAttributeValues(Map.of( - ":part", partitionKey, - ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId))) - .projectionExpression(KEY_SORT) - .consistentRead(true) - .build(); - deleteRowsMatchingQuery(partitionKey, queryRequest); - }); + public CompletableFuture deleteAllMessagesForDevice(final UUID destinationAccountUuid, final long destinationDeviceId) { + final Timer.Sample sample = Timer.start(); + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid); + + return Flux.from(dbAsyncClient.queryPaginator(QueryRequest.builder() + .tableName(tableName) + .keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )") + .expressionAttributeNames(Map.of( + "#part", KEY_PARTITION, + "#sort", KEY_SORT)) + .expressionAttributeValues(Map.of( + ":part", partitionKey, + ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId))) + .projectionExpression(KEY_SORT) + .consistentRead(true) + .build()) + .items()) + .flatMap(item -> Mono.fromFuture(dbAsyncClient.deleteItem(DeleteItemRequest.builder() + .tableName(tableName) + .key(Map.of( + KEY_PARTITION, partitionKey, + KEY_SORT, item.get(KEY_SORT))) + .build()))) + .doOnComplete(() -> sample.stop(deleteByDevice)) + .then() + .toFuture(); } @VisibleForTesting @@ -256,21 +274,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { return MessageProtos.Envelope.parseFrom(item.get(KEY_ENVELOPE_BYTES).b().asByteArray()); } - private void deleteRowsMatchingQuery(AttributeValue partitionKey, QueryRequest querySpec) { - writeInBatches(db().queryPaginator(querySpec).items(), itemBatch -> deleteItems(partitionKey, itemBatch)); - } - - private void deleteItems(AttributeValue partitionKey, List> items) { - List deletes = items.stream() - .map(item -> WriteRequest.builder() - .deleteRequest(DeleteRequest.builder().key(Map.of( - KEY_PARTITION, partitionKey, - KEY_SORT, item.get(KEY_SORT))).build()) - .build()) - .toList(); - executeTableWriteItemsUntilComplete(Map.of(tableName, deletes)); - } - private long getTtlForMessage(MessageProtos.Envelope message) { return message.getServerTimestamp() / 1000 + timeToLive.getSeconds(); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 1af48d345..94c37dc8f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -104,12 +104,12 @@ public class MessagesManager { public void clear(UUID destinationUuid) { messagesCache.clear(destinationUuid).join(); - messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid); + messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid).join(); } public void clear(UUID destinationUuid, long deviceId) { messagesCache.clear(destinationUuid, deviceId).join(); - messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId); + messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId).join(); } public CompletableFuture> delete(UUID destinationUuid, long destinationDeviceId, UUID guid, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java index df47ab23e..0f3c7dfca 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java @@ -194,7 +194,7 @@ class MessagesDynamoDbTest { assertThat(load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() .hasSize(1).element(0).isEqualTo(MESSAGE2); - messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid); + messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid).join(); assertThat(load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().isEmpty(); assertThat(load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().isEmpty(); @@ -217,7 +217,7 @@ class MessagesDynamoDbTest { assertThat(load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() .hasSize(1).element(0).isEqualTo(MESSAGE2); - messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, 2); + messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, 2).join(); assertThat(load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1) .element(0).isEqualTo(MESSAGE1);