Store compressed envelopes at rest
This commit is contained in:
parent
c8f45685b8
commit
87c30d00e8
|
@ -57,7 +57,7 @@ class MessagesCacheInsertScript {
|
||||||
);
|
);
|
||||||
|
|
||||||
final List<byte[]> args = new ArrayList<>(Arrays.asList(
|
final List<byte[]> args = new ArrayList<>(Arrays.asList(
|
||||||
envelope.toByteArray(), // message
|
EnvelopeUtil.compress(envelope).toByteArray(), // message
|
||||||
String.valueOf(envelope.getServerTimestamp()).getBytes(StandardCharsets.UTF_8), // currentTime
|
String.valueOf(envelope.getServerTimestamp()).getBytes(StandardCharsets.UTF_8), // currentTime
|
||||||
envelope.getServerGuid().getBytes(StandardCharsets.UTF_8), // guid
|
envelope.getServerGuid().getBytes(StandardCharsets.UTF_8), // guid
|
||||||
NEW_MESSAGE_EVENT_BYTES // eventPayload
|
NEW_MESSAGE_EVENT_BYTES // eventPayload
|
||||||
|
|
|
@ -105,7 +105,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||||
.put(KEY_SORT, convertSortKey(message.getServerTimestamp(), messageUuid))
|
.put(KEY_SORT, convertSortKey(message.getServerTimestamp(), messageUuid))
|
||||||
.put(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid))
|
.put(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid))
|
||||||
.put(KEY_TTL, AttributeValues.fromLong(getTtlForMessage(message)))
|
.put(KEY_TTL, AttributeValues.fromLong(getTtlForMessage(message)))
|
||||||
.put(KEY_ENVELOPE_BYTES, AttributeValue.builder().b(SdkBytes.fromByteArray(message.toByteArray())).build());
|
.put(KEY_ENVELOPE_BYTES, AttributeValue.builder().b(SdkBytes.fromByteArray(EnvelopeUtil.compress(message).toByteArray())).build());
|
||||||
|
|
||||||
writeItems.add(WriteRequest.builder().putRequest(PutRequest.builder()
|
writeItems.add(WriteRequest.builder().putRequest(PutRequest.builder()
|
||||||
.item(item.build())
|
.item(item.build())
|
||||||
|
|
|
@ -50,8 +50,8 @@ class MessagesCacheGetItemsScriptTest {
|
||||||
|
|
||||||
assertNotNull(messageAndScores);
|
assertNotNull(messageAndScores);
|
||||||
assertEquals(2, messageAndScores.size());
|
assertEquals(2, messageAndScores.size());
|
||||||
final MessageProtos.Envelope resultEnvelope = MessageProtos.Envelope.parseFrom(
|
final MessageProtos.Envelope resultEnvelope =
|
||||||
messageAndScores.getFirst());
|
EnvelopeUtil.expand(MessageProtos.Envelope.parseFrom(messageAndScores.getFirst()));
|
||||||
|
|
||||||
assertEquals(serverGuid, resultEnvelope.getServerGuid());
|
assertEquals(serverGuid, resultEnvelope.getServerGuid());
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ class MessagesCacheInsertScriptTest {
|
||||||
|
|
||||||
insertScript.executeAsync(destinationUuid, deviceId, envelope1);
|
insertScript.executeAsync(destinationUuid, deviceId, envelope1);
|
||||||
|
|
||||||
assertEquals(List.of(envelope1), getStoredMessages(destinationUuid, deviceId));
|
assertEquals(List.of(EnvelopeUtil.compress(envelope1)), getStoredMessages(destinationUuid, deviceId));
|
||||||
|
|
||||||
final MessageProtos.Envelope envelope2 = MessageProtos.Envelope.newBuilder()
|
final MessageProtos.Envelope envelope2 = MessageProtos.Envelope.newBuilder()
|
||||||
.setServerTimestamp(Instant.now().getEpochSecond())
|
.setServerTimestamp(Instant.now().getEpochSecond())
|
||||||
|
@ -52,11 +52,13 @@ class MessagesCacheInsertScriptTest {
|
||||||
|
|
||||||
insertScript.executeAsync(destinationUuid, deviceId, envelope2);
|
insertScript.executeAsync(destinationUuid, deviceId, envelope2);
|
||||||
|
|
||||||
assertEquals(List.of(envelope1, envelope2), getStoredMessages(destinationUuid, deviceId));
|
assertEquals(List.of(EnvelopeUtil.compress(envelope1), EnvelopeUtil.compress(envelope2)),
|
||||||
|
getStoredMessages(destinationUuid, deviceId));
|
||||||
|
|
||||||
insertScript.executeAsync(destinationUuid, deviceId, envelope1);
|
insertScript.executeAsync(destinationUuid, deviceId, envelope1);
|
||||||
|
|
||||||
assertEquals(List.of(envelope1, envelope2), getStoredMessages(destinationUuid, deviceId),
|
assertEquals(List.of(EnvelopeUtil.compress(envelope1), EnvelopeUtil.compress(envelope2)),
|
||||||
|
getStoredMessages(destinationUuid, deviceId),
|
||||||
"Messages with same GUID should be deduplicated");
|
"Messages with same GUID should be deduplicated");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@ import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||||
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
|
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
|
||||||
|
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
||||||
|
|
||||||
class MessagesCacheRemoveByGuidScriptTest {
|
class MessagesCacheRemoveByGuidScriptTest {
|
||||||
|
|
||||||
|
@ -44,9 +45,8 @@ class MessagesCacheRemoveByGuidScriptTest {
|
||||||
|
|
||||||
assertEquals(1, removedMessages.size());
|
assertEquals(1, removedMessages.size());
|
||||||
|
|
||||||
final MessageProtos.Envelope resultMessage = MessageProtos.Envelope.parseFrom(
|
final MessageProtos.Envelope resultMessage = MessageProtos.Envelope.parseFrom(removedMessages.getFirst());
|
||||||
removedMessages.getFirst());
|
|
||||||
|
|
||||||
assertEquals(serverGuid, UUID.fromString(resultMessage.getServerGuid()));
|
assertEquals(serverGuid, UUIDUtil.fromByteString(resultMessage.getServerGuidBinary()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue