Retire `RemoveE164RecentlyDeletedAccountsCommand`
This commit is contained in:
parent
142e2cbe9d
commit
fb6c4eca34
|
@ -264,7 +264,6 @@ import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerF
|
|||
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.RemoveE164RecentlyDeletedAccountsCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.RemoveExpiredBackupsCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.RemoveExpiredLinkedDevicesCommand;
|
||||
|
@ -330,8 +329,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
bootstrap.addCommand(new ProcessScheduledJobsServiceCommand("process-idle-device-notification-jobs",
|
||||
"Processes scheduled jobs to send notifications to idle devices",
|
||||
new IdleDeviceNotificationSchedulerFactory()));
|
||||
|
||||
bootstrap.addCommand(new RemoveE164RecentlyDeletedAccountsCommand());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -54,7 +54,6 @@ import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
|||
import software.amazon.awssdk.services.dynamodb.model.CancellationReason;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
|
||||
import software.amazon.awssdk.services.dynamodb.model.Delete;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.Put;
|
||||
|
@ -1247,36 +1246,6 @@ public class Accounts {
|
|||
.sequential();
|
||||
}
|
||||
|
||||
CompletableFuture<Void> removeRecentlyDeletedAccountRecord(final String e164) {
|
||||
return asyncClient.deleteItem(DeleteItemRequest.builder()
|
||||
.tableName(deletedAccountsTableName)
|
||||
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_PNI, AttributeValues.fromString(e164)))
|
||||
.build())
|
||||
.thenRun(Util.NOOP);
|
||||
}
|
||||
|
||||
Flux<String> getE164sForRecentlyDeletedAccounts(final int segments, final Scheduler scheduler) {
|
||||
if (segments < 1) {
|
||||
throw new IllegalArgumentException("Total number of segments must be positive");
|
||||
}
|
||||
|
||||
return Flux.range(0, segments)
|
||||
.parallel()
|
||||
.runOn(scheduler)
|
||||
.flatMap(segment -> dynamoDbAsyncClient.scanPaginator(ScanRequest.builder()
|
||||
.tableName(deletedAccountsTableName)
|
||||
.consistentRead(true)
|
||||
.segment(segment)
|
||||
.totalSegments(segments)
|
||||
.filterExpression("begins_with(#key, :e164Prefix)")
|
||||
.expressionAttributeNames(Map.of("#key", DELETED_ACCOUNTS_KEY_ACCOUNT_PNI))
|
||||
.expressionAttributeValues(Map.of(":e164Prefix", AttributeValue.fromS("+")))
|
||||
.build())
|
||||
.items())
|
||||
.map(item -> item.get(DELETED_ACCOUNTS_KEY_ACCOUNT_PNI).s())
|
||||
.sequential();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private Optional<Account> getByIndirectLookup(
|
||||
final Timer timer,
|
||||
|
|
|
@ -1212,14 +1212,6 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
|||
return accounts.findRecentlyDeletedPhoneNumberIdentifier(accountIdentifier);
|
||||
}
|
||||
|
||||
public Flux<String> getE164sForRecentlyDeletedAccounts(final int segments, final Scheduler scheduler) {
|
||||
return accounts.getE164sForRecentlyDeletedAccounts(segments, scheduler);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> removeRecentlyDeletedAccountRecord(final String e164) {
|
||||
return accounts.removeRecentlyDeletedAccountRecord(e164);
|
||||
}
|
||||
|
||||
public Flux<Account> streamAllFromDynamo(final int segments, final Scheduler scheduler) {
|
||||
return accounts.getAll(segments, scheduler);
|
||||
}
|
||||
|
|
|
@ -1,127 +0,0 @@
|
|||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.workers;
|
||||
|
||||
import io.dropwizard.core.Application;
|
||||
import io.dropwizard.core.setup.Environment;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import net.sourceforge.argparse4j.inf.Namespace;
|
||||
import net.sourceforge.argparse4j.inf.Subparser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.retry.Retry;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class RemoveE164RecentlyDeletedAccountsCommand extends AbstractCommandWithDependencies {
|
||||
|
||||
private static final String DRY_RUN_ARGUMENT = "dry-run";
|
||||
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
|
||||
private static final String SEGMENTS_ARGUMENT = "segments";
|
||||
private static final String BUFFER_ARGUMENT = "buffer";
|
||||
|
||||
private static final String RECORDS_INSPECTED_COUNTER_NAME =
|
||||
MetricsUtil.name(RemoveE164RecentlyDeletedAccountsCommand.class, "recordsInspected");
|
||||
|
||||
private static final String RECORDS_DELETED_COUNTER_NAME =
|
||||
MetricsUtil.name(RemoveE164RecentlyDeletedAccountsCommand.class, "recordsDeleted");
|
||||
|
||||
private static final String DRY_RUN_TAG = "dryRun";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RemoveE164RecentlyDeletedAccountsCommand.class);
|
||||
|
||||
public RemoveE164RecentlyDeletedAccountsCommand() {
|
||||
|
||||
super(new Application<>() {
|
||||
@Override
|
||||
public void run(final WhisperServerConfiguration configuration, final Environment environment) {
|
||||
}
|
||||
}, "remove-e164-recently-deleted-accounts", "Delete e164-associated recently-deleted account records");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Subparser subparser) {
|
||||
super.configure(subparser);
|
||||
|
||||
subparser.addArgument("--dry-run")
|
||||
.type(Boolean.class)
|
||||
.dest(DRY_RUN_ARGUMENT)
|
||||
.required(false)
|
||||
.setDefault(true)
|
||||
.help("If true, don’t actually delete any registration recovery passwords");
|
||||
|
||||
subparser.addArgument("--max-concurrency")
|
||||
.type(Integer.class)
|
||||
.dest(MAX_CONCURRENCY_ARGUMENT)
|
||||
.setDefault(16)
|
||||
.help("Max concurrency for DynamoDB operations");
|
||||
|
||||
subparser.addArgument("--segments")
|
||||
.type(Integer.class)
|
||||
.dest(SEGMENTS_ARGUMENT)
|
||||
.required(false)
|
||||
.setDefault(1)
|
||||
.help("The total number of segments for a DynamoDB scan");
|
||||
|
||||
subparser.addArgument("--buffer")
|
||||
.type(Integer.class)
|
||||
.dest(BUFFER_ARGUMENT)
|
||||
.setDefault(16_384)
|
||||
.help("Records to buffer");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void run(final Environment environment, final Namespace namespace,
|
||||
final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
|
||||
|
||||
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
|
||||
final int maxConcurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
|
||||
final int segments = namespace.getInt(SEGMENTS_ARGUMENT);
|
||||
final int bufferSize = namespace.getInt(BUFFER_ARGUMENT);
|
||||
|
||||
final Counter recordsInspectedCounter =
|
||||
Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
|
||||
|
||||
final Counter recordsDeletedCounter =
|
||||
Metrics.counter(RECORDS_DELETED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
|
||||
|
||||
final AccountsManager accountsManager = commandDependencies.accountsManager();
|
||||
|
||||
accountsManager.getE164sForRecentlyDeletedAccounts(segments, Schedulers.parallel())
|
||||
.buffer(bufferSize)
|
||||
.map(source -> {
|
||||
final List<String> shuffled = new ArrayList<>(source);
|
||||
Collections.shuffle(shuffled);
|
||||
return shuffled;
|
||||
})
|
||||
.limitRate(2)
|
||||
.flatMapIterable(Function.identity())
|
||||
.doOnNext(e164 -> recordsInspectedCounter.increment())
|
||||
.flatMap(e164 -> {
|
||||
final Mono<Void> deleteMono = dryRun
|
||||
? Mono.empty()
|
||||
: Mono.fromFuture(() -> accountsManager.removeRecentlyDeletedAccountRecord(e164))
|
||||
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
|
||||
.onErrorResume(throwable -> {
|
||||
logger.warn("Failed to remove recently-deleted account record for {}", e164, throwable);
|
||||
return Mono.empty();
|
||||
});
|
||||
|
||||
return deleteMono.doOnSuccess(ignored -> recordsDeletedCounter.increment());
|
||||
}, maxConcurrency)
|
||||
.then()
|
||||
.block();
|
||||
}
|
||||
}
|
|
@ -22,7 +22,6 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.i18n.phonenumbers.PhoneNumberUtil;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
@ -1708,32 +1707,6 @@ class AccountsTest {
|
|||
assertInstanceOf(DeviceIdDeserializer.DeviceIdDeserializationException.class, cause);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getE164sForRecentlyDeletedAccounts() {
|
||||
final UUID accountIdentifier = UUID.randomUUID();
|
||||
final UUID phoneNumberIdentifier = UUID.randomUUID();
|
||||
final String phoneNumber = PhoneNumberUtil.getInstance().format(
|
||||
PhoneNumberUtil.getInstance().getExampleNumber("US"),
|
||||
PhoneNumberUtil.PhoneNumberFormat.E164);
|
||||
|
||||
final Account deletedAccount = generateAccount(phoneNumber, accountIdentifier, phoneNumberIdentifier);
|
||||
createAccount(deletedAccount);
|
||||
accounts.delete(deletedAccount.getUuid(), List.of()).join();
|
||||
|
||||
// Artificially insert this row to simulate legacy data
|
||||
DYNAMO_DB_EXTENSION.getDynamoDbClient().putItem(PutItemRequest.builder()
|
||||
.tableName(Tables.DELETED_ACCOUNTS.tableName())
|
||||
.item(Map.of(
|
||||
Accounts.DELETED_ACCOUNTS_KEY_ACCOUNT_PNI, AttributeValues.fromString(phoneNumber),
|
||||
Accounts.DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(accountIdentifier),
|
||||
Accounts.DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(clock.instant().plus(Accounts.DELETED_ACCOUNTS_TIME_TO_LIVE).getEpochSecond())))
|
||||
.build());
|
||||
|
||||
assertEquals(
|
||||
List.of(phoneNumber),
|
||||
accounts.getE164sForRecentlyDeletedAccounts(1, Schedulers.immediate()).collectList().block());
|
||||
}
|
||||
|
||||
private static Device generateDevice(byte id) {
|
||||
return DevicesHelper.createDevice(id);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue