Subscribe to remote presence changes before setting the key

This commit is contained in:
Chris Eager 2024-10-11 12:13:45 -05:00 committed by Chris Eager
parent 46227295ff
commit 830a07012b
2 changed files with 62 additions and 18 deletions

View File

@ -13,12 +13,12 @@ import io.lettuce.core.LettuceFutures;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
@ -27,6 +27,8 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@ -63,6 +65,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
private ScheduledFuture<?> pruneMissingPeersFuture;
private final Map<String, DisplacedPresenceListener> displacementListenersByPresenceKey = new ConcurrentHashMap<>();
private final Map<String, CompletionStage<?>> pendingPresenceSetsByPresenceKey = new ConcurrentHashMap<>();
private final Timer checkPresenceTimer;
private final Timer setPresenceTimer;
@ -167,14 +170,30 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
displacementListenersByPresenceKey.put(presenceKey, displacementListener);
presenceCluster.useCluster(connection -> {
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
final CompletableFuture<Void> presenceFuture = new CompletableFuture<>();
final CompletionStage<?> previousFuture = pendingPresenceSetsByPresenceKey.put(presenceKey, presenceFuture);
if (previousFuture != null) {
log.warn("Unexpected pending presence");
commands.sadd(connectedClientSetKey, presenceKey);
commands.setex(presenceKey, PRESENCE_EXPIRATION_SECONDS, managerId);
});
}
subscribeForRemotePresenceChanges(presenceKey);
presenceCluster.withCluster(connection -> {
final RedisAdvancedClusterAsyncCommands<String, String> commands = connection.async();
commands.sadd(connectedClientSetKey, presenceKey);
return commands.setex(presenceKey, PRESENCE_EXPIRATION_SECONDS, managerId);
}).whenComplete((result, throwable) -> {
if (throwable != null) {
presenceFuture.completeExceptionally(throwable);
} else {
presenceFuture.complete(null);
}
});
presenceFuture.whenComplete(
(ignored, throwable) -> pendingPresenceSetsByPresenceKey.remove(presenceKey, presenceFuture));
});
}
@ -308,19 +327,38 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
if (channel.startsWith("__keyspace@0__:presence::{")) {
if ("set".equals(message) || "del".equals(message)) {
// for "set", another process has overwritten this presence key, which means the client has connected to another host.
// "set" might mean the client has connected to another host, although it might just be our own `set`,
// because we subscribe for changes before setting the key.
// for "del", another process has indicated the client should be disconnected
final boolean connectedElsewhere = "set".equals(message);
final boolean maybeConnectedElsewhere = "set".equals(message);
// At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making
// synchronous Lettuce calls to avoid deadlocking.
keyspaceNotificationExecutorService.execute(() -> {
try {
displacePresence(channel.substring("__keyspace@0__:".length()), connectedElsewhere);
remoteDisplacementMeter.increment();
} catch (final Exception e) {
log.warn("Error displacing presence", e);
}
final String clientPresenceKey = channel.substring("__keyspace@0__:".length());
final CompletionStage<?> pendingConnection = pendingPresenceSetsByPresenceKey.getOrDefault(clientPresenceKey,
CompletableFuture.completedFuture(null));
pendingConnection.thenCompose(ignored -> {
if (maybeConnectedElsewhere) {
return presenceCluster.withCluster(connection -> connection.async().get(clientPresenceKey))
.thenApply(currentManagerId -> !managerId.equals(currentManagerId));
}
return CompletableFuture.completedFuture(true);
})
.exceptionally(ignored -> true)
.thenAcceptAsync(shouldDisplace -> {
if (shouldDisplace) {
try {
displacePresence(clientPresenceKey, maybeConnectedElsewhere);
remoteDisplacementMeter.increment();
} catch (final Exception e) {
log.warn("Error displacing presence", e);
}
}
}, keyspaceNotificationExecutorService);
});
}
}

View File

@ -386,8 +386,8 @@ class ClientPresenceManagerTest {
displaced.join();
}
@RepeatedTest(value = 10, failureThreshold = 1)
void testConcurrentConnection() {
@RepeatedTest(value = 100)
void testConcurrentConnection() throws Exception {
final UUID uuid1 = UUID.randomUUID();
final byte deviceId = 1;
@ -400,7 +400,13 @@ class ClientPresenceManagerTest {
server1Thread.start();
server2Thread.start();
assertTimeoutPreemptively(Duration.ofSeconds(10), displaced::join);
displaced.join();
server2Thread.join();
server1Thread.join();
while (server1.isLocallyPresent(uuid1, deviceId) == server2.isLocallyPresent(uuid1, deviceId)) {
Thread.sleep(50);
}
}
}