Add metric for late removal of message availability and displacement listeners

This commit is contained in:
Chris Eager 2023-09-18 16:28:34 -05:00 committed by Chris Eager
parent 0fa8276d2d
commit 0e989419c6
4 changed files with 20 additions and 8 deletions

View File

@ -20,6 +20,7 @@ import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.micrometer.core.instrument.Counter;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
@ -33,8 +34,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import io.micrometer.core.instrument.Metrics;
import java.util.stream.LongStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
@ -75,6 +75,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
private final Meter pruneClientMeter; private final Meter pruneClientMeter;
private final Meter remoteDisplacementMeter; private final Meter remoteDisplacementMeter;
private final Meter pubSubMessageMeter; private final Meter pubSubMessageMeter;
private final Counter displacementListenerAlreadyRemovedCounter;
private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int) Duration.ofSeconds(30).toSeconds(); private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int) Duration.ofSeconds(30).toSeconds();
private static final int PRESENCE_EXPIRATION_SECONDS = (int) Duration.ofMinutes(11).toSeconds(); private static final int PRESENCE_EXPIRATION_SECONDS = (int) Duration.ofMinutes(11).toSeconds();
@ -105,6 +106,8 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
this.pruneClientMeter = metricRegistry.meter(name(getClass(), "pruneClient")); this.pruneClientMeter = metricRegistry.meter(name(getClass(), "pruneClient"));
this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement")); this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement"));
this.pubSubMessageMeter = metricRegistry.meter(name(getClass(), "pubSubMessage")); this.pubSubMessageMeter = metricRegistry.meter(name(getClass(), "pubSubMessage"));
this.displacementListenerAlreadyRemovedCounter = Metrics.counter(
name(getClass(), "displacementListenerAlreadyRemoved"));
} }
@VisibleForTesting @VisibleForTesting
@ -230,8 +233,13 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
return displacementListenersByPresenceKey.containsKey(getPresenceKey(accountUuid, deviceId)); return displacementListenersByPresenceKey.containsKey(getPresenceKey(accountUuid, deviceId));
} }
public boolean clearPresence(final UUID accountUuid, final long deviceId) { public boolean clearPresence(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener listener) {
return clearPresence(getPresenceKey(accountUuid, deviceId)); final String presenceKey = getPresenceKey(accountUuid, deviceId);
if (!displacementListenersByPresenceKey.remove(presenceKey, listener)) {
displacementListenerAlreadyRemovedCounter.increment();
}
return clearPresence(presenceKey);
} }
private boolean clearPresence(final String presenceKey) { private boolean clearPresence(final String presenceKey) {

View File

@ -87,6 +87,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
name(MessagesCache.class, "queuePersisted")); name(MessagesCache.class, "queuePersisted"));
private final Counter staleEphemeralMessagesCounter = Metrics.counter( private final Counter staleEphemeralMessagesCounter = Metrics.counter(
name(MessagesCache.class, "staleEphemeralMessages")); name(MessagesCache.class, "staleEphemeralMessages"));
private final Counter messageAvailabilityListenerRemovedAfterAddCounter = Metrics.counter(
name(MessagesCache.class, "messageAvailabilityListenerRemovedAfterAdd"));
static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot"; static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot";
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8); private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);
@ -401,7 +403,9 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
synchronized (messageListenersByQueueName) { synchronized (messageListenersByQueueName) {
queueNamesByMessageListener.remove(listener); queueNamesByMessageListener.remove(listener);
messageListenersByQueueName.remove(queueName); if (!messageListenersByQueueName.remove(queueName, listener)) {
messageAvailabilityListenerRemovedAfterAddCounter.increment();
}
} }
} }
} }

View File

@ -181,7 +181,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
connection.stop(); connection.stop();
RedisOperation.unchecked( RedisOperation.unchecked(
() -> clientPresenceManager.clearPresence(auth.getAccount().getUuid(), device.getId())); () -> clientPresenceManager.clearPresence(auth.getAccount().getUuid(), device.getId(), connection));
RedisOperation.unchecked(() -> { RedisOperation.unchecked(() -> {
messagesManager.removeMessageAvailabilityListener(connection); messagesManager.removeMessageAvailabilityListener(connection);

View File

@ -161,14 +161,14 @@ class ClientPresenceManagerTest {
assertFalse(clientPresenceManager.isPresent(accountUuid, deviceId)); assertFalse(clientPresenceManager.isPresent(accountUuid, deviceId));
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
assertTrue(clientPresenceManager.clearPresence(accountUuid, deviceId)); assertTrue(clientPresenceManager.clearPresence(accountUuid, deviceId, NO_OP));
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster( REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(
connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
UUID.randomUUID().toString())); UUID.randomUUID().toString()));
assertFalse(clientPresenceManager.clearPresence(accountUuid, deviceId)); assertFalse(clientPresenceManager.clearPresence(accountUuid, deviceId, NO_OP));
} }
@Test @Test