diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 59ab2f365..b940ab680 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -234,6 +234,7 @@ import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler; import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener; import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener; import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; +import org.whispersystems.textsecuregcm.workers.BackupMetricsCommand; import org.whispersystems.textsecuregcm.workers.CertificateCommand; import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; @@ -298,6 +299,7 @@ public class WhisperServerService extends Application listBackupAttributes(final int segments, final Scheduler scheduler) { + return this.backupsDb.listBackupAttributes(segments, scheduler); + } + /** * List all backups whose media or messages refresh timestamp are older than the provided purgeTime * diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupsDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupsDb.java index 175c43adc..3aa4487f7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupsDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupsDb.java @@ -441,6 +441,37 @@ public class BackupsDb { } } + Flux listBackupAttributes(final int segments, final Scheduler scheduler) { + if (segments < 1) { + throw new IllegalArgumentException("Total number of segments must be positive"); + } + + return Flux.range(0, segments) + .parallel() + .runOn(scheduler) + .flatMap(segment -> dynamoClient.scanPaginator(ScanRequest.builder() + .tableName(backupTableName) + .consistentRead(true) + .segment(segment) + .totalSegments(segments) + .expressionAttributeNames(Map.of( + "#backupIdHash", KEY_BACKUP_ID_HASH, + "#refresh", ATTR_LAST_REFRESH, + "#mediaRefresh", ATTR_LAST_MEDIA_REFRESH, + "#bytesUsed", ATTR_MEDIA_BYTES_USED, + "#numObjects", ATTR_MEDIA_COUNT)) + .projectionExpression("#backupIdHash, #refresh, #mediaRefresh, #bytesUsed, #numObjects") + .build()) + .items()) + .sequential() + .filter(item -> item.containsKey(KEY_BACKUP_ID_HASH)) + .map(item -> new StoredBackupAttributes( + Instant.ofEpochSecond(AttributeValues.getLong(item, ATTR_LAST_REFRESH, 0L)), + Instant.ofEpochSecond(AttributeValues.getLong(item, ATTR_LAST_MEDIA_REFRESH, 0L)), + AttributeValues.getLong(item, ATTR_MEDIA_BYTES_USED, 0L), + AttributeValues.getLong(item, ATTR_MEDIA_COUNT, 0L))); + } + Flux getExpiredBackups(final int segments, final Scheduler scheduler, final Instant purgeTime) { if (segments < 1) { throw new IllegalArgumentException("Total number of segments must be positive"); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/StoredBackupAttributes.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/StoredBackupAttributes.java new file mode 100644 index 000000000..9a36db8ac --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/StoredBackupAttributes.java @@ -0,0 +1,19 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.backup; + +import java.time.Instant; + +/** + * Attributes stored in the backups table for a single backup id + * + * @param lastRefresh The last time the record was updated with a messages or media tier credential + * @param lastMediaRefresh The last time the record was updated with a media tier credential + * @param bytesUsed The number of media bytes used by the backup + * @param numObjects The number of media objects used byt the backup + */ +public record StoredBackupAttributes( + Instant lastRefresh, Instant lastMediaRefresh, + long bytesUsed, long numObjects) {} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackupMetricsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackupMetricsCommand.java new file mode 100644 index 000000000..2e3d9d6c1 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/BackupMetricsCommand.java @@ -0,0 +1,142 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.workers; + +import io.dropwizard.core.Application; +import io.dropwizard.core.cli.Cli; +import io.dropwizard.core.cli.EnvironmentCommand; +import io.dropwizard.core.setup.Environment; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Metrics; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.signal.libsignal.zkgroup.backups.BackupLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.backup.BackupManager; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; +import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler; +import reactor.core.scheduler.Schedulers; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; + +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; + +public class BackupMetricsCommand extends EnvironmentCommand { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + private static final String SEGMENT_COUNT_ARGUMENT = "segments"; + private static final int DEFAULT_SEGMENT_COUNT = 1; + + private final Clock clock; + + public BackupMetricsCommand(final Clock clock) { + super(new Application<>() { + @Override + public void run(final WhisperServerConfiguration configuration, final Environment environment) { + } + }, "backup-metrics", "Reports metrics about backups"); + this.clock = clock; + } + + @Override + public void configure(final Subparser subparser) { + super.configure(subparser); + + subparser.addArgument("--segments") + .type(Integer.class) + .dest(SEGMENT_COUNT_ARGUMENT) + .required(false) + .setDefault(DEFAULT_SEGMENT_COUNT) + .help("The total number of segments for a DynamoDB scan"); + } + + @Override + protected void run(final Environment environment, final Namespace namespace, + final WhisperServerConfiguration configuration) throws Exception { + + UncaughtExceptionHandler.register(); + final CommandDependencies commandDependencies = CommandDependencies.build(getName(), environment, configuration); + MetricsUtil.configureRegistries(configuration, environment, commandDependencies.dynamicConfigurationManager()); + + final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT_ARGUMENT)); + logger.info("Crawling backups for metrics with {} segments and {} processors", + segments, + Runtime.getRuntime().availableProcessors()); + + try { + environment.lifecycle().getManagedObjects().forEach(managedObject -> { + try { + managedObject.start(); + } catch (final Exception e) { + logger.error("Failed to start managed object", e); + throw new RuntimeException(e); + } + }); + + final DistributionSummary numObjectsMediaTier = Metrics.summary(name(getClass(), "numObjects"), + "tier", BackupLevel.MEDIA.name()); + final DistributionSummary bytesUsedMediaTier = Metrics.summary(name(getClass(), "bytesUsed"), + "tier", BackupLevel.MEDIA.name()); + final DistributionSummary numObjectsMessagesTier = Metrics.summary(name(getClass(), "numObjects"), + "tier", BackupLevel.MESSAGES.name()); + final DistributionSummary bytesUsedMessagesTier = Metrics.summary(name(getClass(), "bytesUsed"), + "tier", BackupLevel.MESSAGES.name()); + + final DistributionSummary timeSinceLastRefresh = Metrics.summary(name(getClass(), + "timeSinceLastRefresh")); + final DistributionSummary timeSinceLastMediaRefresh = Metrics.summary(name(getClass(), + "timeSinceLastMediaRefresh")); + final String backupsCounterName = name(getClass(), "backups"); + + final BackupManager backupManager = commandDependencies.backupManager(); + final Long backupsExpired = backupManager + .listBackupAttributes(segments, Schedulers.parallel()) + .doOnNext(backupMetadata -> { + final boolean subscribed = backupMetadata.lastMediaRefresh().equals(backupMetadata.lastRefresh()); + if (subscribed) { + numObjectsMediaTier.record(backupMetadata.numObjects()); + bytesUsedMediaTier.record(backupMetadata.bytesUsed()); + } else { + numObjectsMessagesTier.record(backupMetadata.numObjects()); + bytesUsedMessagesTier.record(backupMetadata.bytesUsed()); + } + timeSinceLastRefresh.record(timeSince(backupMetadata.lastRefresh()).getSeconds()); + timeSinceLastMediaRefresh.record(timeSince(backupMetadata.lastMediaRefresh()).getSeconds()); + Metrics.counter(backupsCounterName, "subscribed", String.valueOf(subscribed)).increment(); + }) + .count() + .block(); + logger.info("Crawled {} backups", backupsExpired); + } finally { + environment.lifecycle().getManagedObjects().forEach(managedObject -> { + try { + managedObject.stop(); + } catch (final Exception e) { + logger.error("Failed to stop managed object", e); + } + }); + } + } + + private Duration timeSince(Instant t) { + Duration between = Duration.between(clock.instant(), t); + if (between.isNegative()) { + return Duration.ZERO; + } + return between; + } + + @Override + public void onError(final Cli cli, final Namespace namespace, final Throwable throwable) { + logger.error("Unhandled error", throwable); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupsDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupsDbTest.java index f42194f44..b93c19557 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupsDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupsDbTest.java @@ -6,8 +6,16 @@ package org.whispersystems.textsecuregcm.backup; +import static org.assertj.core.api.Assertions.assertThat; + import io.grpc.Status; import io.grpc.StatusRuntimeException; +import java.time.Instant; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -23,12 +31,6 @@ import org.whispersystems.textsecuregcm.util.CompletableFutureTestUtil; import org.whispersystems.textsecuregcm.util.TestClock; import org.whispersystems.textsecuregcm.util.TestRandomUtil; import reactor.core.scheduler.Schedulers; -import java.time.Instant; -import java.util.List; -import java.util.Optional; -import java.util.function.Function; - -import static org.assertj.core.api.Assertions.assertThat; public class BackupsDbTest { @@ -207,6 +209,45 @@ public class BackupsDbTest { } } + @Test + public void list() { + final AuthenticatedBackupUser u1 = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MESSAGES); + final AuthenticatedBackupUser u2 = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MEDIA); + final AuthenticatedBackupUser u3 = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MEDIA); + + // add at least one message backup, so we can describe it + testClock.pin(Instant.ofEpochSecond(10)); + Stream.of(u1, u2, u3).forEach(u -> backupsDb.addMessageBackup(u).join()); + + testClock.pin(Instant.ofEpochSecond(20)); + backupsDb.trackMedia(u2, 10, 100).join(); + + testClock.pin(Instant.ofEpochSecond(30)); + backupsDb.trackMedia(u3, 1, 1000).join(); + + final List sbms = backupsDb.listBackupAttributes(1, Schedulers.immediate()) + .sort(Comparator.comparing(StoredBackupAttributes::lastRefresh)) + .collectList() + .block(); + + final StoredBackupAttributes sbm1 = sbms.get(0); + assertThat(sbm1.bytesUsed()).isEqualTo(0); + assertThat(sbm1.numObjects()).isEqualTo(0); + assertThat(sbm1.lastRefresh()).isEqualTo(Instant.ofEpochSecond(10)); + assertThat(sbm1.lastMediaRefresh()).isEqualTo(Instant.EPOCH); + + + final StoredBackupAttributes sbm2 = sbms.get(1); + assertThat(sbm2.bytesUsed()).isEqualTo(100); + assertThat(sbm2.numObjects()).isEqualTo(10); + assertThat(sbm2.lastRefresh()).isEqualTo(sbm2.lastMediaRefresh()).isEqualTo(Instant.ofEpochSecond(20)); + + final StoredBackupAttributes sbm3 = sbms.get(2); + assertThat(sbm3.bytesUsed()).isEqualTo(1000); + assertThat(sbm3.numObjects()).isEqualTo(1); + assertThat(sbm3.lastRefresh()).isEqualTo(sbm3.lastMediaRefresh()).isEqualTo(Instant.ofEpochSecond(30)); + } + private AuthenticatedBackupUser backupUser(final byte[] backupId, final BackupLevel backupLevel) { return new AuthenticatedBackupUser(backupId, backupLevel, "myBackupDir", "myMediaDir"); }