diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateRegistrationRecoveryPasswordsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateRegistrationRecoveryPasswordsCommand.java index 6e530dd9e..1eda894b2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateRegistrationRecoveryPasswordsCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateRegistrationRecoveryPasswordsCommand.java @@ -9,6 +9,11 @@ import io.dropwizard.core.Application; import io.dropwizard.core.setup.Environment; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Subparser; import org.slf4j.Logger; @@ -19,16 +24,15 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuple3; import reactor.util.retry.Retry; -import java.time.Duration; public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommandWithDependencies { - private static final int DEFAULT_MAX_CONCURRENCY = 16; - private static final String DRY_RUN_ARGUMENT = "dry-run"; private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency"; private static final String SEGMENTS_ARGUMENT = "segments"; + private static final String BUFFER_ARGUMENT = "buffer"; private static final String RECORDS_INSPECTED_COUNTER_NAME = MetricsUtil.name(MigrateRegistrationRecoveryPasswordsCommand.class, "recordsInspected"); @@ -63,7 +67,7 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand subparser.addArgument("--max-concurrency") .type(Integer.class) .dest(MAX_CONCURRENCY_ARGUMENT) - .setDefault(DEFAULT_MAX_CONCURRENCY) + .setDefault(16) .help("Max concurrency for DynamoDB operations"); subparser.addArgument("--segments") @@ -72,6 +76,12 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand .required(false) .setDefault(1) .help("The total number of segments for a DynamoDB scan"); + + subparser.addArgument("--buffer") + .type(Integer.class) + .dest(BUFFER_ARGUMENT) + .setDefault(16_384) + .help("Records to buffer"); } @Override @@ -81,6 +91,7 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT); final int maxConcurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT); final int segments = namespace.getInt(SEGMENTS_ARGUMENT); + final int bufferSize = namespace.getInt(BUFFER_ARGUMENT); final Counter recordsInspectedCounter = Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun)); @@ -92,6 +103,14 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand commandDependencies.registrationRecoveryPasswordsManager(); registrationRecoveryPasswordsManager.getE164AssociatedRegistrationRecoveryPasswords(segments, Schedulers.parallel()) + .buffer(bufferSize) + .map(source -> { + final List> shuffled = new ArrayList<>(source); + Collections.shuffle(shuffled); + return shuffled; + }) + .limitRate(2) + .flatMapIterable(Function.identity()) .doOnNext(tuple -> recordsInspectedCounter.increment()) .flatMap(tuple -> { final String e164 = tuple.getT1();