Use Mono.share() for mrmMessageMono
This commit is contained in:
parent
513f19370a
commit
ab2e6bb9a3
|
@ -444,7 +444,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
|
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
|
||||||
.collectList()
|
.collectList()
|
||||||
.publishOn(messageDeliveryScheduler)))
|
.publishOn(messageDeliveryScheduler)))
|
||||||
.handle((mrmDataAndView, sink) -> {
|
.<MessageProtos.Envelope>handle((mrmDataAndView, sink) -> {
|
||||||
try {
|
try {
|
||||||
assert mrmDataAndView.size() == 2;
|
assert mrmDataAndView.size() == 2;
|
||||||
|
|
||||||
|
@ -461,7 +461,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
sink.error(e);
|
sink.error(e);
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
|
.share();
|
||||||
|
|
||||||
experiment.compareMonoResult(mrmMessage.toBuilder().clearSharedMrmKey().build(), mrmMessageMono);
|
experiment.compareMonoResult(mrmMessage.toBuilder().clearSharedMrmKey().build(), mrmMessageMono);
|
||||||
|
|
||||||
|
|
|
@ -575,7 +575,7 @@ class MessagesCacheTest {
|
||||||
if (sharedMrmKeyPresent) {
|
if (sharedMrmKeyPresent) {
|
||||||
sharedMrmDataKey = messagesCache.insertSharedMultiRecipientMessagePayload(mrm);
|
sharedMrmDataKey = messagesCache.insertSharedMultiRecipientMessagePayload(mrm);
|
||||||
} else {
|
} else {
|
||||||
sharedMrmDataKey = new byte[]{1};
|
sharedMrmDataKey = "{1}".getBytes(StandardCharsets.UTF_8);
|
||||||
}
|
}
|
||||||
|
|
||||||
final UUID guid = UUID.randomUUID();
|
final UUID guid = UUID.randomUUID();
|
||||||
|
|
Loading…
Reference in New Issue