Move command boilerplate into a base class

This commit is contained in:
Ravi Khadiwala 2024-05-14 12:39:44 -05:00 committed by ravi-signal
parent 7d95926f02
commit 1182d159aa
7 changed files with 148 additions and 173 deletions

View File

@ -0,0 +1,75 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.core.Application;
import io.dropwizard.core.cli.Cli;
import io.dropwizard.core.cli.EnvironmentCommand;
import io.dropwizard.core.setup.Environment;
import net.sourceforge.argparse4j.inf.Namespace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
/**
* Base class for one-shot commands that use {@link CommandDependencies}.
* <p>
* Override {@link #run(Environment, Namespace, WhisperServerConfiguration, CommandDependencies)} in a child class to
* let the parent class handle common initialization of dependencies, metrics, and logging.
*/
public abstract class AbstractCommandWithDependencies extends EnvironmentCommand<WhisperServerConfiguration> {
private final Logger logger = LoggerFactory.getLogger(getClass());
protected AbstractCommandWithDependencies(final Application<WhisperServerConfiguration> application,
final String name, final String description) {
super(application, name, description);
}
/**
* Run the command with the given initialized {@link CommandDependencies}
*/
protected abstract void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception;
@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception {
UncaughtExceptionHandler.register();
final CommandDependencies commandDependencies = CommandDependencies.build(getName(), environment, configuration);
MetricsUtil.configureRegistries(configuration, environment, commandDependencies.dynamicConfigurationManager());
try {
logger.info("Starting command dependencies");
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.start();
} catch (final Exception e) {
logger.error("Failed to start managed object", e);
throw new RuntimeException(e);
}
});
run(environment, namespace, configuration, commandDependencies);
} finally {
logger.info("Stopping command dependencies");
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.stop();
} catch (final Exception e) {
logger.error("Failed to stop managed object", e);
}
});
}
}
@Override
public void onError(final Cli cli, final Namespace namespace, final Throwable throwable) {
logger.error("Unhandled error", throwable);
}
}

View File

@ -20,7 +20,7 @@ import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
public abstract class AbstractSinglePassCrawlAccountsCommand extends EnvironmentCommand<WhisperServerConfiguration> { public abstract class AbstractSinglePassCrawlAccountsCommand extends AbstractCommandWithDependencies {
private CommandDependencies commandDependencies; private CommandDependencies commandDependencies;
private Namespace namespace; private Namespace namespace;
@ -59,12 +59,9 @@ public abstract class AbstractSinglePassCrawlAccountsCommand extends Environment
@Override @Override
protected void run(final Environment environment, final Namespace namespace, protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception { final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
UncaughtExceptionHandler.register();
this.namespace = namespace; this.namespace = namespace;
this.commandDependencies = CommandDependencies.build(getName(), environment, configuration); this.commandDependencies = commandDependencies;
final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT)); final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT));
@ -72,31 +69,7 @@ public abstract class AbstractSinglePassCrawlAccountsCommand extends Environment
segments, segments,
Runtime.getRuntime().availableProcessors()); Runtime.getRuntime().availableProcessors());
try { crawlAccounts(commandDependencies.accountsManager().streamAllFromDynamo(segments, Schedulers.parallel()));
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.start();
} catch (final Exception e) {
logger.error("Failed to start managed object", e);
throw new RuntimeException(e);
}
});
crawlAccounts(commandDependencies.accountsManager().streamAllFromDynamo(segments, Schedulers.parallel()));
} finally {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.stop();
} catch (final Exception e) {
logger.error("Failed to stop managed object", e);
}
});
}
}
@Override
public void onError(final Cli cli, final Namespace namespace, final Throwable throwable) {
logger.error("Unhandled error", throwable);
} }
protected abstract void crawlAccounts(final Flux<Account> accounts); protected abstract void crawlAccounts(final Flux<Account> accounts);

View File

@ -29,7 +29,7 @@ import java.util.Objects;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
public class BackupMetricsCommand extends EnvironmentCommand<WhisperServerConfiguration> { public class BackupMetricsCommand extends AbstractCommandWithDependencies {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
@ -61,70 +61,47 @@ public class BackupMetricsCommand extends EnvironmentCommand<WhisperServerConfig
@Override @Override
protected void run(final Environment environment, final Namespace namespace, protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception { final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
UncaughtExceptionHandler.register();
final CommandDependencies commandDependencies = CommandDependencies.build(getName(), environment, configuration);
MetricsUtil.configureRegistries(configuration, environment, commandDependencies.dynamicConfigurationManager());
final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT_ARGUMENT)); final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT_ARGUMENT));
logger.info("Crawling backups for metrics with {} segments and {} processors", logger.info("Crawling backups for metrics with {} segments and {} processors",
segments, segments,
Runtime.getRuntime().availableProcessors()); Runtime.getRuntime().availableProcessors());
try { final DistributionSummary numObjectsMediaTier = Metrics.summary(name(getClass(), "numObjects"),
environment.lifecycle().getManagedObjects().forEach(managedObject -> { "tier", BackupLevel.MEDIA.name());
try { final DistributionSummary bytesUsedMediaTier = Metrics.summary(name(getClass(), "bytesUsed"),
managedObject.start(); "tier", BackupLevel.MEDIA.name());
} catch (final Exception e) { final DistributionSummary numObjectsMessagesTier = Metrics.summary(name(getClass(), "numObjects"),
logger.error("Failed to start managed object", e); "tier", BackupLevel.MESSAGES.name());
throw new RuntimeException(e); final DistributionSummary bytesUsedMessagesTier = Metrics.summary(name(getClass(), "bytesUsed"),
} "tier", BackupLevel.MESSAGES.name());
});
final DistributionSummary numObjectsMediaTier = Metrics.summary(name(getClass(), "numObjects"), final DistributionSummary timeSinceLastRefresh = Metrics.summary(name(getClass(),
"tier", BackupLevel.MEDIA.name()); "timeSinceLastRefresh"));
final DistributionSummary bytesUsedMediaTier = Metrics.summary(name(getClass(), "bytesUsed"), final DistributionSummary timeSinceLastMediaRefresh = Metrics.summary(name(getClass(),
"tier", BackupLevel.MEDIA.name()); "timeSinceLastMediaRefresh"));
final DistributionSummary numObjectsMessagesTier = Metrics.summary(name(getClass(), "numObjects"), final String backupsCounterName = name(getClass(), "backups");
"tier", BackupLevel.MESSAGES.name());
final DistributionSummary bytesUsedMessagesTier = Metrics.summary(name(getClass(), "bytesUsed"),
"tier", BackupLevel.MESSAGES.name());
final DistributionSummary timeSinceLastRefresh = Metrics.summary(name(getClass(), final BackupManager backupManager = commandDependencies.backupManager();
"timeSinceLastRefresh")); final Long backupsExpired = backupManager
final DistributionSummary timeSinceLastMediaRefresh = Metrics.summary(name(getClass(), .listBackupAttributes(segments, Schedulers.parallel())
"timeSinceLastMediaRefresh")); .doOnNext(backupMetadata -> {
final String backupsCounterName = name(getClass(), "backups"); final boolean subscribed = backupMetadata.lastMediaRefresh().equals(backupMetadata.lastRefresh());
if (subscribed) {
final BackupManager backupManager = commandDependencies.backupManager(); numObjectsMediaTier.record(backupMetadata.numObjects());
final Long backupsExpired = backupManager bytesUsedMediaTier.record(backupMetadata.bytesUsed());
.listBackupAttributes(segments, Schedulers.parallel()) } else {
.doOnNext(backupMetadata -> { numObjectsMessagesTier.record(backupMetadata.numObjects());
final boolean subscribed = backupMetadata.lastMediaRefresh().equals(backupMetadata.lastRefresh()); bytesUsedMessagesTier.record(backupMetadata.bytesUsed());
if (subscribed) { }
numObjectsMediaTier.record(backupMetadata.numObjects()); timeSinceLastRefresh.record(timeSince(backupMetadata.lastRefresh()).getSeconds());
bytesUsedMediaTier.record(backupMetadata.bytesUsed()); timeSinceLastMediaRefresh.record(timeSince(backupMetadata.lastMediaRefresh()).getSeconds());
} else { Metrics.counter(backupsCounterName, "subscribed", String.valueOf(subscribed)).increment();
numObjectsMessagesTier.record(backupMetadata.numObjects()); })
bytesUsedMessagesTier.record(backupMetadata.bytesUsed()); .count()
} .block();
timeSinceLastRefresh.record(timeSince(backupMetadata.lastRefresh()).getSeconds()); logger.info("Crawled {} backups", backupsExpired);
timeSinceLastMediaRefresh.record(timeSince(backupMetadata.lastMediaRefresh()).getSeconds());
Metrics.counter(backupsCounterName, "subscribed", String.valueOf(subscribed)).increment();
})
.count()
.block();
logger.info("Crawled {} backups", backupsExpired);
} finally {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.stop();
} catch (final Exception e) {
logger.error("Failed to stop managed object", e);
}
});
}
} }
private Duration timeSince(Instant t) { private Duration timeSince(Instant t) {
@ -134,9 +111,4 @@ public class BackupMetricsCommand extends EnvironmentCommand<WhisperServerConfig
} }
return between; return between;
} }
@Override
public void onError(final Cli cli, final Namespace namespace, final Throwable throwable) {
logger.error("Unhandled error", throwable);
}
} }

View File

@ -5,9 +5,7 @@
package org.whispersystems.textsecuregcm.workers; package org.whispersystems.textsecuregcm.workers;
import com.fasterxml.jackson.databind.DeserializationFeature;
import io.dropwizard.core.Application; import io.dropwizard.core.Application;
import io.dropwizard.core.cli.EnvironmentCommand;
import io.dropwizard.core.setup.Environment; import io.dropwizard.core.setup.Environment;
import java.util.Optional; import java.util.Optional;
import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Namespace;
@ -19,7 +17,7 @@ import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason; import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason;
public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfiguration> { public class DeleteUserCommand extends AbstractCommandWithDependencies {
private final Logger logger = LoggerFactory.getLogger(DeleteUserCommand.class); private final Logger logger = LoggerFactory.getLogger(DeleteUserCommand.class);
@ -36,22 +34,17 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
public void configure(Subparser subparser) { public void configure(Subparser subparser) {
super.configure(subparser); super.configure(subparser);
subparser.addArgument("-u", "--user") subparser.addArgument("-u", "--user")
.dest("user") .dest("user")
.type(String.class) .type(String.class)
.required(true) .required(true)
.help("The user to remove"); .help("The user to remove");
} }
@Override @Override
protected void run(Environment environment, Namespace namespace, WhisperServerConfiguration configuration) protected void run(Environment environment, Namespace namespace, WhisperServerConfiguration configuration,
throws Exception CommandDependencies deps) throws Exception {
{
try { try {
String[] users = namespace.getString("user").split(","); String[] users = namespace.getString("user").split(",");
environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
final CommandDependencies deps = CommandDependencies.build("rmuser", environment, configuration);
AccountsManager accountsManager = deps.accountsManager(); AccountsManager accountsManager = deps.accountsManager();
for (String user : users) { for (String user : users) {

View File

@ -6,8 +6,6 @@
package org.whispersystems.textsecuregcm.workers; package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.core.Application; import io.dropwizard.core.Application;
import io.dropwizard.core.cli.Cli;
import io.dropwizard.core.cli.EnvironmentCommand;
import io.dropwizard.core.setup.Environment; import io.dropwizard.core.setup.Environment;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
import java.time.Clock; import java.time.Clock;
@ -22,11 +20,10 @@ import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.backup.BackupManager; import org.whispersystems.textsecuregcm.backup.BackupManager;
import org.whispersystems.textsecuregcm.backup.ExpiredBackup; import org.whispersystems.textsecuregcm.backup.ExpiredBackup;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
public class RemoveExpiredBackupsCommand extends EnvironmentCommand<WhisperServerConfiguration> { public class RemoveExpiredBackupsCommand extends AbstractCommandWithDependencies {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
@ -89,12 +86,7 @@ public class RemoveExpiredBackupsCommand extends EnvironmentCommand<WhisperServe
@Override @Override
protected void run(final Environment environment, final Namespace namespace, protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception { final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
UncaughtExceptionHandler.register();
final CommandDependencies commandDependencies = CommandDependencies.build(getName(), environment, configuration);
MetricsUtil.configureRegistries(configuration, environment, commandDependencies.dynamicConfigurationManager());
final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT_ARGUMENT)); final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT_ARGUMENT));
final int concurrency = Objects.requireNonNull(namespace.getInt(MAX_CONCURRENCY_ARGUMENT)); final int concurrency = Objects.requireNonNull(namespace.getInt(MAX_CONCURRENCY_ARGUMENT));
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT); final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
@ -105,32 +97,14 @@ public class RemoveExpiredBackupsCommand extends EnvironmentCommand<WhisperServe
Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
gracePeriod); gracePeriod);
try { final BackupManager backupManager = commandDependencies.backupManager();
environment.lifecycle().getManagedObjects().forEach(managedObject -> { final long backupsExpired = backupManager
try { .getExpiredBackups(segments, Schedulers.parallel(), clock.instant().minus(gracePeriod))
managedObject.start(); .flatMap(expiredBackup -> removeExpiredBackup(backupManager, expiredBackup, dryRun), concurrency)
} catch (final Exception e) { .filter(Boolean.TRUE::equals)
logger.error("Failed to start managed object", e); .count()
throw new RuntimeException(e); .block();
} logger.info("Expired {} backups", backupsExpired);
});
final BackupManager backupManager = commandDependencies.backupManager();
final long backupsExpired = backupManager
.getExpiredBackups(segments, Schedulers.parallel(), clock.instant().minus(gracePeriod))
.flatMap(expiredBackup -> removeExpiredBackup(backupManager, expiredBackup, dryRun), concurrency)
.filter(Boolean.TRUE::equals)
.count()
.block();
logger.info("Expired {} backups", backupsExpired);
} finally {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.stop();
} catch (final Exception e) {
logger.error("Failed to stop managed object", e);
}
});
}
} }
private Mono<Boolean> removeExpiredBackup( private Mono<Boolean> removeExpiredBackup(
@ -162,9 +136,4 @@ public class RemoveExpiredBackupsCommand extends EnvironmentCommand<WhisperServe
return Mono.just(false); return Mono.just(false);
}); });
} }
@Override
public void onError(final Cli cli, final Namespace namespace, final Throwable throwable) {
logger.error("Unhandled error", throwable);
}
} }

View File

@ -5,9 +5,7 @@
package org.whispersystems.textsecuregcm.workers; package org.whispersystems.textsecuregcm.workers;
import com.fasterxml.jackson.databind.DeserializationFeature;
import io.dropwizard.core.Application; import io.dropwizard.core.Application;
import io.dropwizard.core.cli.EnvironmentCommand;
import io.dropwizard.core.setup.Environment; import io.dropwizard.core.setup.Environment;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@ -17,7 +15,7 @@ import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.AccountsManager;
public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperServerConfiguration> { public class SetUserDiscoverabilityCommand extends AbstractCommandWithDependencies {
public SetUserDiscoverabilityCommand() { public SetUserDiscoverabilityCommand() {
@ -48,12 +46,10 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
@Override @Override
protected void run(final Environment environment, protected void run(final Environment environment,
final Namespace namespace, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception { final WhisperServerConfiguration configuration,
final CommandDependencies deps) throws Exception {
try { try {
environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
final CommandDependencies deps = CommandDependencies.build("set-discoverability", environment, configuration);
final AccountsManager accountsManager = deps.accountsManager(); final AccountsManager accountsManager = deps.accountsManager();
Optional<Account> maybeAccount; Optional<Account> maybeAccount;

View File

@ -19,7 +19,7 @@ import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.Device;
public class UnlinkDeviceCommand extends EnvironmentCommand<WhisperServerConfiguration> { public class UnlinkDeviceCommand extends AbstractCommandWithDependencies {
public UnlinkDeviceCommand() { public UnlinkDeviceCommand() {
super(new Application<>() { super(new Application<>() {
@ -49,25 +49,22 @@ public class UnlinkDeviceCommand extends EnvironmentCommand<WhisperServerConfigu
@Override @Override
protected void run(final Environment environment, final Namespace namespace, protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception { final WhisperServerConfiguration configuration,
environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); final CommandDependencies deps) throws Exception {
final UUID aci = UUID.fromString(namespace.getString("uuid").trim());
final List<Byte> deviceIds = namespace.getList("deviceIds");
final UUID aci = UUID.fromString(namespace.getString("uuid").trim()); Account account = deps.accountsManager().getByAccountIdentifier(aci)
final List<Byte> deviceIds = namespace.getList("deviceIds"); .orElseThrow(() -> new IllegalArgumentException("account id " + aci + " does not exist"));
final CommandDependencies deps = CommandDependencies.build("unlink-device", environment, configuration); if (deviceIds.contains(Device.PRIMARY_ID)) {
throw new IllegalArgumentException("cannot delete primary device");
}
Account account = deps.accountsManager().getByAccountIdentifier(aci) for (byte deviceId : deviceIds) {
.orElseThrow(() -> new IllegalArgumentException("account id " + aci + " does not exist")); /** see {@link org.whispersystems.textsecuregcm.controllers.DeviceController#removeDevice} */
System.out.format("Removing device %s::%d\n", aci, deviceId);
if (deviceIds.contains(Device.PRIMARY_ID)) { deps.accountsManager().removeDevice(account, deviceId).join();
throw new IllegalArgumentException("cannot delete primary device"); }
}
for (byte deviceId : deviceIds) {
/** see {@link org.whispersystems.textsecuregcm.controllers.DeviceController#removeDevice} */
System.out.format("Removing device %s::%d\n", aci, deviceId);
deps.accountsManager().removeDevice(account, deviceId).join();
}
} }
} }