From 0b7c3ad745ece444cb0ced29890db5eac054b635 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Fri, 13 Aug 2021 10:20:33 -0500 Subject: [PATCH] .editorconfig formatting --- .../push/ClientPresenceManager.java | 425 +++++++++--------- 1 file changed, 219 insertions(+), 206 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java index c7201e649..0b8790764 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java @@ -48,240 +48,253 @@ import org.whispersystems.textsecuregcm.util.Constants; */ public class ClientPresenceManager extends RedisClusterPubSubAdapter implements Managed { - private final String managerId = UUID.randomUUID().toString(); - private final String connectedClientSetKey = getConnectedClientSetKey(managerId); + private final String managerId = UUID.randomUUID().toString(); + private final String connectedClientSetKey = getConnectedClientSetKey(managerId); - private final FaultTolerantRedisCluster presenceCluster; - private final FaultTolerantPubSubConnection pubSubConnection; + private final FaultTolerantRedisCluster presenceCluster; + private final FaultTolerantPubSubConnection pubSubConnection; - private final ClusterLuaScript clearPresenceScript; + private final ClusterLuaScript clearPresenceScript; - private final ExecutorService keyspaceNotificationExecutorService; - private final ScheduledExecutorService scheduledExecutorService; - private ScheduledFuture pruneMissingPeersFuture; + private final ExecutorService keyspaceNotificationExecutorService; + private final ScheduledExecutorService scheduledExecutorService; + private ScheduledFuture pruneMissingPeersFuture; - private final Map displacementListenersByPresenceKey = new ConcurrentHashMap<>(); + private final Map displacementListenersByPresenceKey = new ConcurrentHashMap<>(); - private final Timer checkPresenceTimer; - private final Timer setPresenceTimer; - private final Timer clearPresenceTimer; - private final Timer prunePeersTimer; - private final Meter pruneClientMeter; - private final Meter remoteDisplacementMeter; - private final Meter pubSubMessageMeter; + private final Timer checkPresenceTimer; + private final Timer setPresenceTimer; + private final Timer clearPresenceTimer; + private final Timer prunePeersTimer; + private final Meter pruneClientMeter; + private final Meter remoteDisplacementMeter; + private final Meter pubSubMessageMeter; - private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int)Duration.ofSeconds(30).toSeconds(); + private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int) Duration.ofSeconds(30).toSeconds(); - static final String MANAGER_SET_KEY = "presence::managers"; + static final String MANAGER_SET_KEY = "presence::managers"; - private static final Logger log = LoggerFactory.getLogger(ClientPresenceManager.class); + private static final Logger log = LoggerFactory.getLogger(ClientPresenceManager.class); - public ClientPresenceManager(final FaultTolerantRedisCluster presenceCluster, final ScheduledExecutorService scheduledExecutorService, final ExecutorService keyspaceNotificationExecutorService) throws IOException { - this.presenceCluster = presenceCluster; - this.pubSubConnection = this.presenceCluster.createPubSubConnection(); - this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua", ScriptOutputType.INTEGER); - this.scheduledExecutorService = scheduledExecutorService; - this.keyspaceNotificationExecutorService = keyspaceNotificationExecutorService; + public ClientPresenceManager(final FaultTolerantRedisCluster presenceCluster, + final ScheduledExecutorService scheduledExecutorService, + final ExecutorService keyspaceNotificationExecutorService) throws IOException { + this.presenceCluster = presenceCluster; + this.pubSubConnection = this.presenceCluster.createPubSubConnection(); + this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua", + ScriptOutputType.INTEGER); + this.scheduledExecutorService = scheduledExecutorService; + this.keyspaceNotificationExecutorService = keyspaceNotificationExecutorService; - final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - metricRegistry.gauge(name(getClass(), "localClientCount"), () -> displacementListenersByPresenceKey::size); + final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + metricRegistry.gauge(name(getClass(), "localClientCount"), () -> displacementListenersByPresenceKey::size); - this.checkPresenceTimer = metricRegistry.timer(name(getClass(), "checkPresence")); - this.setPresenceTimer = metricRegistry.timer(name(getClass(), "setPresence")); - this.clearPresenceTimer = metricRegistry.timer(name(getClass(), "clearPresence")); - this.prunePeersTimer = metricRegistry.timer(name(getClass(), "prunePeers")); - this.pruneClientMeter = metricRegistry.meter(name(getClass(), "pruneClient")); - this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement")); - this.pubSubMessageMeter = metricRegistry.meter(name(getClass(), "pubSubMessage")); + this.checkPresenceTimer = metricRegistry.timer(name(getClass(), "checkPresence")); + this.setPresenceTimer = metricRegistry.timer(name(getClass(), "setPresence")); + this.clearPresenceTimer = metricRegistry.timer(name(getClass(), "clearPresence")); + this.prunePeersTimer = metricRegistry.timer(name(getClass(), "prunePeers")); + this.pruneClientMeter = metricRegistry.meter(name(getClass(), "pruneClient")); + this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement")); + this.pubSubMessageMeter = metricRegistry.meter(name(getClass(), "pubSubMessage")); + } + + @VisibleForTesting + FaultTolerantPubSubConnection getPubSubConnection() { + return pubSubConnection; + } + + @Override + public void start() { + pubSubConnection.usePubSubConnection(connection -> { + connection.addListener(this); + connection.getResources().eventBus().get() + .filter(event -> event instanceof ClusterTopologyChangedEvent) + .subscribe(event -> resubscribeAll()); + + final String presenceChannel = getManagerPresenceChannel(managerId); + final int slot = SlotHash.getSlot(presenceChannel); + + connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)) + .commands() + .subscribe(presenceChannel); + }); + + presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId)); + + pruneMissingPeersFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> { + try { + pruneMissingPeers(); + } catch (final Throwable t) { + log.warn("Failed to prune missing peers", t); + } + }, new Random().nextInt(PRUNE_PEERS_INTERVAL_SECONDS), PRUNE_PEERS_INTERVAL_SECONDS, TimeUnit.SECONDS); + } + + @Override + public void stop() { + pubSubConnection.usePubSubConnection(connection -> connection.removeListener(this)); + + if (pruneMissingPeersFuture != null) { + pruneMissingPeersFuture.cancel(false); } - @VisibleForTesting - FaultTolerantPubSubConnection getPubSubConnection() { - return pubSubConnection; + for (final String presenceKey : displacementListenersByPresenceKey.keySet()) { + clearPresence(presenceKey); } - @Override - public void start() { - pubSubConnection.usePubSubConnection(connection -> { - connection.addListener(this); - connection.getResources().eventBus().get() - .filter(event -> event instanceof ClusterTopologyChangedEvent) - .subscribe(event -> resubscribeAll()); + presenceCluster.useCluster(connection -> { + connection.sync().srem(MANAGER_SET_KEY, managerId); + connection.sync().del(getConnectedClientSetKey(managerId)); + }); - final String presenceChannel = getManagerPresenceChannel(managerId); - final int slot = SlotHash.getSlot(presenceChannel); + pubSubConnection.usePubSubConnection( + connection -> connection.sync().upstream().commands().unsubscribe(getManagerPresenceChannel(managerId))); + } - connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)).commands().subscribe(presenceChannel); - }); + public void setPresent(final UUID accountUuid, final long deviceId, + final DisplacedPresenceListener displacementListener) { + try (final Timer.Context ignored = setPresenceTimer.time()) { + final String presenceKey = getPresenceKey(accountUuid, deviceId); - presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId)); + displacePresence(presenceKey); - pruneMissingPeersFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> { - try { - pruneMissingPeers(); - } catch (final Throwable t) { - log.warn("Failed to prune missing peers", t); - } - }, new Random().nextInt(PRUNE_PEERS_INTERVAL_SECONDS), PRUNE_PEERS_INTERVAL_SECONDS, TimeUnit.SECONDS); + displacementListenersByPresenceKey.put(presenceKey, displacementListener); + + presenceCluster.useCluster(connection -> { + final RedisAdvancedClusterCommands commands = connection.sync(); + + commands.sadd(connectedClientSetKey, presenceKey); + commands.set(presenceKey, managerId); + }); + + subscribeForRemotePresenceChanges(presenceKey); + } + } + + private void displacePresence(final String presenceKey) { + final DisplacedPresenceListener displacementListener = displacementListenersByPresenceKey.get(presenceKey); + + if (displacementListener != null) { + displacementListener.handleDisplacement(); } - @Override - public void stop() { - pubSubConnection.usePubSubConnection(connection -> connection.removeListener(this)); + clearPresence(presenceKey); + } - if (pruneMissingPeersFuture != null) { - pruneMissingPeersFuture.cancel(false); + public boolean isPresent(final UUID accountUuid, final long deviceId) { + try (final Timer.Context ignored = checkPresenceTimer.time()) { + return presenceCluster.withCluster(connection -> + connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1; + } + } + + public boolean isLocallyPresent(final UUID accountUuid, final long deviceId) { + return displacementListenersByPresenceKey.containsKey(getPresenceKey(accountUuid, deviceId)); + } + + public boolean clearPresence(final UUID accountUuid, final long deviceId) { + return clearPresence(getPresenceKey(accountUuid, deviceId)); + } + + private boolean clearPresence(final String presenceKey) { + try (final Timer.Context ignored = clearPresenceTimer.time()) { + displacementListenersByPresenceKey.remove(presenceKey); + unsubscribeFromRemotePresenceChanges(presenceKey); + + final boolean removed = clearPresenceScript.execute(List.of(presenceKey), List.of(managerId)) != null; + presenceCluster.useCluster(connection -> connection.sync().srem(connectedClientSetKey, presenceKey)); + + return removed; + } + } + + private void subscribeForRemotePresenceChanges(final String presenceKey) { + final int slot = SlotHash.getSlot(presenceKey); + + pubSubConnection.usePubSubConnection( + connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)) + .commands() + .subscribe(getKeyspaceNotificationChannel(presenceKey))); + } + + private void resubscribeAll() { + for (final String presenceKey : displacementListenersByPresenceKey.keySet()) { + subscribeForRemotePresenceChanges(presenceKey); + } + } + + private void unsubscribeFromRemotePresenceChanges(final String presenceKey) { + pubSubConnection.usePubSubConnection( + connection -> connection.sync().upstream().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey))); + } + + void pruneMissingPeers() { + try (final Timer.Context ignored = prunePeersTimer.time()) { + final Set peerIds = presenceCluster.withCluster( + connection -> connection.sync().smembers(MANAGER_SET_KEY)); + peerIds.remove(managerId); + + for (final String peerId : peerIds) { + final boolean peerMissing = presenceCluster.withCluster( + connection -> connection.sync().publish(getManagerPresenceChannel(peerId), "ping") == 0); + + if (peerMissing) { + log.debug("Presence manager {} did not respond to ping", peerId); + + final String connectedClientsKey = getConnectedClientSetKey(peerId); + + String presenceKey; + + while ((presenceKey = presenceCluster.withCluster(connection -> connection.sync().spop(connectedClientsKey))) + != null) { + clearPresenceScript.execute(List.of(presenceKey), List.of(peerId)); + pruneClientMeter.mark(); + } + + presenceCluster.useCluster(connection -> { + connection.sync().del(connectedClientsKey); + connection.sync().srem(MANAGER_SET_KEY, peerId); + }); } + } + } + } - for (final String presenceKey : displacementListenersByPresenceKey.keySet()) { - clearPresence(presenceKey); + @Override + public void message(final RedisClusterNode node, final String channel, final String message) { + pubSubMessageMeter.mark(); + + if ("set".equals(message) && channel.startsWith("__keyspace@0__:presence::{")) { + // Another process has overwritten this presence key, which means the client has connected to another host. + // At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making + // synchronous Lettuce calls to avoid deadlocking. + keyspaceNotificationExecutorService.execute(() -> { + try { + displacePresence(channel.substring("__keyspace@0__:".length())); + remoteDisplacementMeter.mark(); + } catch (final Exception e) { + log.warn("Error displacing presence", e); } - - presenceCluster.useCluster(connection -> { - connection.sync().srem(MANAGER_SET_KEY, managerId); - connection.sync().del(getConnectedClientSetKey(managerId)); - }); - - pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe(getManagerPresenceChannel(managerId))); + }); } + } - public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) { - try (final Timer.Context ignored = setPresenceTimer.time()) { - final String presenceKey = getPresenceKey(accountUuid, deviceId); + @VisibleForTesting + static String getPresenceKey(final UUID accountUuid, final long deviceId) { + return "presence::{" + accountUuid.toString() + "::" + deviceId + "}"; + } - displacePresence(presenceKey); + private static String getKeyspaceNotificationChannel(final String presenceKey) { + return "__keyspace@0__:" + presenceKey; + } - displacementListenersByPresenceKey.put(presenceKey, displacementListener); + @VisibleForTesting + static String getConnectedClientSetKey(final String managerId) { + return "presence::clients::" + managerId; + } - presenceCluster.useCluster(connection -> { - final RedisAdvancedClusterCommands commands = connection.sync(); - - commands.sadd(connectedClientSetKey, presenceKey); - commands.set(presenceKey, managerId); - }); - - subscribeForRemotePresenceChanges(presenceKey); - } - } - - private void displacePresence(final String presenceKey) { - final DisplacedPresenceListener displacementListener = displacementListenersByPresenceKey.get(presenceKey); - - if (displacementListener != null) { - displacementListener.handleDisplacement(); - } - - clearPresence(presenceKey); - } - - public boolean isPresent(final UUID accountUuid, final long deviceId) { - try (final Timer.Context ignored = checkPresenceTimer.time()) { - return presenceCluster.withCluster(connection -> connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1; - } - } - - public boolean isLocallyPresent(final UUID accountUuid, final long deviceId) { - return displacementListenersByPresenceKey.containsKey(getPresenceKey(accountUuid, deviceId)); - } - - public boolean clearPresence(final UUID accountUuid, final long deviceId) { - return clearPresence(getPresenceKey(accountUuid, deviceId)); - } - - private boolean clearPresence(final String presenceKey) { - try (final Timer.Context ignored = clearPresenceTimer.time()) { - displacementListenersByPresenceKey.remove(presenceKey); - unsubscribeFromRemotePresenceChanges(presenceKey); - - final boolean removed = clearPresenceScript.execute(List.of(presenceKey), List.of(managerId)) != null; - presenceCluster.useCluster(connection -> connection.sync().srem(connectedClientSetKey, presenceKey)); - - return removed; - } - } - - private void subscribeForRemotePresenceChanges(final String presenceKey) { - final int slot = SlotHash.getSlot(presenceKey); - - pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)) - .commands() - .subscribe(getKeyspaceNotificationChannel(presenceKey))); - } - - private void resubscribeAll() { - for (final String presenceKey : displacementListenersByPresenceKey.keySet()) { - subscribeForRemotePresenceChanges(presenceKey); - } - } - - private void unsubscribeFromRemotePresenceChanges(final String presenceKey) { - pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey))); - } - - void pruneMissingPeers() { - try (final Timer.Context ignored = prunePeersTimer.time()) { - final Set peerIds = presenceCluster.withCluster(connection -> connection.sync().smembers(MANAGER_SET_KEY)); - peerIds.remove(managerId); - - for (final String peerId : peerIds) { - final boolean peerMissing = presenceCluster.withCluster(connection -> connection.sync().publish(getManagerPresenceChannel(peerId), "ping") == 0); - - if (peerMissing) { - log.debug("Presence manager {} did not respond to ping", peerId); - - final String connectedClientsKey = getConnectedClientSetKey(peerId); - - String presenceKey; - - while ((presenceKey = presenceCluster.withCluster(connection -> connection.sync().spop(connectedClientsKey))) != null) { - clearPresenceScript.execute(List.of(presenceKey), List.of(peerId)); - pruneClientMeter.mark(); - } - - presenceCluster.useCluster(connection -> { - connection.sync().del(connectedClientsKey); - connection.sync().srem(MANAGER_SET_KEY, peerId); - }); - } - } - } - } - - @Override - public void message(final RedisClusterNode node, final String channel, final String message) { - pubSubMessageMeter.mark(); - - if ("set".equals(message) && channel.startsWith("__keyspace@0__:presence::{")) { - // Another process has overwritten this presence key, which means the client has connected to another host. - // At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making - // synchronous Lettuce calls to avoid deadlocking. - keyspaceNotificationExecutorService.execute(() -> { - try { - displacePresence(channel.substring("__keyspace@0__:".length())); - remoteDisplacementMeter.mark(); - } catch (final Exception e) { - log.warn("Error displacing presence", e); - } - }); - } - } - - @VisibleForTesting - static String getPresenceKey(final UUID accountUuid, final long deviceId) { - return "presence::{" + accountUuid.toString() + "::" + deviceId + "}"; - } - - private static String getKeyspaceNotificationChannel(final String presenceKey) { - return "__keyspace@0__:" + presenceKey; - } - - @VisibleForTesting - static String getConnectedClientSetKey(final String managerId) { - return "presence::clients::" + managerId; - } - - @VisibleForTesting - static String getManagerPresenceChannel(final String managerId) { - return "presence::manager::" + managerId; - } + @VisibleForTesting + static String getManagerPresenceChannel(final String managerId) { + return "presence::manager::" + managerId; + } }