Parallelize single-shot account crawlers

This commit is contained in:
Jon Chambers 2023-07-05 14:14:00 -04:00 committed by Jon Chambers
parent fedeef4da5
commit 8edb450d73
5 changed files with 29 additions and 18 deletions

View File

@ -37,6 +37,8 @@ import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.UUIDUtil; import org.whispersystems.textsecuregcm.util.UUIDUtil;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Scheduler;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@ -678,21 +680,22 @@ public class Accounts extends AbstractDynamoDbStore {
})); }));
} }
Flux<Account> getAll(final int segments) { ParallelFlux<Account> getAll(final int segments, final Scheduler scheduler) {
if (segments < 1) { if (segments < 1) {
throw new IllegalArgumentException("Total number of segments must be positive"); throw new IllegalArgumentException("Total number of segments must be positive");
} }
return Flux.merge( return Flux.range(0, segments)
Flux.range(0, segments) .parallel()
.map(segment -> asyncClient.scanPaginator(ScanRequest.builder() .runOn(scheduler)
.tableName(accountsTableName) .flatMap(segment -> asyncClient.scanPaginator(ScanRequest.builder()
.consistentRead(true) .tableName(accountsTableName)
.segment(segment) .consistentRead(true)
.totalSegments(segments) .segment(segment)
.build()) .totalSegments(segments)
.items() .build())
.map(Accounts::fromItem))); .items()
.map(Accounts::fromItem));
} }
@Nonnull @Nonnull

View File

@ -59,6 +59,8 @@ import org.whispersystems.textsecuregcm.util.DestinationDeviceValidator;
import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Scheduler;
public class AccountsManager { public class AccountsManager {
@ -721,8 +723,8 @@ public class AccountsManager {
return accounts.getAllFrom(uuid, length); return accounts.getAllFrom(uuid, length);
} }
public Flux<Account> streamAllFromDynamo(final int segments) { public ParallelFlux<Account> streamAllFromDynamo(final int segments, final Scheduler scheduler) {
return accounts.getAll(segments); return accounts.getAll(segments, scheduler);
} }
public void delete(final Account account, final DeletionReason deletionReason) throws InterruptedException { public void delete(final Account account, final DeletionReason deletionReason) throws InterruptedException {

View File

@ -15,6 +15,8 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler; import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Schedulers;
import java.util.Objects; import java.util.Objects;
public abstract class AbstractSinglePassCrawlAccountsCommand extends EnvironmentCommand<WhisperServerConfiguration> { public abstract class AbstractSinglePassCrawlAccountsCommand extends EnvironmentCommand<WhisperServerConfiguration> {
@ -57,8 +59,8 @@ public abstract class AbstractSinglePassCrawlAccountsCommand extends Environment
commandDependencies = CommandDependencies.build(getName(), environment, configuration); commandDependencies = CommandDependencies.build(getName(), environment, configuration);
final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT)); final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT));
crawlAccounts(commandDependencies.accountsManager().streamAllFromDynamo(segments)); crawlAccounts(commandDependencies.accountsManager().streamAllFromDynamo(segments, Schedulers.parallel()));
} }
protected abstract void crawlAccounts(final Flux<Account> accounts); protected abstract void crawlAccounts(final ParallelFlux<Account> accounts);
} }

View File

@ -15,6 +15,7 @@ import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.KeysManager; import org.whispersystems.textsecuregcm.storage.KeysManager;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.ParallelFlux;
import reactor.util.function.Tuple3; import reactor.util.function.Tuple3;
import reactor.util.function.Tuples; import reactor.util.function.Tuples;
@ -28,7 +29,7 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun
} }
@Override @Override
protected void crawlAccounts(final Flux<Account> accounts) { protected void crawlAccounts(final ParallelFlux<Account> accounts) {
final KeysManager keysManager = getCommandDependencies().keysManager(); final KeysManager keysManager = getCommandDependencies().keysManager();
accounts.flatMap(account -> Flux.fromIterable(account.getDevices()) accounts.flatMap(account -> Flux.fromIterable(account.getDevices())
@ -48,6 +49,7 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun
.flatMap(keyTuple -> Mono.fromFuture( .flatMap(keyTuple -> Mono.fromFuture(
keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3()))) keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3())))
.doOnNext(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment()) .doOnNext(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment())
.blockLast(); .then()
.block();
} }
} }

View File

@ -57,6 +57,7 @@ import org.whispersystems.textsecuregcm.tests.util.DevicesHelper;
import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.TestClock; import org.whispersystems.textsecuregcm.util.TestClock;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@ -504,7 +505,8 @@ class AccountsTest {
accounts.create(account); accounts.create(account);
} }
final List<Account> retrievedAccounts = accounts.getAll(2).collectList().block(); final List<Account> retrievedAccounts =
accounts.getAll(2, Schedulers.parallel()).sequential().collectList().block();
assertNotNull(retrievedAccounts); assertNotNull(retrievedAccounts);
assertEquals(expectedAccounts.stream().map(Account::getUuid).collect(Collectors.toSet()), assertEquals(expectedAccounts.stream().map(Account::getUuid).collect(Collectors.toSet()),