diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/CardinalityEstimator.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/CardinalityEstimator.java index 62be0fb23..7d586969d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/CardinalityEstimator.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/CardinalityEstimator.java @@ -15,12 +15,14 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.util.Util; +import static io.lettuce.core.ExpireArgs.Builder.nx; + /** * Estimate the number of unique items seen over a configurable period and update a metric */ public class CardinalityEstimator { - private volatile double uniqueElementCount; + private volatile long uniqueElementCount; private final FaultTolerantRedisClusterClient redisCluster; private final String hllName; private final Duration period; @@ -36,41 +38,29 @@ public class CardinalityEstimator { obj -> obj.uniqueElementCount); } - public void add(String element) { + public void add(final String element) { addAsync(element).toCompletableFuture().join(); } - public CompletionStage addAsync(String element) { + public CompletionStage addAsync(final String element) { return redisCluster.withCluster(connection -> connection.async() .pfadd(hllName, element) .thenCompose(modCount -> { if (modCount == 0) { - return CompletableFuture.completedFuture(false); + // The hll hasn't changed - return our current view of cardinality + return CompletableFuture.completedFuture(uniqueElementCount); } - // The hll changed - update our local view of the cardinality, and - // initialize the TTL if required - return connection.async() - .pfcount(hllName) - .thenCompose(count -> { - uniqueElementCount = count; - // check if this is a new hll with no TTL set - return connection.async().ttl(hllName).thenApply(ttl -> ttl == -1); - }); + return connection.async().pfcount(hllName); }) - .thenCompose(isNewHll -> { - if (!isNewHll) { - return CompletableFuture.completedFuture(null); - } - - // If this is a new hll, we need to set the TTL. This could be - // a single atomic op in redis 7.x with EXPIRE NX - return connection.async().expire(hllName, period).thenRun(Util.NOOP); + .thenCompose(newUniqueElementCount -> { + uniqueElementCount = newUniqueElementCount; + return connection.async().expire(hllName, period, nx()).thenRun(Util.NOOP); })); } @VisibleForTesting long estimate() { - return (long) this.uniqueElementCount; + return this.uniqueElementCount; } }