Optimize message deletion by skipping lookup
This commit is contained in:
parent
d5f2d86bd2
commit
d31ddb72f3
|
@ -544,7 +544,8 @@ public class MessageController {
|
||||||
Optional<OutgoingMessageEntity> message = messagesManager.delete(
|
Optional<OutgoingMessageEntity> message = messagesManager.delete(
|
||||||
auth.getAccount().getUuid(),
|
auth.getAccount().getUuid(),
|
||||||
auth.getAuthenticatedDevice().getId(),
|
auth.getAuthenticatedDevice().getId(),
|
||||||
uuid);
|
uuid,
|
||||||
|
null);
|
||||||
|
|
||||||
if (message.isPresent()) {
|
if (message.isPresent()) {
|
||||||
WebSocketConnection.recordMessageDeliveryDuration(message.get().getTimestamp(), auth.getAuthenticatedDevice());
|
WebSocketConnection.recordMessageDeliveryDuration(message.get().getTimestamp(), auth.getAuthenticatedDevice());
|
||||||
|
|
|
@ -52,6 +52,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||||
private final Timer storeTimer = timer(name(getClass(), "store"));
|
private final Timer storeTimer = timer(name(getClass(), "store"));
|
||||||
private final Timer loadTimer = timer(name(getClass(), "load"));
|
private final Timer loadTimer = timer(name(getClass(), "load"));
|
||||||
private final Timer deleteByGuid = timer(name(getClass(), "delete", "guid"));
|
private final Timer deleteByGuid = timer(name(getClass(), "delete", "guid"));
|
||||||
|
private final Timer deleteByKey = timer(name(getClass(), "delete", "key"));
|
||||||
private final Timer deleteByAccount = timer(name(getClass(), "delete", "account"));
|
private final Timer deleteByAccount = timer(name(getClass(), "delete", "account"));
|
||||||
private final Timer deleteByDevice = timer(name(getClass(), "delete", "device"));
|
private final Timer deleteByDevice = timer(name(getClass(), "delete", "device"));
|
||||||
|
|
||||||
|
@ -158,6 +159,24 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Optional<OutgoingMessageEntity> deleteMessage(final UUID destinationAccountUuid,
|
||||||
|
final long destinationDeviceId, final UUID messageUuid, final long serverTimestamp) {
|
||||||
|
return deleteByKey.record(() -> {
|
||||||
|
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
|
||||||
|
final AttributeValue sortKey = convertSortKey(destinationDeviceId, serverTimestamp, messageUuid);
|
||||||
|
DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder()
|
||||||
|
.tableName(tableName)
|
||||||
|
.key(Map.of(KEY_PARTITION, partitionKey, KEY_SORT, sortKey))
|
||||||
|
.returnValues(ReturnValue.ALL_OLD);
|
||||||
|
final DeleteItemResponse deleteItemResponse = db().deleteItem(deleteItemRequest.build());
|
||||||
|
if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) {
|
||||||
|
return Optional.of(convertItemToOutgoingMessageEntity(deleteItemResponse.attributes()));
|
||||||
|
}
|
||||||
|
|
||||||
|
return Optional.empty();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
private Optional<OutgoingMessageEntity> deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(AttributeValue partitionKey, QueryRequest queryRequest) {
|
private Optional<OutgoingMessageEntity> deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(AttributeValue partitionKey, QueryRequest queryRequest) {
|
||||||
Optional<OutgoingMessageEntity> result = Optional.empty();
|
Optional<OutgoingMessageEntity> result = Optional.empty();
|
||||||
|
|
|
@ -91,11 +91,15 @@ public class MessagesManager {
|
||||||
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId);
|
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> delete(UUID destinationUuid, long destinationDeviceId, UUID guid) {
|
public Optional<OutgoingMessageEntity> delete(UUID destinationUuid, long destinationDeviceId, UUID guid, Long serverTimestamp) {
|
||||||
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destinationUuid, destinationDeviceId, guid);
|
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destinationUuid, destinationDeviceId, guid);
|
||||||
|
|
||||||
if (removed.isEmpty()) {
|
if (removed.isEmpty()) {
|
||||||
removed = messagesDynamoDb.deleteMessageByDestinationAndGuid(destinationUuid, guid);
|
if (serverTimestamp == null) {
|
||||||
|
removed = messagesDynamoDb.deleteMessageByDestinationAndGuid(destinationUuid, guid);
|
||||||
|
} else {
|
||||||
|
removed = messagesDynamoDb.deleteMessage(destinationUuid, destinationDeviceId, guid, serverTimestamp);
|
||||||
|
}
|
||||||
cacheMissByGuidMeter.mark();
|
cacheMissByGuidMeter.mark();
|
||||||
} else {
|
} else {
|
||||||
cacheHitByGuidMeter.mark();
|
cacheHitByGuidMeter.mark();
|
||||||
|
|
|
@ -194,7 +194,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||||
if (throwable == null) {
|
if (throwable == null) {
|
||||||
if (isSuccessResponse(response)) {
|
if (isSuccessResponse(response)) {
|
||||||
if (storedMessageInfo.isPresent()) {
|
if (storedMessageInfo.isPresent()) {
|
||||||
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), storedMessageInfo.get().getGuid());
|
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), storedMessageInfo.get().getGuid(), storedMessageInfo.get().getServerTimestamp());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) {
|
if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) {
|
||||||
|
@ -337,12 +337,12 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||||
final Envelope envelope = builder.build();
|
final Envelope envelope = builder.build();
|
||||||
|
|
||||||
if (envelope.getSerializedSize() > MAX_DESKTOP_MESSAGE_SIZE && isDesktopClient) {
|
if (envelope.getSerializedSize() > MAX_DESKTOP_MESSAGE_SIZE && isDesktopClient) {
|
||||||
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), message.getGuid());
|
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), message.getGuid(), message.getServerTimestamp());
|
||||||
discardedMessagesMeter.mark();
|
discardedMessagesMeter.mark();
|
||||||
|
|
||||||
sendFutures[i] = CompletableFuture.completedFuture(null);
|
sendFutures[i] = CompletableFuture.completedFuture(null);
|
||||||
} else {
|
} else {
|
||||||
sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getGuid())));
|
sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getGuid(), message.getServerTimestamp())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -396,13 +396,19 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||||
|
|
||||||
private static class StoredMessageInfo {
|
private static class StoredMessageInfo {
|
||||||
private final UUID guid;
|
private final UUID guid;
|
||||||
|
private final long serverTimestamp;
|
||||||
|
|
||||||
public StoredMessageInfo(UUID guid) {
|
public StoredMessageInfo(UUID guid, long serverTimestamp) {
|
||||||
this.guid = guid;
|
this.guid = guid;
|
||||||
|
this.serverTimestamp = serverTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public UUID getGuid() {
|
public UUID getGuid() {
|
||||||
return guid;
|
return guid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getServerTimestamp() {
|
||||||
|
return serverTimestamp;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -551,17 +551,17 @@ class MessageControllerTest {
|
||||||
UUID sourceUuid = UUID.randomUUID();
|
UUID sourceUuid = UUID.randomUUID();
|
||||||
|
|
||||||
UUID uuid1 = UUID.randomUUID();
|
UUID uuid1 = UUID.randomUUID();
|
||||||
when(messagesManager.delete(AuthHelper.VALID_UUID, 1, uuid1)).thenReturn(Optional.of(new OutgoingMessageEntity(
|
when(messagesManager.delete(AuthHelper.VALID_UUID, 1, uuid1, null)).thenReturn(Optional.of(new OutgoingMessageEntity(
|
||||||
uuid1, Envelope.Type.CIPHERTEXT_VALUE,
|
uuid1, Envelope.Type.CIPHERTEXT_VALUE,
|
||||||
timestamp, "+14152222222", sourceUuid, 1, AuthHelper.VALID_UUID, "hi".getBytes(), 0)));
|
timestamp, "+14152222222", sourceUuid, 1, AuthHelper.VALID_UUID, "hi".getBytes(), 0)));
|
||||||
|
|
||||||
UUID uuid2 = UUID.randomUUID();
|
UUID uuid2 = UUID.randomUUID();
|
||||||
when(messagesManager.delete(AuthHelper.VALID_UUID, 1, uuid2)).thenReturn(Optional.of(new OutgoingMessageEntity(
|
when(messagesManager.delete(AuthHelper.VALID_UUID, 1, uuid2, null)).thenReturn(Optional.of(new OutgoingMessageEntity(
|
||||||
uuid2, Envelope.Type.SERVER_DELIVERY_RECEIPT_VALUE,
|
uuid2, Envelope.Type.SERVER_DELIVERY_RECEIPT_VALUE,
|
||||||
System.currentTimeMillis(), "+14152222222", sourceUuid, 1, AuthHelper.VALID_UUID, null, 0)));
|
System.currentTimeMillis(), "+14152222222", sourceUuid, 1, AuthHelper.VALID_UUID, null, 0)));
|
||||||
|
|
||||||
UUID uuid3 = UUID.randomUUID();
|
UUID uuid3 = UUID.randomUUID();
|
||||||
when(messagesManager.delete(AuthHelper.VALID_UUID, 1, uuid3)).thenReturn(Optional.empty());
|
when(messagesManager.delete(AuthHelper.VALID_UUID, 1, uuid3, null)).thenReturn(Optional.empty());
|
||||||
|
|
||||||
Response response = resources.getJerseyTest()
|
Response response = resources.getJerseyTest()
|
||||||
.target(String.format("/v1/messages/uuid/%s", uuid1))
|
.target(String.format("/v1/messages/uuid/%s", uuid1))
|
||||||
|
|
|
@ -167,6 +167,32 @@ class MessagesDynamoDbTest {
|
||||||
.isEmpty();
|
.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testDeleteSingleMessage() {
|
||||||
|
final UUID destinationUuid = UUID.randomUUID();
|
||||||
|
final UUID secondDestinationUuid = UUID.randomUUID();
|
||||||
|
messagesDynamoDb.store(List.of(MESSAGE1), destinationUuid, 1);
|
||||||
|
messagesDynamoDb.store(List.of(MESSAGE2), secondDestinationUuid, 1);
|
||||||
|
messagesDynamoDb.store(List.of(MESSAGE3), destinationUuid, 2);
|
||||||
|
|
||||||
|
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
|
||||||
|
.element(0).satisfies(verify(MESSAGE1));
|
||||||
|
assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
|
||||||
|
.element(0).satisfies(verify(MESSAGE3));
|
||||||
|
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
|
||||||
|
.hasSize(1).element(0).satisfies(verify(MESSAGE2));
|
||||||
|
|
||||||
|
messagesDynamoDb.deleteMessage(secondDestinationUuid, 1,
|
||||||
|
UUID.fromString(MESSAGE2.getServerGuid()), MESSAGE2.getServerTimestamp());
|
||||||
|
|
||||||
|
assertThat(messagesDynamoDb.load(destinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
|
||||||
|
.element(0).satisfies(verify(MESSAGE1));
|
||||||
|
assertThat(messagesDynamoDb.load(destinationUuid, 2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
|
||||||
|
.element(0).satisfies(verify(MESSAGE3));
|
||||||
|
assertThat(messagesDynamoDb.load(secondDestinationUuid, 1, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
|
||||||
|
.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
private static void verify(OutgoingMessageEntity retrieved, MessageProtos.Envelope inserted) {
|
private static void verify(OutgoingMessageEntity retrieved, MessageProtos.Envelope inserted) {
|
||||||
assertThat(retrieved.getTimestamp()).isEqualTo(inserted.getTimestamp());
|
assertThat(retrieved.getTimestamp()).isEqualTo(inserted.getTimestamp());
|
||||||
assertThat(retrieved.getSource()).isEqualTo(inserted.hasSource() ? inserted.getSource() : null);
|
assertThat(retrieved.getSource()).isEqualTo(inserted.hasSource() ? inserted.getSource() : null);
|
||||||
|
|
|
@ -212,7 +212,7 @@ class WebSocketConnectionTest {
|
||||||
futures.get(0).completeExceptionally(new IOException());
|
futures.get(0).completeExceptionally(new IOException());
|
||||||
futures.get(2).completeExceptionally(new IOException());
|
futures.get(2).completeExceptionally(new IOException());
|
||||||
|
|
||||||
verify(storedMessages, times(1)).delete(eq(accountUuid), eq(2L), eq(outgoingMessages.get(1).getGuid()));
|
verify(storedMessages, times(1)).delete(eq(accountUuid), eq(2L), eq(outgoingMessages.get(1).getGuid()), eq(outgoingMessages.get(1).getServerTimestamp()));
|
||||||
verify(receiptSender, times(1)).sendReceipt(eq(auth), eq(senderOneUuid), eq(2222L));
|
verify(receiptSender, times(1)).sendReceipt(eq(auth), eq(senderOneUuid), eq(2222L));
|
||||||
|
|
||||||
connection.stop();
|
connection.stop();
|
||||||
|
@ -752,7 +752,7 @@ class WebSocketConnectionTest {
|
||||||
|
|
||||||
// We should delete all three messages even though we only sent two; one got discarded because it was too big for
|
// We should delete all three messages even though we only sent two; one got discarded because it was too big for
|
||||||
// desktop clients.
|
// desktop clients.
|
||||||
verify(storedMessages, times(3)).delete(eq(accountUuid), eq(2L), any(UUID.class));
|
verify(storedMessages, times(3)).delete(eq(accountUuid), eq(2L), any(UUID.class), any(Long.class));
|
||||||
|
|
||||||
connection.stop();
|
connection.stop();
|
||||||
verify(client).close(anyInt(), anyString());
|
verify(client).close(anyInt(), anyString());
|
||||||
|
@ -826,7 +826,7 @@ class WebSocketConnectionTest {
|
||||||
futures.get(1).complete(response);
|
futures.get(1).complete(response);
|
||||||
futures.get(2).complete(response);
|
futures.get(2).complete(response);
|
||||||
|
|
||||||
verify(storedMessages, times(3)).delete(eq(accountUuid), eq(2L), any(UUID.class));
|
verify(storedMessages, times(3)).delete(eq(accountUuid), eq(2L), any(UUID.class), any(Long.class));
|
||||||
|
|
||||||
connection.stop();
|
connection.stop();
|
||||||
verify(client).close(anyInt(), anyString());
|
verify(client).close(anyInt(), anyString());
|
||||||
|
|
Loading…
Reference in New Issue