Read bare envelopes from the messages table if possible

This commit is contained in:
Jon Chambers 2022-07-27 16:55:08 -04:00 committed by Jon Chambers
parent d582942244
commit 0c76fdd36c
2 changed files with 98 additions and 26 deletions

View File

@ -8,7 +8,11 @@ package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
import static io.micrometer.core.instrument.Metrics.timer; import static io.micrometer.core.instrument.Metrics.timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.protobuf.InvalidProtocolBufferException;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.Duration; import java.time.Duration;
@ -19,6 +23,8 @@ import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.AttributeValues;
@ -35,8 +41,12 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
public class MessagesDynamoDb extends AbstractDynamoDbStore { public class MessagesDynamoDb extends AbstractDynamoDbStore {
private static final String KEY_PARTITION = "H"; @VisibleForTesting
private static final String KEY_SORT = "S"; static final String KEY_PARTITION = "H";
@VisibleForTesting
static final String KEY_SORT = "S";
private static final String LOCAL_INDEX_MESSAGE_UUID_NAME = "Message_UUID_Index"; private static final String LOCAL_INDEX_MESSAGE_UUID_NAME = "Message_UUID_Index";
private static final String LOCAL_INDEX_MESSAGE_UUID_KEY_SORT = "U"; private static final String LOCAL_INDEX_MESSAGE_UUID_KEY_SORT = "U";
@ -50,6 +60,9 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
private static final String KEY_CONTENT = "C"; private static final String KEY_CONTENT = "C";
private static final String KEY_TTL = "E"; private static final String KEY_TTL = "E";
@VisibleForTesting
static final String KEY_ENVELOPE_BYTES = "EB";
private final Timer storeTimer = timer(name(getClass(), "store")); private final Timer storeTimer = timer(name(getClass(), "store"));
private final Timer loadTimer = timer(name(getClass(), "load")); private final Timer loadTimer = timer(name(getClass(), "load"));
private final Timer deleteByGuid = timer(name(getClass(), "delete", "guid")); private final Timer deleteByGuid = timer(name(getClass(), "delete", "guid"));
@ -60,6 +73,11 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
private final String tableName; private final String tableName;
private final Duration timeToLive; private final Duration timeToLive;
private static final Counter GET_MESSAGE_WITH_ATTRIBUTES_COUNTER = Metrics.counter(name(MessagesDynamoDb.class, "loadMessage"), "format", "attributes");
private static final Counter GET_MESSAGE_WITH_ENVELOPE_COUNTER = Metrics.counter(name(MessagesDynamoDb.class, "loadMessage"), "format", "envelope");
private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class);
public MessagesDynamoDb(DynamoDbClient dynamoDb, String tableName, Duration timeToLive) { public MessagesDynamoDb(DynamoDbClient dynamoDb, String tableName, Duration timeToLive) {
super(dynamoDb); super(dynamoDb);
@ -130,7 +148,12 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
.build(); .build();
List<MessageProtos.Envelope> messageEntities = new ArrayList<>(numberOfMessagesToFetch); List<MessageProtos.Envelope> messageEntities = new ArrayList<>(numberOfMessagesToFetch);
for (Map<String, AttributeValue> message : db().queryPaginator(queryRequest).items()) { for (Map<String, AttributeValue> message : db().queryPaginator(queryRequest).items()) {
messageEntities.add(convertItemToEnvelope(message)); try {
messageEntities.add(convertItemToEnvelope(message));
} catch (final InvalidProtocolBufferException e) {
logger.error("Failed to parse envelope", e);
}
if (messageEntities.size() == numberOfMessagesToFetch) { if (messageEntities.size() == numberOfMessagesToFetch) {
// queryPaginator() uses limit() as the page size, not as an absolute limit // queryPaginator() uses limit() as the page size, not as an absolute limit
// but a page might be smaller than limit, because a page is capped at 1 MB // but a page might be smaller than limit, because a page is capped at 1 MB
@ -173,7 +196,12 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
.returnValues(ReturnValue.ALL_OLD); .returnValues(ReturnValue.ALL_OLD);
final DeleteItemResponse deleteItemResponse = db().deleteItem(deleteItemRequest.build()); final DeleteItemResponse deleteItemResponse = db().deleteItem(deleteItemRequest.build());
if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) { if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) {
return Optional.of(convertItemToEnvelope(deleteItemResponse.attributes())); try {
return Optional.of(convertItemToEnvelope(deleteItemResponse.attributes()));
} catch (final InvalidProtocolBufferException e) {
logger.error("Failed to parse envelope", e);
return Optional.empty();
}
} }
return Optional.empty(); return Optional.empty();
@ -193,7 +221,11 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
} }
final DeleteItemResponse deleteItemResponse = db().deleteItem(deleteItemRequest.build()); final DeleteItemResponse deleteItemResponse = db().deleteItem(deleteItemRequest.build());
if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) { if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) {
result = Optional.of(convertItemToEnvelope(deleteItemResponse.attributes())); try {
result = Optional.of(convertItemToEnvelope(deleteItemResponse.attributes()));
} catch (final InvalidProtocolBufferException e) {
logger.error("Failed to parse envelope", e);
}
} }
} }
return result; return result;
@ -233,20 +265,33 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
}); });
} }
private MessageProtos.Envelope convertItemToEnvelope(final Map<String, AttributeValue> item) { private MessageProtos.Envelope convertItemToEnvelope(final Map<String, AttributeValue> item)
final SortKey sortKey = convertSortKey(item.get(KEY_SORT).b().asByteArray()); throws InvalidProtocolBufferException {
final UUID messageUuid = convertLocalIndexMessageUuidSortKey(item.get(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT).b().asByteArray()); final MessageProtos.Envelope envelope;
final int type = AttributeValues.getInt(item, KEY_TYPE, 0);
final long timestamp = AttributeValues.getLong(item, KEY_TIMESTAMP, 0L);
final String source = AttributeValues.getString(item, KEY_SOURCE, null);
final UUID sourceUuid = AttributeValues.getUUID(item, KEY_SOURCE_UUID, null);
final int sourceDevice = AttributeValues.getInt(item, KEY_SOURCE_DEVICE, 0);
final UUID destinationUuid = AttributeValues.getUUID(item, KEY_DESTINATION_UUID, null);
final byte[] content = AttributeValues.getByteArray(item, KEY_CONTENT, null);
final UUID updatedPni = AttributeValues.getUUID(item, KEY_UPDATED_PNI, null);
return new OutgoingMessageEntity(messageUuid, type, timestamp, source, sourceUuid, sourceDevice, destinationUuid, if (item.containsKey(KEY_ENVELOPE_BYTES)) {
updatedPni, content, sortKey.getServerTimestamp()).toEnvelope(); envelope = MessageProtos.Envelope.parseFrom(item.get(KEY_ENVELOPE_BYTES).b().asByteArray());
GET_MESSAGE_WITH_ENVELOPE_COUNTER.increment();
} else {
final SortKey sortKey = convertSortKey(item.get(KEY_SORT).b().asByteArray());
final UUID messageUuid = convertLocalIndexMessageUuidSortKey(item.get(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT).b().asByteArray());
final int type = AttributeValues.getInt(item, KEY_TYPE, 0);
final long timestamp = AttributeValues.getLong(item, KEY_TIMESTAMP, 0L);
final String source = AttributeValues.getString(item, KEY_SOURCE, null);
final UUID sourceUuid = AttributeValues.getUUID(item, KEY_SOURCE_UUID, null);
final int sourceDevice = AttributeValues.getInt(item, KEY_SOURCE_DEVICE, 0);
final UUID destinationUuid = AttributeValues.getUUID(item, KEY_DESTINATION_UUID, null);
final byte[] content = AttributeValues.getByteArray(item, KEY_CONTENT, null);
final UUID updatedPni = AttributeValues.getUUID(item, KEY_UPDATED_PNI, null);
envelope = new OutgoingMessageEntity(messageUuid, type, timestamp, source, sourceUuid, sourceDevice, destinationUuid,
updatedPni, content, sortKey.getServerTimestamp()).toEnvelope();
GET_MESSAGE_WITH_ATTRIBUTES_COUNTER.increment();
}
return envelope;
} }
private void deleteRowsMatchingQuery(AttributeValue partitionKey, QueryRequest querySpec) { private void deleteRowsMatchingQuery(AttributeValue partitionKey, QueryRequest querySpec) {
@ -268,11 +313,13 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
return message.getServerTimestamp() / 1000 + timeToLive.getSeconds(); return message.getServerTimestamp() / 1000 + timeToLive.getSeconds();
} }
private static AttributeValue convertPartitionKey(final UUID destinationAccountUuid) { @VisibleForTesting
static AttributeValue convertPartitionKey(final UUID destinationAccountUuid) {
return AttributeValues.fromUUID(destinationAccountUuid); return AttributeValues.fromUUID(destinationAccountUuid);
} }
private static AttributeValue convertSortKey(final long destinationDeviceId, final long serverTimestamp, final UUID messageUuid) { @VisibleForTesting
static AttributeValue convertSortKey(final long destinationDeviceId, final long serverTimestamp, final UUID messageUuid) {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[32]); ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[32]);
byteBuffer.putLong(destinationDeviceId); byteBuffer.putLong(destinationDeviceId);
byteBuffer.putLong(serverTimestamp); byteBuffer.putLong(serverTimestamp);

View File

@ -1,26 +1,26 @@
/* /*
* Copyright 2021 Signal Messenger, LLC * Copyright 2013-2022 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only * SPDX-License-Identifier: AGPL-3.0-only
*/ */
package org.whispersystems.textsecuregcm.tests.storage; package org.whispersystems.textsecuregcm.storage;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtension;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbExtension; import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbExtension;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
class MessagesDynamoDbTest { class MessagesDynamoDbTest {
@ -94,6 +94,31 @@ class MessagesDynamoDbTest {
assertThat(messagesStored).element(2).isEqualTo(MESSAGE2); assertThat(messagesStored).element(2).isEqualTo(MESSAGE2);
} }
@Test
void testFetchBareEnvelope() {
final UUID destinationUuid = UUID.randomUUID();
final long destinationDeviceId = Device.MASTER_ID;
final long serverTimestamp = System.currentTimeMillis();
final UUID messageGuid = UUID.randomUUID();
final MessageProtos.Envelope envelope = MessageProtos.Envelope.newBuilder()
.setServerGuid(messageGuid.toString())
.setDestinationUuid(destinationUuid.toString())
.setServerTimestamp(serverTimestamp)
.build();
dynamoDbExtension.getDynamoDbClient().putItem(PutItemRequest.builder()
.tableName(dynamoDbExtension.getTableName())
.item(Map.of(
MessagesDynamoDb.KEY_PARTITION, MessagesDynamoDb.convertPartitionKey(destinationUuid),
MessagesDynamoDb.KEY_SORT, MessagesDynamoDb.convertSortKey(destinationDeviceId, serverTimestamp, messageGuid),
MessagesDynamoDb.KEY_ENVELOPE_BYTES, AttributeValue.builder().b(SdkBytes.fromByteArray(envelope.toByteArray())).build()))
.build());
assertThat(messagesDynamoDb.load(destinationUuid, destinationDeviceId, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE))
.isEqualTo(List.of(envelope));
}
@Test @Test
void testDeleteForDestination() { void testDeleteForDestination() {
final UUID destinationUuid = UUID.randomUUID(); final UUID destinationUuid = UUID.randomUUID();