From e25291c74c84e504ac3dfef34119647d14f148e7 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Tue, 24 Sep 2024 11:27:19 -0500 Subject: [PATCH] =?UTF-8?q?Move=20MRM=20content=20parsing=20outside=20of?= =?UTF-8?q?=20=E2=80=9CwithBinaryClusterReactive=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../textsecuregcm/storage/MessagesCache.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 9cfa4ee90..a943be541 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -441,27 +441,27 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp ServiceIdentifier.valueOf(mrmMessage.getDestinationServiceId()), destinationDevice); final Mono mrmMessageMono = Mono.from(redisCluster.withBinaryClusterReactive( - conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey) - .collectList() - .publishOn(messageDeliveryScheduler) - .handle((mrmDataAndView, sink) -> { - try { - assert mrmDataAndView.size() == 2; + conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey) + .collectList() + .publishOn(messageDeliveryScheduler))) + .handle((mrmDataAndView, sink) -> { + try { + assert mrmDataAndView.size() == 2; - final byte[] content = SealedSenderMultiRecipientMessage.messageForRecipient( - mrmDataAndView.getFirst().getValue(), - mrmDataAndView.getLast().getValue()); + final byte[] content = SealedSenderMultiRecipientMessage.messageForRecipient( + mrmDataAndView.getFirst().getValue(), + mrmDataAndView.getLast().getValue()); - sink.next(mrmMessage.toBuilder() - .clearSharedMrmKey() - .setContent(ByteString.copyFrom(content)) - .build()); + sink.next(mrmMessage.toBuilder() + .clearSharedMrmKey() + .setContent(ByteString.copyFrom(content)) + .build()); - mrmContentRetrievedCounter.increment(); - } catch (Exception e) { - sink.error(e); - } - }))); + mrmContentRetrievedCounter.increment(); + } catch (Exception e) { + sink.error(e); + } + }); experiment.compareFutureResult(mrmMessage.toBuilder().clearSharedMrmKey().build(), mrmMessageMono.toFuture());