Wait for MRM experiment mono to complete before returning default message
This commit is contained in:
parent
b95a766888
commit
ad17c6e40d
|
@ -396,10 +396,13 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
|
|
||||||
final Mono<MessageProtos.Envelope> messageMono;
|
final Mono<MessageProtos.Envelope> messageMono;
|
||||||
if (message.hasSharedMrmKey()) {
|
if (message.hasSharedMrmKey()) {
|
||||||
maybeRunMrmViewExperiment(message, destinationUuid, destinationDevice);
|
final Mono<?> experimentMono = maybeRunMrmViewExperiment(message, destinationUuid, destinationDevice);
|
||||||
|
|
||||||
// mrm views phase 1: messageMono for sharedMrmKey is always Mono.just(), because messages always have content
|
// mrm views phase 1: messageMono for sharedMrmKey is always Mono.just(), because messages always have content
|
||||||
messageMono = Mono.just(message.toBuilder().clearSharedMrmKey().build());
|
// To avoid races, wait for the experiment to run, but ignore any errors
|
||||||
|
messageMono = experimentMono
|
||||||
|
.onErrorComplete()
|
||||||
|
.then(Mono.just(message.toBuilder().clearSharedMrmKey().build()));
|
||||||
} else {
|
} else {
|
||||||
messageMono = Mono.just(message);
|
messageMono = Mono.just(message);
|
||||||
}
|
}
|
||||||
|
@ -420,7 +423,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
*
|
*
|
||||||
* @see DynamicMessagesConfiguration#mrmViewExperimentEnabled()
|
* @see DynamicMessagesConfiguration#mrmViewExperimentEnabled()
|
||||||
*/
|
*/
|
||||||
private void maybeRunMrmViewExperiment(final MessageProtos.Envelope mrmMessage, final UUID destinationUuid,
|
private Mono<?> maybeRunMrmViewExperiment(final MessageProtos.Envelope mrmMessage, final UUID destinationUuid,
|
||||||
final byte destinationDevice) {
|
final byte destinationDevice) {
|
||||||
if (dynamicConfigurationManager.getConfiguration().getMessagesConfiguration()
|
if (dynamicConfigurationManager.getConfiguration().getMessagesConfiguration()
|
||||||
.mrmViewExperimentEnabled()) {
|
.mrmViewExperimentEnabled()) {
|
||||||
|
@ -456,6 +459,10 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
|
|
||||||
experiment.compareFutureResult(mrmMessage.toBuilder().clearSharedMrmKey().build(),
|
experiment.compareFutureResult(mrmMessage.toBuilder().clearSharedMrmKey().build(),
|
||||||
mrmMessageMono.toFuture());
|
mrmMessageMono.toFuture());
|
||||||
|
|
||||||
|
return mrmMessageMono;
|
||||||
|
} else {
|
||||||
|
return Mono.empty();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -563,15 +563,22 @@ class MessagesCacheTest {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
void testMultiRecipientMessage() throws Exception {
|
@ValueSource(booleans = {true, false})
|
||||||
|
void testMultiRecipientMessage(final boolean sharedMrmKeyPresent) throws Exception {
|
||||||
final UUID destinationUuid = UUID.randomUUID();
|
final UUID destinationUuid = UUID.randomUUID();
|
||||||
final byte deviceId = 1;
|
final byte deviceId = 1;
|
||||||
|
|
||||||
final UUID mrmGuid = UUID.randomUUID();
|
final UUID mrmGuid = UUID.randomUUID();
|
||||||
final SealedSenderMultiRecipientMessage mrm = generateRandomMrmMessage(
|
final SealedSenderMultiRecipientMessage mrm = generateRandomMrmMessage(
|
||||||
new AciServiceIdentifier(destinationUuid), deviceId);
|
new AciServiceIdentifier(destinationUuid), deviceId);
|
||||||
final byte[] sharedMrmDataKey = messagesCache.insertSharedMultiRecipientMessagePayload(mrmGuid, mrm);
|
|
||||||
|
final byte[] sharedMrmDataKey;
|
||||||
|
if (sharedMrmKeyPresent) {
|
||||||
|
sharedMrmDataKey = messagesCache.insertSharedMultiRecipientMessagePayload(mrmGuid, mrm);
|
||||||
|
} else {
|
||||||
|
sharedMrmDataKey = new byte[]{1};
|
||||||
|
}
|
||||||
|
|
||||||
final UUID guid = UUID.randomUUID();
|
final UUID guid = UUID.randomUUID();
|
||||||
final MessageProtos.Envelope message = generateRandomMessage(guid, true)
|
final MessageProtos.Envelope message = generateRandomMessage(guid, true)
|
||||||
|
@ -585,7 +592,7 @@ class MessagesCacheTest {
|
||||||
.build();
|
.build();
|
||||||
messagesCache.insert(guid, destinationUuid, deviceId, message);
|
messagesCache.insert(guid, destinationUuid, deviceId, message);
|
||||||
|
|
||||||
assertEquals(1L, (long) REDIS_CLUSTER_EXTENSION.getRedisCluster()
|
assertEquals(sharedMrmKeyPresent ? 1 : 0, (long) REDIS_CLUSTER_EXTENSION.getRedisCluster()
|
||||||
.withBinaryCluster(conn -> conn.sync().exists(MessagesCache.getSharedMrmKey(mrmGuid))));
|
.withBinaryCluster(conn -> conn.sync().exists(MessagesCache.getSharedMrmKey(mrmGuid))));
|
||||||
|
|
||||||
final List<MessageProtos.Envelope> messages = get(destinationUuid, deviceId, 1);
|
final List<MessageProtos.Envelope> messages = get(destinationUuid, deviceId, 1);
|
||||||
|
|
Loading…
Reference in New Issue