Publish outstanding SQS operation count as a gauge.
This commit is contained in:
parent
13447df1e0
commit
ef9a7fda9a
|
@ -11,13 +11,12 @@ import com.codahale.metrics.MetricRegistry;
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.util.Collections;
|
import io.dropwizard.lifecycle.Managed;
|
||||||
import java.util.IdentityHashMap;
|
import io.micrometer.core.instrument.Metrics;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import io.dropwizard.lifecycle.Managed;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.configuration.SqsConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.SqsConfiguration;
|
||||||
|
@ -44,7 +43,7 @@ public class DirectoryQueue implements Managed {
|
||||||
private final List<String> queueUrls;
|
private final List<String> queueUrls;
|
||||||
private final SqsAsyncClient sqs;
|
private final SqsAsyncClient sqs;
|
||||||
|
|
||||||
private final Set<SendMessageRequest> outstandingRequests = Collections.newSetFromMap(new IdentityHashMap<>());
|
private final AtomicInteger outstandingRequests = new AtomicInteger();
|
||||||
|
|
||||||
private enum UpdateAction {
|
private enum UpdateAction {
|
||||||
ADD("add"),
|
ADD("add"),
|
||||||
|
@ -71,6 +70,8 @@ public class DirectoryQueue implements Managed {
|
||||||
.region(Region.of(sqsConfig.getRegion()))
|
.region(Region.of(sqsConfig.getRegion()))
|
||||||
.credentialsProvider(credentialsProvider)
|
.credentialsProvider(credentialsProvider)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
Metrics.gauge(name(getClass(), "outstandingRequests"), outstandingRequests);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -86,7 +87,7 @@ public class DirectoryQueue implements Managed {
|
||||||
@Override
|
@Override
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
synchronized (outstandingRequests) {
|
synchronized (outstandingRequests) {
|
||||||
while (!outstandingRequests.isEmpty()) {
|
while (outstandingRequests.get() > 0) {
|
||||||
outstandingRequests.wait();
|
outstandingRequests.wait();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -123,7 +124,7 @@ public class DirectoryQueue implements Managed {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
synchronized (outstandingRequests) {
|
synchronized (outstandingRequests) {
|
||||||
outstandingRequests.add(request);
|
outstandingRequests.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
sqs.sendMessage(request).whenComplete((response, cause) -> {
|
sqs.sendMessage(request).whenComplete((response, cause) -> {
|
||||||
|
@ -139,7 +140,7 @@ public class DirectoryQueue implements Managed {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
synchronized (outstandingRequests) {
|
synchronized (outstandingRequests) {
|
||||||
outstandingRequests.remove(request);
|
outstandingRequests.decrementAndGet();
|
||||||
outstandingRequests.notifyAll();
|
outstandingRequests.notifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue