Add a command to migrate registration recovery passwords to PNI-associated records
This commit is contained in:
parent
af1d21c225
commit
717fb57a14
|
@ -262,6 +262,7 @@ import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand
|
|||
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory;
|
||||
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.MigrateRegistrationRecoveryPasswordsCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
|
||||
|
@ -329,6 +330,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
bootstrap.addCommand(new ProcessScheduledJobsServiceCommand("process-idle-device-notification-jobs",
|
||||
"Processes scheduled jobs to send notifications to idle devices",
|
||||
new IdleDeviceNotificationSchedulerFactory()));
|
||||
|
||||
bootstrap.addCommand(new MigrateRegistrationRecoveryPasswordsCommand());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,6 +16,8 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.SaltedTokenHash;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.util.function.Tuple3;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
||||
|
||||
public class RegistrationRecoveryPasswordsManager {
|
||||
|
@ -69,6 +71,10 @@ public class RegistrationRecoveryPasswordsManager {
|
|||
}));
|
||||
}
|
||||
|
||||
public Flux<Tuple3<String, SaltedTokenHash, Long>> getE164AssociatedRegistrationRecoveryPasswords() {
|
||||
return registrationRecoveryPasswords.getE164AssociatedRegistrationRecoveryPasswords();
|
||||
}
|
||||
|
||||
public CompletableFuture<Boolean> migrateE164Record(final String number, final SaltedTokenHash saltedTokenHash, final long expirationSeconds) {
|
||||
return phoneNumberIdentifiers.getPhoneNumberIdentifier(number)
|
||||
.thenCompose(phoneNumberIdentifier -> migrateE164Record(number, phoneNumberIdentifier, saltedTokenHash, expirationSeconds, 10));
|
||||
|
|
|
@ -78,6 +78,7 @@ record CommandDependencies(
|
|||
MessagesCache messagesCache,
|
||||
MessagesManager messagesManager,
|
||||
KeysManager keysManager,
|
||||
RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager,
|
||||
APNSender apnSender,
|
||||
FcmSender fcmSender,
|
||||
PushNotificationManager pushNotificationManager,
|
||||
|
@ -277,6 +278,7 @@ record CommandDependencies(
|
|||
messagesCache,
|
||||
messagesManager,
|
||||
keys,
|
||||
registrationRecoveryPasswordsManager,
|
||||
apnSender,
|
||||
fcmSender,
|
||||
pushNotificationManager,
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.workers;
|
||||
|
||||
import io.dropwizard.core.Application;
|
||||
import io.dropwizard.core.setup.Environment;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import net.sourceforge.argparse4j.inf.Namespace;
|
||||
import net.sourceforge.argparse4j.inf.Subparser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.auth.SaltedTokenHash;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommandWithDependencies {
|
||||
|
||||
private static final int DEFAULT_MAX_CONCURRENCY = 16;
|
||||
|
||||
private static final String DRY_RUN_ARGUMENT = "dry-run";
|
||||
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
|
||||
|
||||
private static final String RECORDS_INSPECTED_COUNTER_NAME =
|
||||
MetricsUtil.name(MigrateRegistrationRecoveryPasswordsCommand.class, "recordsInspected");
|
||||
|
||||
private static final String RECORDS_MIGRATED_COUNTER_NAME =
|
||||
MetricsUtil.name(MigrateRegistrationRecoveryPasswordsCommand.class, "recordsMigrated");
|
||||
|
||||
private static final String DRY_RUN_TAG = "dryRun";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MigrateRegistrationRecoveryPasswordsCommand.class);
|
||||
|
||||
public MigrateRegistrationRecoveryPasswordsCommand() {
|
||||
|
||||
super(new Application<>() {
|
||||
@Override
|
||||
public void run(final WhisperServerConfiguration configuration, final Environment environment) {
|
||||
}
|
||||
}, "migrate-registration-recovery-passwords", "Migrate e164-based registration recovery passwords to PNI-based records");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Subparser subparser) {
|
||||
super.configure(subparser);
|
||||
|
||||
subparser.addArgument("--dry-run")
|
||||
.type(Boolean.class)
|
||||
.dest(DRY_RUN_ARGUMENT)
|
||||
.required(false)
|
||||
.setDefault(true)
|
||||
.help("If true, don’t actually modify accounts with expired linked devices");
|
||||
|
||||
subparser.addArgument("--max-concurrency")
|
||||
.type(Integer.class)
|
||||
.dest(MAX_CONCURRENCY_ARGUMENT)
|
||||
.setDefault(DEFAULT_MAX_CONCURRENCY)
|
||||
.help("Max concurrency for DynamoDB operations");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void run(final Environment environment, final Namespace namespace,
|
||||
final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
|
||||
|
||||
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
|
||||
final int maxConcurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
|
||||
|
||||
final Counter recordsInspectedCounter =
|
||||
Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
|
||||
|
||||
final Counter recordsMigratedCounter =
|
||||
Metrics.counter(RECORDS_MIGRATED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
|
||||
|
||||
final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager =
|
||||
commandDependencies.registrationRecoveryPasswordsManager();
|
||||
|
||||
registrationRecoveryPasswordsManager.getE164AssociatedRegistrationRecoveryPasswords()
|
||||
.doOnNext(tuple -> recordsInspectedCounter.increment())
|
||||
.flatMap(tuple -> {
|
||||
final String e164 = tuple.getT1();
|
||||
final SaltedTokenHash saltedTokenHash = tuple.getT2();
|
||||
final long expiration = tuple.getT3();
|
||||
|
||||
return dryRun
|
||||
? Mono.fromFuture(() -> registrationRecoveryPasswordsManager.migrateE164Record(e164, saltedTokenHash, expiration))
|
||||
.onErrorResume(throwable -> {
|
||||
logger.warn("Failed to migrate record for {}", e164, throwable);
|
||||
return Mono.empty();
|
||||
})
|
||||
: Mono.just(false);
|
||||
}, maxConcurrency)
|
||||
.filter(migrated -> migrated)
|
||||
.doOnNext(ignored -> recordsMigratedCounter.increment())
|
||||
.then()
|
||||
.block();
|
||||
}
|
||||
}
|
|
@ -73,6 +73,7 @@ class FinishPushNotificationExperimentCommandTest {
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
pushNotificationExperimentSamples,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -66,6 +66,7 @@ class NotifyIdleDevicesCommandTest {
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null);
|
||||
|
||||
this.idleDeviceNotificationScheduler = idleDeviceNotificationScheduler;
|
||||
|
|
|
@ -62,6 +62,7 @@ class StartPushNotificationExperimentCommandTest {
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
pushNotificationExperimentSamples,
|
||||
null,
|
||||
null,
|
||||
|
|
Loading…
Reference in New Issue