Add a single-shot command for removing expired accounts
This commit is contained in:
parent
6fd1c84126
commit
c3c7329ebb
|
@ -215,6 +215,7 @@ import org.whispersystems.textsecuregcm.workers.CrawlAccountsCommand;
|
||||||
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
|
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
|
||||||
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
|
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
|
||||||
import org.whispersystems.textsecuregcm.workers.MigrateSignedECPreKeysCommand;
|
import org.whispersystems.textsecuregcm.workers.MigrateSignedECPreKeysCommand;
|
||||||
|
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
|
||||||
import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSenderServiceCommand;
|
import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSenderServiceCommand;
|
||||||
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
|
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
|
||||||
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
|
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
|
||||||
|
@ -271,6 +272,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
bootstrap.addCommand(new ScheduledApnPushNotificationSenderServiceCommand());
|
bootstrap.addCommand(new ScheduledApnPushNotificationSenderServiceCommand());
|
||||||
bootstrap.addCommand(new MessagePersisterServiceCommand());
|
bootstrap.addCommand(new MessagePersisterServiceCommand());
|
||||||
bootstrap.addCommand(new MigrateSignedECPreKeysCommand());
|
bootstrap.addCommand(new MigrateSignedECPreKeysCommand());
|
||||||
|
bootstrap.addCommand(new RemoveExpiredAccountsCommand(Clock.systemUTC()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,87 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2023 Signal Messenger, LLC
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.whispersystems.textsecuregcm.workers;
|
||||||
|
|
||||||
|
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import io.micrometer.core.instrument.Counter;
|
||||||
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
import java.time.Clock;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import net.sourceforge.argparse4j.inf.Subparser;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.publisher.ParallelFlux;
|
||||||
|
|
||||||
|
public class RemoveExpiredAccountsCommand extends AbstractSinglePassCrawlAccountsCommand {
|
||||||
|
|
||||||
|
private final Clock clock;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final Duration MAX_IDLE_DURATION = Duration.ofDays(180);
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final String DRY_RUN_ARGUMENT = "dry-run";
|
||||||
|
|
||||||
|
private static final int MAX_CONCURRENCY = 16;
|
||||||
|
|
||||||
|
private static final String DELETED_ACCOUNT_COUNTER_NAME =
|
||||||
|
name(RemoveExpiredAccountsCommand.class, "deletedAccounts");
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(RemoveExpiredAccountsCommand.class);
|
||||||
|
|
||||||
|
public RemoveExpiredAccountsCommand(final Clock clock) {
|
||||||
|
super("remove-expired-accounts", "Removes all accounts that have been idle for more than a set period of time");
|
||||||
|
|
||||||
|
this.clock = clock;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(final Subparser subparser) {
|
||||||
|
super.configure(subparser);
|
||||||
|
|
||||||
|
subparser.addArgument("--dry-run")
|
||||||
|
.type(Boolean.class)
|
||||||
|
.dest(DRY_RUN_ARGUMENT)
|
||||||
|
.required(false)
|
||||||
|
.setDefault(true)
|
||||||
|
.help("If true, don't actually delete accounts");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void crawlAccounts(final ParallelFlux<Account> accounts) {
|
||||||
|
final boolean isDryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
|
||||||
|
final Counter deletedAccountCounter =
|
||||||
|
Metrics.counter(DELETED_ACCOUNT_COUNTER_NAME, "dryRun", String.valueOf(isDryRun));
|
||||||
|
|
||||||
|
accounts.filter(this::isExpired)
|
||||||
|
.sequential()
|
||||||
|
.flatMap(expiredAccount -> {
|
||||||
|
final Mono<Void> deleteAccountMono = isDryRun
|
||||||
|
? Mono.empty()
|
||||||
|
: Mono.fromFuture(() -> getCommandDependencies().accountsManager().delete(expiredAccount, AccountsManager.DeletionReason.EXPIRED));
|
||||||
|
|
||||||
|
return deleteAccountMono
|
||||||
|
.doOnSuccess(ignored -> deletedAccountCounter.increment())
|
||||||
|
.onErrorResume(throwable -> {
|
||||||
|
log.warn("Failed to delete account {}", expiredAccount.getUuid(), throwable);
|
||||||
|
return Mono.empty();
|
||||||
|
});
|
||||||
|
}, MAX_CONCURRENCY)
|
||||||
|
.then()
|
||||||
|
.block();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean isExpired(final Account account) {
|
||||||
|
return Instant.ofEpochMilli(account.getLastSeen()).plus(MAX_IDLE_DURATION).isBefore(clock.instant());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,107 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2023 Signal Messenger, LLC
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.whispersystems.textsecuregcm.workers;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.time.Clock;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.ZoneId;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
import net.sourceforge.argparse4j.inf.Namespace;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
|
class RemoveExpiredAccountsCommandTest {
|
||||||
|
|
||||||
|
private static class TestRemoveExpiredAccountsCommand extends RemoveExpiredAccountsCommand {
|
||||||
|
|
||||||
|
private final CommandDependencies commandDependencies;
|
||||||
|
private final Namespace namespace;
|
||||||
|
|
||||||
|
public TestRemoveExpiredAccountsCommand(final Clock clock, final AccountsManager accountsManager, final boolean isDryRun) {
|
||||||
|
super(clock);
|
||||||
|
|
||||||
|
commandDependencies = mock(CommandDependencies.class);
|
||||||
|
when(commandDependencies.accountsManager()).thenReturn(accountsManager);
|
||||||
|
|
||||||
|
namespace = mock(Namespace.class);
|
||||||
|
when(namespace.getBoolean(RemoveExpiredAccountsCommand.DRY_RUN_ARGUMENT)).thenReturn(isDryRun);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected CommandDependencies getCommandDependencies() {
|
||||||
|
return commandDependencies;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Namespace getNamespace() {
|
||||||
|
return namespace;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(booleans = {true, false})
|
||||||
|
void crawlAccounts(final boolean isDryRun) {
|
||||||
|
final Clock clock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
|
||||||
|
|
||||||
|
final AccountsManager accountsManager = mock(AccountsManager.class);
|
||||||
|
when(accountsManager.delete(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
|
||||||
|
|
||||||
|
final RemoveExpiredAccountsCommand removeExpiredAccountsCommand =
|
||||||
|
new TestRemoveExpiredAccountsCommand(clock, accountsManager, isDryRun);
|
||||||
|
|
||||||
|
final Account activeAccount = mock(Account.class);
|
||||||
|
when(activeAccount.getLastSeen()).thenReturn(clock.instant().toEpochMilli());
|
||||||
|
|
||||||
|
final Account expiredAccount = mock(Account.class);
|
||||||
|
when(expiredAccount.getLastSeen())
|
||||||
|
.thenReturn(clock.instant().minus(RemoveExpiredAccountsCommand.MAX_IDLE_DURATION).minusMillis(1).toEpochMilli());
|
||||||
|
|
||||||
|
removeExpiredAccountsCommand.crawlAccounts(Flux.just(activeAccount, expiredAccount).parallel());
|
||||||
|
|
||||||
|
if (isDryRun) {
|
||||||
|
verify(accountsManager, never()).delete(any(), any());
|
||||||
|
} else {
|
||||||
|
verify(accountsManager).delete(expiredAccount, AccountsManager.DeletionReason.EXPIRED);
|
||||||
|
verify(accountsManager, never()).delete(eq(activeAccount), any());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@MethodSource
|
||||||
|
void isExpired(final Instant currentTime, final Instant lastSeen, final boolean expectExpired) {
|
||||||
|
final Clock clock = Clock.fixed(currentTime, ZoneId.systemDefault());
|
||||||
|
|
||||||
|
final Account account = mock(Account.class);
|
||||||
|
when(account.getLastSeen()).thenReturn(lastSeen.toEpochMilli());
|
||||||
|
|
||||||
|
assertEquals(expectExpired, new RemoveExpiredAccountsCommand(clock).isExpired(account));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Stream<Arguments> isExpired() {
|
||||||
|
final Instant currentTime = Instant.now();
|
||||||
|
|
||||||
|
return Stream.of(
|
||||||
|
Arguments.of(currentTime, currentTime, false),
|
||||||
|
Arguments.of(currentTime, currentTime.minus(RemoveExpiredAccountsCommand.MAX_IDLE_DURATION).plusMillis(1), false),
|
||||||
|
Arguments.of(currentTime, currentTime.minus(RemoveExpiredAccountsCommand.MAX_IDLE_DURATION), true),
|
||||||
|
Arguments.of(currentTime, currentTime.minus(RemoveExpiredAccountsCommand.MAX_IDLE_DURATION).minusMillis(1), true)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue