From c8f45685b826ac5e945d6fecb15729b77e4d06dd Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 25 Jun 2025 09:41:01 -0400 Subject: [PATCH] Expand envelopes on load from storage --- .../textsecuregcm/storage/MessagesCache.java | 16 ++++++++++------ .../textsecuregcm/storage/MessagesDynamoDb.java | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index f64a19db1..fcca5a5a6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -15,8 +15,6 @@ import io.lettuce.core.Range; import io.lettuce.core.ScoredValue; import io.lettuce.core.ZAddArgs; import io.lettuce.core.cluster.SlotHash; -import io.lettuce.core.cluster.models.partitions.ClusterPartitionParser; -import io.lettuce.core.cluster.models.partitions.Partitions; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; @@ -260,7 +258,7 @@ public class MessagesCache { for (final byte[] bytes : serialized) { try { - final MessageProtos.Envelope envelope = MessageProtos.Envelope.parseFrom(bytes); + final MessageProtos.Envelope envelope = parseEnvelope(bytes); removedMessages.add(RemovedMessage.fromEnvelope(envelope)); if (envelope.hasSharedMrmKey()) { serviceIdentifierToMrmKeys.computeIfAbsent( @@ -387,7 +385,7 @@ public class MessagesCache { for (int i = 0; i < queueItems.size() - 1; i += 2) { try { - final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(queueItems.get(i)); + final MessageProtos.Envelope message = parseEnvelope(queueItems.get(i)); final Mono messageMono; if (message.hasSharedMrmKey()) { @@ -606,7 +604,7 @@ public class MessagesCache { return serializedMessages .mapNotNull(message -> { try { - return MessageProtos.Envelope.parseFrom(message); + return parseEnvelope(message); } catch (InvalidProtocolBufferException e) { logger.warn("Failed to parse envelope", e); return null; @@ -646,7 +644,7 @@ public class MessagesCache { final List processedMessages = new ArrayList<>(messagesToProcess.size()); for (byte[] serialized : messagesToProcess) { try { - final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(serialized); + final MessageProtos.Envelope message = parseEnvelope(serialized); processedMessages.add(message.getServerGuid()); @@ -754,4 +752,10 @@ public class MessagesCache { static byte getDeviceIdFromQueueName(final String queueName) { return Byte.parseByte(queueName.substring(queueName.lastIndexOf("::") + 2, queueName.lastIndexOf('}'))); } + + private static MessageProtos.Envelope parseEnvelope(final byte[] envelopeBytes) + throws InvalidProtocolBufferException { + + return EnvelopeUtil.expand(MessageProtos.Envelope.parseFrom(envelopeBytes)); + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index e884e55c3..5a4c02f64 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -227,7 +227,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { static MessageProtos.Envelope convertItemToEnvelope(final Map item) throws InvalidProtocolBufferException { - return MessageProtos.Envelope.parseFrom(item.get(KEY_ENVELOPE_BYTES).b().asByteArray()); + return EnvelopeUtil.expand(MessageProtos.Envelope.parseFrom(item.get(KEY_ENVELOPE_BYTES).b().asByteArray())); } private long getTtlForMessage(MessageProtos.Envelope message) {