From 43ffc996db63558c030b4cc8adac0c3cdfd4e424 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Mon, 25 Nov 2024 13:47:17 -0500 Subject: [PATCH] Use a segmented scan on a separate scheduler for registration recovery passwords --- .../RegistrationRecoveryPasswords.java | 31 +++++++++++++------ .../RegistrationRecoveryPasswordsManager.java | 5 +-- ...eRegistrationRecoveryPasswordsCommand.java | 12 ++++++- .../storage/RegistrationRecoveryTest.java | 3 +- 4 files changed, 37 insertions(+), 14 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryPasswords.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryPasswords.java index 3d2dcb3ab..f98220c07 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryPasswords.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryPasswords.java @@ -19,6 +19,7 @@ import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.ExceptionUtils; import org.whispersystems.textsecuregcm.util.Util; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -137,16 +138,26 @@ public class RegistrationRecoveryPasswords extends AbstractDynamoDbStore { return clock.instant().plus(expiration).getEpochSecond(); } - public Flux> getE164AssociatedRegistrationRecoveryPasswords() { - return Flux.from(asyncClient.scanPaginator(ScanRequest.builder() - .tableName(tableName) - .consistentRead(true) - .filterExpression("begins_with(#key, :e164Prefix)") - .expressionAttributeNames(Map.of("#key", KEY_E164)) - .expressionAttributeValues(Map.of(":e164Prefix", AttributeValue.fromS("+"))) - .build()) - .items()) - .map(item -> Tuples.of(item.get(KEY_E164).s(), saltedTokenHashFromItem(item), Long.parseLong(item.get(ATTR_EXP).n()))); + public Flux> getE164AssociatedRegistrationRecoveryPasswords(final int segments, final Scheduler scheduler) { + if (segments < 1) { + throw new IllegalArgumentException("Total number of segments must be positive"); + } + + return Flux.range(0, segments) + .parallel() + .runOn(scheduler) + .flatMap(segment -> asyncClient.scanPaginator(ScanRequest.builder() + .tableName(tableName) + .consistentRead(true) + .segment(segment) + .totalSegments(segments) + .filterExpression("begins_with(#key, :e164Prefix)") + .expressionAttributeNames(Map.of("#key", KEY_E164)) + .expressionAttributeValues(Map.of(":e164Prefix", AttributeValue.fromS("+"))) + .build()) + .items() + .map(item -> Tuples.of(item.get(KEY_E164).s(), saltedTokenHashFromItem(item), Long.parseLong(item.get(ATTR_EXP).n())))) + .sequential(); } public CompletableFuture insertPniRecord(final String phoneNumber, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryPasswordsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryPasswordsManager.java index 81eb33bfa..89d860c4e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryPasswordsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryPasswordsManager.java @@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.SaltedTokenHash; import org.whispersystems.textsecuregcm.util.ExceptionUtils; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; import reactor.util.function.Tuple3; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; @@ -71,8 +72,8 @@ public class RegistrationRecoveryPasswordsManager { })); } - public Flux> getE164AssociatedRegistrationRecoveryPasswords() { - return registrationRecoveryPasswords.getE164AssociatedRegistrationRecoveryPasswords(); + public Flux> getE164AssociatedRegistrationRecoveryPasswords(final int segments, final Scheduler scheduler) { + return registrationRecoveryPasswords.getE164AssociatedRegistrationRecoveryPasswords(segments, scheduler); } public CompletableFuture migrateE164Record(final String number, final SaltedTokenHash saltedTokenHash, final long expirationSeconds) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateRegistrationRecoveryPasswordsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateRegistrationRecoveryPasswordsCommand.java index 9c91182d7..6e530dd9e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateRegistrationRecoveryPasswordsCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateRegistrationRecoveryPasswordsCommand.java @@ -18,6 +18,7 @@ import org.whispersystems.textsecuregcm.auth.SaltedTokenHash; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; import java.time.Duration; @@ -27,6 +28,7 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand private static final String DRY_RUN_ARGUMENT = "dry-run"; private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency"; + private static final String SEGMENTS_ARGUMENT = "segments"; private static final String RECORDS_INSPECTED_COUNTER_NAME = MetricsUtil.name(MigrateRegistrationRecoveryPasswordsCommand.class, "recordsInspected"); @@ -63,6 +65,13 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand .dest(MAX_CONCURRENCY_ARGUMENT) .setDefault(DEFAULT_MAX_CONCURRENCY) .help("Max concurrency for DynamoDB operations"); + + subparser.addArgument("--segments") + .type(Integer.class) + .dest(SEGMENTS_ARGUMENT) + .required(false) + .setDefault(1) + .help("The total number of segments for a DynamoDB scan"); } @Override @@ -71,6 +80,7 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT); final int maxConcurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT); + final int segments = namespace.getInt(SEGMENTS_ARGUMENT); final Counter recordsInspectedCounter = Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun)); @@ -81,7 +91,7 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager = commandDependencies.registrationRecoveryPasswordsManager(); - registrationRecoveryPasswordsManager.getE164AssociatedRegistrationRecoveryPasswords() + registrationRecoveryPasswordsManager.getE164AssociatedRegistrationRecoveryPasswords(segments, Schedulers.parallel()) .doOnNext(tuple -> recordsInspectedCounter.increment()) .flatMap(tuple -> { final String e164 = tuple.getT1(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryTest.java index 497bcc7f1..d723671cc 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryTest.java @@ -39,6 +39,7 @@ import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.MockUtils; import org.whispersystems.textsecuregcm.util.MutableClock; +import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuples; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; @@ -194,7 +195,7 @@ public class RegistrationRecoveryTest { registrationRecoveryPasswords.addOrReplace(NUMBER, PNI, ORIGINAL_HASH).join(); assertEquals(List.of(Tuples.of(NUMBER, ORIGINAL_HASH, registrationRecoveryPasswords.expirationSeconds())), - registrationRecoveryPasswords.getE164AssociatedRegistrationRecoveryPasswords().collectList().block()); + registrationRecoveryPasswords.getE164AssociatedRegistrationRecoveryPasswords(2, Schedulers.parallel()).collectList().block()); } @Test