diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index e3d5fb06c..642ef5a78 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -387,26 +387,33 @@ public class WhisperServerService extends Application accountDatabaseCrawlerListeners = List.of( new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster), // PushFeedbackProcessor may update device properties - new PushFeedbackProcessor(accountsManager)); + new PushFeedbackProcessor(accountsManager, pushFeedbackUpdateExecutor)); AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster, AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java index 32a408a46..2fbff4c06 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java @@ -13,20 +13,35 @@ import com.codahale.metrics.SharedMetricRegistries; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Util; public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener { + private static final Logger log = LoggerFactory.getLogger(PushFeedbackProcessor.class); + private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private final Meter expired = metricRegistry.meter(name(getClass(), "unregistered", "expired")); - private final Meter recovered = metricRegistry.meter(name(getClass(), "unregistered", "recovered")); + private final Meter expired = metricRegistry.meter(name(getClass(), "unregistered", "expired")); + private final Meter recovered = metricRegistry.meter(name(getClass(), "unregistered", "recovered")); + + private static final Counter UPDATED_ACCOUNT_COUNTER = Metrics.counter( + MetricsUtil.name(PushFeedbackProcessor.class, "updatedAccounts")); + private final AccountsManager accountsManager; + private final ExecutorService updateExecutor; - public PushFeedbackProcessor(AccountsManager accountsManager) { + public PushFeedbackProcessor(AccountsManager accountsManager, ExecutorService updateExecutor) { this.accountsManager = accountsManager; + this.updateExecutor = updateExecutor; } @Override @@ -38,51 +53,68 @@ public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener { @Override protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) { - for (Account account : chunkAccounts) { - boolean update = false; - for (Device device : account.getDevices()) { - if (deviceNeedsUpdate(device)) { - if (deviceExpired(device)) { - if (device.isEnabled()) { - expired.mark(); - update = true; - } - } else { - recovered.mark(); - update = true; - } - } - } + final List> updateFutures = chunkAccounts.stream() + .filter(account -> { + boolean update = false; - if (update) { - // fetch a new version, since the chunk is shared and implicitly read-only - accountsManager.getByAccountIdentifier(account.getUuid()).ifPresent(accountToUpdate -> { - accountsManager.update(accountToUpdate, a -> { - for (Device device : a.getDevices()) { - if (deviceNeedsUpdate(device)) { - if (deviceExpired(device)) { - if (!Util.isEmpty(device.getApnId())) { - if (device.getId() == 1) { - device.setUserAgent("OWI"); - } else { - device.setUserAgent("OWP"); - } - } else if (!Util.isEmpty(device.getGcmId())) { - device.setUserAgent("OWA"); - } - device.setGcmId(null); - device.setApnId(null); - device.setVoipApnId(null); - device.setFetchesMessages(false); - } else { - device.setUninstalledFeedbackTimestamp(0); + for (Device device : account.getDevices()) { + if (deviceNeedsUpdate(device)) { + if (deviceExpired(device)) { + if (device.isEnabled()) { + expired.mark(); + update = true; } + } else { + recovered.mark(); + update = true; } } - }); - }); - } + } + + return update; + }) + .map(account -> CompletableFuture.runAsync(() -> { + // fetch a new version, since the chunk is shared and implicitly read-only + accountsManager.getByAccountIdentifier(account.getUuid()).ifPresent(accountToUpdate -> { + accountsManager.update(accountToUpdate, a -> { + for (Device device : a.getDevices()) { + if (deviceNeedsUpdate(device)) { + if (deviceExpired(device)) { + if (!Util.isEmpty(device.getApnId())) { + if (device.getId() == 1) { + device.setUserAgent("OWI"); + } else { + device.setUserAgent("OWP"); + } + } else if (!Util.isEmpty(device.getGcmId())) { + device.setUserAgent("OWA"); + } + device.setGcmId(null); + device.setApnId(null); + device.setVoipApnId(null); + device.setFetchesMessages(false); + } else { + device.setUninstalledFeedbackTimestamp(0); + } + } + } + }); + }); + }, updateExecutor) + .whenComplete((ignored, throwable) -> { + if (throwable != null) { + log.warn("Failed to update account {}", account.getUuid(), throwable); + } else { + UPDATED_ACCOUNT_COUNTER.increment(); + } + })) + .toList(); + + try { + CompletableFuture.allOf(updateFutures.toArray(new CompletableFuture[0])).join(); + } catch (final Exception e) { + log.debug("Failed to update one or more accounts in chunk", e); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CrawlAccountsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CrawlAccountsCommand.java index 29618a5fb..fae59d68a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CrawlAccountsCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CrawlAccountsCommand.java @@ -92,12 +92,6 @@ public class CrawlAccountsCommand extends EnvironmentCommand accountDatabaseCrawlerListeners = List.of( - new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster), - // PushFeedbackProcessor may update device properties - new PushFeedbackProcessor(accountsManager)); - final DynamicConfigurationManager dynamicConfigurationManager = new DynamicConfigurationManager<>(configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(), @@ -111,6 +105,15 @@ public class CrawlAccountsCommand extends EnvironmentCommand { + final ExecutorService pushFeedbackUpdateExecutor = environment.lifecycle() + .executorService(name(getClass(), "pushFeedback-%d")).maxThreads(workers).minThreads(workers).build(); + + // TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data + final List accountDatabaseCrawlerListeners = List.of( + new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster), + // PushFeedbackProcessor may update device properties + new PushFeedbackProcessor(accountsManager, pushFeedbackUpdateExecutor)); + final AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache( cacheCluster, AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PushFeedbackProcessorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PushFeedbackProcessorTest.java index 131fc30e8..958b868df 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PushFeedbackProcessorTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PushFeedbackProcessorTest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -101,7 +102,7 @@ class PushFeedbackProcessorTest { @Test void testEmpty() throws AccountDatabaseCrawlerRestartException { - PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager); + PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager, Executors.newSingleThreadExecutor()); processor.timeAndProcessCrawlChunk(Optional.of(UUID.randomUUID()), Collections.emptyList()); verifyNoInteractions(accountsManager); @@ -109,7 +110,7 @@ class PushFeedbackProcessorTest { @Test void testUpdate() throws AccountDatabaseCrawlerRestartException { - PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager); + PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager, Executors.newSingleThreadExecutor()); processor.timeAndProcessCrawlChunk(Optional.of(UUID.randomUUID()), List.of(uninstalledAccount, mixedAccount, stillActiveAccount, freshAccount, cleanAccount));