diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java index 4bedeb482..d0dd80035 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java @@ -45,7 +45,6 @@ import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.UUIDUtil; import org.whispersystems.textsecuregcm.util.Util; import reactor.core.publisher.Flux; -import reactor.core.publisher.ParallelFlux; import reactor.core.scheduler.Scheduler; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; @@ -1058,7 +1057,7 @@ public class Accounts extends AbstractDynamoDbStore { .thenRun(() -> sample.stop(DELETE_TIMER)); } - ParallelFlux getAll(final int segments, final Scheduler scheduler) { + Flux getAll(final int segments, final Scheduler scheduler) { if (segments < 1) { throw new IllegalArgumentException("Total number of segments must be positive"); } @@ -1073,7 +1072,8 @@ public class Accounts extends AbstractDynamoDbStore { .totalSegments(segments) .build()) .items() - .map(Accounts::fromItem)); + .map(Accounts::fromItem)) + .sequential(); } @Nonnull diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 591791cbc..fbe3105a6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -69,7 +69,7 @@ import org.whispersystems.textsecuregcm.util.ExceptionUtils; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.Util; -import reactor.core.publisher.ParallelFlux; +import reactor.core.publisher.Flux; import reactor.core.scheduler.Scheduler; import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem; @@ -940,7 +940,7 @@ public class AccountsManager { return accounts.findRecentlyDeletedE164(uuid); } - public ParallelFlux streamAllFromDynamo(final int segments, final Scheduler scheduler) { + public Flux streamAllFromDynamo(final int segments, final Scheduler scheduler) { return accounts.getAll(segments, scheduler); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AbstractSinglePassCrawlAccountsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AbstractSinglePassCrawlAccountsCommand.java index 4251a91a3..cf175fce5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AbstractSinglePassCrawlAccountsCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AbstractSinglePassCrawlAccountsCommand.java @@ -18,7 +18,7 @@ import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler; -import reactor.core.publisher.ParallelFlux; +import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; public abstract class AbstractSinglePassCrawlAccountsCommand extends EnvironmentCommand { @@ -103,5 +103,5 @@ public abstract class AbstractSinglePassCrawlAccountsCommand extends Environment logger.error("Unhandled error", throwable); } - protected abstract void crawlAccounts(final ParallelFlux accounts); + protected abstract void crawlAccounts(final Flux accounts); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java index c9c8702ab..1630946dd 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java @@ -22,7 +22,6 @@ import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.KeysManager; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.ParallelFlux; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; import reactor.util.retry.Retry; @@ -63,13 +62,12 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun } @Override - protected void crawlAccounts(final ParallelFlux accounts) { + protected void crawlAccounts(final Flux accounts) { final KeysManager keysManager = getCommandDependencies().keysManager(); final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT); final int bufferSize = getNamespace().getInt(BUFFER_ARGUMENT); accounts - .sequential() .flatMap(account -> Flux.fromIterable(account.getDevices()) .flatMap(device -> Flux.fromArray(IdentityType.values()) .filter(identityType -> device.getSignedPreKey(identityType) != null) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ProcessPushNotificationFeedbackCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ProcessPushNotificationFeedbackCommand.java index 0e870e414..04f7a42fd 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ProcessPushNotificationFeedbackCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ProcessPushNotificationFeedbackCommand.java @@ -20,8 +20,8 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.ParallelFlux; public class ProcessPushNotificationFeedbackCommand extends AbstractSinglePassCrawlAccountsCommand { @@ -62,12 +62,11 @@ public class ProcessPushNotificationFeedbackCommand extends AbstractSinglePassCr } @Override - protected void crawlAccounts(final ParallelFlux accounts) { + protected void crawlAccounts(final Flux accounts) { final boolean isDryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT); accounts .filter(account -> account.getDevices().stream().anyMatch(this::deviceNeedsUpdate)) - .sequential() .flatMap(account -> { account.getDevices().stream() .filter(this::deviceNeedsUpdate) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredAccountsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredAccountsCommand.java index de84ee261..41c46213e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredAccountsCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredAccountsCommand.java @@ -19,7 +19,7 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import reactor.core.publisher.Mono; -import reactor.core.publisher.ParallelFlux; +import reactor.core.publisher.Flux; public class RemoveExpiredAccountsCommand extends AbstractSinglePassCrawlAccountsCommand { @@ -57,13 +57,12 @@ public class RemoveExpiredAccountsCommand extends AbstractSinglePassCrawlAccount } @Override - protected void crawlAccounts(final ParallelFlux accounts) { + protected void crawlAccounts(final Flux accounts) { final boolean isDryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT); final Counter deletedAccountCounter = Metrics.counter(DELETED_ACCOUNT_COUNTER_NAME, "dryRun", String.valueOf(isDryRun)); accounts.filter(this::isExpired) - .sequential() .flatMap(expiredAccount -> { final Mono deleteAccountMono = isDryRun ? Mono.empty() diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java index c35dcf717..428e3c8dc 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java @@ -665,7 +665,7 @@ class AccountsTest { } final List retrievedAccounts = - accounts.getAll(2, Schedulers.parallel()).sequential().collectList().block(); + accounts.getAll(2, Schedulers.parallel()).collectList().block(); assertNotNull(retrievedAccounts); assertEquals(expectedAccounts.stream().map(Account::getUuid).collect(Collectors.toSet()), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/ProcessPushNotificationFeedbackCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/ProcessPushNotificationFeedbackCommandTest.java index e5fc3c4c9..ca498c49d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/ProcessPushNotificationFeedbackCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/ProcessPushNotificationFeedbackCommandTest.java @@ -124,8 +124,7 @@ class ProcessPushNotificationFeedbackCommandTest { } processPushNotificationFeedbackCommand.crawlAccounts( - Flux.just(accountWithActiveDevice, accountWithUninstalledDevice, accountWithAlreadyDisabledUninstalledDevice) - .parallel()); + Flux.just(accountWithActiveDevice, accountWithUninstalledDevice, accountWithAlreadyDisabledUninstalledDevice)); if (isDryRun) { verify(accountsManager, never()).updateAsync(any(), any()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredAccountsCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredAccountsCommandTest.java index 0eb356c80..8a4532e3c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredAccountsCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredAccountsCommandTest.java @@ -73,7 +73,7 @@ class RemoveExpiredAccountsCommandTest { when(expiredAccount.getLastSeen()) .thenReturn(clock.instant().minus(RemoveExpiredAccountsCommand.MAX_IDLE_DURATION).minusMillis(1).toEpochMilli()); - removeExpiredAccountsCommand.crawlAccounts(Flux.just(activeAccount, expiredAccount).parallel()); + removeExpiredAccountsCommand.crawlAccounts(Flux.just(activeAccount, expiredAccount)); if (isDryRun) { verify(accountsManager, never()).delete(any(), any());