From d31ddb72f3a60a62e829c8bc01d8b054908521c0 Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Wed, 18 May 2022 13:02:21 -0700 Subject: [PATCH] Optimize message deletion by skipping lookup --- .../controllers/MessageController.java | 3 ++- .../storage/MessagesDynamoDb.java | 19 ++++++++++++++ .../storage/MessagesManager.java | 8 ++++-- .../websocket/WebSocketConnection.java | 14 +++++++--- .../controllers/MessageControllerTest.java | 6 ++--- .../tests/storage/MessagesDynamoDbTest.java | 26 +++++++++++++++++++ .../websocket/WebSocketConnectionTest.java | 6 ++--- 7 files changed, 69 insertions(+), 13 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java index 968042d53..dba83839a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java @@ -544,7 +544,8 @@ public class MessageController { Optional message = messagesManager.delete( auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), - uuid); + uuid, + null); if (message.isPresent()) { WebSocketConnection.recordMessageDeliveryDuration(message.get().getTimestamp(), auth.getAuthenticatedDevice()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index 882a34047..ee96eea58 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -52,6 +52,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { private final Timer storeTimer = timer(name(getClass(), "store")); private final Timer loadTimer = timer(name(getClass(), "load")); private final Timer deleteByGuid = timer(name(getClass(), "delete", "guid")); + private final Timer deleteByKey = timer(name(getClass(), "delete", "key")); private final Timer deleteByAccount = timer(name(getClass(), "delete", "account")); private final Timer deleteByDevice = timer(name(getClass(), "delete", "device")); @@ -158,6 +159,24 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { }); } + public Optional deleteMessage(final UUID destinationAccountUuid, + final long destinationDeviceId, final UUID messageUuid, final long serverTimestamp) { + return deleteByKey.record(() -> { + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid); + final AttributeValue sortKey = convertSortKey(destinationDeviceId, serverTimestamp, messageUuid); + DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder() + .tableName(tableName) + .key(Map.of(KEY_PARTITION, partitionKey, KEY_SORT, sortKey)) + .returnValues(ReturnValue.ALL_OLD); + final DeleteItemResponse deleteItemResponse = db().deleteItem(deleteItemRequest.build()); + if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) { + return Optional.of(convertItemToOutgoingMessageEntity(deleteItemResponse.attributes())); + } + + return Optional.empty(); + }); + } + @Nonnull private Optional deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(AttributeValue partitionKey, QueryRequest queryRequest) { Optional result = Optional.empty(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 1ba612c50..00f14f635 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -91,11 +91,15 @@ public class MessagesManager { messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId); } - public Optional delete(UUID destinationUuid, long destinationDeviceId, UUID guid) { + public Optional delete(UUID destinationUuid, long destinationDeviceId, UUID guid, Long serverTimestamp) { Optional removed = messagesCache.remove(destinationUuid, destinationDeviceId, guid); if (removed.isEmpty()) { - removed = messagesDynamoDb.deleteMessageByDestinationAndGuid(destinationUuid, guid); + if (serverTimestamp == null) { + removed = messagesDynamoDb.deleteMessageByDestinationAndGuid(destinationUuid, guid); + } else { + removed = messagesDynamoDb.deleteMessage(destinationUuid, destinationDeviceId, guid, serverTimestamp); + } cacheMissByGuidMeter.mark(); } else { cacheHitByGuidMeter.mark(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 9ae915116..9d10bc7f0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -194,7 +194,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac if (throwable == null) { if (isSuccessResponse(response)) { if (storedMessageInfo.isPresent()) { - messagesManager.delete(auth.getAccount().getUuid(), device.getId(), storedMessageInfo.get().getGuid()); + messagesManager.delete(auth.getAccount().getUuid(), device.getId(), storedMessageInfo.get().getGuid(), storedMessageInfo.get().getServerTimestamp()); } if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) { @@ -337,12 +337,12 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac final Envelope envelope = builder.build(); if (envelope.getSerializedSize() > MAX_DESKTOP_MESSAGE_SIZE && isDesktopClient) { - messagesManager.delete(auth.getAccount().getUuid(), device.getId(), message.getGuid()); + messagesManager.delete(auth.getAccount().getUuid(), device.getId(), message.getGuid(), message.getServerTimestamp()); discardedMessagesMeter.mark(); sendFutures[i] = CompletableFuture.completedFuture(null); } else { - sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getGuid()))); + sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getGuid(), message.getServerTimestamp()))); } } @@ -396,13 +396,19 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private static class StoredMessageInfo { private final UUID guid; + private final long serverTimestamp; - public StoredMessageInfo(UUID guid) { + public StoredMessageInfo(UUID guid, long serverTimestamp) { this.guid = guid; + this.serverTimestamp = serverTimestamp; } public UUID getGuid() { return guid; } + + public long getServerTimestamp() { + return serverTimestamp; + } } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java index 3fd297341..a7cdb5f10 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java @@ -551,17 +551,17 @@ class MessageControllerTest { UUID sourceUuid = UUID.randomUUID(); UUID uuid1 = UUID.randomUUID(); - when(messagesManager.delete(AuthHelper.VALID_UUID, 1, uuid1)).thenReturn(Optional.of(new OutgoingMessageEntity( + when(messagesManager.delete(AuthHelper.VALID_UUID, 1, uuid1, null)).thenReturn(Optional.of(new OutgoingMessageEntity( uuid1, Envelope.Type.CIPHERTEXT_VALUE, timestamp, "+14152222222", sourceUuid, 1, AuthHelper.VALID_UUID, "hi".getBytes(), 0))); UUID uuid2 = UUID.randomUUID(); - when(messagesManager.delete(AuthHelper.VALID_UUID, 1, uuid2)).thenReturn(Optional.of(new OutgoingMessageEntity( + when(messagesManager.delete(AuthHelper.VALID_UUID, 1, uuid2, null)).thenReturn(Optional.of(new OutgoingMessageEntity( uuid2, Envelope.Type.SERVER_DELIVERY_RECEIPT_VALUE, System.currentTimeMillis(), "+14152222222", sourceUuid, 1, AuthHelper.VALID_UUID, null, 0))); UUID uuid3 = UUID.randomUUID(); - when(messagesManager.delete(AuthHelper.VALID_UUID, 1, uuid3)).thenReturn(Optional.empty()); + when(messagesManager.delete(AuthHelper.VALID_UUID, 1, uuid3, null)).thenReturn(Optional.empty()); Response response = resources.getJerseyTest() .target(String.format("/v1/messages/uuid/%s", uuid1)) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/MessagesDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/MessagesDynamoDbTest.java index d2596d222..401c621cd 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/MessagesDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/MessagesDynamoDbTest.java @@ -167,6 +167,32 @@ class MessagesDynamoDbTest { .isEmpty(); } + @Test + void testDeleteSingleMessage() { + final UUID destinationUuid = UUID.randomUUID(); + final UUID secondDestinationUuid = UUID.randomUUID(); + messagesDynamoDb.store(List.of(MESSAGE1), destinationUuid, 1); + messagesDynamoDb.store(List.of(MESSAGE2), secondDestinationUuid, 1); + messagesDynamoDb.store(List.of(MESSAGE3), destinationUuid, 2); + + assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1) + .element(0).satisfies(verify(MESSAGE1)); + assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1) + .element(0).satisfies(verify(MESSAGE3)); + assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() + .hasSize(1).element(0).satisfies(verify(MESSAGE2)); + + messagesDynamoDb.deleteMessage(secondDestinationUuid, 1, + UUID.fromString(MESSAGE2.getServerGuid()), MESSAGE2.getServerTimestamp()); + + assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1) + .element(0).satisfies(verify(MESSAGE1)); + assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1) + .element(0).satisfies(verify(MESSAGE3)); + assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() + .isEmpty(); + } + private static void verify(OutgoingMessageEntity retrieved, MessageProtos.Envelope inserted) { assertThat(retrieved.getTimestamp()).isEqualTo(inserted.getTimestamp()); assertThat(retrieved.getSource()).isEqualTo(inserted.hasSource() ? inserted.getSource() : null); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 3594748e1..2a1430990 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -212,7 +212,7 @@ class WebSocketConnectionTest { futures.get(0).completeExceptionally(new IOException()); futures.get(2).completeExceptionally(new IOException()); - verify(storedMessages, times(1)).delete(eq(accountUuid), eq(2L), eq(outgoingMessages.get(1).getGuid())); + verify(storedMessages, times(1)).delete(eq(accountUuid), eq(2L), eq(outgoingMessages.get(1).getGuid()), eq(outgoingMessages.get(1).getServerTimestamp())); verify(receiptSender, times(1)).sendReceipt(eq(auth), eq(senderOneUuid), eq(2222L)); connection.stop(); @@ -752,7 +752,7 @@ class WebSocketConnectionTest { // We should delete all three messages even though we only sent two; one got discarded because it was too big for // desktop clients. - verify(storedMessages, times(3)).delete(eq(accountUuid), eq(2L), any(UUID.class)); + verify(storedMessages, times(3)).delete(eq(accountUuid), eq(2L), any(UUID.class), any(Long.class)); connection.stop(); verify(client).close(anyInt(), anyString()); @@ -826,7 +826,7 @@ class WebSocketConnectionTest { futures.get(1).complete(response); futures.get(2).complete(response); - verify(storedMessages, times(3)).delete(eq(accountUuid), eq(2L), any(UUID.class)); + verify(storedMessages, times(3)).delete(eq(accountUuid), eq(2L), any(UUID.class), any(Long.class)); connection.stop(); verify(client).close(anyInt(), anyString());