Add worker thread pool to PushFeedbackProcessor

This commit is contained in:
Chris Eager 2023-06-16 10:46:02 -05:00 committed by Chris Eager
parent f1962a03ef
commit 7dce183170
4 changed files with 107 additions and 64 deletions

View File

@ -387,16 +387,23 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ScheduledExecutorService recurringJobExecutor = environment.lifecycle() ScheduledExecutorService recurringJobExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build(); .scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build();
ScheduledExecutorService websocketScheduledExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "websocket-%d")).threads(8).build(); ScheduledExecutorService websocketScheduledExecutor = environment.lifecycle()
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build(); .scheduledExecutorService(name(getClass(), "websocket-%d")).threads(8).build();
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build(); ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle()
.executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16)
.workQueue(keyspaceNotificationDispatchQueue).build();
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d"))
.maxThreads(1).minThreads(1).build();
ExecutorService fcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "fcmSender-%d")) ExecutorService fcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "fcmSender-%d"))
.maxThreads(32).minThreads(32).workQueue(fcmSenderQueue).build(); .maxThreads(32).minThreads(32).workQueue(fcmSenderQueue).build();
ExecutorService secureValueRecoveryServiceExecutor = environment.lifecycle() ExecutorService secureValueRecoveryServiceExecutor = environment.lifecycle()
.executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build(); .executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService storageServiceExecutor = environment.lifecycle() ExecutorService storageServiceExecutor = environment.lifecycle()
.executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build(); .executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService accountDeletionExecutor = environment.lifecycle().executorService(name(getClass(), "accountCleaner-%d")).maxThreads(16).minThreads(16).build(); ExecutorService accountDeletionExecutor = environment.lifecycle()
.executorService(name(getClass(), "accountCleaner-%d")).maxThreads(16).minThreads(16).build();
ExecutorService pushFeedbackUpdateExecutor = environment.lifecycle()
.executorService(name(getClass(), "pushFeedback-%d")).maxThreads(4).minThreads(4).build();
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService( Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
ExecutorServiceMetrics.monitor(Metrics.globalRegistry, ExecutorServiceMetrics.monitor(Metrics.globalRegistry,
@ -582,7 +589,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = List.of( final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = List.of(
new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster), new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster),
// PushFeedbackProcessor may update device properties // PushFeedbackProcessor may update device properties
new PushFeedbackProcessor(accountsManager)); new PushFeedbackProcessor(accountsManager, pushFeedbackUpdateExecutor));
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster, AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster,
AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX); AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX);

View File

@ -13,20 +13,35 @@ import com.codahale.metrics.SharedMetricRegistries;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.util.Util;
public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener { public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener {
private static final Logger log = LoggerFactory.getLogger(PushFeedbackProcessor.class);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter expired = metricRegistry.meter(name(getClass(), "unregistered", "expired")); private final Meter expired = metricRegistry.meter(name(getClass(), "unregistered", "expired"));
private final Meter recovered = metricRegistry.meter(name(getClass(), "unregistered", "recovered")); private final Meter recovered = metricRegistry.meter(name(getClass(), "unregistered", "recovered"));
private final AccountsManager accountsManager; private static final Counter UPDATED_ACCOUNT_COUNTER = Metrics.counter(
MetricsUtil.name(PushFeedbackProcessor.class, "updatedAccounts"));
public PushFeedbackProcessor(AccountsManager accountsManager) {
private final AccountsManager accountsManager;
private final ExecutorService updateExecutor;
public PushFeedbackProcessor(AccountsManager accountsManager, ExecutorService updateExecutor) {
this.accountsManager = accountsManager; this.accountsManager = accountsManager;
this.updateExecutor = updateExecutor;
} }
@Override @Override
@ -38,7 +53,9 @@ public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener {
@Override @Override
protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) { protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
for (Account account : chunkAccounts) {
final List<CompletableFuture<Void>> updateFutures = chunkAccounts.stream()
.filter(account -> {
boolean update = false; boolean update = false;
for (Device device : account.getDevices()) { for (Device device : account.getDevices()) {
@ -55,7 +72,9 @@ public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener {
} }
} }
if (update) { return update;
})
.map(account -> CompletableFuture.runAsync(() -> {
// fetch a new version, since the chunk is shared and implicitly read-only // fetch a new version, since the chunk is shared and implicitly read-only
accountsManager.getByAccountIdentifier(account.getUuid()).ifPresent(accountToUpdate -> { accountsManager.getByAccountIdentifier(account.getUuid()).ifPresent(accountToUpdate -> {
accountsManager.update(accountToUpdate, a -> { accountsManager.update(accountToUpdate, a -> {
@ -82,7 +101,20 @@ public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener {
} }
}); });
}); });
}, updateExecutor)
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.warn("Failed to update account {}", account.getUuid(), throwable);
} else {
UPDATED_ACCOUNT_COUNTER.increment();
} }
}))
.toList();
try {
CompletableFuture.allOf(updateFutures.toArray(new CompletableFuture[0])).join();
} catch (final Exception e) {
log.debug("Failed to update one or more accounts in chunk", e);
} }
} }

View File

@ -92,12 +92,6 @@ public class CrawlAccountsCommand extends EnvironmentCommand<WhisperServerConfig
final FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", final FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster",
configuration.getMetricsClusterConfiguration(), deps.redisClusterClientResources()); configuration.getMetricsClusterConfiguration(), deps.redisClusterClientResources());
// TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = List.of(
new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster),
// PushFeedbackProcessor may update device properties
new PushFeedbackProcessor(accountsManager));
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
new DynamicConfigurationManager<>(configuration.getAppConfig().getApplication(), new DynamicConfigurationManager<>(configuration.getAppConfig().getApplication(),
configuration.getAppConfig().getEnvironment(), configuration.getAppConfig().getEnvironment(),
@ -111,6 +105,15 @@ public class CrawlAccountsCommand extends EnvironmentCommand<WhisperServerConfig
final AccountDatabaseCrawler crawler = switch ((CrawlType) namespace.get(CRAWL_TYPE)) { final AccountDatabaseCrawler crawler = switch ((CrawlType) namespace.get(CRAWL_TYPE)) {
case GENERAL_PURPOSE -> { case GENERAL_PURPOSE -> {
final ExecutorService pushFeedbackUpdateExecutor = environment.lifecycle()
.executorService(name(getClass(), "pushFeedback-%d")).maxThreads(workers).minThreads(workers).build();
// TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = List.of(
new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster),
// PushFeedbackProcessor may update device properties
new PushFeedbackProcessor(accountsManager, pushFeedbackUpdateExecutor));
final AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache( final AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(
cacheCluster, cacheCluster,
AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX); AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX);

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -101,7 +102,7 @@ class PushFeedbackProcessorTest {
@Test @Test
void testEmpty() throws AccountDatabaseCrawlerRestartException { void testEmpty() throws AccountDatabaseCrawlerRestartException {
PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager); PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager, Executors.newSingleThreadExecutor());
processor.timeAndProcessCrawlChunk(Optional.of(UUID.randomUUID()), Collections.emptyList()); processor.timeAndProcessCrawlChunk(Optional.of(UUID.randomUUID()), Collections.emptyList());
verifyNoInteractions(accountsManager); verifyNoInteractions(accountsManager);
@ -109,7 +110,7 @@ class PushFeedbackProcessorTest {
@Test @Test
void testUpdate() throws AccountDatabaseCrawlerRestartException { void testUpdate() throws AccountDatabaseCrawlerRestartException {
PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager); PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager, Executors.newSingleThreadExecutor());
processor.timeAndProcessCrawlChunk(Optional.of(UUID.randomUUID()), processor.timeAndProcessCrawlChunk(Optional.of(UUID.randomUUID()),
List.of(uninstalledAccount, mixedAccount, stillActiveAccount, freshAccount, cleanAccount)); List.of(uninstalledAccount, mixedAccount, stillActiveAccount, freshAccount, cleanAccount));