Add MRM views experiment to `MessagesCache.getMessagesToPersist()`
This commit is contained in:
parent
5bc6ff0e77
commit
1c617284f3
|
@ -15,22 +15,14 @@ import io.micrometer.core.instrument.Metrics;
|
|||
import io.micrometer.core.instrument.Timer;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuples;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ItemCollectionSizeLimitExceededException;
|
||||
|
||||
public class MessagePersister implements Managed {
|
||||
|
@ -38,8 +30,6 @@ public class MessagePersister implements Managed {
|
|||
private final MessagesCache messagesCache;
|
||||
private final MessagesManager messagesManager;
|
||||
private final AccountsManager accountsManager;
|
||||
private final ClientPresenceManager clientPresenceManager;
|
||||
private final KeysManager keysManager;
|
||||
|
||||
private final Duration persistDelay;
|
||||
|
||||
|
@ -72,17 +62,14 @@ public class MessagePersister implements Managed {
|
|||
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
||||
|
||||
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager,
|
||||
final AccountsManager accountsManager, final ClientPresenceManager clientPresenceManager,
|
||||
final KeysManager keysManager,
|
||||
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
|
||||
final Duration persistDelay,
|
||||
final AccountsManager accountsManager,
|
||||
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager, final Duration persistDelay,
|
||||
final int dedicatedProcessWorkerThreadCount
|
||||
) {
|
||||
|
||||
this.messagesCache = messagesCache;
|
||||
this.messagesManager = messagesManager;
|
||||
this.accountsManager = accountsManager;
|
||||
this.clientPresenceManager = clientPresenceManager;
|
||||
this.keysManager = keysManager;
|
||||
this.persistDelay = persistDelay;
|
||||
this.workerThreads = new Thread[dedicatedProcessWorkerThreadCount];
|
||||
|
||||
|
|
|
@ -11,7 +11,6 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
import io.lettuce.core.ScoredValue;
|
||||
import io.lettuce.core.ZAddArgs;
|
||||
import io.lettuce.core.cluster.SlotHash;
|
||||
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
|
||||
|
@ -493,8 +492,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
|||
.subscribe();
|
||||
}
|
||||
|
||||
private Mono<Pair<List<byte[]>, Long>> getNextMessagePage(final UUID destinationUuid,
|
||||
final byte destinationDevice,
|
||||
private Mono<Pair<List<byte[]>, Long>> getNextMessagePage(final UUID destinationUuid, final byte destinationDevice,
|
||||
long messageId) {
|
||||
|
||||
return getItemsScript.execute(destinationUuid, destinationDevice, PAGE_SIZE, messageId)
|
||||
|
@ -520,22 +518,40 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
|||
@VisibleForTesting
|
||||
List<MessageProtos.Envelope> getMessagesToPersist(final UUID accountUuid, final byte destinationDevice,
|
||||
final int limit) {
|
||||
return getMessagesTimer.record(() -> {
|
||||
final List<ScoredValue<byte[]>> scoredMessages = redisCluster.withBinaryCluster(
|
||||
connection -> connection.sync()
|
||||
.zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit));
|
||||
final List<MessageProtos.Envelope> envelopes = new ArrayList<>(scoredMessages.size());
|
||||
|
||||
for (final ScoredValue<byte[]> scoredMessage : scoredMessages) {
|
||||
try {
|
||||
envelopes.add(MessageProtos.Envelope.parseFrom(scoredMessage.getValue()));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
logger.warn("Failed to parse envelope", e);
|
||||
}
|
||||
}
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return envelopes;
|
||||
});
|
||||
final List<byte[]> messages = redisCluster.withBinaryCluster(connection ->
|
||||
connection.sync().zrange(getMessageQueueKey(accountUuid, destinationDevice), 0, limit));
|
||||
|
||||
return Flux.fromIterable(messages)
|
||||
.mapNotNull(message -> {
|
||||
try {
|
||||
return MessageProtos.Envelope.parseFrom(message);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
logger.warn("Failed to parse envelope", e);
|
||||
return null;
|
||||
}
|
||||
})
|
||||
.concatMap(message -> {
|
||||
final Mono<MessageProtos.Envelope> messageMono;
|
||||
if (message.hasSharedMrmKey()) {
|
||||
final Mono<?> experimentMono = maybeRunMrmViewExperiment(message, accountUuid, destinationDevice);
|
||||
|
||||
// mrm views phase 1: messageMono for sharedMrmKey is always Mono.just(), because messages always have content
|
||||
// To avoid races, wait for the experiment to run, but ignore any errors
|
||||
messageMono = experimentMono
|
||||
.onErrorComplete()
|
||||
.then(Mono.just(message.toBuilder().clearSharedMrmKey().build()));
|
||||
} else {
|
||||
messageMono = Mono.just(message);
|
||||
}
|
||||
|
||||
return messageMono;
|
||||
})
|
||||
.collectList()
|
||||
.doOnTerminate(() -> sample.stop(getMessagesTimer))
|
||||
.block(Duration.ofSeconds(5));
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> clear(final UUID destinationUuid) {
|
||||
|
|
|
@ -61,10 +61,7 @@ public class MessagePersisterServiceCommand extends ServerCommand<WhisperServerC
|
|||
}
|
||||
|
||||
final MessagePersister messagePersister = new MessagePersister(deps.messagesCache(), deps.messagesManager(),
|
||||
deps.accountsManager(),
|
||||
deps.clientPresenceManager(),
|
||||
deps.keysManager(),
|
||||
deps.dynamicConfigurationManager(),
|
||||
deps.accountsManager(), deps.dynamicConfigurationManager(),
|
||||
Duration.ofMinutes(configuration.getMessageCacheConfiguration().getPersistDelayMinutes()),
|
||||
namespace.getInt(WORKER_COUNT));
|
||||
|
||||
|
|
|
@ -32,11 +32,9 @@ import org.junit.jupiter.api.Test;
|
|||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
||||
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables;
|
||||
import org.whispersystems.textsecuregcm.tests.util.DevicesHelper;
|
||||
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
||||
|
@ -85,7 +83,7 @@ class MessagePersisterIntegrationTest {
|
|||
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class),
|
||||
messageDeletionExecutorService);
|
||||
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
|
||||
mock(ClientPresenceManager.class), mock(KeysManager.class), dynamicConfigurationManager, PERSIST_DELAY, 1);
|
||||
dynamicConfigurationManager, PERSIST_DELAY, 1);
|
||||
|
||||
account = mock(Account.class);
|
||||
|
||||
|
|
|
@ -46,7 +46,6 @@ import org.mockito.ArgumentCaptor;
|
|||
import org.mockito.stubbing.Answer;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
||||
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
|
||||
import org.whispersystems.textsecuregcm.tests.util.DevicesHelper;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
|
@ -66,8 +65,6 @@ class MessagePersisterTest {
|
|||
private MessagesDynamoDb messagesDynamoDb;
|
||||
private MessagePersister messagePersister;
|
||||
private AccountsManager accountsManager;
|
||||
private ClientPresenceManager clientPresenceManager;
|
||||
private KeysManager keysManager;
|
||||
private MessagesManager messagesManager;
|
||||
private Account destinationAccount;
|
||||
|
||||
|
@ -87,8 +84,6 @@ class MessagePersisterTest {
|
|||
|
||||
messagesDynamoDb = mock(MessagesDynamoDb.class);
|
||||
accountsManager = mock(AccountsManager.class);
|
||||
clientPresenceManager = mock(ClientPresenceManager.class);
|
||||
keysManager = mock(KeysManager.class);
|
||||
destinationAccount = mock(Account.class);;
|
||||
|
||||
when(accountsManager.getByAccountIdentifier(DESTINATION_ACCOUNT_UUID)).thenReturn(Optional.of(destinationAccount));
|
||||
|
@ -105,11 +100,10 @@ class MessagePersisterTest {
|
|||
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
|
||||
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService,
|
||||
messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager);
|
||||
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, clientPresenceManager,
|
||||
keysManager, dynamicConfigurationManager, PERSIST_DELAY, 1);
|
||||
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
|
||||
dynamicConfigurationManager, PERSIST_DELAY, 1);
|
||||
|
||||
when(messagesManager.clear(any(UUID.class), anyByte())).thenReturn(CompletableFuture.completedFuture(null));
|
||||
when(keysManager.deleteSingleUsePreKeys(any(), anyByte())).thenReturn(CompletableFuture.completedFuture(null));
|
||||
|
||||
when(messagesManager.persistMessages(any(UUID.class), any(), any())).thenAnswer(invocation -> {
|
||||
final UUID destinationUuid = invocation.getArgument(0);
|
||||
|
|
|
@ -206,8 +206,7 @@ class MessagesCacheTest {
|
|||
.collect(Collectors.toList())).get(5, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(messagesToRemove.stream().map(RemovedMessage::fromEnvelope).toList(), removedMessages);
|
||||
assertEquals(messagesToPreserve,
|
||||
messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
|
||||
assertEquals(messagesToPreserve, get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -620,6 +619,55 @@ class MessagesCacheTest {
|
|||
}, "Shared MRM data should be deleted asynchronously");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
void testGetMessagesToPersist(final boolean sharedMrmKeyPresent) throws Exception {
|
||||
final UUID destinationUuid = UUID.randomUUID();
|
||||
final byte deviceId = 1;
|
||||
|
||||
final UUID messageGuid = UUID.randomUUID();
|
||||
final MessageProtos.Envelope message = generateRandomMessage(destinationUuid, true);
|
||||
|
||||
messagesCache.insert(messageGuid, destinationUuid, deviceId, message);
|
||||
|
||||
final SealedSenderMultiRecipientMessage mrm = generateRandomMrmMessage(
|
||||
new AciServiceIdentifier(destinationUuid), deviceId);
|
||||
|
||||
final byte[] sharedMrmDataKey;
|
||||
if (sharedMrmKeyPresent) {
|
||||
sharedMrmDataKey = messagesCache.insertSharedMultiRecipientMessagePayload(mrm);
|
||||
} else {
|
||||
sharedMrmDataKey = new byte[]{1};
|
||||
}
|
||||
|
||||
final UUID mrmMessageGuid = UUID.randomUUID();
|
||||
final MessageProtos.Envelope mrmMessage = generateRandomMessage(mrmMessageGuid, true)
|
||||
.toBuilder()
|
||||
// clear some things added by the helper
|
||||
.clearServerGuid()
|
||||
// mrm views phase 1: messages have content
|
||||
.setContent(
|
||||
ByteString.copyFrom(mrm.messageForRecipient(mrm.getRecipients().get(new ServiceId.Aci(destinationUuid)))))
|
||||
.setSharedMrmKey(ByteString.copyFrom(sharedMrmDataKey))
|
||||
.build();
|
||||
messagesCache.insert(mrmMessageGuid, destinationUuid, deviceId, mrmMessage);
|
||||
|
||||
final List<MessageProtos.Envelope> messages = get(destinationUuid, deviceId, 100);
|
||||
|
||||
assertEquals(2, messages.size());
|
||||
|
||||
assertEquals(message.toBuilder()
|
||||
.setServerGuid(messageGuid.toString())
|
||||
.build(),
|
||||
messages.getFirst());
|
||||
|
||||
assertEquals(mrmMessage.toBuilder().
|
||||
clearSharedMrmKey().
|
||||
setServerGuid(mrmMessageGuid.toString())
|
||||
.build(),
|
||||
messages.getLast());
|
||||
}
|
||||
|
||||
private List<MessageProtos.Envelope> get(final UUID destinationUuid, final byte destinationDeviceId,
|
||||
final int messageCount) {
|
||||
return Flux.from(messagesCache.get(destinationUuid, destinationDeviceId))
|
||||
|
|
Loading…
Reference in New Issue