From 04287c5073afbcc03e0e15de856b05cddf64b839 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 27 Jul 2022 17:21:54 -0400 Subject: [PATCH] Optionally write messages as envelopes to the messages table --- .../textsecuregcm/WhisperServerService.java | 2 +- .../dynamic/DynamicConfiguration.java | 8 +++ .../DynamicMessageTableConfiguration.java | 18 +++++ .../storage/MessagesDynamoDb.java | 69 ++++++++++--------- .../workers/AssignUsernameCommand.java | 2 +- .../workers/DeleteUserCommand.java | 2 +- .../SetUserDiscoverabilityCommand.java | 2 +- .../MessagePersisterIntegrationTest.java | 7 +- .../storage/MessagesDynamoDbTest.java | 54 ++++++--------- .../WebSocketConnectionIntegrationTest.java | 15 +++- 10 files changed, 108 insertions(+), 71 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicMessageTableConfiguration.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 3479e6512..281a40f1b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -342,7 +342,7 @@ public class WhisperServerService extends Application getExperimentEnrollmentConfiguration( final String experimentName) { return Optional.ofNullable(experiments.get(experimentName)); @@ -126,4 +130,8 @@ public class DynamicConfiguration { public DynamicMessagePersisterConfiguration getMessagePersisterConfiguration() { return messagePersister; } + + public DynamicMessageTableConfiguration getMessageTableConfiguration() { + return messageTable; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicMessageTableConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicMessageTableConfiguration.java new file mode 100644 index 000000000..fe3c47e28 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicMessageTableConfiguration.java @@ -0,0 +1,18 @@ +/* + * Copyright 2013-2022 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.configuration.dynamic; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class DynamicMessageTableConfiguration { + + @JsonProperty + private boolean writeEnvelopes = false; + + public boolean isWriteEnvelopes() { + return writeEnvelopes; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index 5195ff688..f82d889cc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -8,7 +8,6 @@ package org.whispersystems.textsecuregcm.storage; import static com.codahale.metrics.MetricRegistry.name; import static io.micrometer.core.instrument.Metrics.timer; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.protobuf.InvalidProtocolBufferException; import io.micrometer.core.instrument.Counter; @@ -25,10 +24,12 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.UUIDUtil; +import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; @@ -41,11 +42,8 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest; public class MessagesDynamoDb extends AbstractDynamoDbStore { - @VisibleForTesting - static final String KEY_PARTITION = "H"; - - @VisibleForTesting - static final String KEY_SORT = "S"; + private static final String KEY_PARTITION = "H"; + private static final String KEY_SORT = "S"; private static final String LOCAL_INDEX_MESSAGE_UUID_NAME = "Message_UUID_Index"; private static final String LOCAL_INDEX_MESSAGE_UUID_KEY_SORT = "U"; @@ -59,9 +57,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { private static final String KEY_UPDATED_PNI = "UP"; private static final String KEY_CONTENT = "C"; private static final String KEY_TTL = "E"; - - @VisibleForTesting - static final String KEY_ENVELOPE_BYTES = "EB"; + private static final String KEY_ENVELOPE_BYTES = "EB"; private final Timer storeTimer = timer(name(getClass(), "store")); private final Timer loadTimer = timer(name(getClass(), "load")); @@ -73,16 +69,20 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { private final String tableName; private final Duration timeToLive; + private final DynamicConfigurationManager dynamicConfigurationManager; + private static final Counter GET_MESSAGE_WITH_ATTRIBUTES_COUNTER = Metrics.counter(name(MessagesDynamoDb.class, "loadMessage"), "format", "attributes"); private static final Counter GET_MESSAGE_WITH_ENVELOPE_COUNTER = Metrics.counter(name(MessagesDynamoDb.class, "loadMessage"), "format", "envelope"); private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class); - public MessagesDynamoDb(DynamoDbClient dynamoDb, String tableName, Duration timeToLive) { + public MessagesDynamoDb(DynamoDbClient dynamoDb, String tableName, Duration timeToLive, + final DynamicConfigurationManager dynamicConfigurationManager) { super(dynamoDb); this.tableName = tableName; this.timeToLive = timeToLive; + this.dynamicConfigurationManager = dynamicConfigurationManager; } public void store(final List messages, final UUID destinationAccountUuid, final long destinationDeviceId) { @@ -98,30 +98,37 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { List writeItems = new ArrayList<>(); for (MessageProtos.Envelope message : messages) { final UUID messageUuid = UUID.fromString(message.getServerGuid()); + final ImmutableMap.Builder item = ImmutableMap.builder() .put(KEY_PARTITION, partitionKey) .put(KEY_SORT, convertSortKey(destinationDeviceId, message.getServerTimestamp(), messageUuid)) .put(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid)) - .put(KEY_TYPE, AttributeValues.fromInt(message.getType().getNumber())) - .put(KEY_TIMESTAMP, AttributeValues.fromLong(message.getTimestamp())) - .put(KEY_TTL, AttributeValues.fromLong(getTtlForMessage(message))) - .put(KEY_DESTINATION_UUID, AttributeValues.fromUUID(UUID.fromString(message.getDestinationUuid()))); + .put(KEY_TTL, AttributeValues.fromLong(getTtlForMessage(message))); - if (message.hasUpdatedPni()) { - item.put(KEY_UPDATED_PNI, AttributeValues.fromUUID(UUID.fromString(message.getUpdatedPni()))); - } - if (message.hasSource()) { - item.put(KEY_SOURCE, AttributeValues.fromString(message.getSource())); - } - if (message.hasSourceUuid()) { - item.put(KEY_SOURCE_UUID, AttributeValues.fromUUID(UUID.fromString(message.getSourceUuid()))); - } - if (message.hasSourceDevice()) { - item.put(KEY_SOURCE_DEVICE, AttributeValues.fromInt(message.getSourceDevice())); - } - if (message.hasContent()) { - item.put(KEY_CONTENT, AttributeValues.fromByteArray(message.getContent().toByteArray())); + if (dynamicConfigurationManager.getConfiguration().getMessageTableConfiguration().isWriteEnvelopes()) { + item.put(KEY_ENVELOPE_BYTES, AttributeValue.builder().b(SdkBytes.fromByteArray(message.toByteArray())).build()); + } else { + item.put(KEY_TYPE, AttributeValues.fromInt(message.getType().getNumber())) + .put(KEY_TIMESTAMP, AttributeValues.fromLong(message.getTimestamp())) + .put(KEY_DESTINATION_UUID, AttributeValues.fromUUID(UUID.fromString(message.getDestinationUuid()))); + + if (message.hasUpdatedPni()) { + item.put(KEY_UPDATED_PNI, AttributeValues.fromUUID(UUID.fromString(message.getUpdatedPni()))); + } + if (message.hasSource()) { + item.put(KEY_SOURCE, AttributeValues.fromString(message.getSource())); + } + if (message.hasSourceUuid()) { + item.put(KEY_SOURCE_UUID, AttributeValues.fromUUID(UUID.fromString(message.getSourceUuid()))); + } + if (message.hasSourceDevice()) { + item.put(KEY_SOURCE_DEVICE, AttributeValues.fromInt(message.getSourceDevice())); + } + if (message.hasContent()) { + item.put(KEY_CONTENT, AttributeValues.fromByteArray(message.getContent().toByteArray())); + } } + writeItems.add(WriteRequest.builder().putRequest(PutRequest.builder() .item(item.build()) .build()).build()); @@ -313,13 +320,11 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { return message.getServerTimestamp() / 1000 + timeToLive.getSeconds(); } - @VisibleForTesting - static AttributeValue convertPartitionKey(final UUID destinationAccountUuid) { + private static AttributeValue convertPartitionKey(final UUID destinationAccountUuid) { return AttributeValues.fromUUID(destinationAccountUuid); } - @VisibleForTesting - static AttributeValue convertSortKey(final long destinationDeviceId, final long serverTimestamp, final UUID messageUuid) { + private static AttributeValue convertSortKey(final long destinationDeviceId, final long serverTimestamp, final UUID messageUuid) { ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[32]); byteBuffer.putLong(destinationDeviceId); byteBuffer.putLong(serverTimestamp); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java index d1d2524ba..04fb81615 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java @@ -152,7 +152,7 @@ public class AssignUsernameCommand extends EnvironmentCommand dynamicConfigurationManager = + mock(DynamicConfigurationManager.class); + final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), - MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(14)); + MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(14), dynamicConfigurationManager); final AccountsManager accountsManager = mock(AccountsManager.class); - final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class); notificationExecutorService = Executors.newSingleThreadExecutor(); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), @@ -83,6 +85,7 @@ class MessagePersisterIntegrationTest { when(account.getNumber()).thenReturn("+18005551234"); when(account.getUuid()).thenReturn(accountUuid); when(accountsManager.getByAccountIdentifier(accountUuid)).thenReturn(Optional.of(account)); + when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); messagesCache.start(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java index b620dc080..c968257e3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java @@ -6,21 +6,23 @@ package org.whispersystems.textsecuregcm.storage; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.protobuf.ByteString; import java.time.Duration; import java.util.List; -import java.util.Map; import java.util.Random; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessageTableConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbExtension; -import software.amazon.awssdk.core.SdkBytes; -import software.amazon.awssdk.services.dynamodb.model.AttributeValue; -import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; class MessagesDynamoDbTest { @@ -65,6 +67,7 @@ class MessagesDynamoDbTest { MESSAGE3 = builder.build(); } + private DynamicMessageTableConfiguration dynamicMessageTableConfiguration; private MessagesDynamoDb messagesDynamoDb; @@ -73,12 +76,24 @@ class MessagesDynamoDbTest { @BeforeEach void setup() { + @SuppressWarnings("unchecked") final DynamicConfigurationManager dynamicConfigurationManager = + mock(DynamicConfigurationManager.class); + + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + dynamicMessageTableConfiguration = mock(DynamicMessageTableConfiguration.class); + + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + when(dynamicConfiguration.getMessageTableConfiguration()).thenReturn(dynamicMessageTableConfiguration); + messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), MessagesDynamoDbExtension.TABLE_NAME, - Duration.ofDays(14)); + Duration.ofDays(14), dynamicConfigurationManager); } - @Test - void testSimpleFetchAfterInsert() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testSimpleFetchAfterInsert(final boolean writeEnvelopes) { + when(dynamicMessageTableConfiguration.isWriteEnvelopes()).thenReturn(writeEnvelopes); + final UUID destinationUuid = UUID.randomUUID(); final int destinationDeviceId = random.nextInt(255) + 1; messagesDynamoDb.store(List.of(MESSAGE1, MESSAGE2, MESSAGE3), destinationUuid, destinationDeviceId); @@ -94,31 +109,6 @@ class MessagesDynamoDbTest { assertThat(messagesStored).element(2).isEqualTo(MESSAGE2); } - @Test - void testFetchBareEnvelope() { - final UUID destinationUuid = UUID.randomUUID(); - final long destinationDeviceId = Device.MASTER_ID; - final long serverTimestamp = System.currentTimeMillis(); - final UUID messageGuid = UUID.randomUUID(); - - final MessageProtos.Envelope envelope = MessageProtos.Envelope.newBuilder() - .setServerGuid(messageGuid.toString()) - .setDestinationUuid(destinationUuid.toString()) - .setServerTimestamp(serverTimestamp) - .build(); - - dynamoDbExtension.getDynamoDbClient().putItem(PutItemRequest.builder() - .tableName(dynamoDbExtension.getTableName()) - .item(Map.of( - MessagesDynamoDb.KEY_PARTITION, MessagesDynamoDb.convertPartitionKey(destinationUuid), - MessagesDynamoDb.KEY_SORT, MessagesDynamoDb.convertSortKey(destinationDeviceId, serverTimestamp, messageGuid), - MessagesDynamoDb.KEY_ENVELOPE_BYTES, AttributeValue.builder().b(SdkBytes.fromByteArray(envelope.toByteArray())).build())) - .build()); - - assertThat(messagesDynamoDb.load(destinationUuid, destinationDeviceId, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)) - .isEqualTo(List.of(envelope)); - } - @Test void testDeleteForDestination() { final UUID destinationUuid = UUID.randomUUID(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 08b5de92a..fee26d126 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -41,6 +41,8 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.ArgumentCaptor; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessageTableConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; @@ -48,6 +50,7 @@ import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.DynamoDbExtension; import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb; @@ -83,11 +86,21 @@ class WebSocketConnectionIntegrationTest { @BeforeEach void setUp() throws Exception { + @SuppressWarnings("unchecked") final DynamicConfigurationManager dynamicConfigurationManager = + mock(DynamicConfigurationManager.class); + + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + final DynamicMessageTableConfiguration dynamicMessageTableConfiguration = mock(DynamicMessageTableConfiguration.class); + + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + when(dynamicConfiguration.getMessageTableConfiguration()).thenReturn(dynamicMessageTableConfiguration); + + executorService = Executors.newSingleThreadExecutor(); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), REDIS_CLUSTER_EXTENSION.getRedisCluster(), executorService); messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), MessagesDynamoDbExtension.TABLE_NAME, - Duration.ofDays(7)); + Duration.ofDays(7), dynamicConfigurationManager); reportMessageManager = mock(ReportMessageManager.class); account = mock(Account.class); device = mock(Device.class);