From 0e989419c672942dbc892ae741024c663a1759f9 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Mon, 18 Sep 2023 16:28:34 -0500 Subject: [PATCH] Add metric for late removal of message availability and displacement listeners --- .../push/ClientPresenceManager.java | 16 ++++++++++++---- .../textsecuregcm/storage/MessagesCache.java | 6 +++++- .../websocket/AuthenticatedConnectListener.java | 2 +- .../push/ClientPresenceManagerTest.java | 4 ++-- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java index 963bb0abc..5015c8da6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java @@ -20,6 +20,7 @@ import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; +import io.micrometer.core.instrument.Counter; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -33,8 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.LongStream; +import io.micrometer.core.instrument.Metrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; @@ -75,6 +75,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter imp name(MessagesCache.class, "queuePersisted")); private final Counter staleEphemeralMessagesCounter = Metrics.counter( name(MessagesCache.class, "staleEphemeralMessages")); + private final Counter messageAvailabilityListenerRemovedAfterAddCounter = Metrics.counter( + name(MessagesCache.class, "messageAvailabilityListenerRemovedAfterAdd")); static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot"; private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8); @@ -401,7 +403,9 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp synchronized (messageListenersByQueueName) { queueNamesByMessageListener.remove(listener); - messageListenersByQueueName.remove(queueName); + if (!messageListenersByQueueName.remove(queueName, listener)) { + messageAvailabilityListenerRemovedAfterAddCounter.increment(); + } } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java index f33e4f091..c7e7165f5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java @@ -181,7 +181,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { connection.stop(); RedisOperation.unchecked( - () -> clientPresenceManager.clearPresence(auth.getAccount().getUuid(), device.getId())); + () -> clientPresenceManager.clearPresence(auth.getAccount().getUuid(), device.getId(), connection)); RedisOperation.unchecked(() -> { messagesManager.removeMessageAvailabilityListener(connection); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java index 976ea6aaf..8ce5dfaa4 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java @@ -161,14 +161,14 @@ class ClientPresenceManagerTest { assertFalse(clientPresenceManager.isPresent(accountUuid, deviceId)); clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); - assertTrue(clientPresenceManager.clearPresence(accountUuid, deviceId)); + assertTrue(clientPresenceManager.clearPresence(accountUuid, deviceId, NO_OP)); clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster( connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), UUID.randomUUID().toString())); - assertFalse(clientPresenceManager.clearPresence(accountUuid, deviceId)); + assertFalse(clientPresenceManager.clearPresence(accountUuid, deviceId, NO_OP)); } @Test