Retire `DeleteE164RegistrationRecoveryPasswordsCommand`
This commit is contained in:
parent
c7e6ee7297
commit
916314233f
|
@ -260,7 +260,6 @@ import org.whispersystems.textsecuregcm.workers.BackfillBeninPhoneNumberFormsCom
|
|||
import org.whispersystems.textsecuregcm.workers.BackupMetricsCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.DeleteE164RegistrationRecoveryPasswordsCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory;
|
||||
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
|
||||
|
@ -332,7 +331,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
"Processes scheduled jobs to send notifications to idle devices",
|
||||
new IdleDeviceNotificationSchedulerFactory()));
|
||||
|
||||
bootstrap.addCommand(new DeleteE164RegistrationRecoveryPasswordsCommand());
|
||||
bootstrap.addCommand(new BackfillBeninPhoneNumberFormsCommand());
|
||||
}
|
||||
|
||||
|
|
|
@ -10,25 +10,18 @@ import static java.util.Objects.requireNonNull;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Function;
|
||||
import org.whispersystems.textsecuregcm.auth.SaltedTokenHash;
|
||||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
|
||||
|
||||
public class RegistrationRecoveryPasswords {
|
||||
|
||||
|
@ -83,14 +76,6 @@ public class RegistrationRecoveryPasswords {
|
|||
.thenRun(Util.NOOP);
|
||||
}
|
||||
|
||||
CompletableFuture<Void> removeEntry(final String number) {
|
||||
return asyncClient.deleteItem(DeleteItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.key(Map.of(KEY_PNI, AttributeValues.fromString(number)))
|
||||
.build())
|
||||
.thenRun(Util.NOOP);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> removeEntry(final UUID phoneNumberIdentifier) {
|
||||
return asyncClient.deleteItem(DeleteItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
|
@ -104,36 +89,6 @@ public class RegistrationRecoveryPasswords {
|
|||
return clock.instant().plus(expiration).getEpochSecond();
|
||||
}
|
||||
|
||||
Flux<String> getE164sWithRegistrationRecoveryPasswords(final int segments, final int bufferSize, final Scheduler scheduler) {
|
||||
if (segments < 1) {
|
||||
throw new IllegalArgumentException("Total number of segments must be positive");
|
||||
}
|
||||
|
||||
return Flux.range(0, segments)
|
||||
.parallel()
|
||||
.runOn(scheduler)
|
||||
.flatMap(segment -> asyncClient.scanPaginator(ScanRequest.builder()
|
||||
.tableName(tableName)
|
||||
.consistentRead(true)
|
||||
.segment(segment)
|
||||
.totalSegments(segments)
|
||||
.filterExpression("begins_with(#key, :e164Prefix)")
|
||||
.expressionAttributeNames(Map.of("#key", KEY_PNI))
|
||||
.expressionAttributeValues(Map.of(":e164Prefix", AttributeValue.fromS("+")))
|
||||
.build())
|
||||
.items()
|
||||
.map(item -> item.get(KEY_PNI).s()))
|
||||
.sequential()
|
||||
.buffer(bufferSize)
|
||||
.map(source -> {
|
||||
final List<String> shuffled = new ArrayList<>(source);
|
||||
Collections.shuffle(shuffled);
|
||||
return shuffled;
|
||||
})
|
||||
.limitRate(2)
|
||||
.flatMapIterable(Function.identity());
|
||||
}
|
||||
|
||||
private static SaltedTokenHash saltedTokenHashFromItem(final Map<String, AttributeValue> item) {
|
||||
return new SaltedTokenHash(item.get(ATTR_HASH).s(), item.get(ATTR_SALT).s());
|
||||
}
|
||||
|
|
|
@ -15,8 +15,6 @@ import java.util.concurrent.CompletableFuture;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.SaltedTokenHash;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
||||
|
||||
public class RegistrationRecoveryPasswordsManager {
|
||||
|
@ -52,18 +50,6 @@ public class RegistrationRecoveryPasswordsManager {
|
|||
});
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> removeForE164(final String number) {
|
||||
return registrationRecoveryPasswords.removeEntry(number)
|
||||
.whenComplete((ignored, error) -> {
|
||||
if (error instanceof ResourceNotFoundException) {
|
||||
// These will naturally happen if a recovery password is already deleted. Since we can remove
|
||||
// the recovery password through many flows, we avoid creating log messages for these exceptions
|
||||
} else if (error != null) {
|
||||
logger.warn("Failed to remove Registration Recovery Password", error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> remove(final UUID phoneNumberIdentifier) {
|
||||
return registrationRecoveryPasswords.removeEntry(phoneNumberIdentifier)
|
||||
.whenComplete((ignored, error) -> {
|
||||
|
@ -76,10 +62,6 @@ public class RegistrationRecoveryPasswordsManager {
|
|||
});
|
||||
}
|
||||
|
||||
public Flux<String> getE164sWithRegistrationRecoveryPasswords(final int segments, final int bufferSize, final Scheduler scheduler) {
|
||||
return registrationRecoveryPasswords.getE164sWithRegistrationRecoveryPasswords(segments, bufferSize, scheduler);
|
||||
}
|
||||
|
||||
private static String bytesToString(final byte[] bytes) {
|
||||
return HexFormat.of().formatHex(bytes);
|
||||
}
|
||||
|
|
|
@ -1,116 +0,0 @@
|
|||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.workers;
|
||||
|
||||
import io.dropwizard.core.Application;
|
||||
import io.dropwizard.core.setup.Environment;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import java.time.Duration;
|
||||
import net.sourceforge.argparse4j.inf.Namespace;
|
||||
import net.sourceforge.argparse4j.inf.Subparser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.retry.Retry;
|
||||
|
||||
public class DeleteE164RegistrationRecoveryPasswordsCommand extends AbstractCommandWithDependencies {
|
||||
|
||||
private static final String DRY_RUN_ARGUMENT = "dry-run";
|
||||
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
|
||||
private static final String SEGMENTS_ARGUMENT = "segments";
|
||||
private static final String BUFFER_ARGUMENT = "buffer";
|
||||
|
||||
private static final String RECORDS_INSPECTED_COUNTER_NAME =
|
||||
MetricsUtil.name(DeleteE164RegistrationRecoveryPasswordsCommand.class, "recordsInspected");
|
||||
|
||||
private static final String RECORDS_DELETED_COUNTER_NAME =
|
||||
MetricsUtil.name(DeleteE164RegistrationRecoveryPasswordsCommand.class, "recordsDeleted");
|
||||
|
||||
private static final String DRY_RUN_TAG = "dryRun";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DeleteE164RegistrationRecoveryPasswordsCommand.class);
|
||||
|
||||
public DeleteE164RegistrationRecoveryPasswordsCommand() {
|
||||
|
||||
super(new Application<>() {
|
||||
@Override
|
||||
public void run(final WhisperServerConfiguration configuration, final Environment environment) {
|
||||
}
|
||||
}, "delete-e164-registration-recovery-passwords", "Delete e164-associated registration recovery passwords");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Subparser subparser) {
|
||||
super.configure(subparser);
|
||||
|
||||
subparser.addArgument("--dry-run")
|
||||
.type(Boolean.class)
|
||||
.dest(DRY_RUN_ARGUMENT)
|
||||
.required(false)
|
||||
.setDefault(true)
|
||||
.help("If true, don’t actually delete any registration recovery passwords");
|
||||
|
||||
subparser.addArgument("--max-concurrency")
|
||||
.type(Integer.class)
|
||||
.dest(MAX_CONCURRENCY_ARGUMENT)
|
||||
.setDefault(16)
|
||||
.help("Max concurrency for DynamoDB operations");
|
||||
|
||||
subparser.addArgument("--segments")
|
||||
.type(Integer.class)
|
||||
.dest(SEGMENTS_ARGUMENT)
|
||||
.required(false)
|
||||
.setDefault(1)
|
||||
.help("The total number of segments for a DynamoDB scan");
|
||||
|
||||
subparser.addArgument("--buffer")
|
||||
.type(Integer.class)
|
||||
.dest(BUFFER_ARGUMENT)
|
||||
.setDefault(16_384)
|
||||
.help("Records to buffer");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void run(final Environment environment, final Namespace namespace,
|
||||
final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
|
||||
|
||||
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
|
||||
final int maxConcurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
|
||||
final int segments = namespace.getInt(SEGMENTS_ARGUMENT);
|
||||
final int bufferSize = namespace.getInt(BUFFER_ARGUMENT);
|
||||
|
||||
final Counter recordsInspectedCounter =
|
||||
Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
|
||||
|
||||
final Counter recordsDeletedCounter =
|
||||
Metrics.counter(RECORDS_DELETED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
|
||||
|
||||
final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager =
|
||||
commandDependencies.registrationRecoveryPasswordsManager();
|
||||
|
||||
registrationRecoveryPasswordsManager.getE164sWithRegistrationRecoveryPasswords(segments, bufferSize, Schedulers.parallel())
|
||||
.doOnNext(e164 -> recordsInspectedCounter.increment())
|
||||
.flatMap(e164 -> {
|
||||
final Mono<Void> deleteMono = dryRun
|
||||
? Mono.empty()
|
||||
: Mono.fromFuture(() -> registrationRecoveryPasswordsManager.removeForE164(e164))
|
||||
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
|
||||
.onErrorResume(throwable -> {
|
||||
logger.warn("Failed to migrate record for {}", e164, throwable);
|
||||
return Mono.empty();
|
||||
});
|
||||
|
||||
return deleteMono.doOnSuccess(ignored -> recordsDeletedCounter.increment());
|
||||
}, maxConcurrency)
|
||||
.then()
|
||||
.block();
|
||||
}
|
||||
}
|
|
@ -13,13 +13,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import com.google.i18n.phonenumbers.PhoneNumberUtil;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
@ -28,10 +26,8 @@ import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables;
|
|||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
import org.whispersystems.textsecuregcm.util.MockUtils;
|
||||
import org.whispersystems.textsecuregcm.util.MutableClock;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||
|
||||
public class RegistrationRecoveryTest {
|
||||
|
||||
|
@ -142,35 +138,6 @@ public class RegistrationRecoveryTest {
|
|||
assertFalse(manager.verify(PNI, wrongPassword).get());
|
||||
}
|
||||
|
||||
@Test
|
||||
void getE164AssociatedRegistrationRecoveryPasswords() {
|
||||
final String phoneNumber = PhoneNumberUtil.getInstance().format(
|
||||
PhoneNumberUtil.getInstance().getExampleNumber("US"),
|
||||
PhoneNumberUtil.PhoneNumberFormat.E164);
|
||||
|
||||
DYNAMO_DB_EXTENSION.getDynamoDbClient().putItem(PutItemRequest.builder()
|
||||
.tableName(Tables.REGISTRATION_RECOVERY_PASSWORDS.tableName())
|
||||
.item(Map.of(
|
||||
RegistrationRecoveryPasswords.KEY_PNI, AttributeValues.fromString(PNI.toString()),
|
||||
RegistrationRecoveryPasswords.ATTR_EXP, AttributeValues.fromLong(registrationRecoveryPasswords.expirationSeconds()),
|
||||
RegistrationRecoveryPasswords.ATTR_SALT, AttributeValues.fromString(ORIGINAL_HASH.salt()),
|
||||
RegistrationRecoveryPasswords.ATTR_HASH, AttributeValues.fromString(ORIGINAL_HASH.hash())))
|
||||
.build());
|
||||
|
||||
DYNAMO_DB_EXTENSION.getDynamoDbClient().putItem(PutItemRequest.builder()
|
||||
.tableName(Tables.REGISTRATION_RECOVERY_PASSWORDS.tableName())
|
||||
.item(Map.of(
|
||||
RegistrationRecoveryPasswords.KEY_PNI, AttributeValues.fromString(phoneNumber),
|
||||
RegistrationRecoveryPasswords.ATTR_EXP, AttributeValues.fromLong(registrationRecoveryPasswords.expirationSeconds()),
|
||||
RegistrationRecoveryPasswords.ATTR_SALT, AttributeValues.fromString(ORIGINAL_HASH.salt()),
|
||||
RegistrationRecoveryPasswords.ATTR_HASH, AttributeValues.fromString(ORIGINAL_HASH.hash())))
|
||||
.build());
|
||||
|
||||
assertEquals(List.of(phoneNumber),
|
||||
registrationRecoveryPasswords.getE164sWithRegistrationRecoveryPasswords(2, 2, Schedulers.parallel())
|
||||
.collectList().block());
|
||||
}
|
||||
|
||||
private static long fetchTimestamp(final UUID phoneNumberIdentifier) throws ExecutionException, InterruptedException {
|
||||
return DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient().getItem(GetItemRequest.builder()
|
||||
.tableName(Tables.REGISTRATION_RECOVERY_PASSWORDS.tableName())
|
||||
|
|
Loading…
Reference in New Issue