Use expire NX on cardinality estimator key

This commit is contained in:
Ameya Lokare 2025-04-03 09:21:56 -07:00 committed by Jon Chambers
parent 2efe687b4b
commit d83d826236
1 changed files with 12 additions and 22 deletions

View File

@ -15,12 +15,14 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.util.Util;
import static io.lettuce.core.ExpireArgs.Builder.nx;
/** /**
* Estimate the number of unique items seen over a configurable period and update a metric * Estimate the number of unique items seen over a configurable period and update a metric
*/ */
public class CardinalityEstimator { public class CardinalityEstimator {
private volatile double uniqueElementCount; private volatile long uniqueElementCount;
private final FaultTolerantRedisClusterClient redisCluster; private final FaultTolerantRedisClusterClient redisCluster;
private final String hllName; private final String hllName;
private final Duration period; private final Duration period;
@ -36,41 +38,29 @@ public class CardinalityEstimator {
obj -> obj.uniqueElementCount); obj -> obj.uniqueElementCount);
} }
public void add(String element) { public void add(final String element) {
addAsync(element).toCompletableFuture().join(); addAsync(element).toCompletableFuture().join();
} }
public CompletionStage<Void> addAsync(String element) { public CompletionStage<Void> addAsync(final String element) {
return redisCluster.withCluster(connection -> connection.async() return redisCluster.withCluster(connection -> connection.async()
.pfadd(hllName, element) .pfadd(hllName, element)
.thenCompose(modCount -> { .thenCompose(modCount -> {
if (modCount == 0) { if (modCount == 0) {
return CompletableFuture.completedFuture(false); // The hll hasn't changed - return our current view of cardinality
return CompletableFuture.completedFuture(uniqueElementCount);
} }
// The hll changed - update our local view of the cardinality, and return connection.async().pfcount(hllName);
// initialize the TTL if required
return connection.async()
.pfcount(hllName)
.thenCompose(count -> {
uniqueElementCount = count;
// check if this is a new hll with no TTL set
return connection.async().ttl(hllName).thenApply(ttl -> ttl == -1);
});
}) })
.thenCompose(isNewHll -> { .thenCompose(newUniqueElementCount -> {
if (!isNewHll) { uniqueElementCount = newUniqueElementCount;
return CompletableFuture.completedFuture(null); return connection.async().expire(hllName, period, nx()).thenRun(Util.NOOP);
}
// If this is a new hll, we need to set the TTL. This could be
// a single atomic op in redis 7.x with EXPIRE NX
return connection.async().expire(hllName, period).thenRun(Util.NOOP);
})); }));
} }
@VisibleForTesting @VisibleForTesting
long estimate() { long estimate() {
return (long) this.uniqueElementCount; return this.uniqueElementCount;
} }
} }