From c17cc07b7316c62e119c8762e86d0b7ed9f377c3 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Fri, 12 Jun 2020 13:37:42 -0400 Subject: [PATCH] Instrument BlockingThreadPoolExecutor. --- .../textsecuregcm/push/PushSender.java | 2 +- .../util/BlockingThreadPoolExecutor.java | 18 ++++++++++++++++-- .../util/BlockingThreadPoolExecutorTest.java | 2 +- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java index 2114b81d7..cabbf7813 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java @@ -56,7 +56,7 @@ public class PushSender implements Managed { this.apnSender = apnSender; this.webSocketSender = websocketSender; this.queueSize = queueSize; - this.executor = new BlockingThreadPoolExecutor(50, queueSize); + this.executor = new BlockingThreadPoolExecutor("pushSender", 50, queueSize); SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME) .register(name(PushSender.class, "send_queue_depth"), diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/BlockingThreadPoolExecutor.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/BlockingThreadPoolExecutor.java index a0eadd1d8..2a2cb46c7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/util/BlockingThreadPoolExecutor.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/BlockingThreadPoolExecutor.java @@ -1,22 +1,36 @@ package org.whispersystems.textsecuregcm.util; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; +import com.codahale.metrics.Timer; + import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static com.codahale.metrics.MetricRegistry.name; + public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private final Semaphore semaphore; + private final Timer acquirePermitTimer; - public BlockingThreadPoolExecutor(int threads, int bound) { + public BlockingThreadPoolExecutor(String name, int threads, int bound) { super(threads, threads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue()); this.semaphore = new Semaphore(bound); + + final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + + this.acquirePermitTimer = metricRegistry.timer(name(getClass(), name, "acquirePermit")); + metricRegistry.gauge(name(getClass(), name, "permitsAvailable"), () -> semaphore::availablePermits); } @Override public void execute(Runnable task) { - semaphore.acquireUninterruptibly(); + try (final Timer.Context ignored = acquirePermitTimer.time()) { + semaphore.acquireUninterruptibly(); + } try { super.execute(task); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/BlockingThreadPoolExecutorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/BlockingThreadPoolExecutorTest.java index cc9b326f1..9d2db5e07 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/BlockingThreadPoolExecutorTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/BlockingThreadPoolExecutorTest.java @@ -10,7 +10,7 @@ public class BlockingThreadPoolExecutorTest { @Test public void testBlocking() { - BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor(1, 3); + BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor("test", 1, 3); long start = System.currentTimeMillis(); executor.execute(new Runnable() {