Use a segmented scan on a separate scheduler for registration recovery passwords

This commit is contained in:
Jon Chambers 2024-11-25 13:47:17 -05:00 committed by Jon Chambers
parent 27f5f94c60
commit 43ffc996db
4 changed files with 37 additions and 14 deletions

View File

@ -19,6 +19,7 @@ import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
@ -137,16 +138,26 @@ public class RegistrationRecoveryPasswords extends AbstractDynamoDbStore {
return clock.instant().plus(expiration).getEpochSecond();
}
public Flux<Tuple3<String, SaltedTokenHash, Long>> getE164AssociatedRegistrationRecoveryPasswords() {
return Flux.from(asyncClient.scanPaginator(ScanRequest.builder()
.tableName(tableName)
.consistentRead(true)
.filterExpression("begins_with(#key, :e164Prefix)")
.expressionAttributeNames(Map.of("#key", KEY_E164))
.expressionAttributeValues(Map.of(":e164Prefix", AttributeValue.fromS("+")))
.build())
.items())
.map(item -> Tuples.of(item.get(KEY_E164).s(), saltedTokenHashFromItem(item), Long.parseLong(item.get(ATTR_EXP).n())));
public Flux<Tuple3<String, SaltedTokenHash, Long>> getE164AssociatedRegistrationRecoveryPasswords(final int segments, final Scheduler scheduler) {
if (segments < 1) {
throw new IllegalArgumentException("Total number of segments must be positive");
}
return Flux.range(0, segments)
.parallel()
.runOn(scheduler)
.flatMap(segment -> asyncClient.scanPaginator(ScanRequest.builder()
.tableName(tableName)
.consistentRead(true)
.segment(segment)
.totalSegments(segments)
.filterExpression("begins_with(#key, :e164Prefix)")
.expressionAttributeNames(Map.of("#key", KEY_E164))
.expressionAttributeValues(Map.of(":e164Prefix", AttributeValue.fromS("+")))
.build())
.items()
.map(item -> Tuples.of(item.get(KEY_E164).s(), saltedTokenHashFromItem(item), Long.parseLong(item.get(ATTR_EXP).n()))))
.sequential();
}
public CompletableFuture<Boolean> insertPniRecord(final String phoneNumber,

View File

@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.SaltedTokenHash;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple3;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
@ -71,8 +72,8 @@ public class RegistrationRecoveryPasswordsManager {
}));
}
public Flux<Tuple3<String, SaltedTokenHash, Long>> getE164AssociatedRegistrationRecoveryPasswords() {
return registrationRecoveryPasswords.getE164AssociatedRegistrationRecoveryPasswords();
public Flux<Tuple3<String, SaltedTokenHash, Long>> getE164AssociatedRegistrationRecoveryPasswords(final int segments, final Scheduler scheduler) {
return registrationRecoveryPasswords.getE164AssociatedRegistrationRecoveryPasswords(segments, scheduler);
}
public CompletableFuture<Boolean> migrateE164Record(final String number, final SaltedTokenHash saltedTokenHash, final long expirationSeconds) {

View File

@ -18,6 +18,7 @@ import org.whispersystems.textsecuregcm.auth.SaltedTokenHash;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import java.time.Duration;
@ -27,6 +28,7 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand
private static final String DRY_RUN_ARGUMENT = "dry-run";
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
private static final String SEGMENTS_ARGUMENT = "segments";
private static final String RECORDS_INSPECTED_COUNTER_NAME =
MetricsUtil.name(MigrateRegistrationRecoveryPasswordsCommand.class, "recordsInspected");
@ -63,6 +65,13 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand
.dest(MAX_CONCURRENCY_ARGUMENT)
.setDefault(DEFAULT_MAX_CONCURRENCY)
.help("Max concurrency for DynamoDB operations");
subparser.addArgument("--segments")
.type(Integer.class)
.dest(SEGMENTS_ARGUMENT)
.required(false)
.setDefault(1)
.help("The total number of segments for a DynamoDB scan");
}
@Override
@ -71,6 +80,7 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
final int maxConcurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
final int segments = namespace.getInt(SEGMENTS_ARGUMENT);
final Counter recordsInspectedCounter =
Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
@ -81,7 +91,7 @@ public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommand
final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager =
commandDependencies.registrationRecoveryPasswordsManager();
registrationRecoveryPasswordsManager.getE164AssociatedRegistrationRecoveryPasswords()
registrationRecoveryPasswordsManager.getE164AssociatedRegistrationRecoveryPasswords(segments, Schedulers.parallel())
.doOnNext(tuple -> recordsInspectedCounter.increment())
.flatMap(tuple -> {
final String e164 = tuple.getT1();

View File

@ -39,6 +39,7 @@ import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.MockUtils;
import org.whispersystems.textsecuregcm.util.MutableClock;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuples;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
@ -194,7 +195,7 @@ public class RegistrationRecoveryTest {
registrationRecoveryPasswords.addOrReplace(NUMBER, PNI, ORIGINAL_HASH).join();
assertEquals(List.of(Tuples.of(NUMBER, ORIGINAL_HASH, registrationRecoveryPasswords.expirationSeconds())),
registrationRecoveryPasswords.getE164AssociatedRegistrationRecoveryPasswords().collectList().block());
registrationRecoveryPasswords.getE164AssociatedRegistrationRecoveryPasswords(2, Schedulers.parallel()).collectList().block());
}
@Test