Listen for new messages via keyspace notifications.
This commit is contained in:
parent
2c29f831e8
commit
8d3316ccd6
|
@ -338,6 +338,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration().getUrls(), config.getMetricsClusterConfiguration().getTimeout(), config.getMetricsClusterConfiguration().getCircuitBreakerConfiguration());
|
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration().getUrls(), config.getMetricsClusterConfiguration().getTimeout(), config.getMetricsClusterConfiguration().getCircuitBreakerConfiguration());
|
||||||
|
|
||||||
ScheduledExecutorService clientPresenceExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build();
|
ScheduledExecutorService clientPresenceExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build();
|
||||||
|
ExecutorService messageNotificationExecutor = environment.lifecycle().executorService("messageCacheNotifications").maxThreads(8).build();
|
||||||
ExecutorService messageCacheClusterExperimentExecutor = environment.lifecycle().executorService("messages_cache_experiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
ExecutorService messageCacheClusterExperimentExecutor = environment.lifecycle().executorService("messages_cache_experiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
||||||
ExecutorService websocketExperimentExecutor = environment.lifecycle().executorService("websocketPresenceExperiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
ExecutorService websocketExperimentExecutor = environment.lifecycle().executorService("websocketPresenceExperiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
||||||
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, clientPresenceExecutor);
|
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, clientPresenceExecutor);
|
||||||
|
@ -349,7 +350,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster);
|
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster);
|
||||||
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
||||||
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
||||||
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster);
|
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster, messageNotificationExecutor);
|
||||||
MessagesCache messagesCache = new MessagesCache(messagesClient);
|
MessagesCache messagesCache = new MessagesCache(messagesClient);
|
||||||
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
||||||
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, clusterMessagesCache, pushLatencyManager, messageCacheClusterExperimentExecutor);
|
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, clusterMessagesCache, pushLatencyManager, messageCacheClusterExperimentExecutor);
|
||||||
|
|
|
@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
|
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
@ -81,15 +82,9 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
|
RedisClusterUtil.assertKeyspaceNotificationsConfigured(presenceCluster, "K$");
|
||||||
|
|
||||||
presenceCluster.usePubSubConnection(connection -> {
|
presenceCluster.usePubSubConnection(connection -> {
|
||||||
final String configuredKeyspaceNotifications = connection.sync().configGet("notify-keyspace-events").getOrDefault("notify-keyspace-events", "");
|
|
||||||
|
|
||||||
for (final char requiredNotificationType : new char[] {'K', '$'}) {
|
|
||||||
if (configuredKeyspaceNotifications.indexOf(requiredNotificationType) == -1) {
|
|
||||||
throw new IllegalStateException("Required keyspace notification type not configured. Need at least K$, but is actually: " + configuredKeyspaceNotifications);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
connection.addListener(this);
|
connection.addListener(this);
|
||||||
connection.getResources().eventBus().get()
|
connection.getResources().eventBus().get()
|
||||||
.filter(event -> event instanceof ClusterTopologyChangedEvent)
|
.filter(event -> event instanceof ClusterTopologyChangedEvent)
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
package org.whispersystems.textsecuregcm.storage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A message availability listener is notified when new messages are available for a specific device for a specific
|
||||||
|
* account. Availability listeners are also notified when messages are moved from the message cache to long-term storage
|
||||||
|
* as an optimization hint to implementing classes.
|
||||||
|
*/
|
||||||
|
public interface MessageAvailabilityListener {
|
||||||
|
|
||||||
|
void handleNewMessagesAvailable();
|
||||||
|
|
||||||
|
void handleMessagesPersisted();
|
||||||
|
}
|
|
@ -127,4 +127,12 @@ public class MessagesManager {
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(destination, destinationUuid, deviceId, id);
|
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(destination, destinationUuid, deviceId, id);
|
||||||
removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id), experimentExecutor);
|
removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id), experimentExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
||||||
|
clusterMessagesCache.addMessageAvailabilityListener(destinationUuid, deviceId, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) {
|
||||||
|
clusterMessagesCache.removeMessageAvailabilityListener(listener);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,9 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import io.lettuce.core.ScriptOutputType;
|
import io.lettuce.core.ScriptOutputType;
|
||||||
import io.lettuce.core.cluster.SlotHash;
|
import io.lettuce.core.cluster.SlotHash;
|
||||||
|
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
|
||||||
|
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
|
||||||
|
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
|
||||||
import io.micrometer.core.instrument.Metrics;
|
import io.micrometer.core.instrument.Metrics;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -18,15 +21,20 @@ import java.nio.charset.StandardCharsets;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.IdentityHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
|
|
||||||
public class RedisClusterMessagesCache implements UserMessagesCache {
|
public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String, String> implements UserMessagesCache {
|
||||||
|
|
||||||
private final FaultTolerantRedisCluster redisCluster;
|
private final FaultTolerantRedisCluster redisCluster;
|
||||||
|
private final ExecutorService notificationExecutorService;
|
||||||
|
|
||||||
private final ClusterLuaScript insertScript;
|
private final ClusterLuaScript insertScript;
|
||||||
private final ClusterLuaScript removeByIdScript;
|
private final ClusterLuaScript removeByIdScript;
|
||||||
|
@ -36,9 +44,15 @@ public class RedisClusterMessagesCache implements UserMessagesCache {
|
||||||
private final ClusterLuaScript removeQueueScript;
|
private final ClusterLuaScript removeQueueScript;
|
||||||
private final ClusterLuaScript getQueuesToPersistScript;
|
private final ClusterLuaScript getQueuesToPersistScript;
|
||||||
|
|
||||||
|
private final Map<String, MessageAvailabilityListener> messageListenersByQueueName = new HashMap<>();
|
||||||
|
private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>();
|
||||||
|
|
||||||
static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot";
|
static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot";
|
||||||
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);
|
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
private static final String QUEUE_KEYSPACE_PATTERN = "__keyspace@0__:user_queue::*";
|
||||||
|
private static final String PERSISTING_KEYSPACE_PATTERN = "__keyspace@0__:user_queue_persisting::*";
|
||||||
|
|
||||||
private static final String INSERT_TIMER_NAME = name(RedisClusterMessagesCache.class, "insert");
|
private static final String INSERT_TIMER_NAME = name(RedisClusterMessagesCache.class, "insert");
|
||||||
private static final String REMOVE_TIMER_NAME = name(RedisClusterMessagesCache.class, "remove");
|
private static final String REMOVE_TIMER_NAME = name(RedisClusterMessagesCache.class, "remove");
|
||||||
private static final String GET_TIMER_NAME = name(RedisClusterMessagesCache.class, "get");
|
private static final String GET_TIMER_NAME = name(RedisClusterMessagesCache.class, "get");
|
||||||
|
@ -51,9 +65,10 @@ public class RedisClusterMessagesCache implements UserMessagesCache {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RedisClusterMessagesCache.class);
|
private static final Logger logger = LoggerFactory.getLogger(RedisClusterMessagesCache.class);
|
||||||
|
|
||||||
public RedisClusterMessagesCache(final FaultTolerantRedisCluster redisCluster) throws IOException {
|
public RedisClusterMessagesCache(final FaultTolerantRedisCluster redisCluster, final ExecutorService notificationExecutorService) throws IOException {
|
||||||
|
|
||||||
this.redisCluster = redisCluster;
|
this.redisCluster = redisCluster;
|
||||||
|
this.notificationExecutorService = notificationExecutorService;
|
||||||
|
|
||||||
this.insertScript = ClusterLuaScript.fromResource(redisCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER);
|
this.insertScript = ClusterLuaScript.fromResource(redisCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER);
|
||||||
this.removeByIdScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_id.lua", ScriptOutputType.VALUE);
|
this.removeByIdScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_id.lua", ScriptOutputType.VALUE);
|
||||||
|
@ -62,6 +77,24 @@ public class RedisClusterMessagesCache implements UserMessagesCache {
|
||||||
this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.MULTI);
|
this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.MULTI);
|
||||||
this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS);
|
this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS);
|
||||||
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI);
|
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI);
|
||||||
|
|
||||||
|
RedisClusterUtil.assertKeyspaceNotificationsConfigured(redisCluster, "K$gz");
|
||||||
|
|
||||||
|
redisCluster.usePubSubConnection(connection -> {
|
||||||
|
connection.addListener(this);
|
||||||
|
connection.getResources().eventBus().get()
|
||||||
|
.filter(event -> event instanceof ClusterTopologyChangedEvent)
|
||||||
|
.handle((event, sink) -> {
|
||||||
|
resubscribeAll();
|
||||||
|
sink.next(event);
|
||||||
|
});
|
||||||
|
|
||||||
|
connection.sync().masters().commands().psubscribe(QUEUE_KEYSPACE_PATTERN, PERSISTING_KEYSPACE_PATTERN);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void resubscribeAll() {
|
||||||
|
redisCluster.usePubSubConnection(connection -> connection.sync().masters().commands().psubscribe(QUEUE_KEYSPACE_PATTERN, PERSISTING_KEYSPACE_PATTERN));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -251,6 +284,55 @@ public class RedisClusterMessagesCache implements UserMessagesCache {
|
||||||
redisCluster.useBinaryWriteCluster(connection -> connection.sync().del(getPersistInProgressKey(queue)));
|
redisCluster.useBinaryWriteCluster(connection -> connection.sync().del(getPersistInProgressKey(queue)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
||||||
|
final String queueName = getQueueName(destinationUuid, deviceId);
|
||||||
|
|
||||||
|
synchronized (messageListenersByQueueName) {
|
||||||
|
messageListenersByQueueName.put(queueName, listener);
|
||||||
|
queueNamesByMessageListener.put(listener, queueName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) {
|
||||||
|
synchronized (messageListenersByQueueName) {
|
||||||
|
final String queueName = queueNamesByMessageListener.remove(listener);
|
||||||
|
|
||||||
|
if (queueName != null) {
|
||||||
|
messageListenersByQueueName.remove(queueName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void message(final RedisClusterNode node, final String pattern, final String channel, final String message) {
|
||||||
|
if (QUEUE_KEYSPACE_PATTERN.equals(pattern) && "zadd".equals(message)) {
|
||||||
|
notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable));
|
||||||
|
} else if (PERSISTING_KEYSPACE_PATTERN.equals(pattern) && "del".equals(message)) {
|
||||||
|
notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Optional<MessageAvailabilityListener> findListener(final String keyspaceChannel) {
|
||||||
|
final String queueName = getQueueNameFromKeyspaceChannel(keyspaceChannel);
|
||||||
|
|
||||||
|
synchronized (messageListenersByQueueName) {
|
||||||
|
return Optional.ofNullable(messageListenersByQueueName.get(queueName));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static String getQueueName(final UUID accountUuid, final long deviceId) {
|
||||||
|
return accountUuid + "::" + deviceId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static String getQueueNameFromKeyspaceChannel(final String channel) {
|
||||||
|
final int startOfHashTag = channel.indexOf('{');
|
||||||
|
final int endOfHashTag = channel.lastIndexOf('}');
|
||||||
|
|
||||||
|
return channel.substring(startOfHashTag + 1, endOfHashTag);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static byte[] getMessageQueueKey(final UUID accountUuid, final long deviceId) {
|
static byte[] getMessageQueueKey(final UUID accountUuid, final long deviceId) {
|
||||||
return ("user_queue::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
return ("user_queue::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package org.whispersystems.textsecuregcm.util;
|
package org.whispersystems.textsecuregcm.util;
|
||||||
|
|
||||||
import io.lettuce.core.cluster.SlotHash;
|
import io.lettuce.core.cluster.SlotHash;
|
||||||
|
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||||
|
|
||||||
public class RedisClusterUtil {
|
public class RedisClusterUtil {
|
||||||
|
|
||||||
|
@ -21,7 +22,39 @@ public class RedisClusterUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a Redis hash tag that maps to the given cluster slot.
|
||||||
|
*
|
||||||
|
* @param slot the Redis cluster slot for which to retrieve a hash tag
|
||||||
|
*
|
||||||
|
* @return a Redis hash tag that maps to the given cluster slot
|
||||||
|
*
|
||||||
|
* @see <a href="https://redis.io/topics/cluster-spec#keys-hash-tags">Redis Cluster Specification - Keys hash tags</a>
|
||||||
|
*/
|
||||||
public static String getMinimalHashTag(final int slot) {
|
public static String getMinimalHashTag(final int slot) {
|
||||||
return HASHES_BY_SLOT[slot];
|
return HASHES_BY_SLOT[slot];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asserts that a Redis cluster is configured to generate (at least) a specific set of keyspace notification events.
|
||||||
|
*
|
||||||
|
* @param redisCluster the Redis cluster to check for the required keyspace notification configuration
|
||||||
|
* @param requiredKeyspaceNotifications a string representing the required keyspace notification events (e.g. "Kg$lz")
|
||||||
|
*
|
||||||
|
* @throws IllegalStateException if the given Redis cluster is not configured to generate the required keyspace
|
||||||
|
* notification events
|
||||||
|
*
|
||||||
|
* @see <a href="https://redis.io/topics/notifications#configuration">Redis Keyspace Notifications - Configuration</a>
|
||||||
|
*/
|
||||||
|
public static void assertKeyspaceNotificationsConfigured(final FaultTolerantRedisCluster redisCluster, final String requiredKeyspaceNotifications) {
|
||||||
|
final String configuredKeyspaceNotifications = redisCluster.withReadCluster(connection -> connection.sync().configGet("notify-keyspace-events"))
|
||||||
|
.getOrDefault("notify-keyspace-events", "")
|
||||||
|
.replace("A", "g$lshztxe");
|
||||||
|
|
||||||
|
for (final char requiredNotificationType : requiredKeyspaceNotifications.toCharArray()) {
|
||||||
|
if (configuredKeyspaceNotifications.indexOf(requiredNotificationType) == -1) {
|
||||||
|
throw new IllegalStateException(String.format("Required at least \"%s\" for keyspace notifications, but only had \"%s\".", requiredKeyspaceNotifications, configuredKeyspaceNotifications));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,6 +76,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||||
openWebsocketCounter.inc();
|
openWebsocketCounter.inc();
|
||||||
RedisOperation.unchecked(() -> apnFallbackManager.cancel(account, device));
|
RedisOperation.unchecked(() -> apnFallbackManager.cancel(account, device));
|
||||||
clientPresenceManager.setPresent(account.getUuid(), device.getId(), explicitDisplacementMeter::mark);
|
clientPresenceManager.setPresent(account.getUuid(), device.getId(), explicitDisplacementMeter::mark);
|
||||||
|
messagesManager.addMessageAvailabilityListener(account.getUuid(), device.getId(), connection);
|
||||||
pubSubManager.publish(address, connectMessage);
|
pubSubManager.publish(address, connectMessage);
|
||||||
pubSubManager.subscribe(address, connection);
|
pubSubManager.subscribe(address, connection);
|
||||||
|
|
||||||
|
@ -85,6 +86,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||||
openWebsocketCounter.dec();
|
openWebsocketCounter.dec();
|
||||||
pubSubManager.unsubscribe(address, connection);
|
pubSubManager.unsubscribe(address, connection);
|
||||||
clientPresenceManager.clearPresence(account.getUuid(), device.getId());
|
clientPresenceManager.clearPresence(account.getUuid(), device.getId());
|
||||||
|
messagesManager.removeMessageAvailabilityListener(connection);
|
||||||
timer.stop();
|
timer.stop();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -15,12 +15,12 @@ import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
|
||||||
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
|
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
|
||||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
|
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
|
||||||
import org.whispersystems.textsecuregcm.push.DisplacedPresenceListener;
|
|
||||||
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
|
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
|
||||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
import org.whispersystems.textsecuregcm.storage.Device;
|
import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.MessageAvailabilityListener;
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
import org.whispersystems.textsecuregcm.util.TimestampHeaderUtil;
|
import org.whispersystems.textsecuregcm.util.TimestampHeaderUtil;
|
||||||
|
@ -39,12 +39,16 @@ import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||||
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
|
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
|
||||||
|
|
||||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||||
public class WebSocketConnection implements DispatchChannel {
|
public class WebSocketConnection implements DispatchChannel, MessageAvailabilityListener {
|
||||||
|
|
||||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||||
public static final Histogram messageTime = metricRegistry.histogram(name(MessageController.class, "message_delivery_duration"));
|
public static final Histogram messageTime = metricRegistry.histogram(name(MessageController.class, "message_delivery_duration"));
|
||||||
private static final Meter sendMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_message"));
|
private static final Meter sendMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_message"));
|
||||||
private static final Meter pubSubDisplacementMeter = metricRegistry.meter(name(WebSocketConnection.class, "pubSubDisplacement"));
|
private static final Meter pubSubDisplacementMeter = metricRegistry.meter(name(WebSocketConnection.class, "pubSubDisplacement"));
|
||||||
|
private static final Meter messageAvailableMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesAvailable"));
|
||||||
|
private static final Meter messagesPersistedMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesPersisted"));
|
||||||
|
private static final Meter pubSubNewMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "pubSubNewMessage"));
|
||||||
|
private static final Meter pubSubPersistedMeter = metricRegistry.meter(name(WebSocketConnection.class, "pubSubPersisted"));
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
|
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
|
||||||
|
|
||||||
|
@ -81,9 +85,11 @@ public class WebSocketConnection implements DispatchChannel {
|
||||||
|
|
||||||
switch (pubSubMessage.getType().getNumber()) {
|
switch (pubSubMessage.getType().getNumber()) {
|
||||||
case PubSubMessage.Type.QUERY_DB_VALUE:
|
case PubSubMessage.Type.QUERY_DB_VALUE:
|
||||||
|
pubSubPersistedMeter.mark();
|
||||||
processStoredMessages();
|
processStoredMessages();
|
||||||
break;
|
break;
|
||||||
case PubSubMessage.Type.DELIVER_VALUE:
|
case PubSubMessage.Type.DELIVER_VALUE:
|
||||||
|
pubSubNewMessageMeter.mark();
|
||||||
sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.empty(), false);
|
sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.empty(), false);
|
||||||
break;
|
break;
|
||||||
case PubSubMessage.Type.CONNECTED_VALUE:
|
case PubSubMessage.Type.CONNECTED_VALUE:
|
||||||
|
@ -214,6 +220,16 @@ public class WebSocketConnection implements DispatchChannel {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleNewMessagesAvailable() {
|
||||||
|
messageAvailableMeter.mark();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessagesPersisted() {
|
||||||
|
messagesPersistedMeter.mark();
|
||||||
|
}
|
||||||
|
|
||||||
private static class StoredMessageInfo {
|
private static class StoredMessageInfo {
|
||||||
private final long id;
|
private final long id;
|
||||||
private final boolean cached;
|
private final boolean cached;
|
||||||
|
|
|
@ -5,11 +5,14 @@ import junitparams.Parameters;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -20,6 +23,7 @@ public class RedisClusterMessagesCacheTest extends AbstractMessagesCacheTest {
|
||||||
private static final UUID DESTINATION_UUID = UUID.randomUUID();
|
private static final UUID DESTINATION_UUID = UUID.randomUUID();
|
||||||
private static final int DESTINATION_DEVICE_ID = 7;
|
private static final int DESTINATION_DEVICE_ID = 7;
|
||||||
|
|
||||||
|
private ExecutorService notificationExecutorService;
|
||||||
private RedisClusterMessagesCache messagesCache;
|
private RedisClusterMessagesCache messagesCache;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -27,13 +31,18 @@ public class RedisClusterMessagesCacheTest extends AbstractMessagesCacheTest {
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
|
|
||||||
try {
|
getRedisCluster().useWriteCluster(connection -> connection.sync().masters().commands().configSet("notify-keyspace-events", "K$gz"));
|
||||||
messagesCache = new RedisClusterMessagesCache(getRedisCluster());
|
|
||||||
} catch (final IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
getRedisCluster().useWriteCluster(connection -> connection.sync().flushall());
|
notificationExecutorService = Executors.newSingleThreadExecutor();
|
||||||
|
messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
super.tearDown();
|
||||||
|
|
||||||
|
notificationExecutorService.shutdown();
|
||||||
|
notificationExecutorService.awaitTermination(1, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -70,6 +79,12 @@ public class RedisClusterMessagesCacheTest extends AbstractMessagesCacheTest {
|
||||||
RedisClusterMessagesCache.getDeviceIdFromQueueName(new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8)));
|
RedisClusterMessagesCache.getDeviceIdFromQueueName(new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetQueueNameFromKeyspaceChannel() {
|
||||||
|
assertEquals("1b363a31-a429-4fb6-8959-984a025e72ff::7",
|
||||||
|
RedisClusterMessagesCache.getQueueNameFromKeyspaceChannel("__keyspace@0__:user_queue::{1b363a31-a429-4fb6-8959-984a025e72ff::7}"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Parameters({"true", "false"})
|
@Parameters({"true", "false"})
|
||||||
public void testGetQueuesToPersist(final boolean sealedSender) {
|
public void testGetQueuesToPersist(final boolean sealedSender) {
|
||||||
|
@ -86,4 +101,67 @@ public class RedisClusterMessagesCacheTest extends AbstractMessagesCacheTest {
|
||||||
assertEquals(DESTINATION_UUID, RedisClusterMessagesCache.getAccountUuidFromQueueName(queues.get(0)));
|
assertEquals(DESTINATION_UUID, RedisClusterMessagesCache.getAccountUuidFromQueueName(queues.get(0)));
|
||||||
assertEquals(DESTINATION_DEVICE_ID, RedisClusterMessagesCache.getDeviceIdFromQueueName(queues.get(0)));
|
assertEquals(DESTINATION_DEVICE_ID, RedisClusterMessagesCache.getDeviceIdFromQueueName(queues.get(0)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 5_000L)
|
||||||
|
public void testNotifyListenerNewMessage() throws InterruptedException {
|
||||||
|
final AtomicBoolean notified = new AtomicBoolean(false);
|
||||||
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
|
|
||||||
|
final MessageAvailabilityListener listener = new MessageAvailabilityListener() {
|
||||||
|
@Override
|
||||||
|
public void handleNewMessagesAvailable() {
|
||||||
|
synchronized (notified) {
|
||||||
|
notified.set(true);
|
||||||
|
notified.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessagesPersisted() {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
|
||||||
|
messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, true));
|
||||||
|
|
||||||
|
synchronized (notified) {
|
||||||
|
while (!notified.get()) {
|
||||||
|
notified.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(notified.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 5_000L)
|
||||||
|
public void testNotifyListenerPersisted() throws InterruptedException {
|
||||||
|
final AtomicBoolean notified = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
final MessageAvailabilityListener listener = new MessageAvailabilityListener() {
|
||||||
|
@Override
|
||||||
|
public void handleNewMessagesAvailable() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessagesPersisted() {
|
||||||
|
synchronized (notified) {
|
||||||
|
notified.set(true);
|
||||||
|
notified.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
|
||||||
|
|
||||||
|
messagesCache.lockQueueForPersistence(RedisClusterMessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID));
|
||||||
|
messagesCache.unlockQueueForPersistence(RedisClusterMessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID));
|
||||||
|
|
||||||
|
synchronized (notified) {
|
||||||
|
while (!notified.get()) {
|
||||||
|
notified.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(notified.get());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
package org.whispersystems.textsecuregcm.util;
|
||||||
|
|
||||||
|
import io.lettuce.core.cluster.SlotHash;
|
||||||
|
import io.lettuce.core.cluster.api.sync.Executions;
|
||||||
|
import io.lettuce.core.cluster.api.sync.NodeSelection;
|
||||||
|
import io.lettuce.core.cluster.api.sync.NodeSelectionCommands;
|
||||||
|
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
|
||||||
|
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
|
||||||
|
import junitparams.JUnitParamsRunner;
|
||||||
|
import junitparams.Parameters;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||||
|
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
|
||||||
|
import redis.embedded.Redis;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@RunWith(JUnitParamsRunner.class)
|
||||||
|
public class RedisClusterUtilTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetMinimalHashTag() {
|
||||||
|
for (int slot = 0; slot < SlotHash.SLOT_COUNT; slot++) {
|
||||||
|
assertEquals(slot, SlotHash.getSlot(RedisClusterUtil.getMinimalHashTag(slot)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test
|
||||||
|
@Parameters(method = "argumentsForTestAssertKeyspaceNotificationsConfigured")
|
||||||
|
public void testAssertKeyspaceNotificationsConfigured(final String requiredKeyspaceNotifications, final String configuerdKeyspaceNotifications, final boolean expectException) {
|
||||||
|
final RedisAdvancedClusterCommands<String, String> commands = mock(RedisAdvancedClusterCommands.class);
|
||||||
|
final FaultTolerantRedisCluster redisCluster = RedisClusterHelper.buildMockRedisCluster(commands);
|
||||||
|
|
||||||
|
when(commands.configGet("notify-keyspace-events")).thenReturn(Map.of("notify-keyspace-events", configuerdKeyspaceNotifications));
|
||||||
|
|
||||||
|
if (expectException) {
|
||||||
|
try {
|
||||||
|
RedisClusterUtil.assertKeyspaceNotificationsConfigured(redisCluster, requiredKeyspaceNotifications);
|
||||||
|
fail("Expected IllegalStateException");
|
||||||
|
} catch (final IllegalStateException ignored) {
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
RedisClusterUtil.assertKeyspaceNotificationsConfigured(redisCluster, requiredKeyspaceNotifications);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
private Object argumentsForTestAssertKeyspaceNotificationsConfigured() {
|
||||||
|
return new Object[] {
|
||||||
|
new Object[] { "K$gz", "", true },
|
||||||
|
new Object[] { "K$gz", "K$gz", false },
|
||||||
|
new Object[] { "K$gz", "K$gzl", false },
|
||||||
|
new Object[] { "K$gz", "KA", false },
|
||||||
|
new Object[] { "", "A", false },
|
||||||
|
new Object[] { "", "", false },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue