From f12a6ff73f85a93c34d589bc8b3741f07a5cf812 Mon Sep 17 00:00:00 2001 From: Jonathan Klabunde Tomer <125505367+jkt-signal@users.noreply.github.com> Date: Tue, 23 Jul 2024 14:07:19 -0700 Subject: [PATCH] Remove migration paths for lazy message deletion --- .../textsecuregcm/WhisperServerService.java | 1 - .../storage/MessagesDynamoDb.java | 217 +++--------------- .../storage/MessagesManager.java | 8 +- .../workers/CommandDependencies.java | 1 - .../MessagePersisterIntegrationTest.java | 2 +- .../storage/MessagesDynamoDbTest.java | 144 +----------- .../WebSocketConnectionIntegrationTest.java | 6 +- 7 files changed, 34 insertions(+), 345 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 5c009c718..468b042cc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -397,7 +397,6 @@ public class WhisperServerService extends Application dynamicConfig; private final ExecutorService messageDeletionExecutor; private final Scheduler messageDeletionScheduler; private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class); public MessagesDynamoDb(DynamoDbClient dynamoDb, DynamoDbAsyncClient dynamoDbAsyncClient, String tableName, - Duration timeToLive, DynamicConfigurationManager dynamicConfig, ExecutorService messageDeletionExecutor) { + Duration timeToLive, ExecutorService messageDeletionExecutor) { super(dynamoDb); this.dbAsyncClient = dynamoDbAsyncClient; this.tableName = tableName; this.timeToLive = timeToLive; - this.dynamicConfig = dynamicConfig; this.messageDeletionExecutor = messageDeletionExecutor; this.messageDeletionScheduler = Schedulers.fromExecutor(messageDeletionExecutor); @@ -102,20 +95,18 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { private void storeBatch(final List messages, final UUID destinationAccountUuid, final Device destinationDevice) { - final byte destinationDeviceId = destinationDevice.getId(); if (messages.size() > DYNAMO_DB_MAX_BATCH_SIZE) { throw new IllegalArgumentException("Maximum batch size of " + DYNAMO_DB_MAX_BATCH_SIZE + " exceeded with " + messages.size() + " messages"); } - final DynamoKeyScheme scheme = dynamicConfig.getConfiguration().getMessagesConfiguration().writeKeyScheme(); - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice, scheme); + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice); List writeItems = new ArrayList<>(); for (MessageProtos.Envelope message : messages) { final UUID messageUuid = UUID.fromString(message.getServerGuid()); final ImmutableMap.Builder item = ImmutableMap.builder() .put(KEY_PARTITION, partitionKey) - .put(KEY_SORT, convertSortKey(destinationDevice.getId(), message.getServerTimestamp(), messageUuid, scheme)) + .put(KEY_SORT, convertSortKey(destinationDevice.getId(), message.getServerTimestamp(), messageUuid)) .put(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid)) .put(KEY_TTL, AttributeValues.fromLong(getTtlForMessage(message))) .put(KEY_ENVELOPE_BYTES, AttributeValue.builder().b(SdkBytes.fromByteArray(message.toByteArray())).build()); @@ -126,77 +117,31 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { } executeTableWriteItemsUntilComplete(Map.of(tableName, writeItems)); - Metrics.counter(MESSAGES_STORED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", scheme.name())).increment(writeItems.size()); } public CompletableFuture mayHaveMessages(final UUID accountIdentifier, final Device device) { - return Flux.fromIterable(dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes()) - .flatMap(scheme -> mayHaveMessages(accountIdentifier, device, scheme)) - .any(mayHaveMessages -> mayHaveMessages) - .toFuture(); - } - - private Mono mayHaveMessages(final UUID accountIdentifier, final Device device, final DynamoKeyScheme scheme) { - final AttributeValue partitionKey = convertPartitionKey(accountIdentifier, device, scheme); + final AttributeValue partitionKey = convertPartitionKey(accountIdentifier, device); QueryRequest.Builder queryRequestBuilder = QueryRequest.builder() .tableName(tableName) .consistentRead(false) - .limit(1); + .limit(1) + .keyConditionExpression("#part = :part") + .expressionAttributeNames(Map.of("#part", KEY_PARTITION)) + .expressionAttributeValues(Map.of(":part", partitionKey)); - queryRequestBuilder = switch (scheme) { - case TRADITIONAL -> queryRequestBuilder - .keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )") - .expressionAttributeNames(Map.of( - "#part", KEY_PARTITION, - "#sort", KEY_SORT)) - .expressionAttributeValues(Map.of( - ":part", partitionKey, - ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(device.getId(), scheme))); - case LAZY_DELETION -> queryRequestBuilder - .keyConditionExpression("#part = :part") - .expressionAttributeNames(Map.of("#part", KEY_PARTITION)) - .expressionAttributeValues(Map.of(":part", partitionKey)); - }; - - return Mono.fromFuture(dbAsyncClient.query(queryRequestBuilder.build()) - .thenApply(queryResponse -> queryResponse.count() > 0)); + return dbAsyncClient.query(queryRequestBuilder.build()) + .thenApply(queryResponse -> queryResponse.count() > 0); } public Publisher load(final UUID destinationAccountUuid, final Device device, final Integer limit) { - return Flux.concat( - dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes() - .stream() - .map(scheme -> load(destinationAccountUuid, device, limit, scheme)) - .toList()) - .map(messageAndScheme -> { - Metrics.counter(MESSAGES_LOADED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", messageAndScheme.getT2().name())).increment(); - return messageAndScheme.getT1(); - }); - } - - private Publisher> load(final UUID destinationAccountUuid, final Device device, final Integer limit, final DynamoKeyScheme scheme) { - final byte destinationDeviceId = device.getId(); - - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, device, scheme); + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, device); QueryRequest.Builder queryRequestBuilder = QueryRequest.builder() .tableName(tableName) - .consistentRead(true); - - queryRequestBuilder = switch (scheme) { - case TRADITIONAL -> queryRequestBuilder - .keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )") - .expressionAttributeNames(Map.of( - "#part", KEY_PARTITION, - "#sort", KEY_SORT)) - .expressionAttributeValues(Map.of( - ":part", partitionKey, - ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId, scheme))); - case LAZY_DELETION -> queryRequestBuilder - .keyConditionExpression("#part = :part") - .expressionAttributeNames(Map.of("#part", KEY_PARTITION)) - .expressionAttributeValues(Map.of(":part", partitionKey)); - }; + .consistentRead(true) + .keyConditionExpression("#part = :part") + .expressionAttributeNames(Map.of("#part", KEY_PARTITION)) + .expressionAttributeValues(Map.of(":part", partitionKey)); if (limit != null) { // some callers don’t take advantage of reactive streams, so we want to support limiting the fetch size. Otherwise, @@ -215,25 +160,12 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { return null; } }) - .filter(Predicate.not(Objects::isNull)) - .map(m -> Tuples.of(m, scheme)); + .filter(Predicate.not(Objects::isNull)); } public CompletableFuture> deleteMessageByDestinationAndGuid( final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid) { - return dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes() - .stream() - .map(scheme -> deleteMessageByDestinationAndGuid(destinationAccountUuid, destinationDevice, messageUuid, scheme)) - // this combines the futures by producing a future that returns an arbitrary nonempty - // result if there is one, which should be OK because only one of the keying schemes - // should produce a nonempty result for any given message uuid - .reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b))) - .get(); // there is always at least one scheme - } - - private CompletableFuture> deleteMessageByDestinationAndGuid( - final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid, DynamoKeyScheme scheme) { - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice, scheme); + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice); final QueryRequest queryRequest = QueryRequest.builder() .tableName(tableName) .indexName(LOCAL_INDEX_MESSAGE_UUID_NAME) @@ -260,7 +192,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { .mapNotNull(deleteItemResponse -> { try { if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) { - Metrics.counter(MESSAGES_DELETED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", scheme.name())).increment(); return convertItemToEnvelope(deleteItemResponse.attributes()); } } catch (final InvalidProtocolBufferException e) { @@ -276,20 +207,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { public CompletableFuture> deleteMessage(final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) { - return dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes() - .stream() - .map(scheme -> deleteMessage(destinationAccountUuid, destinationDevice, messageUuid, serverTimestamp, scheme)) - // this combines the futures by producing a future that returns an arbitrary nonempty - // result if there is one, which should be OK because only one of the keying schemes - // should produce a nonempty result for any given message uuid - .reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b))) - .orElseThrow(); // there is always at least one scheme - } - - private CompletableFuture> deleteMessage(final UUID destinationAccountUuid, - final Device destinationDevice, final UUID messageUuid, final long serverTimestamp, final DynamoKeyScheme scheme) { - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice, scheme); - final AttributeValue sortKey = convertSortKey(destinationDevice.getId(), serverTimestamp, messageUuid, scheme); + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice); + final AttributeValue sortKey = convertSortKey(destinationDevice.getId(), serverTimestamp, messageUuid); DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder() .tableName(tableName) .key(Map.of(KEY_PARTITION, partitionKey, KEY_SORT, sortKey)) @@ -299,7 +218,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { .thenApplyAsync(deleteItemResponse -> { if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) { try { - Metrics.counter(MESSAGES_DELETED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", scheme.name())).increment(); return Optional.of(convertItemToEnvelope(deleteItemResponse.attributes())); } catch (final InvalidProtocolBufferException e) { logger.error("Failed to parse envelope", e); @@ -310,69 +228,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { }, messageDeletionExecutor); } - // Deletes all messages stored for the supplied account that were stored under the traditional (uuid+device id) keying scheme. - // Messages stored under the lazy-message-deletion keying scheme will not be affected. - public CompletableFuture deleteAllMessagesForAccount(final UUID destinationAccountUuid) { - final Timer.Sample sample = Timer.start(); - - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, null, DynamoKeyScheme.TRADITIONAL); - - return Flux.from(dbAsyncClient.queryPaginator(QueryRequest.builder() - .tableName(tableName) - .projectionExpression(KEY_SORT) - .consistentRead(true) - .keyConditionExpression("#part = :part") - .expressionAttributeNames(Map.of("#part", KEY_PARTITION)) - .expressionAttributeValues(Map.of(":part", partitionKey)) - .build()) - .items()) - .flatMap(item -> Mono.fromFuture(() -> dbAsyncClient.deleteItem(DeleteItemRequest.builder() - .tableName(tableName) - .key(Map.of( - KEY_PARTITION, partitionKey, - KEY_SORT, item.get(KEY_SORT))) - .build())), - DYNAMO_DB_MAX_BATCH_SIZE) - .then() - .doOnSuccess(ignored -> sample.stop(timer(DELETE_BY_ACCOUNT_TIMER_NAME, "outcome", "success"))) - .doOnError(ignored -> sample.stop(timer(DELETE_BY_ACCOUNT_TIMER_NAME, "outcome", "error"))) - .toFuture(); - } - - // Deletes all messages stored for the supplied account and device that were stored under the - // traditional (uuid+device id) keying scheme. Messages stored under the lazy-message-deletion - // keying scheme will not be affected. - public CompletableFuture deleteAllMessagesForDevice(final UUID destinationAccountUuid, - final byte destinationDeviceId) { - final Timer.Sample sample = Timer.start(); - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, null, DynamoKeyScheme.TRADITIONAL); - - return Flux.from(dbAsyncClient.queryPaginator(QueryRequest.builder() - .tableName(tableName) - .keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )") - .expressionAttributeNames(Map.of( - "#part", KEY_PARTITION, - "#sort", KEY_SORT)) - .expressionAttributeValues(Map.of( - ":part", partitionKey, - ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId, DynamoKeyScheme.TRADITIONAL))) - .projectionExpression(KEY_SORT) - .consistentRead(true) - .build()) - .items()) - .flatMap(item -> Mono.fromFuture(() -> dbAsyncClient.deleteItem(DeleteItemRequest.builder() - .tableName(tableName) - .key(Map.of( - KEY_PARTITION, partitionKey, - KEY_SORT, item.get(KEY_SORT))) - .build())), - DYNAMO_DB_MAX_BATCH_SIZE) - .then() - .doOnSuccess(ignored -> sample.stop(timer(DELETE_BY_DEVICE_TIMER_NAME, "outcome", "success"))) - .doOnError(ignored -> sample.stop(timer(DELETE_BY_DEVICE_TIMER_NAME, "outcome", "error"))) - .toFuture(); - } - @VisibleForTesting static MessageProtos.Envelope convertItemToEnvelope(final Map item) throws InvalidProtocolBufferException { @@ -384,40 +239,22 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { return message.getServerTimestamp() / 1000 + timeToLive.getSeconds(); } - private static AttributeValue convertPartitionKey(final UUID destinationAccountUuid, final Device destinationDevice, final DynamoKeyScheme scheme) { - return switch (scheme) { - case TRADITIONAL -> AttributeValues.fromUUID(destinationAccountUuid); - case LAZY_DELETION -> { - final ByteBuffer byteBuffer = ByteBuffer.allocate(24); - byteBuffer.putLong(destinationAccountUuid.getMostSignificantBits()); - byteBuffer.putLong(destinationAccountUuid.getLeastSignificantBits()); - byteBuffer.putLong(destinationDevice.getCreated() & ~0x7f + destinationDevice.getId()); - yield AttributeValues.fromByteBuffer(byteBuffer.flip()); - } - }; + private static AttributeValue convertPartitionKey(final UUID destinationAccountUuid, final Device destinationDevice) { + final ByteBuffer byteBuffer = ByteBuffer.allocate(24); + byteBuffer.putLong(destinationAccountUuid.getMostSignificantBits()); + byteBuffer.putLong(destinationAccountUuid.getLeastSignificantBits()); + byteBuffer.putLong((destinationDevice.getCreated() & ~0x7f) + destinationDevice.getId()); + return AttributeValues.fromByteBuffer(byteBuffer.flip()); } - private static AttributeValue convertSortKey(final byte destinationDeviceId, final long serverTimestamp, - final UUID messageUuid, final DynamoKeyScheme scheme) { - - final ByteBuffer byteBuffer = ByteBuffer.allocate(32); - if (scheme == DynamoKeyScheme.TRADITIONAL) { - // for compatibility - destinationDeviceId was previously `long` - byteBuffer.putLong(destinationDeviceId); - } + private static AttributeValue convertSortKey(final byte destinationDeviceId, final long serverTimestamp, final UUID messageUuid) { + final ByteBuffer byteBuffer = ByteBuffer.allocate(24); byteBuffer.putLong(serverTimestamp); byteBuffer.putLong(messageUuid.getMostSignificantBits()); byteBuffer.putLong(messageUuid.getLeastSignificantBits()); return AttributeValues.fromByteBuffer(byteBuffer.flip()); } - private static AttributeValue convertDestinationDeviceIdToSortKeyPrefix(final byte destinationDeviceId, final DynamoKeyScheme scheme) { - return switch (scheme) { - case TRADITIONAL -> AttributeValues.fromByteBuffer(ByteBuffer.allocate(8).putLong(destinationDeviceId).flip()); - case LAZY_DELETION -> AttributeValues.b(new byte[0]); - }; - } - private static AttributeValue convertLocalIndexMessageUuidSortKey(final UUID messageUuid) { return AttributeValues.fromUUID(messageUuid); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 9976d51c4..3dee11d0e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -107,15 +107,11 @@ public class MessagesManager { } public CompletableFuture clear(UUID destinationUuid) { - return CompletableFuture.allOf( - messagesCache.clear(destinationUuid), - messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid)); + return messagesCache.clear(destinationUuid); } public CompletableFuture clear(UUID destinationUuid, byte deviceId) { - return CompletableFuture.allOf( - messagesCache.clear(destinationUuid, deviceId), - messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId)); + return messagesCache.clear(destinationUuid, deviceId); } public CompletableFuture> delete(UUID destinationUuid, Device destinationDevice, UUID guid, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index a4eb53a76..f12445273 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -169,7 +169,6 @@ record CommandDependencies( MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbClient, dynamoDbAsyncClient, configuration.getDynamoDbTables().getMessages().getTableName(), configuration.getDynamoDbTables().getMessages().getExpiration(), - dynamicConfigurationManager, messageDeletionExecutor); FaultTolerantRedisCluster messagesCluster = configuration.getMessageCacheConfiguration() .getRedisClusterConfiguration().build("messages", redisClientResourcesBuilder); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index e714beeaf..2a71bc2de 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -76,7 +76,7 @@ class MessagePersisterIntegrationTest { messageDeletionExecutorService = Executors.newSingleThreadExecutor(); final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(), DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.MESSAGES.tableName(), Duration.ofDays(14), - dynamicConfigurationManager, messageDeletionExecutorService); + messageDeletionExecutorService); final AccountsManager accountsManager = mock(AccountsManager.class); notificationExecutorService = Executors.newSingleThreadExecutor(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java index 322f8abd9..7009ef238 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java @@ -25,11 +25,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.reactivestreams.Publisher; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagesConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; @@ -80,7 +77,6 @@ class MessagesDynamoDbTest { } private ExecutorService messageDeletionExecutorService; - private DynamicConfigurationManager dynamicConfigurationManager; private MessagesDynamoDb messagesDynamoDb; @RegisterExtension @@ -89,11 +85,9 @@ class MessagesDynamoDbTest { @BeforeEach void setup() { messageDeletionExecutorService = Executors.newSingleThreadExecutor(); - dynamicConfigurationManager = mock(DynamicConfigurationManager.class); - when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(), DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.MESSAGES.tableName(), Duration.ofDays(14), - dynamicConfigurationManager, messageDeletionExecutorService); + messageDeletionExecutorService); } @AfterEach @@ -195,61 +189,6 @@ class MessagesDynamoDbTest { .verify(); } - @Test - void testDeleteForDestination() { - final UUID destinationUuid = UUID.randomUUID(); - final UUID secondDestinationUuid = UUID.randomUUID(); - final Device primary = DevicesHelper.createDevice((byte) 1); - final Device device2 = DevicesHelper.createDevice((byte) 2); - - messagesDynamoDb.store(List.of(MESSAGE1), destinationUuid, primary); - messagesDynamoDb.store(List.of(MESSAGE2), secondDestinationUuid, primary); - messagesDynamoDb.store(List.of(MESSAGE3), destinationUuid, device2); - - assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1) - .element(0).isEqualTo(MESSAGE1); - assertThat(load(destinationUuid, device2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1) - .element(0).isEqualTo(MESSAGE3); - assertThat(load(secondDestinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() - .hasSize(1).element(0).isEqualTo(MESSAGE2); - - messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid).join(); - - assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().isEmpty(); - assertThat(load(destinationUuid, device2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().isEmpty(); - assertThat(load(secondDestinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() - .hasSize(1).element(0).isEqualTo(MESSAGE2); - } - - @Test - void testDeleteForDestinationDevice() { - final UUID destinationUuid = UUID.randomUUID(); - final UUID secondDestinationUuid = UUID.randomUUID(); - final Device primary = DevicesHelper.createDevice((byte) 1); - final Device device2 = DevicesHelper.createDevice((byte) 2); - - messagesDynamoDb.store(List.of(MESSAGE1), destinationUuid, primary); - messagesDynamoDb.store(List.of(MESSAGE2), secondDestinationUuid, primary); - messagesDynamoDb.store(List.of(MESSAGE3), destinationUuid, device2); - - assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1) - .element(0).isEqualTo(MESSAGE1); - assertThat(load(destinationUuid, device2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() - .hasSize(1) - .element(0).isEqualTo(MESSAGE3); - assertThat(load(secondDestinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() - .hasSize(1).element(0).isEqualTo(MESSAGE2); - - messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, device2.getId()).join(); - - assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1) - .element(0).isEqualTo(MESSAGE1); - assertThat(load(destinationUuid, device2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() - .isEmpty(); - assertThat(load(secondDestinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() - .hasSize(1).element(0).isEqualTo(MESSAGE2); - } - @Test void testDeleteMessageByDestinationAndGuid() throws Exception { final UUID destinationUuid = UUID.randomUUID(); @@ -330,66 +269,12 @@ class MessagesDynamoDbTest { .block(); } - @Test - void testMessageKeySchemeMigration() throws Exception { - final UUID destinationUuid = UUID.randomUUID(); - final Device primary = DevicesHelper.createDevice((byte) 1); - - // store message 1 in old scheme - when(dynamicConfigurationManager.getConfiguration()).thenReturn(SystemMapper.yamlMapper().readValue(""" - messagesConfiguration: - dynamoKeySchemes: - - TRADITIONAL - """, DynamicConfiguration.class)); - messagesDynamoDb.store(List.of(MESSAGE1), destinationUuid, primary); - - // store message 2 in new scheme during migration - when(dynamicConfigurationManager.getConfiguration()).thenReturn(SystemMapper.yamlMapper().readValue(""" - messagesConfiguration: - dynamoKeySchemes: - - TRADITIONAL - - LAZY_DELETION - """, DynamicConfiguration.class)); - messagesDynamoDb.store(List.of(MESSAGE2), destinationUuid, primary); - - // in old scheme, we should only get message 1 back (we would never actually do this, it's just a way to prove we used the new scheme for message 2) - when(dynamicConfigurationManager.getConfiguration()).thenReturn(SystemMapper.yamlMapper().readValue(""" - messagesConfiguration: - dynamoKeySchemes: - - TRADITIONAL - """, DynamicConfiguration.class)); - assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).containsExactly(MESSAGE1); - - // during migration we should get both messages back in order - when(dynamicConfigurationManager.getConfiguration()).thenReturn(SystemMapper.yamlMapper().readValue(""" - messagesConfiguration: - dynamoKeySchemes: - - TRADITIONAL - - LAZY_DELETION - """, DynamicConfiguration.class)); - assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).containsExactly(MESSAGE1, MESSAGE2); - - // after migration we would only get message 2 back (we shouldn't do this either in practice) - when(dynamicConfigurationManager.getConfiguration()).thenReturn(SystemMapper.yamlMapper().readValue(""" - messagesConfiguration: - dynamoKeySchemes: - - LAZY_DELETION - """, DynamicConfiguration.class)); - assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).containsExactly(MESSAGE2); - } - @Test void testLazyMessageDeletion() throws Exception { final UUID destinationUuid = UUID.randomUUID(); final Device primary = DevicesHelper.createDevice((byte) 1); primary.setCreated(System.currentTimeMillis()); - when(dynamicConfigurationManager.getConfiguration()).thenReturn(SystemMapper.yamlMapper().readValue(""" - messagesConfiguration: - dynamoKeySchemes: - - LAZY_DELETION - """, DynamicConfiguration.class)); - messagesDynamoDb.store(List.of(MESSAGE1, MESSAGE2, MESSAGE3), destinationUuid, primary); assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)) .as("load should return all messages stored").containsOnly(MESSAGE1, MESSAGE2, MESSAGE3); @@ -404,45 +289,22 @@ class MessagesDynamoDbTest { assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)) .as("deleting message by guid and timestamp should work").containsExactly(MESSAGE3); - messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, (byte) 1).get(1, TimeUnit.SECONDS); - assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)) - .as("deleting all messages for device should do nothing").containsExactly(MESSAGE3); - - messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid).get(1, TimeUnit.SECONDS); - assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)) - .as("deleting all messages for account should do nothing").containsExactly(MESSAGE3); - primary.setCreated(primary.getCreated() + 1000); assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)) .as("devices with the same id but different create timestamps should see no messages") .isEmpty(); } - @ParameterizedTest - @MethodSource - void mayHaveMessages(final List schemes) { + @Test + void mayHaveMessages() { final UUID destinationUuid = UUID.randomUUID(); final byte destinationDeviceId = (byte) (random.nextInt(Device.MAXIMUM_DEVICE_ID) + 1); final Device destinationDevice = DevicesHelper.createDevice(destinationDeviceId); - final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); - when(dynamicConfiguration.getMessagesConfiguration()).thenReturn(new DynamicMessagesConfiguration(schemes)); - - when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); - assertThat(messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice).join()).isFalse(); messagesDynamoDb.store(List.of(MESSAGE1, MESSAGE2, MESSAGE3), destinationUuid, destinationDevice); assertThat(messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice).join()).isTrue(); } - - private static List> mayHaveMessages() { - return List.of( - List.of(DynamicMessagesConfiguration.DynamoKeyScheme.TRADITIONAL), - List.of(DynamicMessagesConfiguration.DynamoKeyScheme.LAZY_DELETION), - List.of(DynamicMessagesConfiguration.DynamoKeyScheme.TRADITIONAL, DynamicMessagesConfiguration.DynamoKeyScheme.LAZY_DELETION), - List.of(DynamicMessagesConfiguration.DynamoKeyScheme.LAZY_DELETION, DynamicMessagesConfiguration.DynamoKeyScheme.TRADITIONAL) - ); - } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 59cd33e60..4299bf8d4 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -44,7 +44,6 @@ import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; @@ -88,9 +87,6 @@ class WebSocketConnectionIntegrationTest { @BeforeEach void setUp() throws Exception { - final DynamicConfigurationManager mockDynamicConfigurationManager = mock(DynamicConfigurationManager.class); - when(mockDynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); - sharedExecutorService = Executors.newSingleThreadExecutor(); scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); @@ -98,7 +94,7 @@ class WebSocketConnectionIntegrationTest { messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC()); messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(), DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.MESSAGES.tableName(), Duration.ofDays(7), - mockDynamicConfigurationManager, sharedExecutorService); + sharedExecutorService); reportMessageManager = mock(ReportMessageManager.class); account = mock(Account.class); device = mock(Device.class);