Add a crawler for backup usage metrics

This commit is contained in:
Ravi Khadiwala 2024-05-07 13:50:28 -05:00 committed by ravi-signal
parent 101ecf342f
commit 7d95926f02
6 changed files with 252 additions and 6 deletions

View File

@ -234,6 +234,7 @@ import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener;
import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener;
import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
import org.whispersystems.textsecuregcm.workers.BackupMetricsCommand;
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
@ -298,6 +299,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new RemoveExpiredAccountsCommand(Clock.systemUTC()));
bootstrap.addCommand(new RemoveExpiredUsernameHoldsCommand(Clock.systemUTC()));
bootstrap.addCommand(new RemoveExpiredBackupsCommand(Clock.systemUTC()));
bootstrap.addCommand(new BackupMetricsCommand(Clock.systemUTC()));
bootstrap.addCommand(new ProcessPushNotificationFeedbackCommand(Clock.systemUTC()));
bootstrap.addCommand(new RemoveExpiredLinkedDevicesCommand());
}

View File

@ -452,6 +452,17 @@ public class BackupManager {
});
}
/**
* List all backups stored in the backups table
*
* @param segments Number of segments to read in parallel from the underlying backup database
* @param scheduler Scheduler for running downstream operations
* @return Flux of {@link StoredBackupAttributes} for each backup record in the backups table
*/
public Flux<StoredBackupAttributes> listBackupAttributes(final int segments, final Scheduler scheduler) {
return this.backupsDb.listBackupAttributes(segments, scheduler);
}
/**
* List all backups whose media or messages refresh timestamp are older than the provided purgeTime
*

View File

@ -441,6 +441,37 @@ public class BackupsDb {
}
}
Flux<StoredBackupAttributes> listBackupAttributes(final int segments, final Scheduler scheduler) {
if (segments < 1) {
throw new IllegalArgumentException("Total number of segments must be positive");
}
return Flux.range(0, segments)
.parallel()
.runOn(scheduler)
.flatMap(segment -> dynamoClient.scanPaginator(ScanRequest.builder()
.tableName(backupTableName)
.consistentRead(true)
.segment(segment)
.totalSegments(segments)
.expressionAttributeNames(Map.of(
"#backupIdHash", KEY_BACKUP_ID_HASH,
"#refresh", ATTR_LAST_REFRESH,
"#mediaRefresh", ATTR_LAST_MEDIA_REFRESH,
"#bytesUsed", ATTR_MEDIA_BYTES_USED,
"#numObjects", ATTR_MEDIA_COUNT))
.projectionExpression("#backupIdHash, #refresh, #mediaRefresh, #bytesUsed, #numObjects")
.build())
.items())
.sequential()
.filter(item -> item.containsKey(KEY_BACKUP_ID_HASH))
.map(item -> new StoredBackupAttributes(
Instant.ofEpochSecond(AttributeValues.getLong(item, ATTR_LAST_REFRESH, 0L)),
Instant.ofEpochSecond(AttributeValues.getLong(item, ATTR_LAST_MEDIA_REFRESH, 0L)),
AttributeValues.getLong(item, ATTR_MEDIA_BYTES_USED, 0L),
AttributeValues.getLong(item, ATTR_MEDIA_COUNT, 0L)));
}
Flux<ExpiredBackup> getExpiredBackups(final int segments, final Scheduler scheduler, final Instant purgeTime) {
if (segments < 1) {
throw new IllegalArgumentException("Total number of segments must be positive");

View File

@ -0,0 +1,19 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.backup;
import java.time.Instant;
/**
* Attributes stored in the backups table for a single backup id
*
* @param lastRefresh The last time the record was updated with a messages or media tier credential
* @param lastMediaRefresh The last time the record was updated with a media tier credential
* @param bytesUsed The number of media bytes used by the backup
* @param numObjects The number of media objects used byt the backup
*/
public record StoredBackupAttributes(
Instant lastRefresh, Instant lastMediaRefresh,
long bytesUsed, long numObjects) {}

View File

@ -0,0 +1,142 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.core.Application;
import io.dropwizard.core.cli.Cli;
import io.dropwizard.core.cli.EnvironmentCommand;
import io.dropwizard.core.setup.Environment;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.signal.libsignal.zkgroup.backups.BackupLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.backup.BackupManager;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
import reactor.core.scheduler.Schedulers;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
public class BackupMetricsCommand extends EnvironmentCommand<WhisperServerConfiguration> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final String SEGMENT_COUNT_ARGUMENT = "segments";
private static final int DEFAULT_SEGMENT_COUNT = 1;
private final Clock clock;
public BackupMetricsCommand(final Clock clock) {
super(new Application<>() {
@Override
public void run(final WhisperServerConfiguration configuration, final Environment environment) {
}
}, "backup-metrics", "Reports metrics about backups");
this.clock = clock;
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--segments")
.type(Integer.class)
.dest(SEGMENT_COUNT_ARGUMENT)
.required(false)
.setDefault(DEFAULT_SEGMENT_COUNT)
.help("The total number of segments for a DynamoDB scan");
}
@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception {
UncaughtExceptionHandler.register();
final CommandDependencies commandDependencies = CommandDependencies.build(getName(), environment, configuration);
MetricsUtil.configureRegistries(configuration, environment, commandDependencies.dynamicConfigurationManager());
final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT_ARGUMENT));
logger.info("Crawling backups for metrics with {} segments and {} processors",
segments,
Runtime.getRuntime().availableProcessors());
try {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.start();
} catch (final Exception e) {
logger.error("Failed to start managed object", e);
throw new RuntimeException(e);
}
});
final DistributionSummary numObjectsMediaTier = Metrics.summary(name(getClass(), "numObjects"),
"tier", BackupLevel.MEDIA.name());
final DistributionSummary bytesUsedMediaTier = Metrics.summary(name(getClass(), "bytesUsed"),
"tier", BackupLevel.MEDIA.name());
final DistributionSummary numObjectsMessagesTier = Metrics.summary(name(getClass(), "numObjects"),
"tier", BackupLevel.MESSAGES.name());
final DistributionSummary bytesUsedMessagesTier = Metrics.summary(name(getClass(), "bytesUsed"),
"tier", BackupLevel.MESSAGES.name());
final DistributionSummary timeSinceLastRefresh = Metrics.summary(name(getClass(),
"timeSinceLastRefresh"));
final DistributionSummary timeSinceLastMediaRefresh = Metrics.summary(name(getClass(),
"timeSinceLastMediaRefresh"));
final String backupsCounterName = name(getClass(), "backups");
final BackupManager backupManager = commandDependencies.backupManager();
final Long backupsExpired = backupManager
.listBackupAttributes(segments, Schedulers.parallel())
.doOnNext(backupMetadata -> {
final boolean subscribed = backupMetadata.lastMediaRefresh().equals(backupMetadata.lastRefresh());
if (subscribed) {
numObjectsMediaTier.record(backupMetadata.numObjects());
bytesUsedMediaTier.record(backupMetadata.bytesUsed());
} else {
numObjectsMessagesTier.record(backupMetadata.numObjects());
bytesUsedMessagesTier.record(backupMetadata.bytesUsed());
}
timeSinceLastRefresh.record(timeSince(backupMetadata.lastRefresh()).getSeconds());
timeSinceLastMediaRefresh.record(timeSince(backupMetadata.lastMediaRefresh()).getSeconds());
Metrics.counter(backupsCounterName, "subscribed", String.valueOf(subscribed)).increment();
})
.count()
.block();
logger.info("Crawled {} backups", backupsExpired);
} finally {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.stop();
} catch (final Exception e) {
logger.error("Failed to stop managed object", e);
}
});
}
}
private Duration timeSince(Instant t) {
Duration between = Duration.between(clock.instant(), t);
if (between.isNegative()) {
return Duration.ZERO;
}
return between;
}
@Override
public void onError(final Cli cli, final Namespace namespace, final Throwable throwable) {
logger.error("Unhandled error", throwable);
}
}

View File

@ -6,8 +6,16 @@
package org.whispersystems.textsecuregcm.backup;
import static org.assertj.core.api.Assertions.assertThat;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -23,12 +31,6 @@ import org.whispersystems.textsecuregcm.util.CompletableFutureTestUtil;
import org.whispersystems.textsecuregcm.util.TestClock;
import org.whispersystems.textsecuregcm.util.TestRandomUtil;
import reactor.core.scheduler.Schedulers;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
public class BackupsDbTest {
@ -207,6 +209,45 @@ public class BackupsDbTest {
}
}
@Test
public void list() {
final AuthenticatedBackupUser u1 = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MESSAGES);
final AuthenticatedBackupUser u2 = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MEDIA);
final AuthenticatedBackupUser u3 = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MEDIA);
// add at least one message backup, so we can describe it
testClock.pin(Instant.ofEpochSecond(10));
Stream.of(u1, u2, u3).forEach(u -> backupsDb.addMessageBackup(u).join());
testClock.pin(Instant.ofEpochSecond(20));
backupsDb.trackMedia(u2, 10, 100).join();
testClock.pin(Instant.ofEpochSecond(30));
backupsDb.trackMedia(u3, 1, 1000).join();
final List<StoredBackupAttributes> sbms = backupsDb.listBackupAttributes(1, Schedulers.immediate())
.sort(Comparator.comparing(StoredBackupAttributes::lastRefresh))
.collectList()
.block();
final StoredBackupAttributes sbm1 = sbms.get(0);
assertThat(sbm1.bytesUsed()).isEqualTo(0);
assertThat(sbm1.numObjects()).isEqualTo(0);
assertThat(sbm1.lastRefresh()).isEqualTo(Instant.ofEpochSecond(10));
assertThat(sbm1.lastMediaRefresh()).isEqualTo(Instant.EPOCH);
final StoredBackupAttributes sbm2 = sbms.get(1);
assertThat(sbm2.bytesUsed()).isEqualTo(100);
assertThat(sbm2.numObjects()).isEqualTo(10);
assertThat(sbm2.lastRefresh()).isEqualTo(sbm2.lastMediaRefresh()).isEqualTo(Instant.ofEpochSecond(20));
final StoredBackupAttributes sbm3 = sbms.get(2);
assertThat(sbm3.bytesUsed()).isEqualTo(1000);
assertThat(sbm3.numObjects()).isEqualTo(1);
assertThat(sbm3.lastRefresh()).isEqualTo(sbm3.lastMediaRefresh()).isEqualTo(Instant.ofEpochSecond(30));
}
private AuthenticatedBackupUser backupUser(final byte[] backupId, final BackupLevel backupLevel) {
return new AuthenticatedBackupUser(backupId, backupLevel, "myBackupDir", "myMediaDir");
}