Pass the reason for displacement to presence displacement listeners

This commit is contained in:
Jon Chambers 2022-03-09 16:54:43 -05:00 committed by Jon Chambers
parent 1dd7d33e23
commit 1ba00a66eb
4 changed files with 16 additions and 16 deletions

View File

@ -158,7 +158,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
try (final Timer.Context ignored = setPresenceTimer.time()) {
final String presenceKey = getPresenceKey(accountUuid, deviceId);
displacePresence(presenceKey);
displacePresence(presenceKey, true);
displacementListenersByPresenceKey.put(presenceKey, displacementListener);
@ -182,7 +182,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
final String presenceKey = getPresenceKey(accountUuid, deviceId);
if (isLocallyPresent(accountUuid, deviceId)) {
displacePresence(presenceKey);
displacePresence(presenceKey, false);
}
// If connected locally, we still need to clean up the presence key.
@ -190,11 +190,11 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
presenceCluster.useCluster(connection -> connection.sync().del(presenceKey));
}
private void displacePresence(final String presenceKey) {
private void displacePresence(final String presenceKey, final boolean connectedElsewhere) {
final DisplacedPresenceListener displacementListener = displacementListenersByPresenceKey.get(presenceKey);
if (displacementListener != null) {
displacementListener.handleDisplacement();
displacementListener.handleDisplacement(connectedElsewhere);
}
clearPresence(presenceKey);
@ -287,12 +287,13 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
if ("set".equals(message) || "del".equals(message)) {
// for "set", another process has overwritten this presence key, which means the client has connected to another host.
// for "del", another process has indicated the client should be disconnected
final boolean connectedElsewhere = "set".equals(message);
// At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making
// synchronous Lettuce calls to avoid deadlocking.
keyspaceNotificationExecutorService.execute(() -> {
try {
displacePresence(channel.substring("__keyspace@0__:".length()));
displacePresence(channel.substring("__keyspace@0__:".length()), connectedElsewhere);
remoteDisplacementMeter.mark();
} catch (final Exception e) {
log.warn("Error displacing presence", e);

View File

@ -12,5 +12,5 @@ package org.whispersystems.textsecuregcm.push;
@FunctionalInterface
public interface DisplacedPresenceListener {
void handleDisplacement();
void handleDisplacement(boolean connectedElsewhere);
}

View File

@ -382,7 +382,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
}
@Override
public void handleDisplacement() {
public void handleDisplacement(final boolean connectedElsewhere) {
Metrics.counter(DISPLACEMENT_COUNTER_NAME, List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent()))).increment();
try {

View File

@ -39,7 +39,7 @@ class ClientPresenceManagerTest {
private ScheduledExecutorService presenceRenewalExecutorService;
private ClientPresenceManager clientPresenceManager;
private static final DisplacedPresenceListener NO_OP = () -> {
private static final DisplacedPresenceListener NO_OP = connectedElsewhere -> {
};
@BeforeEach
@ -94,7 +94,7 @@ class ClientPresenceManagerTest {
final long deviceId = 1;
final AtomicInteger displacementCounter = new AtomicInteger(0);
final DisplacedPresenceListener displacementListener = displacementCounter::incrementAndGet;
final DisplacedPresenceListener displacementListener = connectedElsewhere -> displacementCounter.incrementAndGet();
clientPresenceManager.setPresent(accountUuid, deviceId, displacementListener);
@ -114,7 +114,7 @@ class ClientPresenceManagerTest {
clientPresenceManager.start();
clientPresenceManager.setPresent(accountUuid, deviceId, () -> displaced.complete(null));
clientPresenceManager.setPresent(accountUuid, deviceId, connectedElsewhere -> displaced.complete(null));
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(
connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
@ -132,7 +132,7 @@ class ClientPresenceManagerTest {
clientPresenceManager.start();
clientPresenceManager.setPresent(accountUuid, deviceId, () -> displaced.complete(null));
clientPresenceManager.setPresent(accountUuid, deviceId, connectedElsewhere -> displaced.complete(null));
clientPresenceManager.getPubSubConnection()
.usePubSubConnection(connection -> connection.getResources().eventBus()
@ -336,11 +336,10 @@ class ClientPresenceManagerTest {
final long deviceId = 1L;
final CompletableFuture<?> displaced = new CompletableFuture<>();
final DisplacedPresenceListener listener1 = () -> displaced.complete(null);
final DisplacedPresenceListener listener1 = connectedElsewhere -> displaced.complete(null);
server1.setPresent(uuid1, deviceId, listener1);
server2.setPresent(uuid1, deviceId, () -> {
});
server2.setPresent(uuid1, deviceId, connectedElsewhere -> {});
assertTimeoutPreemptively(Duration.ofSeconds(10), displaced::join);
}
@ -351,7 +350,7 @@ class ClientPresenceManagerTest {
final long deviceId = 1L;
final CompletableFuture<?> displaced = new CompletableFuture<>();
final DisplacedPresenceListener listener1 = () -> displaced.complete(null);
final DisplacedPresenceListener listener1 = connectedElsewhere -> displaced.complete(null);
server1.setPresent(uuid1, deviceId, listener1);
server1.disconnectPresence(uuid1, deviceId);
@ -365,7 +364,7 @@ class ClientPresenceManagerTest {
final long deviceId = 1L;
final CompletableFuture<?> displaced = new CompletableFuture<>();
final DisplacedPresenceListener listener1 = () -> displaced.complete(null);
final DisplacedPresenceListener listener1 = connectedElsewhere -> displaced.complete(null);
server1.setPresent(uuid1, deviceId, listener1);
server2.disconnectPresence(uuid1, deviceId);