diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 9fe5407cb..d4a1f17dc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -264,6 +264,7 @@ import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerF import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand; import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand; import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand; +import org.whispersystems.textsecuregcm.workers.RemoveE164RecentlyDeletedAccountsCommand; import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand; import org.whispersystems.textsecuregcm.workers.RemoveExpiredBackupsCommand; import org.whispersystems.textsecuregcm.workers.RemoveExpiredLinkedDevicesCommand; @@ -329,6 +330,8 @@ public class WhisperServerService extends Application getE164sForRecentlyDeletedAccounts(final int segments, final Scheduler scheduler) { + CompletableFuture removeRecentlyDeletedAccountRecord(final String e164) { + return asyncClient.deleteItem(DeleteItemRequest.builder() + .tableName(deletedAccountsTableName) + .key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_PNI, AttributeValues.fromString(e164))) + .build()) + .thenRun(Util.NOOP); + } + + Flux getE164sForRecentlyDeletedAccounts(final int segments, final Scheduler scheduler) { if (segments < 1) { throw new IllegalArgumentException("Total number of segments must be positive"); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 2fb5a2f5f..338df8147 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -90,7 +90,6 @@ import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.Util; import reactor.core.publisher.Flux; import reactor.core.scheduler.Scheduler; -import reactor.util.function.Tuple3; import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem; import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException; @@ -1213,6 +1212,14 @@ public class AccountsManager extends RedisPubSubAdapter implemen return accounts.findRecentlyDeletedPhoneNumberIdentifier(accountIdentifier); } + public Flux getE164sForRecentlyDeletedAccounts(final int segments, final Scheduler scheduler) { + return accounts.getE164sForRecentlyDeletedAccounts(segments, scheduler); + } + + public CompletableFuture removeRecentlyDeletedAccountRecord(final String e164) { + return accounts.removeRecentlyDeletedAccountRecord(e164); + } + public Flux streamAllFromDynamo(final int segments, final Scheduler scheduler) { return accounts.getAll(segments, scheduler); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveE164RecentlyDeletedAccountsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveE164RecentlyDeletedAccountsCommand.java new file mode 100644 index 000000000..86e5cc6b1 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveE164RecentlyDeletedAccountsCommand.java @@ -0,0 +1,127 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.workers; + +import io.dropwizard.core.Application; +import io.dropwizard.core.setup.Environment; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +public class RemoveE164RecentlyDeletedAccountsCommand extends AbstractCommandWithDependencies { + + private static final String DRY_RUN_ARGUMENT = "dry-run"; + private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency"; + private static final String SEGMENTS_ARGUMENT = "segments"; + private static final String BUFFER_ARGUMENT = "buffer"; + + private static final String RECORDS_INSPECTED_COUNTER_NAME = + MetricsUtil.name(RemoveE164RecentlyDeletedAccountsCommand.class, "recordsInspected"); + + private static final String RECORDS_DELETED_COUNTER_NAME = + MetricsUtil.name(RemoveE164RecentlyDeletedAccountsCommand.class, "recordsDeleted"); + + private static final String DRY_RUN_TAG = "dryRun"; + + private static final Logger logger = LoggerFactory.getLogger(RemoveE164RecentlyDeletedAccountsCommand.class); + + public RemoveE164RecentlyDeletedAccountsCommand() { + + super(new Application<>() { + @Override + public void run(final WhisperServerConfiguration configuration, final Environment environment) { + } + }, "remove-e164-recently-deleted-accounts", "Delete e164-associated recently-deleted account records"); + } + + @Override + public void configure(final Subparser subparser) { + super.configure(subparser); + + subparser.addArgument("--dry-run") + .type(Boolean.class) + .dest(DRY_RUN_ARGUMENT) + .required(false) + .setDefault(true) + .help("If true, don’t actually delete any registration recovery passwords"); + + subparser.addArgument("--max-concurrency") + .type(Integer.class) + .dest(MAX_CONCURRENCY_ARGUMENT) + .setDefault(16) + .help("Max concurrency for DynamoDB operations"); + + subparser.addArgument("--segments") + .type(Integer.class) + .dest(SEGMENTS_ARGUMENT) + .required(false) + .setDefault(1) + .help("The total number of segments for a DynamoDB scan"); + + subparser.addArgument("--buffer") + .type(Integer.class) + .dest(BUFFER_ARGUMENT) + .setDefault(16_384) + .help("Records to buffer"); + } + + @Override + protected void run(final Environment environment, final Namespace namespace, + final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception { + + final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT); + final int maxConcurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT); + final int segments = namespace.getInt(SEGMENTS_ARGUMENT); + final int bufferSize = namespace.getInt(BUFFER_ARGUMENT); + + final Counter recordsInspectedCounter = + Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun)); + + final Counter recordsDeletedCounter = + Metrics.counter(RECORDS_DELETED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun)); + + final AccountsManager accountsManager = commandDependencies.accountsManager(); + + accountsManager.getE164sForRecentlyDeletedAccounts(segments, Schedulers.parallel()) + .buffer(bufferSize) + .map(source -> { + final List shuffled = new ArrayList<>(source); + Collections.shuffle(shuffled); + return shuffled; + }) + .limitRate(2) + .flatMapIterable(Function.identity()) + .doOnNext(e164 -> recordsInspectedCounter.increment()) + .flatMap(e164 -> { + final Mono deleteMono = dryRun + ? Mono.empty() + : Mono.fromFuture(() -> accountsManager.removeRecentlyDeletedAccountRecord(e164)) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) + .onErrorResume(throwable -> { + logger.warn("Failed to remove recently-deleted account record for {}", e164, throwable); + return Mono.empty(); + }); + + return deleteMono.doOnSuccess(ignored -> recordsDeletedCounter.increment()); + }, maxConcurrency) + .then() + .block(); + } +}