Refactor to use shared recurringJobExecutor

This commit is contained in:
Chris Eager 2021-06-28 15:49:53 -05:00 committed by Chris Eager
parent 819d59cd79
commit 6fa9dcd954
3 changed files with 36 additions and 14 deletions

View File

@ -400,7 +400,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
BlockingQueue<Runnable> keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(10_000); BlockingQueue<Runnable> keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(10_000);
Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), keyspaceNotificationDispatchQueue); Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), keyspaceNotificationDispatchQueue);
ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(2).build(); ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(3).build();
ScheduledExecutorService declinedMessageReceiptExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "declined-receipt-%d")).threads(2).build(); ScheduledExecutorService declinedMessageReceiptExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "declined-receipt-%d")).threads(2).build();
ScheduledExecutorService retrySchedulingExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "retry-%d")).threads(2).build(); ScheduledExecutorService retrySchedulingExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "retry-%d")).threads(2).build();
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build(); ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build();
@ -494,7 +494,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster); AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster);
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()); AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs());
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccounts, deletedAccountsDirectoryReconcilers, cacheCluster); DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccounts, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
apnSender.setApnFallbackManager(apnFallbackManager); apnSender.setApnFallbackManager(apnFallbackManager);
environment.lifecycle().manage(apnFallbackManager); environment.lifecycle().manage(apnFallbackManager);

View File

@ -12,6 +12,7 @@ import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest.User; import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest.User;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
@ -32,9 +33,10 @@ public class DeletedAccountsTableCrawler extends ManagedPeriodicWork {
public DeletedAccountsTableCrawler( public DeletedAccountsTableCrawler(
final DeletedAccounts deletedAccounts, final DeletedAccounts deletedAccounts,
final List<DeletedAccountsDirectoryReconciler> reconcilers, final List<DeletedAccountsDirectoryReconciler> reconcilers,
final FaultTolerantRedisCluster cluster) throws IOException { final FaultTolerantRedisCluster cluster,
final ScheduledExecutorService executorService) throws IOException {
super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL); super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL, executorService);
this.deletedAccounts = deletedAccounts; this.deletedAccounts = deletedAccounts;
this.reconcilers = reconcilers; this.reconcilers = reconcilers;

View File

@ -4,12 +4,16 @@
*/ */
package org.whispersystems.textsecuregcm.storage; package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed; import io.dropwizard.lifecycle.Managed;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration; import java.time.Duration;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -19,6 +23,8 @@ public abstract class ManagedPeriodicWork implements Managed {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private static final String FUTURE_DONE_GAUGE_NAME = "futureDone";
private final ManagedPeriodicWorkLock lock; private final ManagedPeriodicWorkLock lock;
private final Duration workerTtl; private final Duration workerTtl;
private final Duration runInterval; private final Duration runInterval;
@ -27,17 +33,24 @@ public abstract class ManagedPeriodicWork implements Managed {
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private ScheduledFuture<?> scheduledFuture;
public ManagedPeriodicWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval) { public ManagedPeriodicWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval, final ScheduledExecutorService scheduledExecutorService) {
this.lock = lock; this.lock = lock;
this.workerTtl = workerTtl; this.workerTtl = workerTtl;
this.runInterval = runInterval; this.runInterval = runInterval;
this.workerId = UUID.randomUUID().toString(); this.workerId = UUID.randomUUID().toString();
this.executorService = Executors.newSingleThreadScheduledExecutor((runnable) -> new Thread(runnable, getClass().getName())); this.executorService = scheduledExecutorService;
Metrics.gauge(name(getClass(), FUTURE_DONE_GAUGE_NAME), this, ManagedPeriodicWork::isFutureDone);
} }
abstract protected void doPeriodicWork() throws Exception; abstract protected void doPeriodicWork() throws Exception;
int isFutureDone() {
return scheduledFuture.isDone() ? 1 : 0;
}
@Override @Override
public synchronized void start() throws Exception { public synchronized void start() throws Exception {
@ -45,7 +58,7 @@ public abstract class ManagedPeriodicWork implements Managed {
return; return;
} }
executorService.scheduleAtFixedRate(() -> { scheduledFuture = executorService.scheduleAtFixedRate(() -> {
try { try {
execute(); execute();
} catch (final Exception e) { } catch (final Exception e) {
@ -60,13 +73,20 @@ public abstract class ManagedPeriodicWork implements Managed {
@Override @Override
public synchronized void stop() throws Exception { public synchronized void stop() throws Exception {
executorService.shutdown(); if (scheduledFuture != null) {
scheduledFuture.cancel(false);
boolean terminated = false; boolean terminated = false;
while (!terminated) { while (!terminated) {
terminated = executorService.awaitTermination(5, TimeUnit.MINUTES); try {
if (!terminated) { scheduledFuture.get(5, TimeUnit.MINUTES);
logger.warn("worker not yet terminated"); terminated = true;
} catch (final TimeoutException e) {
logger.warn("worker not yet terminated");
} catch (final Exception e) {
logger.warn("worker terminated exceptionally", e);
terminated = true;
}
} }
} }
} }