Stop reading attribute-based messages from the messages table
This commit is contained in:
parent
1891622e69
commit
e72d1d0b6f
|
@ -11,8 +11,6 @@ import static io.micrometer.core.instrument.Metrics.timer;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import io.micrometer.core.instrument.Counter;
|
|
||||||
import io.micrometer.core.instrument.Metrics;
|
|
||||||
import io.micrometer.core.instrument.Timer;
|
import io.micrometer.core.instrument.Timer;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
@ -26,9 +24,7 @@ import javax.annotation.Nonnull;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
|
||||||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||||
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
|
||||||
import software.amazon.awssdk.core.SdkBytes;
|
import software.amazon.awssdk.core.SdkBytes;
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||||
|
@ -51,26 +47,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||||
private static final String KEY_TTL = "E";
|
private static final String KEY_TTL = "E";
|
||||||
private static final String KEY_ENVELOPE_BYTES = "EB";
|
private static final String KEY_ENVELOPE_BYTES = "EB";
|
||||||
|
|
||||||
// TODO Stop reading messages by attribute value after DATE
|
|
||||||
@Deprecated
|
|
||||||
private static final String KEY_TYPE = "T";
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
private static final String KEY_TIMESTAMP = "TS";
|
|
||||||
private static final String KEY_SOURCE_UUID = "SU";
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
private static final String KEY_SOURCE_DEVICE = "SD";
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
private static final String KEY_DESTINATION_UUID = "DU";
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
private static final String KEY_UPDATED_PNI = "UP";
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
private static final String KEY_CONTENT = "C";
|
|
||||||
|
|
||||||
private final Timer storeTimer = timer(name(getClass(), "store"));
|
private final Timer storeTimer = timer(name(getClass(), "store"));
|
||||||
private final Timer loadTimer = timer(name(getClass(), "load"));
|
private final Timer loadTimer = timer(name(getClass(), "load"));
|
||||||
private final Timer deleteByGuid = timer(name(getClass(), "delete", "guid"));
|
private final Timer deleteByGuid = timer(name(getClass(), "delete", "guid"));
|
||||||
|
@ -81,9 +57,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||||
private final String tableName;
|
private final String tableName;
|
||||||
private final Duration timeToLive;
|
private final Duration timeToLive;
|
||||||
|
|
||||||
private static final Counter GET_MESSAGE_WITH_ATTRIBUTES_COUNTER = Metrics.counter(name(MessagesDynamoDb.class, "loadMessage"), "format", "attributes");
|
|
||||||
private static final Counter GET_MESSAGE_WITH_ENVELOPE_COUNTER = Metrics.counter(name(MessagesDynamoDb.class, "loadMessage"), "format", "envelope");
|
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class);
|
private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class);
|
||||||
|
|
||||||
public MessagesDynamoDb(DynamoDbClient dynamoDb, String tableName, Duration timeToLive) {
|
public MessagesDynamoDb(DynamoDbClient dynamoDb, String tableName, Duration timeToLive) {
|
||||||
|
@ -260,30 +233,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static MessageProtos.Envelope convertItemToEnvelope(final Map<String, AttributeValue> item)
|
static MessageProtos.Envelope convertItemToEnvelope(final Map<String, AttributeValue> item)
|
||||||
throws InvalidProtocolBufferException {
|
throws InvalidProtocolBufferException {
|
||||||
final MessageProtos.Envelope envelope;
|
|
||||||
|
|
||||||
if (item.containsKey(KEY_ENVELOPE_BYTES)) {
|
return MessageProtos.Envelope.parseFrom(item.get(KEY_ENVELOPE_BYTES).b().asByteArray());
|
||||||
envelope = MessageProtos.Envelope.parseFrom(item.get(KEY_ENVELOPE_BYTES).b().asByteArray());
|
|
||||||
|
|
||||||
GET_MESSAGE_WITH_ENVELOPE_COUNTER.increment();
|
|
||||||
} else {
|
|
||||||
final SortKey sortKey = convertSortKey(item.get(KEY_SORT).b().asByteArray());
|
|
||||||
final UUID messageUuid = convertLocalIndexMessageUuidSortKey(item.get(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT).b().asByteArray());
|
|
||||||
final int type = AttributeValues.getInt(item, KEY_TYPE, 0);
|
|
||||||
final long timestamp = AttributeValues.getLong(item, KEY_TIMESTAMP, 0L);
|
|
||||||
final UUID sourceUuid = AttributeValues.getUUID(item, KEY_SOURCE_UUID, null);
|
|
||||||
final int sourceDevice = AttributeValues.getInt(item, KEY_SOURCE_DEVICE, 0);
|
|
||||||
final UUID destinationUuid = AttributeValues.getUUID(item, KEY_DESTINATION_UUID, null);
|
|
||||||
final byte[] content = AttributeValues.getByteArray(item, KEY_CONTENT, null);
|
|
||||||
final UUID updatedPni = AttributeValues.getUUID(item, KEY_UPDATED_PNI, null);
|
|
||||||
|
|
||||||
envelope = new OutgoingMessageEntity(messageUuid, type, timestamp, sourceUuid, sourceDevice, destinationUuid,
|
|
||||||
updatedPni, content, sortKey.getServerTimestamp(), true).toEnvelope();
|
|
||||||
|
|
||||||
GET_MESSAGE_WITH_ATTRIBUTES_COUNTER.increment();
|
|
||||||
}
|
|
||||||
|
|
||||||
return envelope;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteRowsMatchingQuery(AttributeValue partitionKey, QueryRequest querySpec) {
|
private void deleteRowsMatchingQuery(AttributeValue partitionKey, QueryRequest querySpec) {
|
||||||
|
@ -324,56 +275,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||||
return AttributeValues.fromByteBuffer(byteBuffer.flip());
|
return AttributeValues.fromByteBuffer(byteBuffer.flip());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static SortKey convertSortKey(final byte[] bytes) {
|
|
||||||
if (bytes.length != 32) {
|
|
||||||
throw new IllegalArgumentException("unexpected sort key byte length");
|
|
||||||
}
|
|
||||||
|
|
||||||
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
|
|
||||||
final long destinationDeviceId = byteBuffer.getLong();
|
|
||||||
final long serverTimestamp = byteBuffer.getLong();
|
|
||||||
final long mostSigBits = byteBuffer.getLong();
|
|
||||||
final long leastSigBits = byteBuffer.getLong();
|
|
||||||
return new SortKey(destinationDeviceId, serverTimestamp, new UUID(mostSigBits, leastSigBits));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static AttributeValue convertLocalIndexMessageUuidSortKey(final UUID messageUuid) {
|
private static AttributeValue convertLocalIndexMessageUuidSortKey(final UUID messageUuid) {
|
||||||
return AttributeValues.fromUUID(messageUuid);
|
return AttributeValues.fromUUID(messageUuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static UUID convertLocalIndexMessageUuidSortKey(final byte[] bytes) {
|
|
||||||
return convertUuidFromBytes(bytes, "local index message uuid sort key");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static UUID convertUuidFromBytes(final byte[] bytes, final String name) {
|
|
||||||
try {
|
|
||||||
return UUIDUtil.fromBytes(bytes);
|
|
||||||
} catch (final IllegalArgumentException e) {
|
|
||||||
throw new IllegalArgumentException("unexpected " + name + " byte length; was " + bytes.length + " but expected 16");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final class SortKey {
|
|
||||||
private final long destinationDeviceId;
|
|
||||||
private final long serverTimestamp;
|
|
||||||
private final UUID messageUuid;
|
|
||||||
|
|
||||||
public SortKey(long destinationDeviceId, long serverTimestamp, UUID messageUuid) {
|
|
||||||
this.destinationDeviceId = destinationDeviceId;
|
|
||||||
this.serverTimestamp = serverTimestamp;
|
|
||||||
this.messageUuid = messageUuid;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getDestinationDeviceId() {
|
|
||||||
return destinationDeviceId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getServerTimestamp() {
|
|
||||||
return serverTimestamp;
|
|
||||||
}
|
|
||||||
|
|
||||||
public UUID getMessageUuid() {
|
|
||||||
return messageUuid;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue