Expand envelopes on load from storage

This commit is contained in:
Jon Chambers 2025-06-25 09:41:01 -04:00 committed by Jon Chambers
parent bb90d80d22
commit c8f45685b8
2 changed files with 11 additions and 7 deletions

View File

@ -15,8 +15,6 @@ import io.lettuce.core.Range;
import io.lettuce.core.ScoredValue;
import io.lettuce.core.ZAddArgs;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.models.partitions.ClusterPartitionParser;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
@ -260,7 +258,7 @@ public class MessagesCache {
for (final byte[] bytes : serialized) {
try {
final MessageProtos.Envelope envelope = MessageProtos.Envelope.parseFrom(bytes);
final MessageProtos.Envelope envelope = parseEnvelope(bytes);
removedMessages.add(RemovedMessage.fromEnvelope(envelope));
if (envelope.hasSharedMrmKey()) {
serviceIdentifierToMrmKeys.computeIfAbsent(
@ -387,7 +385,7 @@ public class MessagesCache {
for (int i = 0; i < queueItems.size() - 1; i += 2) {
try {
final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(queueItems.get(i));
final MessageProtos.Envelope message = parseEnvelope(queueItems.get(i));
final Mono<MessageProtos.Envelope> messageMono;
if (message.hasSharedMrmKey()) {
@ -606,7 +604,7 @@ public class MessagesCache {
return serializedMessages
.mapNotNull(message -> {
try {
return MessageProtos.Envelope.parseFrom(message);
return parseEnvelope(message);
} catch (InvalidProtocolBufferException e) {
logger.warn("Failed to parse envelope", e);
return null;
@ -646,7 +644,7 @@ public class MessagesCache {
final List<String> processedMessages = new ArrayList<>(messagesToProcess.size());
for (byte[] serialized : messagesToProcess) {
try {
final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(serialized);
final MessageProtos.Envelope message = parseEnvelope(serialized);
processedMessages.add(message.getServerGuid());
@ -754,4 +752,10 @@ public class MessagesCache {
static byte getDeviceIdFromQueueName(final String queueName) {
return Byte.parseByte(queueName.substring(queueName.lastIndexOf("::") + 2, queueName.lastIndexOf('}')));
}
private static MessageProtos.Envelope parseEnvelope(final byte[] envelopeBytes)
throws InvalidProtocolBufferException {
return EnvelopeUtil.expand(MessageProtos.Envelope.parseFrom(envelopeBytes));
}
}

View File

@ -227,7 +227,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
static MessageProtos.Envelope convertItemToEnvelope(final Map<String, AttributeValue> item)
throws InvalidProtocolBufferException {
return MessageProtos.Envelope.parseFrom(item.get(KEY_ENVELOPE_BYTES).b().asByteArray());
return EnvelopeUtil.expand(MessageProtos.Envelope.parseFrom(item.get(KEY_ENVELOPE_BYTES).b().asByteArray()));
}
private long getTtlForMessage(MessageProtos.Envelope message) {