Retire the legacy message availability system

This commit is contained in:
Jon Chambers 2024-11-06 16:29:27 -05:00 committed by Jon Chambers
parent ef716aacc2
commit 6a1f4906c5
17 changed files with 47 additions and 525 deletions

View File

@ -50,7 +50,6 @@ import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@ -459,9 +458,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FaultTolerantRedisClient pubsubClient =
config.getRedisPubSubConfiguration().build("pubsub", sharedClientResources);
final BlockingQueue<Runnable> keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(100_000);
Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(),
keyspaceNotificationDispatchQueue);
final BlockingQueue<Runnable> receiptSenderQueue = new LinkedBlockingQueue<>();
Metrics.gaugeCollectionSize(name(getClass(), "receiptSenderQueue"), Collections.emptyList(), receiptSenderQueue);
final BlockingQueue<Runnable> fcmSenderQueue = new LinkedBlockingQueue<>();
@ -474,14 +470,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build();
ScheduledExecutorService websocketScheduledExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "websocket-%d")).threads(8).build();
ExecutorService keyspaceNotificationDispatchExecutor = ExecutorServiceMetrics.monitor(Metrics.globalRegistry,
environment.lifecycle()
.executorService(name(getClass(), "keyspaceNotification-%d"))
.maxThreads(16)
.workQueue(keyspaceNotificationDispatchQueue)
.build(),
MetricsUtil.name(getClass(), "keyspaceNotificationExecutor"),
MetricsUtil.PREFIX);
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d"))
.maxThreads(1).minThreads(1).build();
ExecutorService fcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "fcmSender-%d"))
@ -611,8 +599,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
storageServiceExecutor, storageServiceRetryExecutor, config.getSecureStorageServiceConfiguration());
PubSubClientEventManager pubSubClientEventManager = new PubSubClientEventManager(messagesCluster, clientEventExecutor);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesCluster, keyspaceNotificationDispatchExecutor,
messageDeliveryScheduler, messageDeletionAsyncExecutor, clock, dynamicConfigurationManager);
MessagesCache messagesCache = new MessagesCache(messagesCluster, messageDeliveryScheduler,
messageDeletionAsyncExecutor, clock, dynamicConfigurationManager);
ClientReleaseManager clientReleaseManager = new ClientReleaseManager(clientReleases,
recurringJobExecutor,
config.getClientReleaseConfiguration().refreshInterval(),
@ -733,7 +721,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(pushNotificationScheduler);
environment.lifecycle().manage(provisioningManager);
environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(pubSubClientEventManager);
environment.lifecycle().manage(currencyManager);
environment.lifecycle().manage(registrationServiceClient);

View File

@ -19,7 +19,7 @@ public interface ClientEventListener {
/**
* Indicates that messages for the client have been persisted from short-term storage to long-term storage.
*/
void handleMessagesPersistedPubSub();
void handleMessagesPersisted();
/**
* Indicates that the client's presence has been displaced and the listener should close the client's underlying

View File

@ -371,7 +371,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
case DISCONNECT_REQUESTED -> listenerEventExecutor.execute(() -> listener.handleConnectionDisplaced(false));
case MESSAGES_PERSISTED -> listenerEventExecutor.execute(listener::handleMessagesPersistedPubSub);
case MESSAGES_PERSISTED -> listenerEventExecutor.execute(listener::handleMessagesPersisted);
default -> logger.warn("Unexpected client event type: {}", clientEvent.getClass());
}

View File

@ -1,26 +0,0 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
/**
* A message availability listener is notified when new messages are available for a specific device for a specific
* account. Availability listeners are also notified when messages are moved from the message cache to long-term storage
* as an optimization hint to implementing classes.
*/
public interface MessageAvailabilityListener {
/**
* @return whether the listener is still active. {@code false} indicates the listener can no longer handle messages
* and may be discarded
*/
boolean handleNewMessagesAvailable();
/**
* @return whether the listener is still active. {@code false} indicates the listener can no longer handle messages
* and may be discarded
*/
boolean handleMessagesPersisted();
}

View File

@ -10,12 +10,8 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.dropwizard.lifecycle.Managed;
import io.lettuce.core.ZAddArgs;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
@ -28,18 +24,13 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
import org.signal.libsignal.protocol.ServiceId;
@ -51,11 +42,9 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -118,13 +107,11 @@ import reactor.core.scheduler.Schedulers;
* @see MessagesCacheRemoveRecipientViewFromMrmDataScript
* @see MessagesCacheRemoveQueueScript
*/
public class MessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed {
public class MessagesCache {
private final FaultTolerantRedisClusterClient redisCluster;
private final FaultTolerantPubSubClusterConnection<String, String> pubSubConnection;
private final Clock clock;
private final ExecutorService notificationExecutorService;
private final Scheduler messageDeliveryScheduler;
private final ExecutorService messageDeletionExecutorService;
// messageDeletionExecutorService wrapped into a reactor Scheduler
@ -140,10 +127,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private final MessagesCacheGetQueuesToPersistScript getQueuesToPersistScript;
private final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript;
private final ReentrantLock messageListenersLock = new ReentrantLock();
private final Map<String, MessageAvailabilityListener> messageListenersByQueueName = new HashMap<>();
private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>();
private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert"));
private final Timer insertSharedMrmPayloadTimer = Metrics.timer(name(MessagesCache.class, "insertSharedMrmPayload"));
private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get"));
@ -151,17 +134,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private final Timer removeByGuidTimer = Metrics.timer(name(MessagesCache.class, "removeByGuid"));
private final Timer removeRecipientViewTimer = Metrics.timer(name(MessagesCache.class, "removeRecipientView"));
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage"));
private final Counter newMessageNotificationCounter = Metrics.counter(
name(MessagesCache.class, "newMessageNotification"));
private final Counter queuePersistedNotificationCounter = Metrics.counter(
name(MessagesCache.class, "queuePersisted"));
private final Counter staleEphemeralMessagesCounter = Metrics.counter(
name(MessagesCache.class, "staleEphemeralMessages"));
private final Counter messageAvailabilityListenerRemovedAfterAddCounter = Metrics.counter(
name(MessagesCache.class, "messageAvailabilityListenerRemovedAfterAdd"));
private final Counter prunedStaleSubscriptionCounter = Metrics.counter(
name(MessagesCache.class, "prunedStaleSubscription"));
private final Counter mrmContentRetrievedCounter = Metrics.counter(name(MessagesCache.class, "mrmViewRetrieved"));
private final Counter sharedMrmDataKeyRemovedCounter = Metrics.counter(
name(MessagesCache.class, "sharedMrmKeyRemoved"));
@ -169,9 +143,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot";
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);
private static final String QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue::";
private static final String PERSISTING_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_persisting::";
private static final String MRM_VIEWS_EXPERIMENT_NAME = "mrmViews";
@VisibleForTesting
@ -184,13 +155,15 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class);
public MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final ExecutorService notificationExecutorService,
final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock,
public MessagesCache(final FaultTolerantRedisClusterClient redisCluster,
final Scheduler messageDeliveryScheduler,
final ExecutorService messageDeletionExecutorService,
final Clock clock,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager)
throws IOException {
this(
redisCluster,
notificationExecutorService,
messageDeliveryScheduler,
messageDeletionExecutorService,
clock,
@ -206,8 +179,9 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
}
@VisibleForTesting
MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final ExecutorService notificationExecutorService,
final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock,
MessagesCache(final FaultTolerantRedisClusterClient redisCluster,
final Scheduler messageDeliveryScheduler,
final ExecutorService messageDeletionExecutorService, final Clock clock,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final MessagesCacheInsertScript insertScript,
final MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript insertMrmScript,
@ -218,10 +192,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
throws IOException {
this.redisCluster = redisCluster;
this.pubSubConnection = redisCluster.createPubSubConnection();
this.clock = clock;
this.notificationExecutorService = notificationExecutorService;
this.messageDeliveryScheduler = messageDeliveryScheduler;
this.messageDeletionExecutorService = messageDeletionExecutorService;
this.messageDeletionScheduler = Schedulers.fromExecutorService(messageDeletionExecutorService, "messageDeletion");
@ -237,34 +209,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
this.removeRecipientViewFromMrmDataScript = removeRecipientViewFromMrmDataScript;
}
@Override
public void start() {
pubSubConnection.usePubSubConnection(connection -> connection.addListener(this));
pubSubConnection.subscribeToClusterTopologyChangedEvents(this::resubscribeAll);
}
@Override
public void stop() {
pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe());
}
private void resubscribeAll(final ClusterTopologyChangedEvent event) {
final Set<String> queueNames;
messageListenersLock.lock();
try {
queueNames = new HashSet<>(messageListenersByQueueName.keySet());
} finally {
messageListenersLock.unlock();
}
for (final String queueName : queueNames) {
// avoid overwhelming a newly recovered node by processing synchronously, rather than using CompletableFuture.allOf()
subscribeForKeyspaceNotifications(queueName).join();
}
}
public long insert(final UUID guid, final UUID destinationUuid, final byte destinationDevice,
final MessageProtos.Envelope message) {
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
@ -642,146 +586,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId)));
}
public void addMessageAvailabilityListener(final UUID destinationUuid, final byte deviceId,
final MessageAvailabilityListener listener) {
final String queueName = getQueueName(destinationUuid, deviceId);
final CompletableFuture<Void> subscribeFuture;
messageListenersLock.lock();
try {
messageListenersByQueueName.put(queueName, listener);
queueNamesByMessageListener.put(listener, queueName);
// Submit to the Redis queue while holding the lock, but dont wait until exiting
subscribeFuture = subscribeForKeyspaceNotifications(queueName);
} finally {
messageListenersLock.unlock();
}
subscribeFuture.join();
}
public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) {
@Nullable final String queueName;
messageListenersLock.lock();
try {
queueName = queueNamesByMessageListener.get(listener);
} finally {
messageListenersLock.unlock();
}
if (queueName != null) {
final CompletableFuture<Void> unsubscribeFuture;
messageListenersLock.lock();
try {
queueNamesByMessageListener.remove(listener);
if (messageListenersByQueueName.remove(queueName, listener)) {
// Submit to the Redis queue holding the lock, but dont wait until exiting
unsubscribeFuture = unsubscribeFromKeyspaceNotifications(queueName);
} else {
messageAvailabilityListenerRemovedAfterAddCounter.increment();
unsubscribeFuture = CompletableFuture.completedFuture(null);
}
} finally {
messageListenersLock.unlock();
}
unsubscribeFuture.join();
}
}
private void pruneStaleSubscription(final String channel) {
unsubscribeFromKeyspaceNotifications(getQueueNameFromKeyspaceChannel(channel))
.thenRun(prunedStaleSubscriptionCounter::increment);
}
private CompletableFuture<Void> subscribeForKeyspaceNotifications(final String queueName) {
final int slot = SlotHash.getSlot(queueName);
return pubSubConnection.withPubSubConnection(
connection -> connection.async()
.nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot))
.commands()
.subscribe(getKeyspaceChannels(queueName))).toCompletableFuture()
.thenRun(Util.NOOP);
}
private CompletableFuture<Void> unsubscribeFromKeyspaceNotifications(final String queueName) {
final int slot = SlotHash.getSlot(queueName);
return pubSubConnection.withPubSubConnection(
connection -> connection.async()
.nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot))
.commands()
.unsubscribe(getKeyspaceChannels(queueName)))
.toCompletableFuture()
.thenRun(Util.NOOP);
}
private static String[] getKeyspaceChannels(final String queueName) {
return new String[]{
QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}",
PERSISTING_KEYSPACE_PREFIX + "{" + queueName + "}"
};
}
@Override
public void message(final RedisClusterNode node, final String channel, final String message) {
pubSubMessageCounter.increment();
if (channel.startsWith(QUEUE_KEYSPACE_PREFIX) && "zadd".equals(message)) {
newMessageNotificationCounter.increment();
notificationExecutorService.execute(() -> {
try {
findListener(channel).ifPresentOrElse(listener -> {
if (!listener.handleNewMessagesAvailable()) {
removeMessageAvailabilityListener(listener);
}
}, () -> pruneStaleSubscription(channel));
} catch (final Exception e) {
logger.warn("Unexpected error handling new message", e);
}
});
} else if (channel.startsWith(PERSISTING_KEYSPACE_PREFIX) && "del".equals(message)) {
queuePersistedNotificationCounter.increment();
notificationExecutorService.execute(() -> {
try {
findListener(channel).ifPresentOrElse(listener -> {
if (!listener.handleMessagesPersisted()) {
removeMessageAvailabilityListener(listener);
}
}, () -> pruneStaleSubscription(channel));
} catch (final Exception e) {
logger.warn("Unexpected error handling messages persisted", e);
}
});
}
}
private Optional<MessageAvailabilityListener> findListener(final String keyspaceChannel) {
final String queueName = getQueueNameFromKeyspaceChannel(keyspaceChannel);
messageListenersLock.lock();
try {
return Optional.ofNullable(messageListenersByQueueName.get(queueName));
} finally {
messageListenersLock.unlock();
}
}
@VisibleForTesting
static String getQueueName(final UUID accountUuid, final byte deviceId) {
return accountUuid + "::" + deviceId;
}
@VisibleForTesting
static String getQueueNameFromKeyspaceChannel(final String channel) {
final int startOfHashTag = channel.indexOf('{');
final int endOfHashTag = channel.lastIndexOf('}');
return channel.substring(startOfHashTag + 1, endOfHashTag);
}
static byte[] getMessageQueueKey(final UUID accountUuid, final byte deviceId) {
return ("user_queue::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
}

View File

@ -24,7 +24,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.Pair;
import reactor.core.observability.micrometer.Micrometer;
@ -128,10 +127,6 @@ public class MessagesManager {
.tap(Micrometer.metrics(Metrics.globalRegistry));
}
public Mono<Long> getEarliestUndeliveredTimestampForDevice(UUID destinationUuid, Device destinationDevice) {
return Mono.from(messagesDynamoDb.load(destinationUuid, destinationDevice, 1)).map(Envelope::getServerTimestamp);
}
public CompletableFuture<Void> clear(UUID destinationUuid) {
return messagesCache.clear(destinationUuid);
}
@ -190,17 +185,6 @@ public class MessagesManager {
return messagesRemovedFromCache;
}
public void addMessageAvailabilityListener(
final UUID destinationUuid,
final byte destinationDeviceId,
final MessageAvailabilityListener listener) {
messagesCache.addMessageAvailabilityListener(destinationUuid, destinationDeviceId, listener);
}
public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) {
messagesCache.removeMessageAvailabilityListener(listener);
}
/**
* Inserts the shared multi-recipient message payload to storage.
*
@ -211,15 +195,4 @@ public class MessagesManager {
final SealedSenderMultiRecipientMessage sealedSenderMultiRecipientMessage) {
return messagesCache.insertSharedMultiRecipientMessagePayload(sealedSenderMultiRecipientMessage);
}
/**
* Removes the recipient's view from shared MRM data if necessary
*/
public void removeRecipientViewFromMrmData(final byte destinationDeviceId, final Envelope message) {
if (message.hasSharedMrmKey()) {
messagesCache.removeRecipientViewFromMrmData(List.of(message.getSharedMrmKey().toByteArray()),
ServiceIdentifier.valueOf(message.getDestinationServiceId()),
destinationDeviceId);
}
}
}

View File

@ -19,7 +19,6 @@ import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.PushNotificationScheduler;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.websocket.session.WebSocketSessionContext;
@ -109,23 +108,12 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
pubSubClientEventManager.handleClientDisconnected(auth.getAccount().getUuid(),
auth.getAuthenticatedDevice().getId());
// Next, we stop listening for inbound messages. If a message arrives after this call, the websocket connection
// will not be notified and will not change its state, but that's okay because it has already closed and
// attempts to deliver mesages via this connection will not succeed.
RedisOperation.unchecked(() -> messagesManager.removeMessageAvailabilityListener(connection));
// Finally, stop trying to deliver messages and send a push notification if the connection is aware of any
// undelivered messages.
connection.stop();
});
try {
// Once we add this connection as a message availability listener, it will be notified any time a new message
// arrives in the message cache. This updates the connection's "may have messages" state. It's important that
// we do this first because we want to make sure we're accurately tracking message availability in the
// connection's internal state.
messagesManager.addMessageAvailabilityListener(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), connection);
// Once we "start" the websocket connection, we'll cancel any scheduled "you may have new messages" push
// notifications and begin delivering any stored messages for the connected device. We have not yet declared the
// client as "present" yet. If a message arrives at this point, we will update the message availability state

View File

@ -51,7 +51,6 @@ import org.whispersystems.textsecuregcm.push.PushNotificationScheduler;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessageAvailabilityListener;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.HeaderUtils;
import org.whispersystems.websocket.WebSocketClient;
@ -63,7 +62,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public class WebSocketConnection implements MessageAvailabilityListener, ClientEventListener {
public class WebSocketConnection implements ClientEventListener {
private static final DistributionSummary messageTime = Metrics.summary(
name(MessageController.class, "messageDeliveryDuration"));
@ -81,8 +80,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, ClientE
private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement");
private static final String NON_SUCCESS_RESPONSE_COUNTER_NAME = name(WebSocketConnection.class,
"clientNonSuccessResponse");
private static final String CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME = name(WebSocketConnection.class,
"messageAvailableAfterClientClosed");
private static final String SEND_MESSAGES_FLUX_NAME = MetricsUtil.name(WebSocketConnection.class,
"sendMessages");
private static final String SEND_MESSAGE_ERROR_COUNTER = MetricsUtil.name(WebSocketConnection.class,
@ -460,21 +457,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, ClientE
}
}
@Override
public boolean handleNewMessagesAvailable() {
if (!client.isOpen()) {
// The client may become closed without successful removal of references to the `MessageAvailabilityListener`
Metrics.counter(CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME).increment();
return false;
}
Metrics.counter(MESSAGE_AVAILABLE_COUNTER_NAME,
PRESENCE_MANAGER_TAG, "legacy")
.increment();
return true;
}
@Override
public void handleNewMessageAvailable() {
Metrics.counter(MESSAGE_AVAILABLE_COUNTER_NAME,
@ -487,22 +469,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, ClientE
}
@Override
public boolean handleMessagesPersisted() {
if (!client.isOpen()) {
// The client may become without successful removal of references to the `MessageAvailabilityListener`
Metrics.counter(CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME).increment();
return false;
}
Metrics.counter(MESSAGES_PERSISTED_COUNTER_NAME,
PRESENCE_MANAGER_TAG, "legacy")
.increment();
return true;
}
@Override
public void handleMessagesPersistedPubSub() {
public void handleMessagesPersisted() {
Metrics.counter(MESSAGES_PERSISTED_COUNTER_NAME,
PRESENCE_MANAGER_TAG, "pubsub")
.increment();

View File

@ -119,8 +119,6 @@ record CommandDependencies(
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
environment.lifecycle().executorService("messageDelivery").minThreads(4).maxThreads(4).build());
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle()
.executorService(name(name, "keyspaceNotification-%d")).minThreads(4).maxThreads(4).build();
ExecutorService messageDeletionExecutor = environment.lifecycle()
.executorService(name(name, "messageDeletion-%d")).minThreads(4).maxThreads(4).build();
ExecutorService secureValueRecoveryServiceExecutor = environment.lifecycle()
@ -209,7 +207,7 @@ record CommandDependencies(
SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator,
storageServiceExecutor, storageServiceRetryExecutor, configuration.getSecureStorageServiceConfiguration());
PubSubClientEventManager pubSubClientEventManager = new PubSubClientEventManager(messagesCluster, clientEventExecutor);
MessagesCache messagesCache = new MessagesCache(messagesCluster, keyspaceNotificationDispatchExecutor,
MessagesCache messagesCache = new MessagesCache(messagesCluster,
messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC(), dynamicConfigurationManager);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient,
@ -262,7 +260,6 @@ record CommandDependencies(
Clock.systemUTC());
environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(pubSubClientEventManager);
environment.lifecycle().manage(new ManagedAwsCrt());

View File

@ -68,7 +68,6 @@ public class MessagePersisterServiceCommand extends ServerCommand<WhisperServerC
Duration.ofMinutes(configuration.getMessageCacheConfiguration().getPersistDelayMinutes()),
namespace.getInt(WORKER_COUNT));
environment.lifecycle().manage(deps.messagesCache());
environment.lifecycle().manage(messagePersister);
MetricsUtil.registerSystemResourceMetrics(environment);

View File

@ -22,9 +22,6 @@ public class LocalFaultTolerantRedisClusterFactory implements FaultTolerantRedis
try {
redisClusterExtension.beforeAll(null);
redisClusterExtension.beforeEach(null);
redisClusterExtension.getRedisCluster().useCluster(connection ->
connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz"));
} catch (final Exception e) {
throw new RuntimeException(e);
}

View File

@ -59,7 +59,7 @@ class PubSubClientEventManagerTest {
}
@Override
public void handleMessagesPersistedPubSub() {
public void handleMessagesPersisted() {
}
@Override
@ -174,7 +174,7 @@ class PubSubClientEventManagerTest {
localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter() {
@Override
public void handleMessagesPersistedPubSub() {
public void handleMessagesPersisted() {
messagesPersistedLatch.countDown();
}
}).toCompletableFuture().join();

View File

@ -9,7 +9,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.protobuf.ByteString;
@ -33,6 +32,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.push.ClientEventListener;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables;
@ -50,9 +50,9 @@ class MessagePersisterIntegrationTest {
@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
private ExecutorService notificationExecutorService;
private Scheduler messageDeliveryScheduler;
private ExecutorService messageDeletionExecutorService;
private ExecutorService clientEventExecutorService;
private MessagesCache messagesCache;
private MessagesManager messagesManager;
private PubSubClientEventManager pubSubClientEventManager;
@ -65,7 +65,6 @@ class MessagePersisterIntegrationTest {
void setUp() throws Exception {
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> {
connection.sync().flushall();
connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz");
});
@SuppressWarnings("unchecked") final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
@ -80,12 +79,14 @@ class MessagePersisterIntegrationTest {
messageDeletionExecutorService);
final AccountsManager accountsManager = mock(AccountsManager.class);
notificationExecutorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), notificationExecutorService,
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC(), dynamicConfigurationManager);
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class),
messageDeletionExecutorService);
pubSubClientEventManager = mock(PubSubClientEventManager.class);
clientEventExecutorService = Executors.newVirtualThreadPerTaskExecutor();
pubSubClientEventManager = new PubSubClientEventManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), clientEventExecutorService);
pubSubClientEventManager.start();
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
pubSubClientEventManager, dynamicConfigurationManager, PERSIST_DELAY, 1);
@ -100,19 +101,19 @@ class MessagePersisterIntegrationTest {
when(account.getDevice(Device.PRIMARY_ID)).thenReturn(Optional.of(DevicesHelper.createDevice(Device.PRIMARY_ID)));
when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration());
messagesCache.start();
}
@AfterEach
void tearDown() throws Exception {
notificationExecutorService.shutdown();
notificationExecutorService.awaitTermination(15, TimeUnit.SECONDS);
messageDeletionExecutorService.shutdown();
messageDeletionExecutorService.awaitTermination(15, TimeUnit.SECONDS);
clientEventExecutorService.shutdown();
clientEventExecutorService.awaitTermination(15, TimeUnit.SECONDS);
messageDeliveryScheduler.dispose();
pubSubClientEventManager.stop();
}
@Test
@ -141,21 +142,22 @@ class MessagePersisterIntegrationTest {
final AtomicBoolean messagesPersisted = new AtomicBoolean(false);
messagesManager.addMessageAvailabilityListener(account.getUuid(), Device.PRIMARY_ID,
new MessageAvailabilityListener() {
pubSubClientEventManager.handleClientConnected(account.getUuid(), Device.PRIMARY_ID, new ClientEventListener() {
@Override
public boolean handleNewMessagesAvailable() {
return true;
public void handleNewMessageAvailable() {
}
@Override
public boolean handleMessagesPersisted() {
public void handleMessagesPersisted() {
synchronized (messagesPersisted) {
messagesPersisted.set(true);
messagesPersisted.notifyAll();
return true;
}
}
@Override
public void handleConnectionDisplaced(final boolean connectedElsewhere) {
}
});
messagePersister.start();
@ -183,8 +185,6 @@ class MessagePersisterIntegrationTest {
.toList();
assertEquals(expectedMessages, persistedMessages);
verify(pubSubClientEventManager).handleMessagesPersisted(account.getUuid(), Device.PRIMARY_ID);
});
}

View File

@ -100,7 +100,7 @@ class MessagePersisterTest {
sharedExecutorService = Executors.newSingleThreadExecutor();
resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService,
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager);
pubSubClientEventManager = mock(PubSubClientEventManager.class);
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, pubSubClientEventManager,

View File

@ -50,9 +50,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.RandomStringUtils;
@ -105,12 +103,6 @@ class MessagesCacheTest {
@BeforeEach
void setUp() throws Exception {
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> {
connection.sync().flushall();
connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz");
});
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
when(dynamicConfiguration.getMessagesConfiguration()).thenReturn(new DynamicMessagesConfiguration(true, true));
dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
@ -119,16 +111,12 @@ class MessagesCacheTest {
sharedExecutorService = Executors.newSingleThreadExecutor();
resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService,
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager);
messagesCache.start();
}
@AfterEach
void tearDown() throws Exception {
messagesCache.stop();
sharedExecutorService.shutdown();
sharedExecutorService.awaitTermination(1, TimeUnit.SECONDS);
@ -303,8 +291,7 @@ class MessagesCacheTest {
}
final MessagesCache messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, cacheClock,
dynamicConfigurationManager);
messageDeliveryScheduler, sharedExecutorService, cacheClock, dynamicConfigurationManager);
final List<MessageProtos.Envelope> actualMessages = Flux.from(
messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID))
@ -394,13 +381,6 @@ class MessagesCacheTest {
StandardCharsets.UTF_8)));
}
@Test
void testGetQueueNameFromKeyspaceChannel() {
assertEquals("1b363a31-a429-4fb6-8959-984a025e72ff::7",
MessagesCache.getQueueNameFromKeyspaceChannel(
"__keyspace@0__:user_queue::{1b363a31-a429-4fb6-8959-984a025e72ff::7}"));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testGetQueuesToPersist(final boolean sealedSender) {
@ -415,152 +395,8 @@ class MessagesCacheTest {
final List<String> queues = messagesCache.getQueuesToPersist(slot, Instant.now().plusSeconds(60), 100);
assertEquals(1, queues.size());
assertEquals(DESTINATION_UUID, MessagesCache.getAccountUuidFromQueueName(queues.get(0)));
assertEquals(DESTINATION_DEVICE_ID, MessagesCache.getDeviceIdFromQueueName(queues.get(0)));
}
@Test
void testNotifyListenerNewMessage() {
final AtomicBoolean notified = new AtomicBoolean(false);
final UUID messageGuid = UUID.randomUUID();
final MessageAvailabilityListener listener = new MessageAvailabilityListener() {
@Override
public boolean handleNewMessagesAvailable() {
synchronized (notified) {
notified.set(true);
notified.notifyAll();
return true;
}
}
@Override
public boolean handleMessagesPersisted() {
return true;
}
};
assertTimeoutPreemptively(Duration.ofSeconds(5), () -> {
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID,
generateRandomMessage(messageGuid, true));
synchronized (notified) {
while (!notified.get()) {
notified.wait();
}
}
assertTrue(notified.get());
});
}
@Test
void testNotifyListenerPersisted() {
final AtomicBoolean notified = new AtomicBoolean(false);
final MessageAvailabilityListener listener = new MessageAvailabilityListener() {
@Override
public boolean handleNewMessagesAvailable() {
return true;
}
@Override
public boolean handleMessagesPersisted() {
synchronized (notified) {
notified.set(true);
notified.notifyAll();
return true;
}
}
};
assertTimeoutPreemptively(Duration.ofSeconds(5), () -> {
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
messagesCache.lockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID);
messagesCache.unlockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID);
synchronized (notified) {
while (!notified.get()) {
notified.wait();
}
}
assertTrue(notified.get());
});
}
/**
* Helper class that implements {@link MessageAvailabilityListener#handleNewMessagesAvailable()} by always returning
* {@code false}. Its {@code counter} field tracks how many times {@code handleNewMessagesAvailable} has been
* called.
* <p>
* It uses a {@link CompletableFuture} to signal that it has received a messages available callback for the first
* time.
*/
private static class NewMessagesAvailabilityClosedListener implements MessageAvailabilityListener {
private int counter;
private final Consumer<Integer> messageHandledCallback;
private final CompletableFuture<Void> firstMessageHandled = new CompletableFuture<>();
private NewMessagesAvailabilityClosedListener(final Consumer<Integer> messageHandledCallback) {
this.messageHandledCallback = messageHandledCallback;
}
@Override
public boolean handleNewMessagesAvailable() {
counter++;
messageHandledCallback.accept(counter);
firstMessageHandled.complete(null);
return false;
}
@Override
public boolean handleMessagesPersisted() {
return true;
}
}
@Test
void testAvailabilityListenerResponses() {
final NewMessagesAvailabilityClosedListener listener1 = new NewMessagesAvailabilityClosedListener(
count -> assertEquals(1, count));
final NewMessagesAvailabilityClosedListener listener2 = new NewMessagesAvailabilityClosedListener(
count -> assertEquals(1, count));
assertTimeoutPreemptively(Duration.ofSeconds(30), () -> {
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener1);
final UUID messageGuid1 = UUID.randomUUID();
messagesCache.insert(messageGuid1, DESTINATION_UUID, DESTINATION_DEVICE_ID,
generateRandomMessage(messageGuid1, true));
listener1.firstMessageHandled.get();
// Avoid a race condition by blocking on the message handled future *and* the current notification executor task
// the notification executor task includes unsubscribing `listener1`, and, if we dont wait, sometimes
// `listener2` will get subscribed before `listener1` is cleaned up
sharedExecutorService.submit(() -> listener1.firstMessageHandled.get()).get();
final UUID messageGuid2 = UUID.randomUUID();
messagesCache.insert(messageGuid2, DESTINATION_UUID, DESTINATION_DEVICE_ID,
generateRandomMessage(messageGuid2, true));
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener2);
final UUID messageGuid3 = UUID.randomUUID();
messagesCache.insert(messageGuid3, DESTINATION_UUID, DESTINATION_DEVICE_ID,
generateRandomMessage(messageGuid3, true));
listener2.firstMessageHandled.get();
});
assertEquals(DESTINATION_UUID, MessagesCache.getAccountUuidFromQueueName(queues.getFirst()));
assertEquals(DESTINATION_DEVICE_ID, MessagesCache.getDeviceIdFromQueueName(queues.getFirst()));
}
@ParameterizedTest
@ -621,7 +457,7 @@ class MessagesCacheTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testGetMessagesToPersist(final boolean sharedMrmKeyPresent) throws Exception {
void testGetMessagesToPersist(final boolean sharedMrmKeyPresent) {
final UUID destinationUuid = UUID.randomUUID();
final byte deviceId = 1;
@ -697,7 +533,7 @@ class MessagesCacheTest {
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(mockCluster, mock(ExecutorService.class), messageDeliveryScheduler,
messagesCache = new MessagesCache(mockCluster, messageDeliveryScheduler,
Executors.newSingleThreadExecutor(), Clock.systemUTC(), mock(DynamicConfigurationManager.class));
}

View File

@ -98,7 +98,7 @@ class WebSocketConnectionIntegrationTest {
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration());
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService,
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager);
messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(),
DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.MESSAGES.tableName(), Duration.ofDays(7),

View File

@ -769,7 +769,7 @@ class WebSocketConnectionTest {
// whenComplete method will get called immediately on THIS thread, so we don't need to synchronize or wait for
// anything.
connection.processStoredMessages();
connection.handleMessagesPersistedPubSub();
connection.handleMessagesPersisted();
verify(messagesManager, times(2)).getMessagesForDeviceReactive(account.getUuid(), device, false);
}