Add some additional backup metrics
This commit is contained in:
parent
4a42ff562d
commit
2b07a21477
|
@ -178,6 +178,7 @@ import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper
|
||||||
import org.whispersystems.textsecuregcm.mappers.RegistrationServiceSenderExceptionMapper;
|
import org.whispersystems.textsecuregcm.mappers.RegistrationServiceSenderExceptionMapper;
|
||||||
import org.whispersystems.textsecuregcm.mappers.ServerRejectedExceptionMapper;
|
import org.whispersystems.textsecuregcm.mappers.ServerRejectedExceptionMapper;
|
||||||
import org.whispersystems.textsecuregcm.mappers.SubscriptionExceptionMapper;
|
import org.whispersystems.textsecuregcm.mappers.SubscriptionExceptionMapper;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.BackupMetrics;
|
||||||
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
|
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
|
||||||
import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener;
|
import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener;
|
||||||
import org.whispersystems.textsecuregcm.metrics.MetricsHttpChannelListener;
|
import org.whispersystems.textsecuregcm.metrics.MetricsHttpChannelListener;
|
||||||
|
@ -955,6 +956,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
Set.of(websocketServletPath, provisioningWebsocketServletPath, "/health-check"));
|
Set.of(websocketServletPath, provisioningWebsocketServletPath, "/health-check"));
|
||||||
metricsHttpChannelListener.configure(environment);
|
metricsHttpChannelListener.configure(environment);
|
||||||
final MessageMetrics messageMetrics = new MessageMetrics();
|
final MessageMetrics messageMetrics = new MessageMetrics();
|
||||||
|
final BackupMetrics backupMetrics = new BackupMetrics();
|
||||||
|
|
||||||
// BufferingInterceptor is needed on the base environment but not the WebSocketEnvironment,
|
// BufferingInterceptor is needed on the base environment but not the WebSocketEnvironment,
|
||||||
// because we handle serialization of http responses on the websocket on our own and can
|
// because we handle serialization of http responses on the websocket on our own and can
|
||||||
|
@ -1071,7 +1073,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
registrationLockVerificationManager, rateLimiters),
|
registrationLockVerificationManager, rateLimiters),
|
||||||
new AttachmentControllerV4(rateLimiters, gcsAttachmentGenerator, tusAttachmentGenerator,
|
new AttachmentControllerV4(rateLimiters, gcsAttachmentGenerator, tusAttachmentGenerator,
|
||||||
experimentEnrollmentManager),
|
experimentEnrollmentManager),
|
||||||
new ArchiveController(backupAuthManager, backupManager),
|
new ArchiveController(backupAuthManager, backupManager, backupMetrics),
|
||||||
new CallRoutingControllerV2(rateLimiters, cloudflareTurnCredentialsManager),
|
new CallRoutingControllerV2(rateLimiters, cloudflareTurnCredentialsManager),
|
||||||
new CallLinkController(rateLimiters, callingGenericZkSecretParams),
|
new CallLinkController(rateLimiters, callingGenericZkSecretParams),
|
||||||
new CertificateController(new CertificateGenerator(config.getDeliveryCertificate().certificate().value(),
|
new CertificateController(new CertificateGenerator(config.getDeliveryCertificate().certificate().value(),
|
||||||
|
|
|
@ -46,7 +46,7 @@ public class BackupManager {
|
||||||
|
|
||||||
static final String MESSAGE_BACKUP_NAME = "messageBackup";
|
static final String MESSAGE_BACKUP_NAME = "messageBackup";
|
||||||
public static final long MAX_TOTAL_BACKUP_MEDIA_BYTES = DataSize.gibibytes(100).toBytes();
|
public static final long MAX_TOTAL_BACKUP_MEDIA_BYTES = DataSize.gibibytes(100).toBytes();
|
||||||
static final long MAX_MEDIA_OBJECT_SIZE = DataSize.mebibytes(101).toBytes();
|
public static final long MAX_MEDIA_OBJECT_SIZE = DataSize.mebibytes(101).toBytes();
|
||||||
|
|
||||||
// If the last media usage recalculation is over MAX_QUOTA_STALENESS, force a recalculation before quota enforcement.
|
// If the last media usage recalculation is over MAX_QUOTA_STALENESS, force a recalculation before quota enforcement.
|
||||||
static final Duration MAX_QUOTA_STALENESS = Duration.ofDays(1);
|
static final Duration MAX_QUOTA_STALENESS = Duration.ofDays(1);
|
||||||
|
|
|
@ -5,6 +5,8 @@
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.controllers;
|
package org.whispersystems.textsecuregcm.controllers;
|
||||||
|
|
||||||
|
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonValue;
|
import com.fasterxml.jackson.annotation.JsonValue;
|
||||||
import com.fasterxml.jackson.core.JsonParser;
|
import com.fasterxml.jackson.core.JsonParser;
|
||||||
|
@ -13,13 +15,16 @@ import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
||||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.net.HttpHeaders;
|
||||||
import io.dropwizard.auth.Auth;
|
import io.dropwizard.auth.Auth;
|
||||||
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
import io.micrometer.core.instrument.Tag;
|
||||||
|
import io.micrometer.core.instrument.Tags;
|
||||||
import io.swagger.v3.oas.annotations.Operation;
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
import io.swagger.v3.oas.annotations.Parameter;
|
import io.swagger.v3.oas.annotations.Parameter;
|
||||||
import io.swagger.v3.oas.annotations.media.Content;
|
import io.swagger.v3.oas.annotations.media.Content;
|
||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
|
||||||
import jakarta.validation.Valid;
|
import jakarta.validation.Valid;
|
||||||
import jakarta.validation.constraints.Max;
|
import jakarta.validation.constraints.Max;
|
||||||
import jakarta.validation.constraints.Min;
|
import jakarta.validation.constraints.Min;
|
||||||
|
@ -69,6 +74,8 @@ import org.whispersystems.textsecuregcm.backup.CopyParameters;
|
||||||
import org.whispersystems.textsecuregcm.backup.CopyResult;
|
import org.whispersystems.textsecuregcm.backup.CopyResult;
|
||||||
import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters;
|
import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters;
|
||||||
import org.whispersystems.textsecuregcm.entities.RemoteAttachment;
|
import org.whispersystems.textsecuregcm.entities.RemoteAttachment;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.BackupMetrics;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
||||||
import org.whispersystems.textsecuregcm.util.BackupAuthCredentialAdapter;
|
import org.whispersystems.textsecuregcm.util.BackupAuthCredentialAdapter;
|
||||||
import org.whispersystems.textsecuregcm.util.ByteArrayAdapter;
|
import org.whispersystems.textsecuregcm.util.ByteArrayAdapter;
|
||||||
import org.whispersystems.textsecuregcm.util.ByteArrayBase64UrlAdapter;
|
import org.whispersystems.textsecuregcm.util.ByteArrayBase64UrlAdapter;
|
||||||
|
@ -80,7 +87,7 @@ import org.whispersystems.websocket.auth.ReadOnly;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
@Path("/v1/archives")
|
@Path("/v1/archives")
|
||||||
@Tag(name = "Archive")
|
@io.swagger.v3.oas.annotations.tags.Tag(name = "Archive")
|
||||||
public class ArchiveController {
|
public class ArchiveController {
|
||||||
|
|
||||||
public final static String X_SIGNAL_ZK_AUTH = "X-Signal-ZK-Auth";
|
public final static String X_SIGNAL_ZK_AUTH = "X-Signal-ZK-Auth";
|
||||||
|
@ -88,12 +95,15 @@ public class ArchiveController {
|
||||||
|
|
||||||
private final BackupAuthManager backupAuthManager;
|
private final BackupAuthManager backupAuthManager;
|
||||||
private final BackupManager backupManager;
|
private final BackupManager backupManager;
|
||||||
|
private final BackupMetrics backupMetrics;
|
||||||
|
|
||||||
public ArchiveController(
|
public ArchiveController(
|
||||||
final BackupAuthManager backupAuthManager,
|
final BackupAuthManager backupAuthManager,
|
||||||
final BackupManager backupManager) {
|
final BackupManager backupManager,
|
||||||
|
final BackupMetrics backupMetrics) {
|
||||||
this.backupAuthManager = backupAuthManager;
|
this.backupAuthManager = backupAuthManager;
|
||||||
this.backupManager = backupManager;
|
this.backupManager = backupManager;
|
||||||
|
this.backupMetrics = backupMetrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
public record SetBackupIdRequest(
|
public record SetBackupIdRequest(
|
||||||
|
@ -247,6 +257,7 @@ public class ArchiveController {
|
||||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||||
public CompletionStage<BackupAuthCredentialsResponse> getBackupZKCredentials(
|
public CompletionStage<BackupAuthCredentialsResponse> getBackupZKCredentials(
|
||||||
@Mutable @Auth AuthenticatedDevice auth,
|
@Mutable @Auth AuthenticatedDevice auth,
|
||||||
|
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||||
@NotNull @QueryParam("redemptionStartSeconds") Long startSeconds,
|
@NotNull @QueryParam("redemptionStartSeconds") Long startSeconds,
|
||||||
@NotNull @QueryParam("redemptionEndSeconds") Long endSeconds) {
|
@NotNull @QueryParam("redemptionEndSeconds") Long endSeconds) {
|
||||||
|
|
||||||
|
@ -258,11 +269,17 @@ public class ArchiveController {
|
||||||
auth.getAccount(),
|
auth.getAccount(),
|
||||||
credentialType,
|
credentialType,
|
||||||
Instant.ofEpochSecond(startSeconds), Instant.ofEpochSecond(endSeconds))
|
Instant.ofEpochSecond(startSeconds), Instant.ofEpochSecond(endSeconds))
|
||||||
.thenAccept(credentials -> credentialsByType.put(credentialType, credentials.stream()
|
.thenAccept(credentials -> {
|
||||||
|
backupMetrics.updateGetCredentialCounter(
|
||||||
|
UserAgentTagUtil.getPlatformTag(userAgent),
|
||||||
|
credentialType,
|
||||||
|
credentials.size());
|
||||||
|
credentialsByType.put(credentialType, credentials.stream()
|
||||||
.map(credential -> new BackupAuthCredentialsResponse.BackupAuthCredential(
|
.map(credential -> new BackupAuthCredentialsResponse.BackupAuthCredential(
|
||||||
credential.credential().serialize(),
|
credential.credential().serialize(),
|
||||||
credential.redemptionTime().getEpochSecond()))
|
credential.redemptionTime().getEpochSecond()))
|
||||||
.toList())))
|
.toList());
|
||||||
|
}))
|
||||||
.toArray(CompletableFuture[]::new))
|
.toArray(CompletableFuture[]::new))
|
||||||
.thenApply(ignored -> new BackupAuthCredentialsResponse(credentialsByType.entrySet().stream()
|
.thenApply(ignored -> new BackupAuthCredentialsResponse(credentialsByType.entrySet().stream()
|
||||||
.collect(Collectors.toMap(
|
.collect(Collectors.toMap(
|
||||||
|
@ -586,6 +603,7 @@ public class ArchiveController {
|
||||||
@ApiResponseZkAuth
|
@ApiResponseZkAuth
|
||||||
public CompletionStage<CopyMediaResponse> copyMedia(
|
public CompletionStage<CopyMediaResponse> copyMedia(
|
||||||
@ReadOnly @Auth final Optional<AuthenticatedDevice> account,
|
@ReadOnly @Auth final Optional<AuthenticatedDevice> account,
|
||||||
|
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||||
|
|
||||||
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
|
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
|
||||||
@NotNull
|
@NotNull
|
||||||
|
@ -605,6 +623,7 @@ public class ArchiveController {
|
||||||
.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature))
|
.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature))
|
||||||
.flatMap(backupUser -> backupManager.copyToBackup(backupUser, List.of(copyMediaRequest.toCopyParameters()))
|
.flatMap(backupUser -> backupManager.copyToBackup(backupUser, List.of(copyMediaRequest.toCopyParameters()))
|
||||||
.next()
|
.next()
|
||||||
|
.doOnNext(result -> backupMetrics.updateCopyCounter(result, UserAgentTagUtil.getPlatformTag(userAgent)))
|
||||||
.map(copyResult -> switch (copyResult.outcome()) {
|
.map(copyResult -> switch (copyResult.outcome()) {
|
||||||
case SUCCESS -> new CopyMediaResponse(copyResult.cdn());
|
case SUCCESS -> new CopyMediaResponse(copyResult.cdn());
|
||||||
case SOURCE_WRONG_LENGTH -> throw new BadRequestException("Invalid length");
|
case SOURCE_WRONG_LENGTH -> throw new BadRequestException("Invalid length");
|
||||||
|
@ -683,6 +702,7 @@ public class ArchiveController {
|
||||||
@ApiResponseZkAuth
|
@ApiResponseZkAuth
|
||||||
public CompletionStage<Response> copyMedia(
|
public CompletionStage<Response> copyMedia(
|
||||||
@ReadOnly @Auth final Optional<AuthenticatedDevice> account,
|
@ReadOnly @Auth final Optional<AuthenticatedDevice> account,
|
||||||
|
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||||
|
|
||||||
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
|
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
|
||||||
@NotNull
|
@NotNull
|
||||||
|
@ -701,6 +721,7 @@ public class ArchiveController {
|
||||||
final Stream<CopyParameters> copyParams = copyMediaRequest.items().stream().map(CopyMediaRequest::toCopyParameters);
|
final Stream<CopyParameters> copyParams = copyMediaRequest.items().stream().map(CopyMediaRequest::toCopyParameters);
|
||||||
return Mono.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature))
|
return Mono.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature))
|
||||||
.flatMapMany(backupUser -> backupManager.copyToBackup(backupUser, copyParams.toList()))
|
.flatMapMany(backupUser -> backupManager.copyToBackup(backupUser, copyParams.toList()))
|
||||||
|
.doOnNext(result -> backupMetrics.updateCopyCounter(result, UserAgentTagUtil.getPlatformTag(userAgent)))
|
||||||
.map(CopyMediaBatchResponse.Entry::fromCopyResult)
|
.map(CopyMediaBatchResponse.Entry::fromCopyResult)
|
||||||
.collectList()
|
.collectList()
|
||||||
.map(list -> Response.status(207).entity(new CopyMediaBatchResponse(list)).build())
|
.map(list -> Response.status(207).entity(new CopyMediaBatchResponse(list)).build())
|
||||||
|
|
|
@ -8,6 +8,9 @@ import com.google.protobuf.ByteString;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
import io.micrometer.core.instrument.Tag;
|
||||||
|
import io.micrometer.core.instrument.Tags;
|
||||||
import org.signal.chat.backup.CopyMediaRequest;
|
import org.signal.chat.backup.CopyMediaRequest;
|
||||||
import org.signal.chat.backup.CopyMediaResponse;
|
import org.signal.chat.backup.CopyMediaResponse;
|
||||||
import org.signal.chat.backup.DeleteAllRequest;
|
import org.signal.chat.backup.DeleteAllRequest;
|
||||||
|
@ -36,15 +39,22 @@ import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
|
||||||
import org.whispersystems.textsecuregcm.backup.BackupManager;
|
import org.whispersystems.textsecuregcm.backup.BackupManager;
|
||||||
import org.whispersystems.textsecuregcm.backup.CopyParameters;
|
import org.whispersystems.textsecuregcm.backup.CopyParameters;
|
||||||
import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters;
|
import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters;
|
||||||
|
import org.whispersystems.textsecuregcm.controllers.ArchiveController;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.BackupMetrics;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||||
|
|
||||||
public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.BackupsAnonymousImplBase {
|
public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.BackupsAnonymousImplBase {
|
||||||
|
|
||||||
private final BackupManager backupManager;
|
private final BackupManager backupManager;
|
||||||
|
private final BackupMetrics backupMetrics;
|
||||||
|
|
||||||
public BackupsAnonymousGrpcService(final BackupManager backupManager) {
|
public BackupsAnonymousGrpcService(final BackupManager backupManager, final BackupMetrics backupMetrics) {
|
||||||
this.backupManager = backupManager;
|
this.backupManager = backupManager;
|
||||||
|
this.backupMetrics = backupMetrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -115,6 +125,9 @@ public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.Bac
|
||||||
fromUnsignedExact(item.getObjectLength()),
|
fromUnsignedExact(item.getObjectLength()),
|
||||||
new MediaEncryptionParameters(item.getEncryptionKey().toByteArray(), item.getHmacKey().toByteArray()),
|
new MediaEncryptionParameters(item.getEncryptionKey().toByteArray(), item.getHmacKey().toByteArray()),
|
||||||
item.getMediaId().toByteArray())).toList()))
|
item.getMediaId().toByteArray())).toList()))
|
||||||
|
.doOnNext(result -> backupMetrics.updateCopyCounter(
|
||||||
|
result,
|
||||||
|
UserAgentTagUtil.getPlatformTag(RequestAttributesUtil.getUserAgent().orElse(null))))
|
||||||
.map(copyResult -> {
|
.map(copyResult -> {
|
||||||
CopyMediaResponse.Builder builder = CopyMediaResponse
|
CopyMediaResponse.Builder builder = CopyMediaResponse
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
|
|
|
@ -9,6 +9,9 @@ import io.grpc.Status;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
import io.micrometer.core.instrument.Tag;
|
||||||
|
import io.micrometer.core.instrument.Tags;
|
||||||
import org.signal.chat.backup.GetBackupAuthCredentialsRequest;
|
import org.signal.chat.backup.GetBackupAuthCredentialsRequest;
|
||||||
import org.signal.chat.backup.GetBackupAuthCredentialsResponse;
|
import org.signal.chat.backup.GetBackupAuthCredentialsResponse;
|
||||||
import org.signal.chat.backup.ReactorBackupsGrpc;
|
import org.signal.chat.backup.ReactorBackupsGrpc;
|
||||||
|
@ -24,18 +27,25 @@ import org.signal.libsignal.zkgroup.receipts.ReceiptCredentialPresentation;
|
||||||
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticatedDevice;
|
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticatedDevice;
|
||||||
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticationUtil;
|
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticationUtil;
|
||||||
import org.whispersystems.textsecuregcm.backup.BackupAuthManager;
|
import org.whispersystems.textsecuregcm.backup.BackupAuthManager;
|
||||||
|
import org.whispersystems.textsecuregcm.controllers.ArchiveController;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.BackupMetrics;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||||
|
|
||||||
public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase {
|
public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase {
|
||||||
|
|
||||||
private final AccountsManager accountManager;
|
private final AccountsManager accountManager;
|
||||||
private final BackupAuthManager backupAuthManager;
|
private final BackupAuthManager backupAuthManager;
|
||||||
|
private final BackupMetrics backupMetrics;
|
||||||
|
|
||||||
public BackupsGrpcService(final AccountsManager accountManager, final BackupAuthManager backupAuthManager) {
|
public BackupsGrpcService(final AccountsManager accountManager, final BackupAuthManager backupAuthManager, final BackupMetrics backupMetrics) {
|
||||||
this.accountManager = accountManager;
|
this.accountManager = accountManager;
|
||||||
this.backupAuthManager = backupAuthManager;
|
this.backupAuthManager = backupAuthManager;
|
||||||
|
this.backupMetrics = backupMetrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -67,19 +77,26 @@ public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<GetBackupAuthCredentialsResponse> getBackupAuthCredentials(GetBackupAuthCredentialsRequest request) {
|
public Mono<GetBackupAuthCredentialsResponse> getBackupAuthCredentials(GetBackupAuthCredentialsRequest request) {
|
||||||
|
final Tag platformTag = UserAgentTagUtil.getPlatformTag(RequestAttributesUtil.getUserAgent().orElse(null));
|
||||||
return authenticatedAccount().flatMap(account -> {
|
return authenticatedAccount().flatMap(account -> {
|
||||||
|
|
||||||
final Mono<List<BackupAuthManager.Credential>> messageCredentials = Mono.fromCompletionStage(() ->
|
final Mono<List<BackupAuthManager.Credential>> messageCredentials = Mono.fromCompletionStage(() ->
|
||||||
backupAuthManager.getBackupAuthCredentials(
|
backupAuthManager.getBackupAuthCredentials(
|
||||||
account,
|
account,
|
||||||
BackupCredentialType.MESSAGES,
|
BackupCredentialType.MESSAGES,
|
||||||
Instant.ofEpochSecond(request.getRedemptionStart()),
|
Instant.ofEpochSecond(request.getRedemptionStart()),
|
||||||
Instant.ofEpochSecond(request.getRedemptionStop())));
|
Instant.ofEpochSecond(request.getRedemptionStop())))
|
||||||
|
.doOnSuccess(credentials ->
|
||||||
|
backupMetrics.updateGetCredentialCounter(platformTag, BackupCredentialType.MESSAGES, credentials.size()));
|
||||||
|
|
||||||
final Mono<List<BackupAuthManager.Credential>> mediaCredentials = Mono.fromCompletionStage(() ->
|
final Mono<List<BackupAuthManager.Credential>> mediaCredentials = Mono.fromCompletionStage(() ->
|
||||||
backupAuthManager.getBackupAuthCredentials(
|
backupAuthManager.getBackupAuthCredentials(
|
||||||
account,
|
account,
|
||||||
BackupCredentialType.MEDIA,
|
BackupCredentialType.MEDIA,
|
||||||
Instant.ofEpochSecond(request.getRedemptionStart()),
|
Instant.ofEpochSecond(request.getRedemptionStart()),
|
||||||
Instant.ofEpochSecond(request.getRedemptionStop())));
|
Instant.ofEpochSecond(request.getRedemptionStop())))
|
||||||
|
.doOnSuccess(credentials ->
|
||||||
|
backupMetrics.updateGetCredentialCounter(platformTag, BackupCredentialType.MEDIA, credentials.size()));
|
||||||
|
|
||||||
return messageCredentials.zipWith(mediaCredentials, (messageCreds, mediaCreds) ->
|
return messageCredentials.zipWith(mediaCredentials, (messageCreds, mediaCreds) ->
|
||||||
GetBackupAuthCredentialsResponse.newBuilder()
|
GetBackupAuthCredentialsResponse.newBuilder()
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2025 Signal Messenger, LLC
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.whispersystems.textsecuregcm.metrics;
|
||||||
|
|
||||||
|
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry;
|
||||||
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
import io.micrometer.core.instrument.Tag;
|
||||||
|
import io.micrometer.core.instrument.Tags;
|
||||||
|
import org.signal.libsignal.zkgroup.backups.BackupCredentialType;
|
||||||
|
import org.whispersystems.textsecuregcm.backup.CopyResult;
|
||||||
|
|
||||||
|
public class BackupMetrics {
|
||||||
|
|
||||||
|
private final static String COPY_MEDIA_COUNTER_NAME = name(BackupMetrics.class, "copyMedia");
|
||||||
|
private final static String GET_BACKUP_CREDENTIALS_NAME = name(BackupMetrics.class, "getBackupCredentials");
|
||||||
|
|
||||||
|
|
||||||
|
private MeterRegistry registry;
|
||||||
|
|
||||||
|
public BackupMetrics() {
|
||||||
|
this(Metrics.globalRegistry);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
BackupMetrics(MeterRegistry registry) {
|
||||||
|
this.registry = registry;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateCopyCounter(final CopyResult copyResult, final Tag platformTag) {
|
||||||
|
registry.counter(COPY_MEDIA_COUNTER_NAME, Tags.of(
|
||||||
|
platformTag,
|
||||||
|
Tag.of("outcome", copyResult.outcome().name().toLowerCase())))
|
||||||
|
.increment();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateGetCredentialCounter(final Tag platformTag, BackupCredentialType credentialType,
|
||||||
|
final int numCredentials) {
|
||||||
|
Metrics.counter(GET_BACKUP_CREDENTIALS_NAME, Tags.of(
|
||||||
|
platformTag,
|
||||||
|
Tag.of("num", Integer.toString(numCredentials)),
|
||||||
|
Tag.of("type", credentialType.name().toLowerCase())))
|
||||||
|
.increment();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -12,6 +12,7 @@ import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
|
||||||
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
|
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
|
||||||
import org.whispersystems.textsecuregcm.util.ua.UserAgent;
|
import org.whispersystems.textsecuregcm.util.ua.UserAgent;
|
||||||
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
|
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class for extracting platform/version metrics tags from User-Agent strings.
|
* Utility class for extracting platform/version metrics tags from User-Agent strings.
|
||||||
|
@ -26,15 +27,19 @@ public class UserAgentTagUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Tag getPlatformTag(final String userAgentString) {
|
public static Tag getPlatformTag(final String userAgentString) {
|
||||||
String platform;
|
|
||||||
|
UserAgent userAgent = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
platform = UserAgentUtil.parseUserAgentString(userAgentString).getPlatform().name().toLowerCase();
|
userAgent = UserAgentUtil.parseUserAgentString(userAgentString);
|
||||||
} catch (final UnrecognizedUserAgentException e) {
|
} catch (final UnrecognizedUserAgentException ignored) {
|
||||||
platform = "unrecognized";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return Tag.of(PLATFORM_TAG, platform);
|
return getPlatformTag(userAgent);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Tag getPlatformTag(@Nullable final UserAgent userAgent) {
|
||||||
|
return Tag.of(PLATFORM_TAG, userAgent != null ? userAgent.getPlatform().name().toLowerCase() : "unrecognized");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Optional<Tag> getClientVersionTag(final String userAgentString, final ClientReleaseManager clientReleaseManager) {
|
public static Optional<Tag> getClientVersionTag(final String userAgentString, final ClientReleaseManager clientReleaseManager) {
|
||||||
|
|
|
@ -93,7 +93,14 @@ public class BackupMetricsCommand extends AbstractCommandWithDependencies {
|
||||||
}
|
}
|
||||||
timeSinceLastRefresh.record(timeSince(backupMetadata.lastRefresh()).getSeconds());
|
timeSinceLastRefresh.record(timeSince(backupMetadata.lastRefresh()).getSeconds());
|
||||||
timeSinceLastMediaRefresh.record(timeSince(backupMetadata.lastMediaRefresh()).getSeconds());
|
timeSinceLastMediaRefresh.record(timeSince(backupMetadata.lastMediaRefresh()).getSeconds());
|
||||||
Metrics.counter(backupsCounterName, "subscribed", String.valueOf(subscribed)).increment();
|
|
||||||
|
// Report that the backup is out of quota if it cannot store a max size media object
|
||||||
|
final boolean quotaExhausted = backupMetadata.bytesUsed() >=
|
||||||
|
(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES - BackupManager.MAX_MEDIA_OBJECT_SIZE);
|
||||||
|
|
||||||
|
Metrics.counter(backupsCounterName,
|
||||||
|
"subscribed", String.valueOf(subscribed),
|
||||||
|
"quotaExhausted", String.valueOf(quotaExhausted)).increment();
|
||||||
})
|
})
|
||||||
.count()
|
.count()
|
||||||
.block();
|
.block();
|
||||||
|
|
|
@ -72,6 +72,7 @@ import org.whispersystems.textsecuregcm.entities.RemoteAttachment;
|
||||||
import org.whispersystems.textsecuregcm.mappers.CompletionExceptionMapper;
|
import org.whispersystems.textsecuregcm.mappers.CompletionExceptionMapper;
|
||||||
import org.whispersystems.textsecuregcm.mappers.GrpcStatusRuntimeExceptionMapper;
|
import org.whispersystems.textsecuregcm.mappers.GrpcStatusRuntimeExceptionMapper;
|
||||||
import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper;
|
import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.BackupMetrics;
|
||||||
import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
|
import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
|
||||||
import org.whispersystems.textsecuregcm.util.EnumMapUtil;
|
import org.whispersystems.textsecuregcm.util.EnumMapUtil;
|
||||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||||
|
@ -94,7 +95,7 @@ public class ArchiveControllerTest {
|
||||||
.addProvider(new RateLimitExceededExceptionMapper())
|
.addProvider(new RateLimitExceededExceptionMapper())
|
||||||
.setMapper(SystemMapper.jsonMapper())
|
.setMapper(SystemMapper.jsonMapper())
|
||||||
.setTestContainerFactory(new GrizzlyWebTestContainerFactory())
|
.setTestContainerFactory(new GrizzlyWebTestContainerFactory())
|
||||||
.addResource(new ArchiveController(backupAuthManager, backupManager))
|
.addResource(new ArchiveController(backupAuthManager, backupManager, new BackupMetrics()))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
private final UUID aci = UUID.randomUUID();
|
private final UUID aci = UUID.randomUUID();
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.whispersystems.textsecuregcm.backup.BackupManager;
|
||||||
import org.whispersystems.textsecuregcm.backup.BackupUploadDescriptor;
|
import org.whispersystems.textsecuregcm.backup.BackupUploadDescriptor;
|
||||||
import org.whispersystems.textsecuregcm.backup.CopyResult;
|
import org.whispersystems.textsecuregcm.backup.CopyResult;
|
||||||
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
|
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.BackupMetrics;
|
||||||
import org.whispersystems.textsecuregcm.util.TestRandomUtil;
|
import org.whispersystems.textsecuregcm.util.TestRandomUtil;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
|
@ -74,7 +75,7 @@ class BackupsAnonymousGrpcServiceTest extends
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected BackupsAnonymousGrpcService createServiceBeforeEachTest() {
|
protected BackupsAnonymousGrpcService createServiceBeforeEachTest() {
|
||||||
return new BackupsAnonymousGrpcService(backupManager);
|
return new BackupsAnonymousGrpcService(backupManager, new BackupMetrics());
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
|
|
|
@ -54,6 +54,7 @@ import org.signal.libsignal.zkgroup.receipts.ServerZkReceiptOperations;
|
||||||
import org.whispersystems.textsecuregcm.backup.BackupAuthManager;
|
import org.whispersystems.textsecuregcm.backup.BackupAuthManager;
|
||||||
import org.whispersystems.textsecuregcm.backup.BackupAuthTestUtil;
|
import org.whispersystems.textsecuregcm.backup.BackupAuthTestUtil;
|
||||||
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
|
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.BackupMetrics;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
import org.whispersystems.textsecuregcm.util.EnumMapUtil;
|
import org.whispersystems.textsecuregcm.util.EnumMapUtil;
|
||||||
|
@ -77,7 +78,7 @@ class BackupsGrpcServiceTest extends SimpleBaseGrpcTest<BackupsGrpcService, Back
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected BackupsGrpcService createServiceBeforeEachTest() {
|
protected BackupsGrpcService createServiceBeforeEachTest() {
|
||||||
return new BackupsGrpcService(accountsManager, backupAuthManager);
|
return new BackupsGrpcService(accountsManager, backupAuthManager, new BackupMetrics());
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
|
|
Loading…
Reference in New Issue