From dc28d063aa5334baaf2a0ff541b9385b9b3ca5c3 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Mon, 17 Aug 2020 11:23:16 -0400 Subject: [PATCH] Reactivate the explicit client presence experiment. --- .../textsecuregcm/WhisperServerService.java | 8 +- .../push/ClientPresenceManager.java | 265 ++++++++++++++++- ...paceNotificationClientPresenceManager.java | 267 ------------------ .../push/NoopClientPresenceManager.java | 20 -- ...st.java => ClientPresenceManagerTest.java} | 32 +-- .../websocket/WebSocketConnectionTest.java | 4 +- 6 files changed, 282 insertions(+), 314 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/push/KeyspaceNotificationClientPresenceManager.java delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/push/NoopClientPresenceManager.java rename service/src/test/java/org/whispersystems/textsecuregcm/push/{KeyspaceNotificationClientPresenceManagerTest.java => ClientPresenceManagerTest.java} (80%) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index db67aad47..224b99e57 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -104,7 +104,6 @@ import org.whispersystems.textsecuregcm.push.APNSender; import org.whispersystems.textsecuregcm.push.ApnFallbackManager; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.GCMSender; -import org.whispersystems.textsecuregcm.push.NoopClientPresenceManager; import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.WebsocketSender; @@ -338,13 +337,12 @@ public class WhisperServerService extends Application(1_000)).build(); ExecutorService messageCacheClusterExperimentExecutor = environment.lifecycle().executorService("messages_cache_experiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build(); ExecutorService websocketExperimentExecutor = environment.lifecycle().executorService("websocketPresenceExperiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build(); - // ClientPresenceManager clientPresenceManager = new KeyspaceNotificationClientPresenceManager(messagesCacheCluster, clientPresenceExecutor); - ClientPresenceManager clientPresenceManager = new NoopClientPresenceManager(); + ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, clientPresenceExecutor); DirectoryManager directory = new DirectoryManager(directoryClient); DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration()); PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheCluster); @@ -406,7 +404,7 @@ public class WhisperServerService extends Application implements Managed { - boolean clearPresence(UUID accountUuid, long deviceId); + private final String managerId = UUID.randomUUID().toString(); + private final String connectedClientSetKey = getConnectedClientSetKey(managerId); + + private final FaultTolerantRedisCluster presenceCluster; + private final FaultTolerantPubSubConnection pubSubConnection; + + private final ClusterLuaScript clearPresenceScript; + + private final ScheduledExecutorService scheduledExecutorService; + private ScheduledFuture pruneMissingPeersFuture; + + private final Map displacementListenersByPresenceKey = new ConcurrentHashMap<>(); + + private final Timer checkPresenceTimer; + private final Timer setPresenceTimer; + private final Timer clearPresenceTimer; + private final Timer prunePeersTimer; + private final Meter pruneClientMeter; + private final Meter remoteDisplacementMeter; + private final Meter pubSubMessageMeter; + + private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int)Duration.ofMinutes(3).toSeconds(); + + static final String MANAGER_SET_KEY = "presence::managers"; + + private static final Logger log = LoggerFactory.getLogger(ClientPresenceManager.class); + + public ClientPresenceManager(final FaultTolerantRedisCluster presenceCluster, final ScheduledExecutorService scheduledExecutorService) throws IOException { + this.presenceCluster = presenceCluster; + this.pubSubConnection = this.presenceCluster.createPubSubConnection(); + this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua", ScriptOutputType.INTEGER); + this.scheduledExecutorService = scheduledExecutorService; + + final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + metricRegistry.gauge(name(getClass(), "localClientCount"), () -> displacementListenersByPresenceKey::size); + + this.checkPresenceTimer = metricRegistry.timer(name(getClass(), "checkPresence")); + this.setPresenceTimer = metricRegistry.timer(name(getClass(), "setPresence")); + this.clearPresenceTimer = metricRegistry.timer(name(getClass(), "clearPresence")); + this.prunePeersTimer = metricRegistry.timer(name(getClass(), "prunePeers")); + this.pruneClientMeter = metricRegistry.meter(name(getClass(), "pruneClient")); + this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement")); + this.pubSubMessageMeter = metricRegistry.meter(name(getClass(), "pubSubMessage")); + } + + @VisibleForTesting + FaultTolerantPubSubConnection getPubSubConnection() { + return pubSubConnection; + } + + @Override + public void start() { + pubSubConnection.usePubSubConnection(connection -> { + connection.addListener(this); + connection.getResources().eventBus().get() + .filter(event -> event instanceof ClusterTopologyChangedEvent) + .handle((event, sink) -> { + resubscribeAll(); + sink.next(event); + }); + + final String presenceChannel = getManagerPresenceChannel(managerId); + final int slot = SlotHash.getSlot(presenceChannel); + + connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)).commands().subscribe(presenceChannel); + }); + + presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId)); + + pruneMissingPeersFuture = scheduledExecutorService.scheduleAtFixedRate(this::pruneMissingPeers, new Random().nextInt(PRUNE_PEERS_INTERVAL_SECONDS), PRUNE_PEERS_INTERVAL_SECONDS, TimeUnit.SECONDS); + } + + @Override + public void stop() { + pubSubConnection.usePubSubConnection(connection -> connection.removeListener(this)); + + if (pruneMissingPeersFuture != null) { + pruneMissingPeersFuture.cancel(false); + } + + for (final String presenceKey : displacementListenersByPresenceKey.keySet()) { + clearPresence(presenceKey); + } + + presenceCluster.useCluster(connection -> { + connection.sync().srem(MANAGER_SET_KEY, managerId); + connection.sync().del(getConnectedClientSetKey(managerId)); + }); + + pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getManagerPresenceChannel(managerId))); + } + + public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) { + try (final Timer.Context ignored = setPresenceTimer.time()) { + final String presenceKey = getPresenceKey(accountUuid, deviceId); + + displacePresence(presenceKey); + + displacementListenersByPresenceKey.put(presenceKey, displacementListener); + + presenceCluster.useCluster(connection -> { + final RedisAdvancedClusterCommands commands = connection.sync(); + + commands.set(presenceKey, managerId); + commands.sadd(connectedClientSetKey, presenceKey); + }); + + subscribeForRemotePresenceChanges(presenceKey); + } + } + + private void displacePresence(final String presenceKey) { + final DisplacedPresenceListener displacementListener = displacementListenersByPresenceKey.get(presenceKey); + + if (displacementListener != null) { + displacementListener.handleDisplacement(); + } + + clearPresence(presenceKey); + } + + public boolean isPresent(final UUID accountUuid, final long deviceId) { + try (final Timer.Context ignored = checkPresenceTimer.time()) { + return presenceCluster.withCluster(connection -> connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1; + } + } + + public boolean clearPresence(final UUID accountUuid, final long deviceId) { + return clearPresence(getPresenceKey(accountUuid, deviceId)); + } + + private boolean clearPresence(final String presenceKey) { + try (final Timer.Context ignored = clearPresenceTimer.time()) { + displacementListenersByPresenceKey.remove(presenceKey); + unsubscribeFromRemotePresenceChanges(presenceKey); + + final boolean removed = clearPresenceScript.execute(List.of(presenceKey), List.of(managerId)) != null; + presenceCluster.useCluster(connection -> connection.sync().srem(connectedClientSetKey, presenceKey)); + + return removed; + } + } + + private void subscribeForRemotePresenceChanges(final String presenceKey) { + final int slot = SlotHash.getSlot(presenceKey); + + pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)) + .commands() + .subscribe(getKeyspaceNotificationChannel(presenceKey))); + } + + private void resubscribeAll() { + for (final String presenceKey : displacementListenersByPresenceKey.keySet()) { + subscribeForRemotePresenceChanges(presenceKey); + } + } + + private void unsubscribeFromRemotePresenceChanges(final String presenceKey) { + pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey))); + } + + void pruneMissingPeers() { + try (final Timer.Context ignored = prunePeersTimer.time()) { + final Set peerIds = presenceCluster.withCluster(connection -> connection.sync().smembers(MANAGER_SET_KEY)); + peerIds.remove(managerId); + + for (final String peerId : peerIds) { + final boolean peerMissing = presenceCluster.withCluster(connection -> connection.sync().publish(getManagerPresenceChannel(peerId), "ping") == 0); + + if (peerMissing) { + log.debug("Presence manager {} did not respond to ping", peerId); + + final String connectedClientsKey = getConnectedClientSetKey(peerId); + + presenceCluster.useCluster(connection -> { + final RedisAdvancedClusterCommands commands = connection.sync(); + + String presenceKey; + + while ((presenceKey = commands.spop(connectedClientsKey)) != null) { + clearPresenceScript.execute(List.of(presenceKey), List.of(peerId)); + pruneClientMeter.mark(); + } + + commands.del(connectedClientsKey); + commands.srem(MANAGER_SET_KEY, peerId); + }); + } + } + } + } + + @Override + public void message(final RedisClusterNode node, final String channel, final String message) { + pubSubMessageMeter.mark(); + + if ("set".equals(message) && channel.startsWith("__keyspace@0__:presence::{")) { + // Another process has overwritten this presence key, which means the client has connected to another host. + // At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making + // synchronous Lettuce calls to avoid deadlocking. + scheduledExecutorService.execute(() -> { + displacePresence(channel.substring("__keyspace@0__:".length())); + remoteDisplacementMeter.mark(); + }); + } + } + + @VisibleForTesting + static String getPresenceKey(final UUID accountUuid, final long deviceId) { + return "presence::{" + accountUuid.toString() + "::" + deviceId + "}"; + } + + private static String getKeyspaceNotificationChannel(final String presenceKey) { + return "__keyspace@0__:" + presenceKey; + } + + @VisibleForTesting + static String getConnectedClientSetKey(final String managerId) { + return "presence::clients::" + managerId; + } + + @VisibleForTesting + static String getManagerPresenceChannel(final String managerId) { + return "presence::manager::" + managerId; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/KeyspaceNotificationClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/KeyspaceNotificationClientPresenceManager.java deleted file mode 100644 index cba51c188..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/KeyspaceNotificationClientPresenceManager.java +++ /dev/null @@ -1,267 +0,0 @@ -package org.whispersystems.textsecuregcm.push; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SharedMetricRegistries; -import com.codahale.metrics.Timer; -import com.google.common.annotations.VisibleForTesting; -import io.dropwizard.lifecycle.Managed; -import io.lettuce.core.ScriptOutputType; -import io.lettuce.core.cluster.SlotHash; -import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; -import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; -import io.lettuce.core.cluster.models.partitions.RedisClusterNode; -import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.util.Constants; - -import java.io.IOException; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import static com.codahale.metrics.MetricRegistry.name; - -/** - * The client presence manager keeps track of which clients are actively connected and "present" to receive messages. - * Only one client per account/device may be present at a time; if a second client for the same account/device declares - * its presence, the previous client is displaced. - */ -public class KeyspaceNotificationClientPresenceManager extends RedisClusterPubSubAdapter implements Managed, ClientPresenceManager { - - private final String managerId = UUID.randomUUID().toString(); - private final String connectedClientSetKey = getConnectedClientSetKey(managerId); - - private final FaultTolerantRedisCluster presenceCluster; - private final FaultTolerantPubSubConnection pubSubConnection; - - private final ClusterLuaScript clearPresenceScript; - - private final ScheduledExecutorService scheduledExecutorService; - private ScheduledFuture pruneMissingPeersFuture; - - private final Map displacementListenersByPresenceKey = new ConcurrentHashMap<>(); - - private final Timer checkPresenceTimer; - private final Timer setPresenceTimer; - private final Timer clearPresenceTimer; - private final Timer prunePeersTimer; - private final Meter pruneClientMeter; - private final Meter remoteDisplacementMeter; - - private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int)Duration.ofMinutes(3).toSeconds(); - - static final String MANAGER_SET_KEY = "presence::managers"; - - private static final Logger log = LoggerFactory.getLogger(KeyspaceNotificationClientPresenceManager.class); - - public KeyspaceNotificationClientPresenceManager(final FaultTolerantRedisCluster presenceCluster, final ScheduledExecutorService scheduledExecutorService) throws IOException { - this.presenceCluster = presenceCluster; - this.pubSubConnection = this.presenceCluster.createPubSubConnection(); - this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua", ScriptOutputType.INTEGER); - this.scheduledExecutorService = scheduledExecutorService; - - final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - metricRegistry.gauge(name(getClass(), "localClientCount"), () -> displacementListenersByPresenceKey::size); - - this.checkPresenceTimer = metricRegistry.timer(name(getClass(), "checkPresence")); - this.setPresenceTimer = metricRegistry.timer(name(getClass(), "setPresence")); - this.clearPresenceTimer = metricRegistry.timer(name(getClass(), "clearPresence")); - this.prunePeersTimer = metricRegistry.timer(name(getClass(), "prunePeers")); - this.pruneClientMeter = metricRegistry.meter(name(getClass(), "pruneClient")); - this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement")); - } - - @VisibleForTesting - FaultTolerantPubSubConnection getPubSubConnection() { - return pubSubConnection; - } - - @Override - public void start() { - pubSubConnection.usePubSubConnection(connection -> { - connection.addListener(this); - connection.getResources().eventBus().get() - .filter(event -> event instanceof ClusterTopologyChangedEvent) - .handle((event, sink) -> { - resubscribeAll(); - sink.next(event); - }); - - final String presenceChannel = getManagerPresenceChannel(managerId); - final int slot = SlotHash.getSlot(presenceChannel); - - connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)).commands().subscribe(presenceChannel); - }); - - presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId)); - - pruneMissingPeersFuture = scheduledExecutorService.scheduleAtFixedRate(this::pruneMissingPeers, new Random().nextInt(PRUNE_PEERS_INTERVAL_SECONDS), PRUNE_PEERS_INTERVAL_SECONDS, TimeUnit.SECONDS); - } - - @Override - public void stop() { - pubSubConnection.usePubSubConnection(connection -> connection.removeListener(this)); - - if (pruneMissingPeersFuture != null) { - pruneMissingPeersFuture.cancel(false); - } - - for (final String presenceKey : displacementListenersByPresenceKey.keySet()) { - clearPresence(presenceKey); - } - - presenceCluster.useCluster(connection -> { - connection.sync().srem(MANAGER_SET_KEY, managerId); - connection.sync().del(getConnectedClientSetKey(managerId)); - }); - - pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getManagerPresenceChannel(managerId))); - } - - @Override - public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) { - try (final Timer.Context ignored = setPresenceTimer.time()) { - final String presenceKey = getPresenceKey(accountUuid, deviceId); - - displacePresence(presenceKey); - - displacementListenersByPresenceKey.put(presenceKey, displacementListener); - - presenceCluster.useCluster(connection -> { - final RedisAdvancedClusterCommands commands = connection.sync(); - - commands.set(presenceKey, managerId); - commands.sadd(connectedClientSetKey, presenceKey); - }); - - subscribeForRemotePresenceChanges(presenceKey); - } - } - - private void displacePresence(final String presenceKey) { - final DisplacedPresenceListener displacementListener = displacementListenersByPresenceKey.get(presenceKey); - - if (displacementListener != null) { - displacementListener.handleDisplacement(); - } - - clearPresence(presenceKey); - } - - @Override - public boolean isPresent(final UUID accountUuid, final long deviceId) { - try (final Timer.Context ignored = checkPresenceTimer.time()) { - return presenceCluster.withCluster(connection -> connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1; - } - } - - @Override - public boolean clearPresence(final UUID accountUuid, final long deviceId) { - return clearPresence(getPresenceKey(accountUuid, deviceId)); - } - - private boolean clearPresence(final String presenceKey) { - try (final Timer.Context ignored = clearPresenceTimer.time()) { - displacementListenersByPresenceKey.remove(presenceKey); - unsubscribeFromRemotePresenceChanges(presenceKey); - - final boolean removed = clearPresenceScript.execute(List.of(presenceKey), List.of(managerId)) != null; - presenceCluster.useCluster(connection -> connection.sync().srem(connectedClientSetKey, presenceKey)); - - return removed; - } - } - - private void subscribeForRemotePresenceChanges(final String presenceKey) { - final int slot = SlotHash.getSlot(presenceKey); - - pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)) - .commands() - .subscribe(getKeyspaceNotificationChannel(presenceKey))); - } - - private void resubscribeAll() { - for (final String presenceKey : displacementListenersByPresenceKey.keySet()) { - subscribeForRemotePresenceChanges(presenceKey); - } - } - - private void unsubscribeFromRemotePresenceChanges(final String presenceKey) { - pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey))); - } - - void pruneMissingPeers() { - try (final Timer.Context ignored = prunePeersTimer.time()) { - final Set peerIds = presenceCluster.withCluster(connection -> connection.sync().smembers(MANAGER_SET_KEY)); - peerIds.remove(managerId); - - for (final String peerId : peerIds) { - final boolean peerMissing = presenceCluster.withCluster(connection -> connection.sync().publish(getManagerPresenceChannel(peerId), "ping") == 0); - - if (peerMissing) { - log.debug("Presence manager {} did not respond to ping", peerId); - - final String connectedClientsKey = getConnectedClientSetKey(peerId); - - presenceCluster.useCluster(connection -> { - final RedisAdvancedClusterCommands commands = connection.sync(); - - String presenceKey; - - while ((presenceKey = commands.spop(connectedClientsKey)) != null) { - clearPresenceScript.execute(List.of(presenceKey), List.of(peerId)); - pruneClientMeter.mark(); - } - - commands.del(connectedClientsKey); - commands.srem(MANAGER_SET_KEY, peerId); - }); - } - } - } - } - - @Override - public void message(final RedisClusterNode node, final String channel, final String message) { - if ("set".equals(message) && channel.startsWith("__keyspace@0__:presence::{")) { - // Another process has overwritten this presence key, which means the client has connected to another host. - // At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making - // synchronous Lettuce calls to avoid deadlocking. - scheduledExecutorService.execute(() -> { - displacePresence(channel.substring("__keyspace@0__:".length())); - remoteDisplacementMeter.mark(); - }); - } - } - - @VisibleForTesting - static String getPresenceKey(final UUID accountUuid, final long deviceId) { - return "presence::{" + accountUuid.toString() + "::" + deviceId + "}"; - } - - private static String getKeyspaceNotificationChannel(final String presenceKey) { - return "__keyspace@0__:" + presenceKey; - } - - @VisibleForTesting - static String getConnectedClientSetKey(final String managerId) { - return "presence::clients::" + managerId; - } - - @VisibleForTesting - static String getManagerPresenceChannel(final String managerId) { - return "presence::manager::" + managerId; - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/NoopClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/NoopClientPresenceManager.java deleted file mode 100644 index ae9c9c48c..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/NoopClientPresenceManager.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.whispersystems.textsecuregcm.push; - -import java.util.UUID; - -public class NoopClientPresenceManager implements ClientPresenceManager { - - @Override - public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) { - } - - @Override - public boolean isPresent(final UUID accountUuid, final long deviceId) { - return false; - } - - @Override - public boolean clearPresence(final UUID accountUuid, final long deviceId) { - return false; - } -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/KeyspaceNotificationClientPresenceManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java similarity index 80% rename from service/src/test/java/org/whispersystems/textsecuregcm/push/KeyspaceNotificationClientPresenceManagerTest.java rename to service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java index 0b7572a91..55822d68f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/KeyspaceNotificationClientPresenceManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java @@ -18,10 +18,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -public class KeyspaceNotificationClientPresenceManagerTest extends AbstractRedisClusterTest { +public class ClientPresenceManagerTest extends AbstractRedisClusterTest { private ScheduledExecutorService presenceRenewalExecutorService; - private KeyspaceNotificationClientPresenceManager clientPresenceManager; + private ClientPresenceManager clientPresenceManager; private static final DisplacedPresenceListener NO_OP = () -> {}; @@ -36,7 +36,7 @@ public class KeyspaceNotificationClientPresenceManagerTest extends AbstractRedis }); presenceRenewalExecutorService = Executors.newSingleThreadScheduledExecutor(); - clientPresenceManager = new KeyspaceNotificationClientPresenceManager(getRedisCluster(), presenceRenewalExecutorService); + clientPresenceManager = new ClientPresenceManager(getRedisCluster(), presenceRenewalExecutorService); } @Override @@ -93,7 +93,7 @@ public class KeyspaceNotificationClientPresenceManagerTest extends AbstractRedis } }); - getRedisCluster().useCluster(connection -> connection.sync().set(KeyspaceNotificationClientPresenceManager.getPresenceKey(accountUuid, deviceId), + getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), UUID.randomUUID().toString())); synchronized (displaced) { @@ -125,7 +125,7 @@ public class KeyspaceNotificationClientPresenceManagerTest extends AbstractRedis clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.getResources().eventBus().publish(new ClusterTopologyChangedEvent(List.of(), List.of()))); - getRedisCluster().useCluster(connection -> connection.sync().set(KeyspaceNotificationClientPresenceManager.getPresenceKey(accountUuid, deviceId), + getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), UUID.randomUUID().toString())); synchronized (displaced) { @@ -149,7 +149,7 @@ public class KeyspaceNotificationClientPresenceManagerTest extends AbstractRedis assertTrue(clientPresenceManager.clearPresence(accountUuid, deviceId)); clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); - getRedisCluster().useCluster(connection -> connection.sync().set(KeyspaceNotificationClientPresenceManager.getPresenceKey(accountUuid, deviceId), + getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), UUID.randomUUID().toString())); assertFalse(clientPresenceManager.clearPresence(accountUuid, deviceId)); @@ -161,8 +161,8 @@ public class KeyspaceNotificationClientPresenceManagerTest extends AbstractRedis final String missingPeerId = UUID.randomUUID().toString(); getRedisCluster().useCluster(connection -> { - connection.sync().sadd(KeyspaceNotificationClientPresenceManager.MANAGER_SET_KEY, presentPeerId); - connection.sync().sadd(KeyspaceNotificationClientPresenceManager.MANAGER_SET_KEY, missingPeerId); + connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId); + connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId); }); for (int i = 0; i < 10; i++) { @@ -170,22 +170,22 @@ public class KeyspaceNotificationClientPresenceManagerTest extends AbstractRedis addClientPresence(missingPeerId); } - clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().masters().commands().subscribe(KeyspaceNotificationClientPresenceManager.getManagerPresenceChannel(presentPeerId))); + clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().masters().commands().subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId))); clientPresenceManager.pruneMissingPeers(); - assertEquals(1, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(KeyspaceNotificationClientPresenceManager.getConnectedClientSetKey(presentPeerId)))); - assertTrue(getRedisCluster().withCluster(connection -> connection.sync().sismember(KeyspaceNotificationClientPresenceManager.MANAGER_SET_KEY, presentPeerId))); + assertEquals(1, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId)))); + assertTrue(getRedisCluster().withCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId))); - assertEquals(0, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(KeyspaceNotificationClientPresenceManager.getConnectedClientSetKey(missingPeerId)))); - assertFalse(getRedisCluster().withCluster(connection -> connection.sync().sismember(KeyspaceNotificationClientPresenceManager.MANAGER_SET_KEY, missingPeerId))); + assertEquals(0, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(missingPeerId)))); + assertFalse(getRedisCluster().withCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId))); } private void addClientPresence(final String managerId) { - final String clientPresenceKey = KeyspaceNotificationClientPresenceManager.getPresenceKey(UUID.randomUUID(), 7); + final String clientPresenceKey = ClientPresenceManager.getPresenceKey(UUID.randomUUID(), 7); getRedisCluster().useCluster(connection -> { connection.sync().set(clientPresenceKey, managerId); - connection.sync().sadd(KeyspaceNotificationClientPresenceManager.getConnectedClientSetKey(managerId), clientPresenceKey); + connection.sync().sadd(ClientPresenceManager.getConnectedClientSetKey(managerId), clientPresenceKey); }); } @@ -206,7 +206,7 @@ public class KeyspaceNotificationClientPresenceManagerTest extends AbstractRedis final long displacedAccountDeviceId = 7; clientPresenceManager.setPresent(displacedAccountUuid, displacedAccountDeviceId, NO_OP); - getRedisCluster().useCluster(connection -> connection.sync().set(KeyspaceNotificationClientPresenceManager.getPresenceKey(displacedAccountUuid, displacedAccountDeviceId), + getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(displacedAccountUuid, displacedAccountDeviceId), UUID.randomUUID().toString())); clientPresenceManager.stop(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java index 397261c9d..4e115b7a7 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java @@ -10,7 +10,7 @@ import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList; import org.whispersystems.textsecuregcm.push.ApnFallbackManager; -import org.whispersystems.textsecuregcm.push.KeyspaceNotificationClientPresenceManager; +import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.WebsocketSender; @@ -68,7 +68,7 @@ public class WebSocketConnectionTest { public void testCredentials() throws Exception { MessagesManager storedMessages = mock(MessagesManager.class); WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator); - AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(pushSender, receiptSender, storedMessages, pubSubManager, apnFallbackManager, mock(KeyspaceNotificationClientPresenceManager.class)); + AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(pushSender, receiptSender, storedMessages, pubSubManager, apnFallbackManager, mock(ClientPresenceManager.class)); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD))))