Add a buffer/shuffle pair to better distribute load across shards

This commit is contained in:
Jon Chambers 2024-11-25 20:33:08 -05:00 committed by Jon Chambers
parent ff4e2bdfb7
commit ab94d3045d
1 changed files with 23 additions and 4 deletions

View File

@ -9,6 +9,11 @@ import io.dropwizard.core.Application;
import io.dropwizard.core.setup.Environment; import io.dropwizard.core.setup.Environment;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser; import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -19,16 +24,15 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager; import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple3;
import reactor.util.retry.Retry; import reactor.util.retry.Retry;
import java.time.Duration;
public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommandWithDependencies { public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommandWithDependencies {
private static final int DEFAULT_MAX_CONCURRENCY = 16;
private static final String DRY_RUN_ARGUMENT = "dry-run"; private static final String DRY_RUN_ARGUMENT = "dry-run";
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency"; private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
private static final String SEGMENTS_ARGUMENT = "segments"; private static final String SEGMENTS_ARGUMENT = "segments";
private static final String BUFFER_ARGUMENT = "buffer";
private static final String RECORDS_INSPECTED_COUNTER_NAME = private static final String RECORDS_INSPECTED_COUNTER_NAME =
MetricsUtil.name(MigrateRegistrationRecoveryPasswordsCommand.class, "recordsInspected"); MetricsUtil.name(MigrateRegistrationRecoveryPasswordsCommand.class, "recordsInspected");
@ -63,7 +67,7 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand
subparser.addArgument("--max-concurrency") subparser.addArgument("--max-concurrency")
.type(Integer.class) .type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT) .dest(MAX_CONCURRENCY_ARGUMENT)
.setDefault(DEFAULT_MAX_CONCURRENCY) .setDefault(16)
.help("Max concurrency for DynamoDB operations"); .help("Max concurrency for DynamoDB operations");
subparser.addArgument("--segments") subparser.addArgument("--segments")
@ -72,6 +76,12 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand
.required(false) .required(false)
.setDefault(1) .setDefault(1)
.help("The total number of segments for a DynamoDB scan"); .help("The total number of segments for a DynamoDB scan");
subparser.addArgument("--buffer")
.type(Integer.class)
.dest(BUFFER_ARGUMENT)
.setDefault(16_384)
.help("Records to buffer");
} }
@Override @Override
@ -81,6 +91,7 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT); final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
final int maxConcurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT); final int maxConcurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
final int segments = namespace.getInt(SEGMENTS_ARGUMENT); final int segments = namespace.getInt(SEGMENTS_ARGUMENT);
final int bufferSize = namespace.getInt(BUFFER_ARGUMENT);
final Counter recordsInspectedCounter = final Counter recordsInspectedCounter =
Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun)); Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
@ -92,6 +103,14 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand
commandDependencies.registrationRecoveryPasswordsManager(); commandDependencies.registrationRecoveryPasswordsManager();
registrationRecoveryPasswordsManager.getE164AssociatedRegistrationRecoveryPasswords(segments, Schedulers.parallel()) registrationRecoveryPasswordsManager.getE164AssociatedRegistrationRecoveryPasswords(segments, Schedulers.parallel())
.buffer(bufferSize)
.map(source -> {
final List<Tuple3<String, SaltedTokenHash, Long>> shuffled = new ArrayList<>(source);
Collections.shuffle(shuffled);
return shuffled;
})
.limitRate(2)
.flatMapIterable(Function.identity())
.doOnNext(tuple -> recordsInspectedCounter.increment()) .doOnNext(tuple -> recordsInspectedCounter.increment())
.flatMap(tuple -> { .flatMap(tuple -> {
final String e164 = tuple.getT1(); final String e164 = tuple.getT1();