Allow presence keys to expire if not periodically renewed
This commit is contained in:
parent
4e131858ca
commit
92d36b725f
|
@ -55,6 +55,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
||||||
private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
|
private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
|
||||||
|
|
||||||
private final ClusterLuaScript clearPresenceScript;
|
private final ClusterLuaScript clearPresenceScript;
|
||||||
|
private final ClusterLuaScript renewPresenceScript;
|
||||||
|
|
||||||
private final ExecutorService keyspaceNotificationExecutorService;
|
private final ExecutorService keyspaceNotificationExecutorService;
|
||||||
private final ScheduledExecutorService scheduledExecutorService;
|
private final ScheduledExecutorService scheduledExecutorService;
|
||||||
|
@ -71,6 +72,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
||||||
private final Meter pubSubMessageMeter;
|
private final Meter pubSubMessageMeter;
|
||||||
|
|
||||||
private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int) Duration.ofSeconds(30).toSeconds();
|
private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int) Duration.ofSeconds(30).toSeconds();
|
||||||
|
private static final int PRESENCE_EXPIRATION_SECONDS = (int) Duration.ofMinutes(11).toSeconds();
|
||||||
|
|
||||||
static final String MANAGER_SET_KEY = "presence::managers";
|
static final String MANAGER_SET_KEY = "presence::managers";
|
||||||
|
|
||||||
|
@ -81,8 +83,8 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
||||||
final ExecutorService keyspaceNotificationExecutorService) throws IOException {
|
final ExecutorService keyspaceNotificationExecutorService) throws IOException {
|
||||||
this.presenceCluster = presenceCluster;
|
this.presenceCluster = presenceCluster;
|
||||||
this.pubSubConnection = this.presenceCluster.createPubSubConnection();
|
this.pubSubConnection = this.presenceCluster.createPubSubConnection();
|
||||||
this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua",
|
this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua", ScriptOutputType.INTEGER);
|
||||||
ScriptOutputType.INTEGER);
|
this.renewPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/renew_presence.lua", ScriptOutputType.VALUE);
|
||||||
this.scheduledExecutorService = scheduledExecutorService;
|
this.scheduledExecutorService = scheduledExecutorService;
|
||||||
this.keyspaceNotificationExecutorService = keyspaceNotificationExecutorService;
|
this.keyspaceNotificationExecutorService = keyspaceNotificationExecutorService;
|
||||||
|
|
||||||
|
@ -151,8 +153,8 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
||||||
connection -> connection.sync().upstream().commands().unsubscribe(getManagerPresenceChannel(managerId)));
|
connection -> connection.sync().upstream().commands().unsubscribe(getManagerPresenceChannel(managerId)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setPresent(final UUID accountUuid, final long deviceId,
|
public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) {
|
||||||
final DisplacedPresenceListener displacementListener) {
|
|
||||||
try (final Timer.Context ignored = setPresenceTimer.time()) {
|
try (final Timer.Context ignored = setPresenceTimer.time()) {
|
||||||
final String presenceKey = getPresenceKey(accountUuid, deviceId);
|
final String presenceKey = getPresenceKey(accountUuid, deviceId);
|
||||||
|
|
||||||
|
@ -164,13 +166,18 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
||||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||||
|
|
||||||
commands.sadd(connectedClientSetKey, presenceKey);
|
commands.sadd(connectedClientSetKey, presenceKey);
|
||||||
commands.set(presenceKey, managerId);
|
commands.setex(presenceKey, PRESENCE_EXPIRATION_SECONDS, managerId);
|
||||||
});
|
});
|
||||||
|
|
||||||
subscribeForRemotePresenceChanges(presenceKey);
|
subscribeForRemotePresenceChanges(presenceKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void renewPresence(final UUID accountUuid, final long deviceId) {
|
||||||
|
renewPresenceScript.execute(List.of(getPresenceKey(accountUuid, deviceId)),
|
||||||
|
List.of(managerId, String.valueOf(PRESENCE_EXPIRATION_SECONDS)));
|
||||||
|
}
|
||||||
|
|
||||||
public void disconnectPresence(final UUID accountUuid, final long deviceId) {
|
public void disconnectPresence(final UUID accountUuid, final long deviceId) {
|
||||||
final String presenceKey = getPresenceKey(accountUuid, deviceId);
|
final String presenceKey = getPresenceKey(accountUuid, deviceId);
|
||||||
|
|
||||||
|
@ -295,6 +302,11 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
String getManagerId() {
|
||||||
|
return managerId;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static String getPresenceKey(final UUID accountUuid, final long deviceId) {
|
static String getPresenceKey(final UUID accountUuid, final long deviceId) {
|
||||||
return "presence::{" + accountUuid.toString() + "::" + deviceId + "}";
|
return "presence::{" + accountUuid.toString() + "::" + deviceId + "}";
|
||||||
|
|
|
@ -12,6 +12,9 @@ import com.codahale.metrics.MetricRegistry;
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
|
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
|
||||||
|
@ -33,6 +36,8 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||||
private static final Timer unauthenticatedDurationTimer = metricRegistry.timer(name(WebSocketConnection.class, "unauthenticated_connection_duration"));
|
private static final Timer unauthenticatedDurationTimer = metricRegistry.timer(name(WebSocketConnection.class, "unauthenticated_connection_duration"));
|
||||||
private static final Counter openWebsocketCounter = metricRegistry.counter(name(WebSocketConnection.class, "open_websockets"));
|
private static final Counter openWebsocketCounter = metricRegistry.counter(name(WebSocketConnection.class, "open_websockets"));
|
||||||
|
|
||||||
|
private static final long RENEW_PRESENCE_INTERVAL_MINUTES = 5;
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(AuthenticatedConnectListener.class);
|
private static final Logger log = LoggerFactory.getLogger(AuthenticatedConnectListener.class);
|
||||||
|
|
||||||
private final ReceiptSender receiptSender;
|
private final ReceiptSender receiptSender;
|
||||||
|
@ -70,12 +75,20 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||||
openWebsocketCounter.inc();
|
openWebsocketCounter.inc();
|
||||||
RedisOperation.unchecked(() -> apnFallbackManager.cancel(auth.getAccount(), device));
|
RedisOperation.unchecked(() -> apnFallbackManager.cancel(auth.getAccount(), device));
|
||||||
|
|
||||||
|
final AtomicReference<ScheduledFuture<?>> renewPresenceFutureReference = new AtomicReference<>();
|
||||||
|
|
||||||
context.addListener(new WebSocketSessionContext.WebSocketEventListener() {
|
context.addListener(new WebSocketSessionContext.WebSocketEventListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onWebSocketClose(WebSocketSessionContext context, int statusCode, String reason) {
|
public void onWebSocketClose(WebSocketSessionContext context, int statusCode, String reason) {
|
||||||
openWebsocketCounter.dec();
|
openWebsocketCounter.dec();
|
||||||
timer.stop();
|
timer.stop();
|
||||||
|
|
||||||
|
final ScheduledFuture<?> renewPresenceFuture = renewPresenceFutureReference.get();
|
||||||
|
|
||||||
|
if (renewPresenceFuture != null) {
|
||||||
|
renewPresenceFuture.cancel(false);
|
||||||
|
}
|
||||||
|
|
||||||
connection.stop();
|
connection.stop();
|
||||||
|
|
||||||
RedisOperation.unchecked(
|
RedisOperation.unchecked(
|
||||||
|
@ -94,6 +107,12 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||||
connection.start();
|
connection.start();
|
||||||
clientPresenceManager.setPresent(auth.getAccount().getUuid(), device.getId(), connection);
|
clientPresenceManager.setPresent(auth.getAccount().getUuid(), device.getId(), connection);
|
||||||
messagesManager.addMessageAvailabilityListener(auth.getAccount().getUuid(), device.getId(), connection);
|
messagesManager.addMessageAvailabilityListener(auth.getAccount().getUuid(), device.getId(), connection);
|
||||||
|
|
||||||
|
renewPresenceFutureReference.set(scheduledExecutorService.scheduleAtFixedRate(() -> RedisOperation.unchecked(() ->
|
||||||
|
clientPresenceManager.renewPresence(auth.getAccount().getUuid(), device.getId())),
|
||||||
|
RENEW_PRESENCE_INTERVAL_MINUTES,
|
||||||
|
RENEW_PRESENCE_INTERVAL_MINUTES,
|
||||||
|
TimeUnit.MINUTES));
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
log.warn("Failed to initialize websocket", e);
|
log.warn("Failed to initialize websocket", e);
|
||||||
context.getClient().close(1011, "Unexpected error initializing connection");
|
context.getClient().close(1011, "Unexpected error initializing connection");
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
local presenceKey = KEYS[1]
|
||||||
|
local presenceUuid = ARGV[1]
|
||||||
|
local expireSeconds = ARGV[2]
|
||||||
|
|
||||||
|
if redis.call("GET", presenceKey) == presenceUuid then
|
||||||
|
redis.call("EXPIRE", presenceKey, expireSeconds)
|
||||||
|
end
|
|
@ -196,6 +196,64 @@ class ClientPresenceManagerTest {
|
||||||
.sismember(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId)));
|
.sismember(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testInitialPresenceExpiration() {
|
||||||
|
final UUID accountUuid = UUID.randomUUID();
|
||||||
|
final long deviceId = 1;
|
||||||
|
|
||||||
|
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
|
||||||
|
|
||||||
|
{
|
||||||
|
final int ttl = REDIS_CLUSTER_EXTENSION.getRedisCluster().withCluster(connection ->
|
||||||
|
connection.sync().ttl(ClientPresenceManager.getPresenceKey(accountUuid, deviceId)).intValue());
|
||||||
|
|
||||||
|
assertTrue(ttl > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testRenewPresence() {
|
||||||
|
final UUID accountUuid = UUID.randomUUID();
|
||||||
|
final long deviceId = 1;
|
||||||
|
|
||||||
|
final String presenceKey = ClientPresenceManager.getPresenceKey(accountUuid, deviceId);
|
||||||
|
|
||||||
|
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection ->
|
||||||
|
connection.sync().set(presenceKey, clientPresenceManager.getManagerId()));
|
||||||
|
|
||||||
|
{
|
||||||
|
final int ttl = REDIS_CLUSTER_EXTENSION.getRedisCluster().withCluster(connection ->
|
||||||
|
connection.sync().ttl(presenceKey).intValue());
|
||||||
|
|
||||||
|
assertEquals(-1, ttl);
|
||||||
|
}
|
||||||
|
|
||||||
|
clientPresenceManager.renewPresence(accountUuid, deviceId);
|
||||||
|
|
||||||
|
{
|
||||||
|
final int ttl = REDIS_CLUSTER_EXTENSION.getRedisCluster().withCluster(connection ->
|
||||||
|
connection.sync().ttl(presenceKey).intValue());
|
||||||
|
|
||||||
|
assertTrue(ttl > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testExpiredPresence() {
|
||||||
|
final UUID accountUuid = UUID.randomUUID();
|
||||||
|
final long deviceId = 1;
|
||||||
|
|
||||||
|
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
|
||||||
|
|
||||||
|
assertTrue(clientPresenceManager.isPresent(accountUuid, deviceId));
|
||||||
|
|
||||||
|
// Hackily set this key to expire immediately
|
||||||
|
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection ->
|
||||||
|
connection.sync().expire(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), 0));
|
||||||
|
|
||||||
|
assertFalse(clientPresenceManager.isPresent(accountUuid, deviceId));
|
||||||
|
}
|
||||||
|
|
||||||
private void addClientPresence(final String managerId) {
|
private void addClientPresence(final String managerId) {
|
||||||
final String clientPresenceKey = ClientPresenceManager.getPresenceKey(UUID.randomUUID(), 7);
|
final String clientPresenceKey = ClientPresenceManager.getPresenceKey(UUID.randomUUID(), 7);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue