diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 9fe6daa9a..c2e30cbce 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -259,6 +259,7 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; import org.whispersystems.textsecuregcm.workers.BackupMetricsCommand; import org.whispersystems.textsecuregcm.workers.CertificateCommand; import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand; +import org.whispersystems.textsecuregcm.workers.DeleteE164RegistrationRecoveryPasswordsCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory; import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand; @@ -332,6 +333,7 @@ public class WhisperServerService extends Application removeEntry(final String number) { + return asyncClient.deleteItem(DeleteItemRequest.builder() + .tableName(tableName) + .key(Map.of(KEY_PNI, AttributeValues.fromString(number))) + .build()) + .thenRun(Util.NOOP); + } + public CompletableFuture removeEntry(final UUID phoneNumberIdentifier) { return asyncClient.deleteItem(DeleteItemRequest.builder() .tableName(tableName) @@ -89,6 +104,36 @@ public class RegistrationRecoveryPasswords { return clock.instant().plus(expiration).getEpochSecond(); } + Flux getE164sWithRegistrationRecoveryPasswords(final int segments, final int bufferSize, final Scheduler scheduler) { + if (segments < 1) { + throw new IllegalArgumentException("Total number of segments must be positive"); + } + + return Flux.range(0, segments) + .parallel() + .runOn(scheduler) + .flatMap(segment -> asyncClient.scanPaginator(ScanRequest.builder() + .tableName(tableName) + .consistentRead(true) + .segment(segment) + .totalSegments(segments) + .filterExpression("begins_with(#key, :e164Prefix)") + .expressionAttributeNames(Map.of("#key", KEY_PNI)) + .expressionAttributeValues(Map.of(":e164Prefix", AttributeValue.fromS("+"))) + .build()) + .items() + .map(item -> item.get(KEY_PNI).s())) + .sequential() + .buffer(bufferSize) + .map(source -> { + final List shuffled = new ArrayList<>(source); + Collections.shuffle(shuffled); + return shuffled; + }) + .limitRate(2) + .flatMapIterable(Function.identity()); + } + private static SaltedTokenHash saltedTokenHashFromItem(final Map item) { return new SaltedTokenHash(item.get(ATTR_HASH).s(), item.get(ATTR_SALT).s()); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryPasswordsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryPasswordsManager.java index 09e73a917..d7a42d094 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryPasswordsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryPasswordsManager.java @@ -15,6 +15,8 @@ import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.SaltedTokenHash; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; public class RegistrationRecoveryPasswordsManager { @@ -50,6 +52,18 @@ public class RegistrationRecoveryPasswordsManager { }); } + public CompletableFuture removeForE164(final String number) { + return registrationRecoveryPasswords.removeEntry(number) + .whenComplete((ignored, error) -> { + if (error instanceof ResourceNotFoundException) { + // These will naturally happen if a recovery password is already deleted. Since we can remove + // the recovery password through many flows, we avoid creating log messages for these exceptions + } else if (error != null) { + logger.warn("Failed to remove Registration Recovery Password", error); + } + }); + } + public CompletableFuture remove(final UUID phoneNumberIdentifier) { return registrationRecoveryPasswords.removeEntry(phoneNumberIdentifier) .whenComplete((ignored, error) -> { @@ -62,6 +76,10 @@ public class RegistrationRecoveryPasswordsManager { }); } + public Flux getE164sWithRegistrationRecoveryPasswords(final int segments, final int bufferSize, final Scheduler scheduler) { + return registrationRecoveryPasswords.getE164sWithRegistrationRecoveryPasswords(segments, bufferSize, scheduler); + } + private static String bytesToString(final byte[] bytes) { return HexFormat.of().formatHex(bytes); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteE164RegistrationRecoveryPasswordsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteE164RegistrationRecoveryPasswordsCommand.java new file mode 100644 index 000000000..f232bbd8b --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteE164RegistrationRecoveryPasswordsCommand.java @@ -0,0 +1,116 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.workers; + +import io.dropwizard.core.Application; +import io.dropwizard.core.setup.Environment; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import java.time.Duration; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; +import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; + +public class DeleteE164RegistrationRecoveryPasswordsCommand extends AbstractCommandWithDependencies { + + private static final String DRY_RUN_ARGUMENT = "dry-run"; + private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency"; + private static final String SEGMENTS_ARGUMENT = "segments"; + private static final String BUFFER_ARGUMENT = "buffer"; + + private static final String RECORDS_INSPECTED_COUNTER_NAME = + MetricsUtil.name(DeleteE164RegistrationRecoveryPasswordsCommand.class, "recordsInspected"); + + private static final String RECORDS_DELETED_COUNTER_NAME = + MetricsUtil.name(DeleteE164RegistrationRecoveryPasswordsCommand.class, "recordsDeleted"); + + private static final String DRY_RUN_TAG = "dryRun"; + + private static final Logger logger = LoggerFactory.getLogger(DeleteE164RegistrationRecoveryPasswordsCommand.class); + + public DeleteE164RegistrationRecoveryPasswordsCommand() { + + super(new Application<>() { + @Override + public void run(final WhisperServerConfiguration configuration, final Environment environment) { + } + }, "delete-e164-registration-recovery-passwords", "Delete e164-associated registration recovery passwords"); + } + + @Override + public void configure(final Subparser subparser) { + super.configure(subparser); + + subparser.addArgument("--dry-run") + .type(Boolean.class) + .dest(DRY_RUN_ARGUMENT) + .required(false) + .setDefault(true) + .help("If true, don’t actually delete any registration recovery passwords"); + + subparser.addArgument("--max-concurrency") + .type(Integer.class) + .dest(MAX_CONCURRENCY_ARGUMENT) + .setDefault(16) + .help("Max concurrency for DynamoDB operations"); + + subparser.addArgument("--segments") + .type(Integer.class) + .dest(SEGMENTS_ARGUMENT) + .required(false) + .setDefault(1) + .help("The total number of segments for a DynamoDB scan"); + + subparser.addArgument("--buffer") + .type(Integer.class) + .dest(BUFFER_ARGUMENT) + .setDefault(16_384) + .help("Records to buffer"); + } + + @Override + protected void run(final Environment environment, final Namespace namespace, + final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception { + + final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT); + final int maxConcurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT); + final int segments = namespace.getInt(SEGMENTS_ARGUMENT); + final int bufferSize = namespace.getInt(BUFFER_ARGUMENT); + + final Counter recordsInspectedCounter = + Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun)); + + final Counter recordsDeletedCounter = + Metrics.counter(RECORDS_DELETED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun)); + + final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager = + commandDependencies.registrationRecoveryPasswordsManager(); + + registrationRecoveryPasswordsManager.getE164sWithRegistrationRecoveryPasswords(segments, bufferSize, Schedulers.parallel()) + .doOnNext(e164 -> recordsInspectedCounter.increment()) + .flatMap(e164 -> { + final Mono deleteMono = dryRun + ? Mono.empty() + : Mono.fromFuture(() -> registrationRecoveryPasswordsManager.removeForE164(e164)) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) + .onErrorResume(throwable -> { + logger.warn("Failed to migrate record for {}", e164, throwable); + return Mono.empty(); + }); + + return deleteMono.doOnSuccess(ignored -> recordsDeletedCounter.increment()); + }, maxConcurrency) + .then() + .block(); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryTest.java index c8c5433eb..4d60522ce 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RegistrationRecoveryTest.java @@ -13,11 +13,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.charset.StandardCharsets; import java.time.Clock; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import com.google.i18n.phonenumbers.PhoneNumberUtil; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -26,8 +28,10 @@ import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.MockUtils; import org.whispersystems.textsecuregcm.util.MutableClock; +import reactor.core.scheduler.Schedulers; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; public class RegistrationRecoveryTest { @@ -138,6 +142,35 @@ public class RegistrationRecoveryTest { assertFalse(manager.verify(PNI, wrongPassword).get()); } + @Test + void getE164AssociatedRegistrationRecoveryPasswords() { + final String phoneNumber = PhoneNumberUtil.getInstance().format( + PhoneNumberUtil.getInstance().getExampleNumber("US"), + PhoneNumberUtil.PhoneNumberFormat.E164); + + DYNAMO_DB_EXTENSION.getDynamoDbClient().putItem(PutItemRequest.builder() + .tableName(Tables.REGISTRATION_RECOVERY_PASSWORDS.tableName()) + .item(Map.of( + RegistrationRecoveryPasswords.KEY_PNI, AttributeValues.fromString(PNI.toString()), + RegistrationRecoveryPasswords.ATTR_EXP, AttributeValues.fromLong(registrationRecoveryPasswords.expirationSeconds()), + RegistrationRecoveryPasswords.ATTR_SALT, AttributeValues.fromString(ORIGINAL_HASH.salt()), + RegistrationRecoveryPasswords.ATTR_HASH, AttributeValues.fromString(ORIGINAL_HASH.hash()))) + .build()); + + DYNAMO_DB_EXTENSION.getDynamoDbClient().putItem(PutItemRequest.builder() + .tableName(Tables.REGISTRATION_RECOVERY_PASSWORDS.tableName()) + .item(Map.of( + RegistrationRecoveryPasswords.KEY_PNI, AttributeValues.fromString(phoneNumber), + RegistrationRecoveryPasswords.ATTR_EXP, AttributeValues.fromLong(registrationRecoveryPasswords.expirationSeconds()), + RegistrationRecoveryPasswords.ATTR_SALT, AttributeValues.fromString(ORIGINAL_HASH.salt()), + RegistrationRecoveryPasswords.ATTR_HASH, AttributeValues.fromString(ORIGINAL_HASH.hash()))) + .build()); + + assertEquals(List.of(phoneNumber), + registrationRecoveryPasswords.getE164sWithRegistrationRecoveryPasswords(2, 2, Schedulers.parallel()) + .collectList().block()); + } + private static long fetchTimestamp(final UUID phoneNumberIdentifier) throws ExecutionException, InterruptedException { return DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient().getItem(GetItemRequest.builder() .tableName(Tables.REGISTRATION_RECOVERY_PASSWORDS.tableName())