.editorconfig formatting

This commit is contained in:
Chris Eager 2021-08-13 10:20:33 -05:00 committed by Chris Eager
parent 0cde06557d
commit 0b7c3ad745
1 changed files with 219 additions and 206 deletions

View File

@ -48,240 +48,253 @@ import org.whispersystems.textsecuregcm.util.Constants;
*/ */
public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, String> implements Managed { public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, String> implements Managed {
private final String managerId = UUID.randomUUID().toString(); private final String managerId = UUID.randomUUID().toString();
private final String connectedClientSetKey = getConnectedClientSetKey(managerId); private final String connectedClientSetKey = getConnectedClientSetKey(managerId);
private final FaultTolerantRedisCluster presenceCluster; private final FaultTolerantRedisCluster presenceCluster;
private final FaultTolerantPubSubConnection<String, String> pubSubConnection; private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
private final ClusterLuaScript clearPresenceScript; private final ClusterLuaScript clearPresenceScript;
private final ExecutorService keyspaceNotificationExecutorService; private final ExecutorService keyspaceNotificationExecutorService;
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> pruneMissingPeersFuture; private ScheduledFuture<?> pruneMissingPeersFuture;
private final Map<String, DisplacedPresenceListener> displacementListenersByPresenceKey = new ConcurrentHashMap<>(); private final Map<String, DisplacedPresenceListener> displacementListenersByPresenceKey = new ConcurrentHashMap<>();
private final Timer checkPresenceTimer; private final Timer checkPresenceTimer;
private final Timer setPresenceTimer; private final Timer setPresenceTimer;
private final Timer clearPresenceTimer; private final Timer clearPresenceTimer;
private final Timer prunePeersTimer; private final Timer prunePeersTimer;
private final Meter pruneClientMeter; private final Meter pruneClientMeter;
private final Meter remoteDisplacementMeter; private final Meter remoteDisplacementMeter;
private final Meter pubSubMessageMeter; private final Meter pubSubMessageMeter;
private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int)Duration.ofSeconds(30).toSeconds(); private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int) Duration.ofSeconds(30).toSeconds();
static final String MANAGER_SET_KEY = "presence::managers"; static final String MANAGER_SET_KEY = "presence::managers";
private static final Logger log = LoggerFactory.getLogger(ClientPresenceManager.class); private static final Logger log = LoggerFactory.getLogger(ClientPresenceManager.class);
public ClientPresenceManager(final FaultTolerantRedisCluster presenceCluster, final ScheduledExecutorService scheduledExecutorService, final ExecutorService keyspaceNotificationExecutorService) throws IOException { public ClientPresenceManager(final FaultTolerantRedisCluster presenceCluster,
this.presenceCluster = presenceCluster; final ScheduledExecutorService scheduledExecutorService,
this.pubSubConnection = this.presenceCluster.createPubSubConnection(); final ExecutorService keyspaceNotificationExecutorService) throws IOException {
this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua", ScriptOutputType.INTEGER); this.presenceCluster = presenceCluster;
this.scheduledExecutorService = scheduledExecutorService; this.pubSubConnection = this.presenceCluster.createPubSubConnection();
this.keyspaceNotificationExecutorService = keyspaceNotificationExecutorService; this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua",
ScriptOutputType.INTEGER);
this.scheduledExecutorService = scheduledExecutorService;
this.keyspaceNotificationExecutorService = keyspaceNotificationExecutorService;
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
metricRegistry.gauge(name(getClass(), "localClientCount"), () -> displacementListenersByPresenceKey::size); metricRegistry.gauge(name(getClass(), "localClientCount"), () -> displacementListenersByPresenceKey::size);
this.checkPresenceTimer = metricRegistry.timer(name(getClass(), "checkPresence")); this.checkPresenceTimer = metricRegistry.timer(name(getClass(), "checkPresence"));
this.setPresenceTimer = metricRegistry.timer(name(getClass(), "setPresence")); this.setPresenceTimer = metricRegistry.timer(name(getClass(), "setPresence"));
this.clearPresenceTimer = metricRegistry.timer(name(getClass(), "clearPresence")); this.clearPresenceTimer = metricRegistry.timer(name(getClass(), "clearPresence"));
this.prunePeersTimer = metricRegistry.timer(name(getClass(), "prunePeers")); this.prunePeersTimer = metricRegistry.timer(name(getClass(), "prunePeers"));
this.pruneClientMeter = metricRegistry.meter(name(getClass(), "pruneClient")); this.pruneClientMeter = metricRegistry.meter(name(getClass(), "pruneClient"));
this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement")); this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement"));
this.pubSubMessageMeter = metricRegistry.meter(name(getClass(), "pubSubMessage")); this.pubSubMessageMeter = metricRegistry.meter(name(getClass(), "pubSubMessage"));
}
@VisibleForTesting
FaultTolerantPubSubConnection<String, String> getPubSubConnection() {
return pubSubConnection;
}
@Override
public void start() {
pubSubConnection.usePubSubConnection(connection -> {
connection.addListener(this);
connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribe(event -> resubscribeAll());
final String presenceChannel = getManagerPresenceChannel(managerId);
final int slot = SlotHash.getSlot(presenceChannel);
connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot))
.commands()
.subscribe(presenceChannel);
});
presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId));
pruneMissingPeersFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {
try {
pruneMissingPeers();
} catch (final Throwable t) {
log.warn("Failed to prune missing peers", t);
}
}, new Random().nextInt(PRUNE_PEERS_INTERVAL_SECONDS), PRUNE_PEERS_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
@Override
public void stop() {
pubSubConnection.usePubSubConnection(connection -> connection.removeListener(this));
if (pruneMissingPeersFuture != null) {
pruneMissingPeersFuture.cancel(false);
} }
@VisibleForTesting for (final String presenceKey : displacementListenersByPresenceKey.keySet()) {
FaultTolerantPubSubConnection<String, String> getPubSubConnection() { clearPresence(presenceKey);
return pubSubConnection;
} }
@Override presenceCluster.useCluster(connection -> {
public void start() { connection.sync().srem(MANAGER_SET_KEY, managerId);
pubSubConnection.usePubSubConnection(connection -> { connection.sync().del(getConnectedClientSetKey(managerId));
connection.addListener(this); });
connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribe(event -> resubscribeAll());
final String presenceChannel = getManagerPresenceChannel(managerId); pubSubConnection.usePubSubConnection(
final int slot = SlotHash.getSlot(presenceChannel); connection -> connection.sync().upstream().commands().unsubscribe(getManagerPresenceChannel(managerId)));
}
connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)).commands().subscribe(presenceChannel); public void setPresent(final UUID accountUuid, final long deviceId,
}); final DisplacedPresenceListener displacementListener) {
try (final Timer.Context ignored = setPresenceTimer.time()) {
final String presenceKey = getPresenceKey(accountUuid, deviceId);
presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId)); displacePresence(presenceKey);
pruneMissingPeersFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> { displacementListenersByPresenceKey.put(presenceKey, displacementListener);
try {
pruneMissingPeers(); presenceCluster.useCluster(connection -> {
} catch (final Throwable t) { final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
log.warn("Failed to prune missing peers", t);
} commands.sadd(connectedClientSetKey, presenceKey);
}, new Random().nextInt(PRUNE_PEERS_INTERVAL_SECONDS), PRUNE_PEERS_INTERVAL_SECONDS, TimeUnit.SECONDS); commands.set(presenceKey, managerId);
});
subscribeForRemotePresenceChanges(presenceKey);
}
}
private void displacePresence(final String presenceKey) {
final DisplacedPresenceListener displacementListener = displacementListenersByPresenceKey.get(presenceKey);
if (displacementListener != null) {
displacementListener.handleDisplacement();
} }
@Override clearPresence(presenceKey);
public void stop() { }
pubSubConnection.usePubSubConnection(connection -> connection.removeListener(this));
if (pruneMissingPeersFuture != null) { public boolean isPresent(final UUID accountUuid, final long deviceId) {
pruneMissingPeersFuture.cancel(false); try (final Timer.Context ignored = checkPresenceTimer.time()) {
return presenceCluster.withCluster(connection ->
connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1;
}
}
public boolean isLocallyPresent(final UUID accountUuid, final long deviceId) {
return displacementListenersByPresenceKey.containsKey(getPresenceKey(accountUuid, deviceId));
}
public boolean clearPresence(final UUID accountUuid, final long deviceId) {
return clearPresence(getPresenceKey(accountUuid, deviceId));
}
private boolean clearPresence(final String presenceKey) {
try (final Timer.Context ignored = clearPresenceTimer.time()) {
displacementListenersByPresenceKey.remove(presenceKey);
unsubscribeFromRemotePresenceChanges(presenceKey);
final boolean removed = clearPresenceScript.execute(List.of(presenceKey), List.of(managerId)) != null;
presenceCluster.useCluster(connection -> connection.sync().srem(connectedClientSetKey, presenceKey));
return removed;
}
}
private void subscribeForRemotePresenceChanges(final String presenceKey) {
final int slot = SlotHash.getSlot(presenceKey);
pubSubConnection.usePubSubConnection(
connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot))
.commands()
.subscribe(getKeyspaceNotificationChannel(presenceKey)));
}
private void resubscribeAll() {
for (final String presenceKey : displacementListenersByPresenceKey.keySet()) {
subscribeForRemotePresenceChanges(presenceKey);
}
}
private void unsubscribeFromRemotePresenceChanges(final String presenceKey) {
pubSubConnection.usePubSubConnection(
connection -> connection.sync().upstream().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey)));
}
void pruneMissingPeers() {
try (final Timer.Context ignored = prunePeersTimer.time()) {
final Set<String> peerIds = presenceCluster.withCluster(
connection -> connection.sync().smembers(MANAGER_SET_KEY));
peerIds.remove(managerId);
for (final String peerId : peerIds) {
final boolean peerMissing = presenceCluster.withCluster(
connection -> connection.sync().publish(getManagerPresenceChannel(peerId), "ping") == 0);
if (peerMissing) {
log.debug("Presence manager {} did not respond to ping", peerId);
final String connectedClientsKey = getConnectedClientSetKey(peerId);
String presenceKey;
while ((presenceKey = presenceCluster.withCluster(connection -> connection.sync().spop(connectedClientsKey)))
!= null) {
clearPresenceScript.execute(List.of(presenceKey), List.of(peerId));
pruneClientMeter.mark();
}
presenceCluster.useCluster(connection -> {
connection.sync().del(connectedClientsKey);
connection.sync().srem(MANAGER_SET_KEY, peerId);
});
} }
}
}
}
for (final String presenceKey : displacementListenersByPresenceKey.keySet()) { @Override
clearPresence(presenceKey); public void message(final RedisClusterNode node, final String channel, final String message) {
pubSubMessageMeter.mark();
if ("set".equals(message) && channel.startsWith("__keyspace@0__:presence::{")) {
// Another process has overwritten this presence key, which means the client has connected to another host.
// At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making
// synchronous Lettuce calls to avoid deadlocking.
keyspaceNotificationExecutorService.execute(() -> {
try {
displacePresence(channel.substring("__keyspace@0__:".length()));
remoteDisplacementMeter.mark();
} catch (final Exception e) {
log.warn("Error displacing presence", e);
} }
});
presenceCluster.useCluster(connection -> {
connection.sync().srem(MANAGER_SET_KEY, managerId);
connection.sync().del(getConnectedClientSetKey(managerId));
});
pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe(getManagerPresenceChannel(managerId)));
} }
}
public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) { @VisibleForTesting
try (final Timer.Context ignored = setPresenceTimer.time()) { static String getPresenceKey(final UUID accountUuid, final long deviceId) {
final String presenceKey = getPresenceKey(accountUuid, deviceId); return "presence::{" + accountUuid.toString() + "::" + deviceId + "}";
}
displacePresence(presenceKey); private static String getKeyspaceNotificationChannel(final String presenceKey) {
return "__keyspace@0__:" + presenceKey;
}
displacementListenersByPresenceKey.put(presenceKey, displacementListener); @VisibleForTesting
static String getConnectedClientSetKey(final String managerId) {
return "presence::clients::" + managerId;
}
presenceCluster.useCluster(connection -> { @VisibleForTesting
final RedisAdvancedClusterCommands<String, String> commands = connection.sync(); static String getManagerPresenceChannel(final String managerId) {
return "presence::manager::" + managerId;
commands.sadd(connectedClientSetKey, presenceKey); }
commands.set(presenceKey, managerId);
});
subscribeForRemotePresenceChanges(presenceKey);
}
}
private void displacePresence(final String presenceKey) {
final DisplacedPresenceListener displacementListener = displacementListenersByPresenceKey.get(presenceKey);
if (displacementListener != null) {
displacementListener.handleDisplacement();
}
clearPresence(presenceKey);
}
public boolean isPresent(final UUID accountUuid, final long deviceId) {
try (final Timer.Context ignored = checkPresenceTimer.time()) {
return presenceCluster.withCluster(connection -> connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1;
}
}
public boolean isLocallyPresent(final UUID accountUuid, final long deviceId) {
return displacementListenersByPresenceKey.containsKey(getPresenceKey(accountUuid, deviceId));
}
public boolean clearPresence(final UUID accountUuid, final long deviceId) {
return clearPresence(getPresenceKey(accountUuid, deviceId));
}
private boolean clearPresence(final String presenceKey) {
try (final Timer.Context ignored = clearPresenceTimer.time()) {
displacementListenersByPresenceKey.remove(presenceKey);
unsubscribeFromRemotePresenceChanges(presenceKey);
final boolean removed = clearPresenceScript.execute(List.of(presenceKey), List.of(managerId)) != null;
presenceCluster.useCluster(connection -> connection.sync().srem(connectedClientSetKey, presenceKey));
return removed;
}
}
private void subscribeForRemotePresenceChanges(final String presenceKey) {
final int slot = SlotHash.getSlot(presenceKey);
pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot))
.commands()
.subscribe(getKeyspaceNotificationChannel(presenceKey)));
}
private void resubscribeAll() {
for (final String presenceKey : displacementListenersByPresenceKey.keySet()) {
subscribeForRemotePresenceChanges(presenceKey);
}
}
private void unsubscribeFromRemotePresenceChanges(final String presenceKey) {
pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey)));
}
void pruneMissingPeers() {
try (final Timer.Context ignored = prunePeersTimer.time()) {
final Set<String> peerIds = presenceCluster.withCluster(connection -> connection.sync().smembers(MANAGER_SET_KEY));
peerIds.remove(managerId);
for (final String peerId : peerIds) {
final boolean peerMissing = presenceCluster.withCluster(connection -> connection.sync().publish(getManagerPresenceChannel(peerId), "ping") == 0);
if (peerMissing) {
log.debug("Presence manager {} did not respond to ping", peerId);
final String connectedClientsKey = getConnectedClientSetKey(peerId);
String presenceKey;
while ((presenceKey = presenceCluster.withCluster(connection -> connection.sync().spop(connectedClientsKey))) != null) {
clearPresenceScript.execute(List.of(presenceKey), List.of(peerId));
pruneClientMeter.mark();
}
presenceCluster.useCluster(connection -> {
connection.sync().del(connectedClientsKey);
connection.sync().srem(MANAGER_SET_KEY, peerId);
});
}
}
}
}
@Override
public void message(final RedisClusterNode node, final String channel, final String message) {
pubSubMessageMeter.mark();
if ("set".equals(message) && channel.startsWith("__keyspace@0__:presence::{")) {
// Another process has overwritten this presence key, which means the client has connected to another host.
// At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making
// synchronous Lettuce calls to avoid deadlocking.
keyspaceNotificationExecutorService.execute(() -> {
try {
displacePresence(channel.substring("__keyspace@0__:".length()));
remoteDisplacementMeter.mark();
} catch (final Exception e) {
log.warn("Error displacing presence", e);
}
});
}
}
@VisibleForTesting
static String getPresenceKey(final UUID accountUuid, final long deviceId) {
return "presence::{" + accountUuid.toString() + "::" + deviceId + "}";
}
private static String getKeyspaceNotificationChannel(final String presenceKey) {
return "__keyspace@0__:" + presenceKey;
}
@VisibleForTesting
static String getConnectedClientSetKey(final String managerId) {
return "presence::clients::" + managerId;
}
@VisibleForTesting
static String getManagerPresenceChannel(final String managerId) {
return "presence::manager::" + managerId;
}
} }