Refactor to use single threaded scheduled executor

This commit is contained in:
Chris Eager 2021-06-24 18:10:37 -05:00 committed by Chris Eager
parent 74ff491671
commit 2f88f0eedb
1 changed files with 31 additions and 45 deletions

View File

@ -7,12 +7,15 @@ package org.whispersystems.textsecuregcm.storage;
import io.dropwizard.lifecycle.Managed;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.Util;
public abstract class ManagedPeriodicWork implements Managed, Runnable {
public abstract class ManagedPeriodicWork implements Managed {
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -20,88 +23,71 @@ public abstract class ManagedPeriodicWork implements Managed, Runnable {
private final Duration workerTtl;
private final Duration runInterval;
private final String workerId;
private final ScheduledExecutorService executorService;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
public ManagedPeriodicWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval) {
this.lock = lock;
this.workerTtl = workerTtl;
this.runInterval = runInterval;
this.workerId = UUID.randomUUID().toString();
this.executorService = Executors.newSingleThreadScheduledExecutor((runnable) -> new Thread(runnable, getClass().getName()));
}
abstract protected void doPeriodicWork() throws Exception;
@Override
public synchronized void start() throws Exception {
running.set(true);
new Thread(this).start();
if (started.getAndSet(true)) {
return;
}
executorService.scheduleAtFixedRate(() -> {
try {
execute();
} catch (final Exception e) {
logger.warn("Error in execution", e);
// wait a bit, in case the error is caused by external instability
Util.sleep(10_000);
}
}, 0, runInterval.getSeconds(), TimeUnit.SECONDS);
}
@Override
public synchronized void stop() throws Exception {
running.set(false);
notifyAll();
executorService.shutdown();
while (!finished) {
Util.wait(this);
}
}
@Override
public void run() {
while(running.get()) {
try {
execute();
sleepWhileRunning(runInterval);
} catch (final Exception e) {
logger.warn("Error in execution", e);
// wait a bit, in case the error is caused by external instability
Util.sleep(10_000);
boolean terminated = false;
while (!terminated) {
terminated = executorService.awaitTermination(5, TimeUnit.MINUTES);
if (!terminated) {
logger.warn("worker not yet terminated");
}
}
synchronized (this) {
finished = true;
notifyAll();
}
}
private void execute() {
if (lock.claimActiveWork(workerId, workerTtl)) {
try {
final long startTimeMs = System.currentTimeMillis();
logger.info("Starting execution");
doPeriodicWork();
logger.info("Execution complete");
final long endTimeMs = System.currentTimeMillis();
final Duration sleepInterval = runInterval.minusMillis(endTimeMs - startTimeMs);
if (sleepInterval.getSeconds() > 0) {
logger.info("Sleeping for {}", sleepInterval);
sleepWhileRunning(sleepInterval);
}
} catch (final Exception e) {
logger.warn("Periodic work failed", e);
// wait a full interval for recovery
sleepWhileRunning(runInterval);
// wait a bit, in case the error is caused by external instability
Util.sleep(10_000);
} finally {
lock.releaseActiveWork(workerId);
}
}
}
private synchronized void sleepWhileRunning(Duration delay) {
if (running.get()) Util.wait(this, delay.toMillis());
}
}