Instrument BlockingThreadPoolExecutor.
This commit is contained in:
parent
6f767a72a7
commit
c17cc07b73
|
@ -56,7 +56,7 @@ public class PushSender implements Managed {
|
||||||
this.apnSender = apnSender;
|
this.apnSender = apnSender;
|
||||||
this.webSocketSender = websocketSender;
|
this.webSocketSender = websocketSender;
|
||||||
this.queueSize = queueSize;
|
this.queueSize = queueSize;
|
||||||
this.executor = new BlockingThreadPoolExecutor(50, queueSize);
|
this.executor = new BlockingThreadPoolExecutor("pushSender", 50, queueSize);
|
||||||
|
|
||||||
SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME)
|
SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME)
|
||||||
.register(name(PushSender.class, "send_queue_depth"),
|
.register(name(PushSender.class, "send_queue_depth"),
|
||||||
|
|
|
@ -1,22 +1,36 @@
|
||||||
package org.whispersystems.textsecuregcm.util;
|
package org.whispersystems.textsecuregcm.util;
|
||||||
|
|
||||||
|
import com.codahale.metrics.MetricRegistry;
|
||||||
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
|
import com.codahale.metrics.Timer;
|
||||||
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
|
|
||||||
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
|
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
|
|
||||||
private final Semaphore semaphore;
|
private final Semaphore semaphore;
|
||||||
|
private final Timer acquirePermitTimer;
|
||||||
|
|
||||||
public BlockingThreadPoolExecutor(int threads, int bound) {
|
public BlockingThreadPoolExecutor(String name, int threads, int bound) {
|
||||||
super(threads, threads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
|
super(threads, threads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
|
||||||
this.semaphore = new Semaphore(bound);
|
this.semaphore = new Semaphore(bound);
|
||||||
|
|
||||||
|
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||||
|
|
||||||
|
this.acquirePermitTimer = metricRegistry.timer(name(getClass(), name, "acquirePermit"));
|
||||||
|
metricRegistry.gauge(name(getClass(), name, "permitsAvailable"), () -> semaphore::availablePermits);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(Runnable task) {
|
public void execute(Runnable task) {
|
||||||
semaphore.acquireUninterruptibly();
|
try (final Timer.Context ignored = acquirePermitTimer.time()) {
|
||||||
|
semaphore.acquireUninterruptibly();
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
super.execute(task);
|
super.execute(task);
|
||||||
|
|
|
@ -10,7 +10,7 @@ public class BlockingThreadPoolExecutorTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlocking() {
|
public void testBlocking() {
|
||||||
BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor(1, 3);
|
BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor("test", 1, 3);
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
|
|
||||||
executor.execute(new Runnable() {
|
executor.execute(new Runnable() {
|
||||||
|
|
Loading…
Reference in New Issue