From 8edb450d73d668259525e76065f8f74a8e59bccf Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 5 Jul 2023 14:14:00 -0400 Subject: [PATCH] Parallelize single-shot account crawlers --- .../textsecuregcm/storage/Accounts.java | 25 +++++++++++-------- .../storage/AccountsManager.java | 6 +++-- ...bstractSinglePassCrawlAccountsCommand.java | 6 +++-- .../MigrateSignedECPreKeysCommand.java | 6 +++-- .../textsecuregcm/storage/AccountsTest.java | 4 ++- 5 files changed, 29 insertions(+), 18 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java index 7151507a1..a607392be 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java @@ -37,6 +37,8 @@ import org.whispersystems.textsecuregcm.util.ExceptionUtils; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.UUIDUtil; import reactor.core.publisher.Flux; +import reactor.core.publisher.ParallelFlux; +import reactor.core.scheduler.Scheduler; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -678,21 +680,22 @@ public class Accounts extends AbstractDynamoDbStore { })); } - Flux getAll(final int segments) { + ParallelFlux getAll(final int segments, final Scheduler scheduler) { if (segments < 1) { throw new IllegalArgumentException("Total number of segments must be positive"); } - return Flux.merge( - Flux.range(0, segments) - .map(segment -> asyncClient.scanPaginator(ScanRequest.builder() - .tableName(accountsTableName) - .consistentRead(true) - .segment(segment) - .totalSegments(segments) - .build()) - .items() - .map(Accounts::fromItem))); + return Flux.range(0, segments) + .parallel() + .runOn(scheduler) + .flatMap(segment -> asyncClient.scanPaginator(ScanRequest.builder() + .tableName(accountsTableName) + .consistentRead(true) + .segment(segment) + .totalSegments(segments) + .build()) + .items() + .map(Accounts::fromItem)); } @Nonnull diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index f7f1db4a5..586309ee5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -59,6 +59,8 @@ import org.whispersystems.textsecuregcm.util.DestinationDeviceValidator; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.Util; import reactor.core.publisher.Flux; +import reactor.core.publisher.ParallelFlux; +import reactor.core.scheduler.Scheduler; public class AccountsManager { @@ -721,8 +723,8 @@ public class AccountsManager { return accounts.getAllFrom(uuid, length); } - public Flux streamAllFromDynamo(final int segments) { - return accounts.getAll(segments); + public ParallelFlux streamAllFromDynamo(final int segments, final Scheduler scheduler) { + return accounts.getAll(segments, scheduler); } public void delete(final Account account, final DeletionReason deletionReason) throws InterruptedException { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AbstractSinglePassCrawlAccountsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AbstractSinglePassCrawlAccountsCommand.java index 2f0fa285b..17daf1d92 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AbstractSinglePassCrawlAccountsCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AbstractSinglePassCrawlAccountsCommand.java @@ -15,6 +15,8 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler; import reactor.core.publisher.Flux; +import reactor.core.publisher.ParallelFlux; +import reactor.core.scheduler.Schedulers; import java.util.Objects; public abstract class AbstractSinglePassCrawlAccountsCommand extends EnvironmentCommand { @@ -57,8 +59,8 @@ public abstract class AbstractSinglePassCrawlAccountsCommand extends Environment commandDependencies = CommandDependencies.build(getName(), environment, configuration); final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT)); - crawlAccounts(commandDependencies.accountsManager().streamAllFromDynamo(segments)); + crawlAccounts(commandDependencies.accountsManager().streamAllFromDynamo(segments, Schedulers.parallel())); } - protected abstract void crawlAccounts(final Flux accounts); + protected abstract void crawlAccounts(final ParallelFlux accounts); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java index b1e64fc3f..536f2693b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java @@ -15,6 +15,7 @@ import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.KeysManager; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.ParallelFlux; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; @@ -28,7 +29,7 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun } @Override - protected void crawlAccounts(final Flux accounts) { + protected void crawlAccounts(final ParallelFlux accounts) { final KeysManager keysManager = getCommandDependencies().keysManager(); accounts.flatMap(account -> Flux.fromIterable(account.getDevices()) @@ -48,6 +49,7 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun .flatMap(keyTuple -> Mono.fromFuture( keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3()))) .doOnNext(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment()) - .blockLast(); + .then() + .block(); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java index 2e50c6815..c7297e0d3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java @@ -57,6 +57,7 @@ import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.TestClock; +import reactor.core.scheduler.Schedulers; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -504,7 +505,8 @@ class AccountsTest { accounts.create(account); } - final List retrievedAccounts = accounts.getAll(2).collectList().block(); + final List retrievedAccounts = + accounts.getAll(2, Schedulers.parallel()).sequential().collectList().block(); assertNotNull(retrievedAccounts); assertEquals(expectedAccounts.stream().map(Account::getUuid).collect(Collectors.toSet()),