From 6fa9dcd9547b73a2b6ef8b4af0f57daecd1c3dcf Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Mon, 28 Jun 2021 15:49:53 -0500 Subject: [PATCH] Refactor to use shared recurringJobExecutor --- .../textsecuregcm/WhisperServerService.java | 4 +- .../storage/DeletedAccountsTableCrawler.java | 6 ++- .../storage/ManagedPeriodicWork.java | 40 ++++++++++++++----- 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index b3f5e3791..3168612ab 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -400,7 +400,7 @@ public class WhisperServerService extends Application keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(10_000); Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), keyspaceNotificationDispatchQueue); - ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(2).build(); + ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(3).build(); ScheduledExecutorService declinedMessageReceiptExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "declined-receipt-%d")).threads(2).build(); ScheduledExecutorService retrySchedulingExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "retry-%d")).threads(2).build(); ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build(); @@ -494,7 +494,7 @@ public class WhisperServerService extends Application reconcilers, - final FaultTolerantRedisCluster cluster) throws IOException { + final FaultTolerantRedisCluster cluster, + final ScheduledExecutorService executorService) throws IOException { - super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL); + super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL, executorService); this.deletedAccounts = deletedAccounts; this.reconcilers = reconcilers; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java index c6dfdc371..eb937434f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java @@ -4,12 +4,16 @@ */ package org.whispersystems.textsecuregcm.storage; +import static com.codahale.metrics.MetricRegistry.name; + import io.dropwizard.lifecycle.Managed; +import io.micrometer.core.instrument.Metrics; import java.time.Duration; import java.util.UUID; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,6 +23,8 @@ public abstract class ManagedPeriodicWork implements Managed { private final Logger logger = LoggerFactory.getLogger(getClass()); + private static final String FUTURE_DONE_GAUGE_NAME = "futureDone"; + private final ManagedPeriodicWorkLock lock; private final Duration workerTtl; private final Duration runInterval; @@ -27,17 +33,24 @@ public abstract class ManagedPeriodicWork implements Managed { private final AtomicBoolean started = new AtomicBoolean(false); + private ScheduledFuture scheduledFuture; - public ManagedPeriodicWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval) { + public ManagedPeriodicWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval, final ScheduledExecutorService scheduledExecutorService) { this.lock = lock; this.workerTtl = workerTtl; this.runInterval = runInterval; this.workerId = UUID.randomUUID().toString(); - this.executorService = Executors.newSingleThreadScheduledExecutor((runnable) -> new Thread(runnable, getClass().getName())); + this.executorService = scheduledExecutorService; + + Metrics.gauge(name(getClass(), FUTURE_DONE_GAUGE_NAME), this, ManagedPeriodicWork::isFutureDone); } abstract protected void doPeriodicWork() throws Exception; + int isFutureDone() { + return scheduledFuture.isDone() ? 1 : 0; + } + @Override public synchronized void start() throws Exception { @@ -45,7 +58,7 @@ public abstract class ManagedPeriodicWork implements Managed { return; } - executorService.scheduleAtFixedRate(() -> { + scheduledFuture = executorService.scheduleAtFixedRate(() -> { try { execute(); } catch (final Exception e) { @@ -60,13 +73,20 @@ public abstract class ManagedPeriodicWork implements Managed { @Override public synchronized void stop() throws Exception { - executorService.shutdown(); + if (scheduledFuture != null) { + scheduledFuture.cancel(false); - boolean terminated = false; - while (!terminated) { - terminated = executorService.awaitTermination(5, TimeUnit.MINUTES); - if (!terminated) { - logger.warn("worker not yet terminated"); + boolean terminated = false; + while (!terminated) { + try { + scheduledFuture.get(5, TimeUnit.MINUTES); + terminated = true; + } catch (final TimeoutException e) { + logger.warn("worker not yet terminated"); + } catch (final Exception e) { + logger.warn("worker terminated exceptionally", e); + terminated = true; + } } } }