From d0e3fb19019b1c5b6043ceff17c6fb477e3b0e16 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 27 Jul 2022 17:58:09 -0400 Subject: [PATCH] Unconditionally write messages to the messages table as envelopes --- .../textsecuregcm/WhisperServerService.java | 2 +- .../dynamic/DynamicConfiguration.java | 8 --- .../DynamicMessageTableConfiguration.java | 18 ----- .../storage/MessagesDynamoDb.java | 71 ++++++++----------- .../workers/AssignUsernameCommand.java | 2 +- .../workers/DeleteUserCommand.java | 2 +- .../SetUserDiscoverabilityCommand.java | 2 +- .../MessagePersisterIntegrationTest.java | 35 +++++---- .../storage/MessagesDynamoDbTest.java | 25 +------ .../WebSocketConnectionIntegrationTest.java | 15 +--- 10 files changed, 56 insertions(+), 124 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicMessageTableConfiguration.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 281a40f1b..3479e6512 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -342,7 +342,7 @@ public class WhisperServerService extends Application getExperimentEnrollmentConfiguration( final String experimentName) { return Optional.ofNullable(experiments.get(experimentName)); @@ -130,8 +126,4 @@ public class DynamicConfiguration { public DynamicMessagePersisterConfiguration getMessagePersisterConfiguration() { return messagePersister; } - - public DynamicMessageTableConfiguration getMessageTableConfiguration() { - return messageTable; - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicMessageTableConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicMessageTableConfiguration.java deleted file mode 100644 index fe3c47e28..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicMessageTableConfiguration.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright 2013-2022 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.configuration.dynamic; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class DynamicMessageTableConfiguration { - - @JsonProperty - private boolean writeEnvelopes = false; - - public boolean isWriteEnvelopes() { - return writeEnvelopes; - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index f82d889cc..3e63671cc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.storage; import static com.codahale.metrics.MetricRegistry.name; import static io.micrometer.core.instrument.Metrics.timer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.protobuf.InvalidProtocolBufferException; import io.micrometer.core.instrument.Counter; @@ -24,7 +25,6 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.util.AttributeValues; @@ -48,17 +48,34 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { private static final String LOCAL_INDEX_MESSAGE_UUID_NAME = "Message_UUID_Index"; private static final String LOCAL_INDEX_MESSAGE_UUID_KEY_SORT = "U"; - private static final String KEY_TYPE = "T"; - private static final String KEY_TIMESTAMP = "TS"; - private static final String KEY_SOURCE = "SN"; - private static final String KEY_SOURCE_UUID = "SU"; - private static final String KEY_SOURCE_DEVICE = "SD"; - private static final String KEY_DESTINATION_UUID = "DU"; - private static final String KEY_UPDATED_PNI = "UP"; - private static final String KEY_CONTENT = "C"; private static final String KEY_TTL = "E"; private static final String KEY_ENVELOPE_BYTES = "EB"; + // TODO Stop reading messages by attribute value after DATE + @Deprecated + private static final String KEY_TYPE = "T"; + + @Deprecated + private static final String KEY_TIMESTAMP = "TS"; + + @Deprecated + private static final String KEY_SOURCE = "SN"; + + @Deprecated + private static final String KEY_SOURCE_UUID = "SU"; + + @Deprecated + private static final String KEY_SOURCE_DEVICE = "SD"; + + @Deprecated + private static final String KEY_DESTINATION_UUID = "DU"; + + @Deprecated + private static final String KEY_UPDATED_PNI = "UP"; + + @Deprecated + private static final String KEY_CONTENT = "C"; + private final Timer storeTimer = timer(name(getClass(), "store")); private final Timer loadTimer = timer(name(getClass(), "load")); private final Timer deleteByGuid = timer(name(getClass(), "delete", "guid")); @@ -69,20 +86,16 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { private final String tableName; private final Duration timeToLive; - private final DynamicConfigurationManager dynamicConfigurationManager; - private static final Counter GET_MESSAGE_WITH_ATTRIBUTES_COUNTER = Metrics.counter(name(MessagesDynamoDb.class, "loadMessage"), "format", "attributes"); private static final Counter GET_MESSAGE_WITH_ENVELOPE_COUNTER = Metrics.counter(name(MessagesDynamoDb.class, "loadMessage"), "format", "envelope"); private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class); - public MessagesDynamoDb(DynamoDbClient dynamoDb, String tableName, Duration timeToLive, - final DynamicConfigurationManager dynamicConfigurationManager) { + public MessagesDynamoDb(DynamoDbClient dynamoDb, String tableName, Duration timeToLive) { super(dynamoDb); this.tableName = tableName; this.timeToLive = timeToLive; - this.dynamicConfigurationManager = dynamicConfigurationManager; } public void store(final List messages, final UUID destinationAccountUuid, final long destinationDeviceId) { @@ -103,31 +116,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { .put(KEY_PARTITION, partitionKey) .put(KEY_SORT, convertSortKey(destinationDeviceId, message.getServerTimestamp(), messageUuid)) .put(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid)) - .put(KEY_TTL, AttributeValues.fromLong(getTtlForMessage(message))); - - if (dynamicConfigurationManager.getConfiguration().getMessageTableConfiguration().isWriteEnvelopes()) { - item.put(KEY_ENVELOPE_BYTES, AttributeValue.builder().b(SdkBytes.fromByteArray(message.toByteArray())).build()); - } else { - item.put(KEY_TYPE, AttributeValues.fromInt(message.getType().getNumber())) - .put(KEY_TIMESTAMP, AttributeValues.fromLong(message.getTimestamp())) - .put(KEY_DESTINATION_UUID, AttributeValues.fromUUID(UUID.fromString(message.getDestinationUuid()))); - - if (message.hasUpdatedPni()) { - item.put(KEY_UPDATED_PNI, AttributeValues.fromUUID(UUID.fromString(message.getUpdatedPni()))); - } - if (message.hasSource()) { - item.put(KEY_SOURCE, AttributeValues.fromString(message.getSource())); - } - if (message.hasSourceUuid()) { - item.put(KEY_SOURCE_UUID, AttributeValues.fromUUID(UUID.fromString(message.getSourceUuid()))); - } - if (message.hasSourceDevice()) { - item.put(KEY_SOURCE_DEVICE, AttributeValues.fromInt(message.getSourceDevice())); - } - if (message.hasContent()) { - item.put(KEY_CONTENT, AttributeValues.fromByteArray(message.getContent().toByteArray())); - } - } + .put(KEY_TTL, AttributeValues.fromLong(getTtlForMessage(message))) + .put(KEY_ENVELOPE_BYTES, AttributeValue.builder().b(SdkBytes.fromByteArray(message.toByteArray())).build()); writeItems.add(WriteRequest.builder().putRequest(PutRequest.builder() .item(item.build()) @@ -272,7 +262,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { }); } - private MessageProtos.Envelope convertItemToEnvelope(final Map item) + @VisibleForTesting + static MessageProtos.Envelope convertItemToEnvelope(final Map item) throws InvalidProtocolBufferException { final MessageProtos.Envelope envelope; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java index 04fb81615..d1d2524ba 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java @@ -152,7 +152,7 @@ public class AssignUsernameCommand extends EnvironmentCommand dynamicConfigurationManager = mock(DynamicConfigurationManager.class); + when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); + final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), - MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(14), dynamicConfigurationManager); + MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(14)); final AccountsManager accountsManager = mock(AccountsManager.class); notificationExecutorService = Executors.newSingleThreadExecutor(); @@ -146,20 +146,19 @@ class MessagePersisterIntegrationTest { messagePersister.stop(); - final List persistedMessages = new ArrayList<>(messageCount); - DynamoDbClient dynamoDB = dynamoDbExtension.getDynamoDbClient(); - for (Map item : dynamoDB - .scan(ScanRequest.builder().tableName(MessagesDynamoDbExtension.TABLE_NAME).build()).items()) { - persistedMessages.add(MessageProtos.Envelope.newBuilder() - .setServerGuid(AttributeValues.getUUID(item, "U", null).toString()) - .setType(Type.forNumber(AttributeValues.getInt(item, "T", -1))) - .setTimestamp(AttributeValues.getLong(item, "TS", -1)) - .setServerTimestamp(extractServerTimestamp(AttributeValues.getByteArray(item, "S", null))) - .setContent(ByteString.copyFrom(AttributeValues.getByteArray(item, "C", null))) - .setDestinationUuid(AttributeValues.getUUID(item, "DU", null).toString()) - .build()); - } + + final List persistedMessages = + dynamoDB.scan(ScanRequest.builder().tableName(MessagesDynamoDbExtension.TABLE_NAME).build()).items().stream() + .map(item -> { + try { + return MessagesDynamoDb.convertItemToEnvelope(item); + } catch (InvalidProtocolBufferException e) { + fail("Could not parse stored message", e); + return null; + } + }) + .toList(); assertEquals(expectedMessages, persistedMessages); }); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java index c968257e3..f73b85aa2 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java @@ -6,8 +6,6 @@ package org.whispersystems.textsecuregcm.storage; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import com.google.protobuf.ByteString; import java.time.Duration; @@ -17,10 +15,6 @@ import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessageTableConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbExtension; @@ -67,7 +61,6 @@ class MessagesDynamoDbTest { MESSAGE3 = builder.build(); } - private DynamicMessageTableConfiguration dynamicMessageTableConfiguration; private MessagesDynamoDb messagesDynamoDb; @@ -76,24 +69,12 @@ class MessagesDynamoDbTest { @BeforeEach void setup() { - @SuppressWarnings("unchecked") final DynamicConfigurationManager dynamicConfigurationManager = - mock(DynamicConfigurationManager.class); - - final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); - dynamicMessageTableConfiguration = mock(DynamicMessageTableConfiguration.class); - - when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); - when(dynamicConfiguration.getMessageTableConfiguration()).thenReturn(dynamicMessageTableConfiguration); - messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), MessagesDynamoDbExtension.TABLE_NAME, - Duration.ofDays(14), dynamicConfigurationManager); + Duration.ofDays(14)); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testSimpleFetchAfterInsert(final boolean writeEnvelopes) { - when(dynamicMessageTableConfiguration.isWriteEnvelopes()).thenReturn(writeEnvelopes); - + @Test + void testSimpleFetchAfterInsert() { final UUID destinationUuid = UUID.randomUUID(); final int destinationDeviceId = random.nextInt(255) + 1; messagesDynamoDb.store(List.of(MESSAGE1, MESSAGE2, MESSAGE3), destinationUuid, destinationDeviceId); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index fee26d126..08b5de92a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -41,8 +41,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.ArgumentCaptor; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessageTableConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; @@ -50,7 +48,6 @@ import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.DynamoDbExtension; import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb; @@ -86,21 +83,11 @@ class WebSocketConnectionIntegrationTest { @BeforeEach void setUp() throws Exception { - @SuppressWarnings("unchecked") final DynamicConfigurationManager dynamicConfigurationManager = - mock(DynamicConfigurationManager.class); - - final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); - final DynamicMessageTableConfiguration dynamicMessageTableConfiguration = mock(DynamicMessageTableConfiguration.class); - - when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); - when(dynamicConfiguration.getMessageTableConfiguration()).thenReturn(dynamicMessageTableConfiguration); - - executorService = Executors.newSingleThreadExecutor(); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), REDIS_CLUSTER_EXTENSION.getRedisCluster(), executorService); messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), MessagesDynamoDbExtension.TABLE_NAME, - Duration.ofDays(7), dynamicConfigurationManager); + Duration.ofDays(7)); reportMessageManager = mock(ReportMessageManager.class); account = mock(Account.class); device = mock(Device.class);