Make message deletion from DynamoDB asynchronous

This commit is contained in:
Jon Chambers 2023-08-08 10:51:51 -04:00 committed by Jon Chambers
parent 5caa951c61
commit 2c835b5c51
3 changed files with 53 additions and 50 deletions

View File

@ -37,7 +37,6 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
@ -215,38 +214,57 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
}, messageDeletionExecutor);
}
public void deleteAllMessagesForAccount(final UUID destinationAccountUuid) {
deleteByAccount.record(() -> {
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
final QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.projectionExpression(KEY_SORT)
.consistentRead(true)
.keyConditionExpression("#part = :part")
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
.expressionAttributeValues(Map.of(":part", partitionKey))
.build();
deleteRowsMatchingQuery(partitionKey, queryRequest);
});
public CompletableFuture<Void> deleteAllMessagesForAccount(final UUID destinationAccountUuid) {
final Timer.Sample sample = Timer.start();
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
return Flux.from(dbAsyncClient.queryPaginator(QueryRequest.builder()
.tableName(tableName)
.projectionExpression(KEY_SORT)
.consistentRead(true)
.keyConditionExpression("#part = :part")
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
.expressionAttributeValues(Map.of(":part", partitionKey))
.build())
.items())
.flatMap(item -> Mono.fromFuture(dbAsyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(
KEY_PARTITION, partitionKey,
KEY_SORT, item.get(KEY_SORT)))
.build())))
.doOnComplete(() -> sample.stop(deleteByAccount))
.then()
.toFuture();
}
public void deleteAllMessagesForDevice(final UUID destinationAccountUuid, final long destinationDeviceId) {
deleteByDevice.record(() -> {
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
final QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.expressionAttributeNames(Map.of(
"#part", KEY_PARTITION,
"#sort", KEY_SORT))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
.projectionExpression(KEY_SORT)
.consistentRead(true)
.build();
deleteRowsMatchingQuery(partitionKey, queryRequest);
});
public CompletableFuture<Void> deleteAllMessagesForDevice(final UUID destinationAccountUuid, final long destinationDeviceId) {
final Timer.Sample sample = Timer.start();
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
return Flux.from(dbAsyncClient.queryPaginator(QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.expressionAttributeNames(Map.of(
"#part", KEY_PARTITION,
"#sort", KEY_SORT))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
.projectionExpression(KEY_SORT)
.consistentRead(true)
.build())
.items())
.flatMap(item -> Mono.fromFuture(dbAsyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(
KEY_PARTITION, partitionKey,
KEY_SORT, item.get(KEY_SORT)))
.build())))
.doOnComplete(() -> sample.stop(deleteByDevice))
.then()
.toFuture();
}
@VisibleForTesting
@ -256,21 +274,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
return MessageProtos.Envelope.parseFrom(item.get(KEY_ENVELOPE_BYTES).b().asByteArray());
}
private void deleteRowsMatchingQuery(AttributeValue partitionKey, QueryRequest querySpec) {
writeInBatches(db().queryPaginator(querySpec).items(), itemBatch -> deleteItems(partitionKey, itemBatch));
}
private void deleteItems(AttributeValue partitionKey, List<Map<String, AttributeValue>> items) {
List<WriteRequest> deletes = items.stream()
.map(item -> WriteRequest.builder()
.deleteRequest(DeleteRequest.builder().key(Map.of(
KEY_PARTITION, partitionKey,
KEY_SORT, item.get(KEY_SORT))).build())
.build())
.toList();
executeTableWriteItemsUntilComplete(Map.of(tableName, deletes));
}
private long getTtlForMessage(MessageProtos.Envelope message) {
return message.getServerTimestamp() / 1000 + timeToLive.getSeconds();
}

View File

@ -104,12 +104,12 @@ public class MessagesManager {
public void clear(UUID destinationUuid) {
messagesCache.clear(destinationUuid).join();
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid);
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid).join();
}
public void clear(UUID destinationUuid, long deviceId) {
messagesCache.clear(destinationUuid, deviceId).join();
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId);
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId).join();
}
public CompletableFuture<Optional<Envelope>> delete(UUID destinationUuid, long destinationDeviceId, UUID guid,

View File

@ -194,7 +194,7 @@ class MessagesDynamoDbTest {
assertThat(load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.hasSize(1).element(0).isEqualTo(MESSAGE2);
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid);
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid).join();
assertThat(load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().isEmpty();
assertThat(load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().isEmpty();
@ -217,7 +217,7 @@ class MessagesDynamoDbTest {
assertThat(load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.hasSize(1).element(0).isEqualTo(MESSAGE2);
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, 2);
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, 2).join();
assertThat(load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).isEqualTo(MESSAGE1);