From 2f88f0eedbace152ace560330e95d86e8d071cbf Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Thu, 24 Jun 2021 18:10:37 -0500 Subject: [PATCH] Refactor to use single threaded scheduled executor --- .../storage/ManagedPeriodicWork.java | 76 ++++++++----------- 1 file changed, 31 insertions(+), 45 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java index 913e74f83..c6dfdc371 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java @@ -7,12 +7,15 @@ package org.whispersystems.textsecuregcm.storage; import io.dropwizard.lifecycle.Managed; import java.time.Duration; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.util.Util; -public abstract class ManagedPeriodicWork implements Managed, Runnable { +public abstract class ManagedPeriodicWork implements Managed { private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -20,88 +23,71 @@ public abstract class ManagedPeriodicWork implements Managed, Runnable { private final Duration workerTtl; private final Duration runInterval; private final String workerId; + private final ScheduledExecutorService executorService; + + private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean running = new AtomicBoolean(false); - private boolean finished; public ManagedPeriodicWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval) { this.lock = lock; this.workerTtl = workerTtl; this.runInterval = runInterval; this.workerId = UUID.randomUUID().toString(); + this.executorService = Executors.newSingleThreadScheduledExecutor((runnable) -> new Thread(runnable, getClass().getName())); } abstract protected void doPeriodicWork() throws Exception; @Override public synchronized void start() throws Exception { - running.set(true); - new Thread(this).start(); + + if (started.getAndSet(true)) { + return; + } + + executorService.scheduleAtFixedRate(() -> { + try { + execute(); + } catch (final Exception e) { + logger.warn("Error in execution", e); + + // wait a bit, in case the error is caused by external instability + Util.sleep(10_000); + } + }, 0, runInterval.getSeconds(), TimeUnit.SECONDS); } @Override public synchronized void stop() throws Exception { - running.set(false); - notifyAll(); + executorService.shutdown(); - while (!finished) { - Util.wait(this); - } - } - - @Override - public void run() { - - while(running.get()) { - try { - execute(); - sleepWhileRunning(runInterval); - } catch (final Exception e) { - logger.warn("Error in execution", e); - - // wait a bit, in case the error is caused by external instability - Util.sleep(10_000); + boolean terminated = false; + while (!terminated) { + terminated = executorService.awaitTermination(5, TimeUnit.MINUTES); + if (!terminated) { + logger.warn("worker not yet terminated"); } } - - synchronized (this) { - finished = true; - notifyAll(); - } } private void execute() { if (lock.claimActiveWork(workerId, workerTtl)) { - try { - final long startTimeMs = System.currentTimeMillis(); - logger.info("Starting execution"); doPeriodicWork(); logger.info("Execution complete"); - final long endTimeMs = System.currentTimeMillis(); - final Duration sleepInterval = runInterval.minusMillis(endTimeMs - startTimeMs); - if (sleepInterval.getSeconds() > 0) { - logger.info("Sleeping for {}", sleepInterval); - sleepWhileRunning(sleepInterval); - } - } catch (final Exception e) { logger.warn("Periodic work failed", e); - // wait a full interval for recovery - sleepWhileRunning(runInterval); + // wait a bit, in case the error is caused by external instability + Util.sleep(10_000); } finally { lock.releaseActiveWork(workerId); } } } - - private synchronized void sleepWhileRunning(Duration delay) { - if (running.get()) Util.wait(this, delay.toMillis()); - } }