diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java b/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java index 36684f893..c385720af 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java @@ -11,13 +11,12 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; -import java.util.Collections; -import java.util.IdentityHashMap; +import io.dropwizard.lifecycle.Managed; +import io.micrometer.core.instrument.Metrics; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; -import io.dropwizard.lifecycle.Managed; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.SqsConfiguration; @@ -44,7 +43,7 @@ public class DirectoryQueue implements Managed { private final List queueUrls; private final SqsAsyncClient sqs; - private final Set outstandingRequests = Collections.newSetFromMap(new IdentityHashMap<>()); + private final AtomicInteger outstandingRequests = new AtomicInteger(); private enum UpdateAction { ADD("add"), @@ -71,6 +70,8 @@ public class DirectoryQueue implements Managed { .region(Region.of(sqsConfig.getRegion())) .credentialsProvider(credentialsProvider) .build(); + + Metrics.gauge(name(getClass(), "outstandingRequests"), outstandingRequests); } @VisibleForTesting @@ -86,7 +87,7 @@ public class DirectoryQueue implements Managed { @Override public void stop() throws Exception { synchronized (outstandingRequests) { - while (!outstandingRequests.isEmpty()) { + while (outstandingRequests.get() > 0) { outstandingRequests.wait(); } } @@ -123,7 +124,7 @@ public class DirectoryQueue implements Managed { .build(); synchronized (outstandingRequests) { - outstandingRequests.add(request); + outstandingRequests.incrementAndGet(); } sqs.sendMessage(request).whenComplete((response, cause) -> { @@ -139,7 +140,7 @@ public class DirectoryQueue implements Managed { } } finally { synchronized (outstandingRequests) { - outstandingRequests.remove(request); + outstandingRequests.decrementAndGet(); outstandingRequests.notifyAll(); }