Move MRM content parsing outside of “withBinaryClusterReactive”

This commit is contained in:
Chris Eager 2024-09-24 11:27:19 -05:00 committed by Chris Eager
parent 0e552bd602
commit e25291c74c
1 changed files with 18 additions and 18 deletions

View File

@ -441,27 +441,27 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
ServiceIdentifier.valueOf(mrmMessage.getDestinationServiceId()), destinationDevice);
final Mono<MessageProtos.Envelope> mrmMessageMono = Mono.from(redisCluster.withBinaryClusterReactive(
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
.collectList()
.publishOn(messageDeliveryScheduler)
.handle((mrmDataAndView, sink) -> {
try {
assert mrmDataAndView.size() == 2;
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
.collectList()
.publishOn(messageDeliveryScheduler)))
.handle((mrmDataAndView, sink) -> {
try {
assert mrmDataAndView.size() == 2;
final byte[] content = SealedSenderMultiRecipientMessage.messageForRecipient(
mrmDataAndView.getFirst().getValue(),
mrmDataAndView.getLast().getValue());
final byte[] content = SealedSenderMultiRecipientMessage.messageForRecipient(
mrmDataAndView.getFirst().getValue(),
mrmDataAndView.getLast().getValue());
sink.next(mrmMessage.toBuilder()
.clearSharedMrmKey()
.setContent(ByteString.copyFrom(content))
.build());
sink.next(mrmMessage.toBuilder()
.clearSharedMrmKey()
.setContent(ByteString.copyFrom(content))
.build());
mrmContentRetrievedCounter.increment();
} catch (Exception e) {
sink.error(e);
}
})));
mrmContentRetrievedCounter.increment();
} catch (Exception e) {
sink.error(e);
}
});
experiment.compareFutureResult(mrmMessage.toBuilder().clearSharedMrmKey().build(),
mrmMessageMono.toFuture());