Disarm the client presence manager experiment.
This commit is contained in:
parent
f1a74b5939
commit
bb6045c1d0
|
@ -104,6 +104,7 @@ import org.whispersystems.textsecuregcm.push.APNSender;
|
|||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
||||
import org.whispersystems.textsecuregcm.push.GCMSender;
|
||||
import org.whispersystems.textsecuregcm.push.NoopClientPresenceManager;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||
import org.whispersystems.textsecuregcm.push.WebsocketSender;
|
||||
|
@ -337,11 +338,12 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration().getUrls(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getTimeout(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getCircuitBreakerConfiguration());
|
||||
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration().getUrls(), config.getMetricsClusterConfiguration().getTimeout(), config.getMetricsClusterConfiguration().getCircuitBreakerConfiguration());
|
||||
|
||||
ScheduledExecutorService clientPresenceExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build();
|
||||
// ScheduledExecutorService clientPresenceExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build();
|
||||
ExecutorService messageNotificationExecutor = environment.lifecycle().executorService("messageCacheNotifications").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
||||
ExecutorService messageCacheClusterExperimentExecutor = environment.lifecycle().executorService("messages_cache_experiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
||||
ExecutorService websocketExperimentExecutor = environment.lifecycle().executorService("websocketPresenceExperiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
||||
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, clientPresenceExecutor);
|
||||
// ClientPresenceManager clientPresenceManager = new KeyspaceNotificationClientPresenceManager(messagesCacheCluster, clientPresenceExecutor);
|
||||
ClientPresenceManager clientPresenceManager = new NoopClientPresenceManager();
|
||||
|
||||
DirectoryManager directory = new DirectoryManager(directoryClient);
|
||||
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
|
||||
|
@ -404,7 +406,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
environment.lifecycle().manage(accountDatabaseCrawler);
|
||||
environment.lifecycle().manage(remoteConfigsManager);
|
||||
environment.lifecycle().manage(clusterMessagePersister);
|
||||
environment.lifecycle().manage(clientPresenceManager);
|
||||
// environment.lifecycle().manage(clientPresenceManager);
|
||||
|
||||
AWSCredentials credentials = new BasicAWSCredentials(config.getCdnConfiguration().getAccessKey(), config.getCdnConfiguration().getAccessSecret());
|
||||
AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
|
||||
|
|
|
@ -1,264 +1,11 @@
|
|||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
import io.lettuce.core.ScriptOutputType;
|
||||
import io.lettuce.core.cluster.SlotHash;
|
||||
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
|
||||
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
|
||||
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
|
||||
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
public interface ClientPresenceManager {
|
||||
void setPresent(UUID accountUuid, long deviceId, DisplacedPresenceListener displacementListener);
|
||||
|
||||
/**
|
||||
* The client presence manager keeps track of which clients are actively connected and "present" to receive messages.
|
||||
* Only one client per account/device may be present at a time; if a second client for the same account/device declares
|
||||
* its presence, the previous client is displaced.
|
||||
*/
|
||||
public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, String> implements Managed {
|
||||
boolean isPresent(UUID accountUuid, long deviceId);
|
||||
|
||||
private final String managerId = UUID.randomUUID().toString();
|
||||
private final String connectedClientSetKey = getConnectedClientSetKey(managerId);
|
||||
|
||||
private final FaultTolerantRedisCluster presenceCluster;
|
||||
private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
|
||||
|
||||
private final ClusterLuaScript clearPresenceScript;
|
||||
|
||||
private final ScheduledExecutorService scheduledExecutorService;
|
||||
private ScheduledFuture<?> pruneMissingPeersFuture;
|
||||
|
||||
private final Map<String, DisplacedPresenceListener> displacementListenersByPresenceKey = new ConcurrentHashMap<>();
|
||||
|
||||
private final Timer checkPresenceTimer;
|
||||
private final Timer setPresenceTimer;
|
||||
private final Timer clearPresenceTimer;
|
||||
private final Timer prunePeersTimer;
|
||||
private final Meter pruneClientMeter;
|
||||
private final Meter remoteDisplacementMeter;
|
||||
|
||||
private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int)Duration.ofMinutes(3).toSeconds();
|
||||
|
||||
static final String MANAGER_SET_KEY = "presence::managers";
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ClientPresenceManager.class);
|
||||
|
||||
public ClientPresenceManager(final FaultTolerantRedisCluster presenceCluster, final ScheduledExecutorService scheduledExecutorService) throws IOException {
|
||||
this.presenceCluster = presenceCluster;
|
||||
this.pubSubConnection = this.presenceCluster.createPubSubConnection();
|
||||
this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua", ScriptOutputType.INTEGER);
|
||||
this.scheduledExecutorService = scheduledExecutorService;
|
||||
|
||||
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
metricRegistry.gauge(name(getClass(), "localClientCount"), () -> displacementListenersByPresenceKey::size);
|
||||
|
||||
this.checkPresenceTimer = metricRegistry.timer(name(getClass(), "checkPresence"));
|
||||
this.setPresenceTimer = metricRegistry.timer(name(getClass(), "setPresence"));
|
||||
this.clearPresenceTimer = metricRegistry.timer(name(getClass(), "clearPresence"));
|
||||
this.prunePeersTimer = metricRegistry.timer(name(getClass(), "prunePeers"));
|
||||
this.pruneClientMeter = metricRegistry.meter(name(getClass(), "pruneClient"));
|
||||
this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement"));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
FaultTolerantPubSubConnection<String, String> getPubSubConnection() {
|
||||
return pubSubConnection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
pubSubConnection.usePubSubConnection(connection -> {
|
||||
connection.addListener(this);
|
||||
connection.getResources().eventBus().get()
|
||||
.filter(event -> event instanceof ClusterTopologyChangedEvent)
|
||||
.handle((event, sink) -> {
|
||||
resubscribeAll();
|
||||
sink.next(event);
|
||||
});
|
||||
|
||||
final String presenceChannel = getManagerPresenceChannel(managerId);
|
||||
final int slot = SlotHash.getSlot(presenceChannel);
|
||||
|
||||
connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)).commands().subscribe(presenceChannel);
|
||||
});
|
||||
|
||||
presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId));
|
||||
|
||||
pruneMissingPeersFuture = scheduledExecutorService.scheduleAtFixedRate(this::pruneMissingPeers, new Random().nextInt(PRUNE_PEERS_INTERVAL_SECONDS), PRUNE_PEERS_INTERVAL_SECONDS, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
pubSubConnection.usePubSubConnection(connection -> connection.removeListener(this));
|
||||
|
||||
if (pruneMissingPeersFuture != null) {
|
||||
pruneMissingPeersFuture.cancel(false);
|
||||
}
|
||||
|
||||
for (final String presenceKey : displacementListenersByPresenceKey.keySet()) {
|
||||
clearPresence(presenceKey);
|
||||
}
|
||||
|
||||
presenceCluster.useCluster(connection -> {
|
||||
connection.sync().srem(MANAGER_SET_KEY, managerId);
|
||||
connection.sync().del(getConnectedClientSetKey(managerId));
|
||||
});
|
||||
|
||||
pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getManagerPresenceChannel(managerId)));
|
||||
}
|
||||
|
||||
public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) {
|
||||
try (final Timer.Context ignored = setPresenceTimer.time()) {
|
||||
final String presenceKey = getPresenceKey(accountUuid, deviceId);
|
||||
|
||||
displacePresence(presenceKey);
|
||||
|
||||
displacementListenersByPresenceKey.put(presenceKey, displacementListener);
|
||||
|
||||
presenceCluster.useCluster(connection -> {
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
commands.set(presenceKey, managerId);
|
||||
commands.sadd(connectedClientSetKey, presenceKey);
|
||||
});
|
||||
|
||||
subscribeForRemotePresenceChanges(presenceKey);
|
||||
}
|
||||
}
|
||||
|
||||
private void displacePresence(final String presenceKey) {
|
||||
final DisplacedPresenceListener displacementListener = displacementListenersByPresenceKey.get(presenceKey);
|
||||
|
||||
if (displacementListener != null) {
|
||||
displacementListener.handleDisplacement();
|
||||
}
|
||||
|
||||
clearPresence(presenceKey);
|
||||
}
|
||||
|
||||
public boolean isPresent(final UUID accountUuid, final long deviceId) {
|
||||
try (final Timer.Context ignored = checkPresenceTimer.time()) {
|
||||
return presenceCluster.withCluster(connection -> connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean clearPresence(final UUID accountUuid, final long deviceId) {
|
||||
return clearPresence(getPresenceKey(accountUuid, deviceId));
|
||||
}
|
||||
|
||||
private boolean clearPresence(final String presenceKey) {
|
||||
try (final Timer.Context ignored = clearPresenceTimer.time()) {
|
||||
displacementListenersByPresenceKey.remove(presenceKey);
|
||||
unsubscribeFromRemotePresenceChanges(presenceKey);
|
||||
|
||||
final boolean removed = clearPresenceScript.execute(List.of(presenceKey), List.of(managerId)) != null;
|
||||
presenceCluster.useCluster(connection -> connection.sync().srem(connectedClientSetKey, presenceKey));
|
||||
|
||||
return removed;
|
||||
}
|
||||
}
|
||||
|
||||
private void subscribeForRemotePresenceChanges(final String presenceKey) {
|
||||
final int slot = SlotHash.getSlot(presenceKey);
|
||||
|
||||
pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot))
|
||||
.commands()
|
||||
.subscribe(getKeyspaceNotificationChannel(presenceKey)));
|
||||
}
|
||||
|
||||
private void resubscribeAll() {
|
||||
for (final String presenceKey : displacementListenersByPresenceKey.keySet()) {
|
||||
subscribeForRemotePresenceChanges(presenceKey);
|
||||
}
|
||||
}
|
||||
|
||||
private void unsubscribeFromRemotePresenceChanges(final String presenceKey) {
|
||||
pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey)));
|
||||
}
|
||||
|
||||
void pruneMissingPeers() {
|
||||
try (final Timer.Context ignored = prunePeersTimer.time()) {
|
||||
final Set<String> peerIds = presenceCluster.withCluster(connection -> connection.sync().smembers(MANAGER_SET_KEY));
|
||||
peerIds.remove(managerId);
|
||||
|
||||
for (final String peerId : peerIds) {
|
||||
final boolean peerMissing = presenceCluster.withCluster(connection -> connection.sync().publish(getManagerPresenceChannel(peerId), "ping") == 0);
|
||||
|
||||
if (peerMissing) {
|
||||
log.debug("Presence manager {} did not respond to ping", peerId);
|
||||
|
||||
final String connectedClientsKey = getConnectedClientSetKey(peerId);
|
||||
|
||||
presenceCluster.useCluster(connection -> {
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
String presenceKey;
|
||||
|
||||
while ((presenceKey = commands.spop(connectedClientsKey)) != null) {
|
||||
clearPresenceScript.execute(List.of(presenceKey), List.of(peerId));
|
||||
pruneClientMeter.mark();
|
||||
}
|
||||
|
||||
commands.del(connectedClientsKey);
|
||||
commands.srem(MANAGER_SET_KEY, peerId);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void message(final RedisClusterNode node, final String channel, final String message) {
|
||||
if ("set".equals(message) && channel.startsWith("__keyspace@0__:presence::{")) {
|
||||
// Another process has overwritten this presence key, which means the client has connected to another host.
|
||||
// At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making
|
||||
// synchronous Lettuce calls to avoid deadlocking.
|
||||
scheduledExecutorService.execute(() -> {
|
||||
displacePresence(channel.substring("__keyspace@0__:".length()));
|
||||
remoteDisplacementMeter.mark();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getPresenceKey(final UUID accountUuid, final long deviceId) {
|
||||
return "presence::{" + accountUuid.toString() + "::" + deviceId + "}";
|
||||
}
|
||||
|
||||
private static String getKeyspaceNotificationChannel(final String presenceKey) {
|
||||
return "__keyspace@0__:" + presenceKey;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getConnectedClientSetKey(final String managerId) {
|
||||
return "presence::clients::" + managerId;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getManagerPresenceChannel(final String managerId) {
|
||||
return "presence::manager::" + managerId;
|
||||
}
|
||||
boolean clearPresence(UUID accountUuid, long deviceId);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,267 @@
|
|||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
import io.lettuce.core.ScriptOutputType;
|
||||
import io.lettuce.core.cluster.SlotHash;
|
||||
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
|
||||
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
|
||||
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
|
||||
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
/**
|
||||
* The client presence manager keeps track of which clients are actively connected and "present" to receive messages.
|
||||
* Only one client per account/device may be present at a time; if a second client for the same account/device declares
|
||||
* its presence, the previous client is displaced.
|
||||
*/
|
||||
public class KeyspaceNotificationClientPresenceManager extends RedisClusterPubSubAdapter<String, String> implements Managed, ClientPresenceManager {
|
||||
|
||||
private final String managerId = UUID.randomUUID().toString();
|
||||
private final String connectedClientSetKey = getConnectedClientSetKey(managerId);
|
||||
|
||||
private final FaultTolerantRedisCluster presenceCluster;
|
||||
private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
|
||||
|
||||
private final ClusterLuaScript clearPresenceScript;
|
||||
|
||||
private final ScheduledExecutorService scheduledExecutorService;
|
||||
private ScheduledFuture<?> pruneMissingPeersFuture;
|
||||
|
||||
private final Map<String, DisplacedPresenceListener> displacementListenersByPresenceKey = new ConcurrentHashMap<>();
|
||||
|
||||
private final Timer checkPresenceTimer;
|
||||
private final Timer setPresenceTimer;
|
||||
private final Timer clearPresenceTimer;
|
||||
private final Timer prunePeersTimer;
|
||||
private final Meter pruneClientMeter;
|
||||
private final Meter remoteDisplacementMeter;
|
||||
|
||||
private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int)Duration.ofMinutes(3).toSeconds();
|
||||
|
||||
static final String MANAGER_SET_KEY = "presence::managers";
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(KeyspaceNotificationClientPresenceManager.class);
|
||||
|
||||
public KeyspaceNotificationClientPresenceManager(final FaultTolerantRedisCluster presenceCluster, final ScheduledExecutorService scheduledExecutorService) throws IOException {
|
||||
this.presenceCluster = presenceCluster;
|
||||
this.pubSubConnection = this.presenceCluster.createPubSubConnection();
|
||||
this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua", ScriptOutputType.INTEGER);
|
||||
this.scheduledExecutorService = scheduledExecutorService;
|
||||
|
||||
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
metricRegistry.gauge(name(getClass(), "localClientCount"), () -> displacementListenersByPresenceKey::size);
|
||||
|
||||
this.checkPresenceTimer = metricRegistry.timer(name(getClass(), "checkPresence"));
|
||||
this.setPresenceTimer = metricRegistry.timer(name(getClass(), "setPresence"));
|
||||
this.clearPresenceTimer = metricRegistry.timer(name(getClass(), "clearPresence"));
|
||||
this.prunePeersTimer = metricRegistry.timer(name(getClass(), "prunePeers"));
|
||||
this.pruneClientMeter = metricRegistry.meter(name(getClass(), "pruneClient"));
|
||||
this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement"));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
FaultTolerantPubSubConnection<String, String> getPubSubConnection() {
|
||||
return pubSubConnection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
pubSubConnection.usePubSubConnection(connection -> {
|
||||
connection.addListener(this);
|
||||
connection.getResources().eventBus().get()
|
||||
.filter(event -> event instanceof ClusterTopologyChangedEvent)
|
||||
.handle((event, sink) -> {
|
||||
resubscribeAll();
|
||||
sink.next(event);
|
||||
});
|
||||
|
||||
final String presenceChannel = getManagerPresenceChannel(managerId);
|
||||
final int slot = SlotHash.getSlot(presenceChannel);
|
||||
|
||||
connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)).commands().subscribe(presenceChannel);
|
||||
});
|
||||
|
||||
presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId));
|
||||
|
||||
pruneMissingPeersFuture = scheduledExecutorService.scheduleAtFixedRate(this::pruneMissingPeers, new Random().nextInt(PRUNE_PEERS_INTERVAL_SECONDS), PRUNE_PEERS_INTERVAL_SECONDS, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
pubSubConnection.usePubSubConnection(connection -> connection.removeListener(this));
|
||||
|
||||
if (pruneMissingPeersFuture != null) {
|
||||
pruneMissingPeersFuture.cancel(false);
|
||||
}
|
||||
|
||||
for (final String presenceKey : displacementListenersByPresenceKey.keySet()) {
|
||||
clearPresence(presenceKey);
|
||||
}
|
||||
|
||||
presenceCluster.useCluster(connection -> {
|
||||
connection.sync().srem(MANAGER_SET_KEY, managerId);
|
||||
connection.sync().del(getConnectedClientSetKey(managerId));
|
||||
});
|
||||
|
||||
pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getManagerPresenceChannel(managerId)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) {
|
||||
try (final Timer.Context ignored = setPresenceTimer.time()) {
|
||||
final String presenceKey = getPresenceKey(accountUuid, deviceId);
|
||||
|
||||
displacePresence(presenceKey);
|
||||
|
||||
displacementListenersByPresenceKey.put(presenceKey, displacementListener);
|
||||
|
||||
presenceCluster.useCluster(connection -> {
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
commands.set(presenceKey, managerId);
|
||||
commands.sadd(connectedClientSetKey, presenceKey);
|
||||
});
|
||||
|
||||
subscribeForRemotePresenceChanges(presenceKey);
|
||||
}
|
||||
}
|
||||
|
||||
private void displacePresence(final String presenceKey) {
|
||||
final DisplacedPresenceListener displacementListener = displacementListenersByPresenceKey.get(presenceKey);
|
||||
|
||||
if (displacementListener != null) {
|
||||
displacementListener.handleDisplacement();
|
||||
}
|
||||
|
||||
clearPresence(presenceKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPresent(final UUID accountUuid, final long deviceId) {
|
||||
try (final Timer.Context ignored = checkPresenceTimer.time()) {
|
||||
return presenceCluster.withCluster(connection -> connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean clearPresence(final UUID accountUuid, final long deviceId) {
|
||||
return clearPresence(getPresenceKey(accountUuid, deviceId));
|
||||
}
|
||||
|
||||
private boolean clearPresence(final String presenceKey) {
|
||||
try (final Timer.Context ignored = clearPresenceTimer.time()) {
|
||||
displacementListenersByPresenceKey.remove(presenceKey);
|
||||
unsubscribeFromRemotePresenceChanges(presenceKey);
|
||||
|
||||
final boolean removed = clearPresenceScript.execute(List.of(presenceKey), List.of(managerId)) != null;
|
||||
presenceCluster.useCluster(connection -> connection.sync().srem(connectedClientSetKey, presenceKey));
|
||||
|
||||
return removed;
|
||||
}
|
||||
}
|
||||
|
||||
private void subscribeForRemotePresenceChanges(final String presenceKey) {
|
||||
final int slot = SlotHash.getSlot(presenceKey);
|
||||
|
||||
pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot))
|
||||
.commands()
|
||||
.subscribe(getKeyspaceNotificationChannel(presenceKey)));
|
||||
}
|
||||
|
||||
private void resubscribeAll() {
|
||||
for (final String presenceKey : displacementListenersByPresenceKey.keySet()) {
|
||||
subscribeForRemotePresenceChanges(presenceKey);
|
||||
}
|
||||
}
|
||||
|
||||
private void unsubscribeFromRemotePresenceChanges(final String presenceKey) {
|
||||
pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey)));
|
||||
}
|
||||
|
||||
void pruneMissingPeers() {
|
||||
try (final Timer.Context ignored = prunePeersTimer.time()) {
|
||||
final Set<String> peerIds = presenceCluster.withCluster(connection -> connection.sync().smembers(MANAGER_SET_KEY));
|
||||
peerIds.remove(managerId);
|
||||
|
||||
for (final String peerId : peerIds) {
|
||||
final boolean peerMissing = presenceCluster.withCluster(connection -> connection.sync().publish(getManagerPresenceChannel(peerId), "ping") == 0);
|
||||
|
||||
if (peerMissing) {
|
||||
log.debug("Presence manager {} did not respond to ping", peerId);
|
||||
|
||||
final String connectedClientsKey = getConnectedClientSetKey(peerId);
|
||||
|
||||
presenceCluster.useCluster(connection -> {
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
String presenceKey;
|
||||
|
||||
while ((presenceKey = commands.spop(connectedClientsKey)) != null) {
|
||||
clearPresenceScript.execute(List.of(presenceKey), List.of(peerId));
|
||||
pruneClientMeter.mark();
|
||||
}
|
||||
|
||||
commands.del(connectedClientsKey);
|
||||
commands.srem(MANAGER_SET_KEY, peerId);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void message(final RedisClusterNode node, final String channel, final String message) {
|
||||
if ("set".equals(message) && channel.startsWith("__keyspace@0__:presence::{")) {
|
||||
// Another process has overwritten this presence key, which means the client has connected to another host.
|
||||
// At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making
|
||||
// synchronous Lettuce calls to avoid deadlocking.
|
||||
scheduledExecutorService.execute(() -> {
|
||||
displacePresence(channel.substring("__keyspace@0__:".length()));
|
||||
remoteDisplacementMeter.mark();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getPresenceKey(final UUID accountUuid, final long deviceId) {
|
||||
return "presence::{" + accountUuid.toString() + "::" + deviceId + "}";
|
||||
}
|
||||
|
||||
private static String getKeyspaceNotificationChannel(final String presenceKey) {
|
||||
return "__keyspace@0__:" + presenceKey;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getConnectedClientSetKey(final String managerId) {
|
||||
return "presence::clients::" + managerId;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getManagerPresenceChannel(final String managerId) {
|
||||
return "presence::manager::" + managerId;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public class NoopClientPresenceManager implements ClientPresenceManager {
|
||||
|
||||
@Override
|
||||
public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPresent(final UUID accountUuid, final long deviceId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean clearPresence(final UUID accountUuid, final long deviceId) {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -10,7 +10,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
||||
import org.whispersystems.textsecuregcm.push.DisplacedPresenceListener;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||
|
|
|
@ -18,10 +18,10 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
||||
public class KeyspaceNotificationClientPresenceManagerTest extends AbstractRedisClusterTest {
|
||||
|
||||
private ScheduledExecutorService presenceRenewalExecutorService;
|
||||
private ClientPresenceManager clientPresenceManager;
|
||||
private KeyspaceNotificationClientPresenceManager clientPresenceManager;
|
||||
|
||||
private static final DisplacedPresenceListener NO_OP = () -> {};
|
||||
|
||||
|
@ -36,7 +36,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
});
|
||||
|
||||
presenceRenewalExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
clientPresenceManager = new ClientPresenceManager(getRedisCluster(), presenceRenewalExecutorService);
|
||||
clientPresenceManager = new KeyspaceNotificationClientPresenceManager(getRedisCluster(), presenceRenewalExecutorService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -93,7 +93,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
}
|
||||
});
|
||||
|
||||
getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
|
||||
getRedisCluster().useCluster(connection -> connection.sync().set(KeyspaceNotificationClientPresenceManager.getPresenceKey(accountUuid, deviceId),
|
||||
UUID.randomUUID().toString()));
|
||||
|
||||
synchronized (displaced) {
|
||||
|
@ -125,7 +125,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
|
||||
clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.getResources().eventBus().publish(new ClusterTopologyChangedEvent(List.of(), List.of())));
|
||||
|
||||
getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
|
||||
getRedisCluster().useCluster(connection -> connection.sync().set(KeyspaceNotificationClientPresenceManager.getPresenceKey(accountUuid, deviceId),
|
||||
UUID.randomUUID().toString()));
|
||||
|
||||
synchronized (displaced) {
|
||||
|
@ -149,7 +149,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
assertTrue(clientPresenceManager.clearPresence(accountUuid, deviceId));
|
||||
|
||||
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
|
||||
getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
|
||||
getRedisCluster().useCluster(connection -> connection.sync().set(KeyspaceNotificationClientPresenceManager.getPresenceKey(accountUuid, deviceId),
|
||||
UUID.randomUUID().toString()));
|
||||
|
||||
assertFalse(clientPresenceManager.clearPresence(accountUuid, deviceId));
|
||||
|
@ -161,8 +161,8 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
final String missingPeerId = UUID.randomUUID().toString();
|
||||
|
||||
getRedisCluster().useCluster(connection -> {
|
||||
connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId);
|
||||
connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId);
|
||||
connection.sync().sadd(KeyspaceNotificationClientPresenceManager.MANAGER_SET_KEY, presentPeerId);
|
||||
connection.sync().sadd(KeyspaceNotificationClientPresenceManager.MANAGER_SET_KEY, missingPeerId);
|
||||
});
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
|
@ -170,22 +170,22 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
addClientPresence(missingPeerId);
|
||||
}
|
||||
|
||||
clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().masters().commands().subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId)));
|
||||
clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().masters().commands().subscribe(KeyspaceNotificationClientPresenceManager.getManagerPresenceChannel(presentPeerId)));
|
||||
clientPresenceManager.pruneMissingPeers();
|
||||
|
||||
assertEquals(1, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId))));
|
||||
assertTrue(getRedisCluster().withCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId)));
|
||||
assertEquals(1, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(KeyspaceNotificationClientPresenceManager.getConnectedClientSetKey(presentPeerId))));
|
||||
assertTrue(getRedisCluster().withCluster(connection -> connection.sync().sismember(KeyspaceNotificationClientPresenceManager.MANAGER_SET_KEY, presentPeerId)));
|
||||
|
||||
assertEquals(0, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(missingPeerId))));
|
||||
assertFalse(getRedisCluster().withCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId)));
|
||||
assertEquals(0, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(KeyspaceNotificationClientPresenceManager.getConnectedClientSetKey(missingPeerId))));
|
||||
assertFalse(getRedisCluster().withCluster(connection -> connection.sync().sismember(KeyspaceNotificationClientPresenceManager.MANAGER_SET_KEY, missingPeerId)));
|
||||
}
|
||||
|
||||
private void addClientPresence(final String managerId) {
|
||||
final String clientPresenceKey = ClientPresenceManager.getPresenceKey(UUID.randomUUID(), 7);
|
||||
final String clientPresenceKey = KeyspaceNotificationClientPresenceManager.getPresenceKey(UUID.randomUUID(), 7);
|
||||
|
||||
getRedisCluster().useCluster(connection -> {
|
||||
connection.sync().set(clientPresenceKey, managerId);
|
||||
connection.sync().sadd(ClientPresenceManager.getConnectedClientSetKey(managerId), clientPresenceKey);
|
||||
connection.sync().sadd(KeyspaceNotificationClientPresenceManager.getConnectedClientSetKey(managerId), clientPresenceKey);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -206,7 +206,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
final long displacedAccountDeviceId = 7;
|
||||
|
||||
clientPresenceManager.setPresent(displacedAccountUuid, displacedAccountDeviceId, NO_OP);
|
||||
getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(displacedAccountUuid, displacedAccountDeviceId),
|
||||
getRedisCluster().useCluster(connection -> connection.sync().set(KeyspaceNotificationClientPresenceManager.getPresenceKey(displacedAccountUuid, displacedAccountDeviceId),
|
||||
UUID.randomUUID().toString()));
|
||||
|
||||
clientPresenceManager.stop();
|
|
@ -1,7 +1,6 @@
|
|||
package org.whispersystems.textsecuregcm.tests.websocket;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
|
@ -11,7 +10,7 @@ import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
|
|||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
|
||||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
||||
import org.whispersystems.textsecuregcm.push.KeyspaceNotificationClientPresenceManager;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||
import org.whispersystems.textsecuregcm.push.WebsocketSender;
|
||||
|
@ -27,7 +26,6 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
|
|||
import org.whispersystems.textsecuregcm.websocket.WebSocketConnection;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
||||
import org.whispersystems.websocket.WebSocketClient;
|
||||
import org.whispersystems.websocket.auth.WebSocketAuthenticator;
|
||||
import org.whispersystems.websocket.auth.WebSocketAuthenticator.AuthenticationResult;
|
||||
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
|
||||
import org.whispersystems.websocket.session.WebSocketSessionContext;
|
||||
|
@ -70,7 +68,7 @@ public class WebSocketConnectionTest {
|
|||
public void testCredentials() throws Exception {
|
||||
MessagesManager storedMessages = mock(MessagesManager.class);
|
||||
WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator);
|
||||
AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(pushSender, receiptSender, storedMessages, pubSubManager, apnFallbackManager, mock(ClientPresenceManager.class));
|
||||
AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(pushSender, receiptSender, storedMessages, pubSubManager, apnFallbackManager, mock(KeyspaceNotificationClientPresenceManager.class));
|
||||
WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class);
|
||||
|
||||
when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD))))
|
||||
|
|
Loading…
Reference in New Issue