From 202dd8e92dd6fed4d70c624ec54ada0255b8f5c6 Mon Sep 17 00:00:00 2001 From: ravi-signal <99042880+ravi-signal@users.noreply.github.com> Date: Tue, 28 Nov 2023 11:45:41 -0600 Subject: [PATCH] Add copy endpoint to ArchiveController Co-authored-by: Jonathan Klabunde Tomer <125505367+jkt-signal@users.noreply.github.com> Co-authored-by: Chris Eager <79161849+eager-signal@users.noreply.github.com> --- service/config/sample.yml | 29 ++ .../WhisperServerConfiguration.java | 10 + .../textsecuregcm/WhisperServerService.java | 23 +- .../textsecuregcm/backup/BackupManager.java | 317 ++++++------ .../backup/BackupMediaEncrypter.java | 103 ++++ .../textsecuregcm/backup/BackupsDb.java | 489 ++++++++++++++++++ ...ava => Cdn3BackupCredentialGenerator.java} | 8 +- .../backup/Cdn3RemoteStorageManager.java | 102 ++++ .../backup/InvalidLengthException.java | 10 + .../backup/MediaEncryptionParameters.java | 17 + .../backup/PublicKeyConflictException.java | 6 + .../backup/RemoteStorageManager.java | 38 ++ .../backup/SourceObjectNotFoundException.java | 11 + .../configuration/ClientCdnConfiguration.java | 53 ++ .../configuration/DynamoDbTables.java | 9 + .../controllers/ArchiveController.java | 263 +++++++++- .../textsecuregcm/util/ExceptionUtils.java | 33 +- .../backup/BackupManagerTest.java | 175 +++++-- .../textsecuregcm/backup/BackupsDbTest.java | 93 ++++ ...=> Cdn3BackupCredentialGeneratorTest.java} | 6 +- .../backup/Cdn3RemoteStorageManagerTest.java | 185 +++++++ .../controllers/ArchiveControllerTest.java | 157 ++++++ .../storage/DynamoDbExtensionSchema.java | 18 +- .../util/CompletableFutureTestUtil.java | 11 +- 24 files changed, 1918 insertions(+), 248 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupMediaEncrypter.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupsDb.java rename service/src/main/java/org/whispersystems/textsecuregcm/backup/{TusBackupCredentialGenerator.java => Cdn3BackupCredentialGenerator.java} (93%) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManager.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/backup/InvalidLengthException.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/backup/MediaEncryptionParameters.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/backup/PublicKeyConflictException.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/backup/RemoteStorageManager.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/backup/SourceObjectNotFoundException.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/ClientCdnConfiguration.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupsDbTest.java rename service/src/test/java/org/whispersystems/textsecuregcm/backup/{TusBackupCredentialGeneratorTest.java => Cdn3BackupCredentialGeneratorTest.java} (88%) create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManagerTest.java diff --git a/service/config/sample.yml b/service/config/sample.yml index 660441ca9..60471258d 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -87,6 +87,8 @@ dynamoDbTables: usernamesTableName: Example_Accounts_Usernames backups: tableName: Example_Backups + backupMedia: + tableName: Example_BackupMedia clientReleases: tableName: Example_ClientReleases deletedAccounts: @@ -219,6 +221,33 @@ cdn: bucket: cdn # S3 Bucket name region: us-west-2 # AWS region +clientCdn: + attachmentUrls: + 2: https://cdn2.example.com/attachments/ + caCertificates: + - | + -----BEGIN CERTIFICATE----- + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz + AAAAAAAAAAAAAAAAAAAA + -----END CERTIFICATE----- + dogstatsd: environment: dev diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 9ed3446d5..b39c79922 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -24,6 +24,7 @@ import org.whispersystems.textsecuregcm.configuration.AwsAttachmentsConfiguratio import org.whispersystems.textsecuregcm.configuration.BadgesConfiguration; import org.whispersystems.textsecuregcm.configuration.BraintreeConfiguration; import org.whispersystems.textsecuregcm.configuration.CdnConfiguration; +import org.whispersystems.textsecuregcm.configuration.ClientCdnConfiguration; import org.whispersystems.textsecuregcm.configuration.ClientReleaseConfiguration; import org.whispersystems.textsecuregcm.configuration.CommandStopListenerConfiguration; import org.whispersystems.textsecuregcm.configuration.DogstatsdConfiguration; @@ -101,6 +102,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private CdnConfiguration cdn; + @NotNull + @Valid + @JsonProperty + private ClientCdnConfiguration clientCdn; + @NotNull @Valid @JsonProperty @@ -405,6 +411,10 @@ public class WhisperServerConfiguration extends Configuration { return cdn; } + public ClientCdnConfiguration getClientCdn() { + return clientCdn; + } + public DogstatsdConfiguration getDatadogConfiguration() { return dogstatsd; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index aae8350c3..d1e3e3cf8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -79,7 +79,9 @@ import org.whispersystems.textsecuregcm.auth.WebsocketRefreshApplicationEventLis import org.whispersystems.textsecuregcm.auth.grpc.BasicCredentialAuthenticationInterceptor; import org.whispersystems.textsecuregcm.backup.BackupAuthManager; import org.whispersystems.textsecuregcm.backup.BackupManager; -import org.whispersystems.textsecuregcm.backup.TusBackupCredentialGenerator; +import org.whispersystems.textsecuregcm.backup.BackupsDb; +import org.whispersystems.textsecuregcm.backup.Cdn3BackupCredentialGenerator; +import org.whispersystems.textsecuregcm.backup.Cdn3RemoteStorageManager; import org.whispersystems.textsecuregcm.badges.ConfiguredProfileBadgeConverter; import org.whispersystems.textsecuregcm.badges.ResourceBundleLevelTranslator; import org.whispersystems.textsecuregcm.captcha.CaptchaChecker; @@ -420,6 +422,8 @@ public class WhisperServerService extends Application accountAuthFilter = new BasicCredentialAuthFilter.Builder().setAuthenticator( diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java index 69f49ecc8..b70c21771 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java @@ -7,19 +7,15 @@ package org.whispersystems.textsecuregcm.backup; import io.grpc.Status; import io.micrometer.core.instrument.Metrics; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; +import java.net.URI; import java.time.Clock; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Base64; -import java.util.Collections; -import java.util.HashMap; import java.util.HexFormat; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.signal.libsignal.protocol.InvalidKeyException; import org.signal.libsignal.protocol.ecc.ECPublicKey; import org.signal.libsignal.zkgroup.GenericServerSecretParams; @@ -29,66 +25,48 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; -import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.ExceptionUtils; -import org.whispersystems.textsecuregcm.util.Util; -import software.amazon.awssdk.core.SdkBytes; -import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; -import software.amazon.awssdk.services.dynamodb.model.AttributeValue; -import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; -import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; -import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; public class BackupManager { - private static final Logger logger = LoggerFactory.getLogger(BackupManager.class); static final String MESSAGE_BACKUP_NAME = "messageBackup"; - private static final int BACKUP_CDN = 3; + private static final long MAX_TOTAL_BACKUP_MEDIA_BYTES = 1024L * 1024L * 1024L * 50L; + private static final long MAX_MEDIA_OBJECT_SIZE = 1024L * 1024L * 101L; private static final String ZK_AUTHN_COUNTER_NAME = MetricsUtil.name(BackupManager.class, "authentication"); - private static final String ZK_AUTHZ_FAILURE_COUNTER_NAME = MetricsUtil.name(BackupManager.class, "authorizationFailure"); + private static final String ZK_AUTHZ_FAILURE_COUNTER_NAME = MetricsUtil.name(BackupManager.class, + "authorizationFailure"); private static final String SUCCESS_TAG_NAME = "success"; private static final String FAILURE_REASON_TAG_NAME = "reason"; + private final BackupsDb backupsDb; private final GenericServerSecretParams serverSecretParams; - private final TusBackupCredentialGenerator tusBackupCredentialGenerator; - private final DynamoDbAsyncClient dynamoClient; - private final String backupTableName; + private final Cdn3BackupCredentialGenerator cdn3BackupCredentialGenerator; + private final RemoteStorageManager remoteStorageManager; + private final Map attachmentCdnBaseUris; private final Clock clock; - // The backups table - - // B: 16 bytes that identifies the backup - public static final String KEY_BACKUP_ID_HASH = "U"; - // N: Time in seconds since epoch of the last backup refresh. This timestamp must be periodically updated to avoid - // garbage collection of archive objects. - public static final String ATTR_LAST_REFRESH = "R"; - // N: Time in seconds since epoch of the last backup media refresh. This timestamp can only be updated if the client - // has BackupTier.MEDIA, and must be periodically updated to avoid garbage collection of media objects. - public static final String ATTR_LAST_MEDIA_REFRESH = "MR"; - // B: A 32 byte public key that should be used to sign the presentation used to authenticate requests against the - // backup-id - public static final String ATTR_PUBLIC_KEY = "P"; - // N: Bytes consumed by this backup - public static final String ATTR_MEDIA_BYTES_USED = "MB"; - // N: Number of media objects in the backup - public static final String ATTR_MEDIA_COUNT = "MC"; - // N: The cdn number where the message backup is stored - public static final String ATTR_CDN = "CDN"; public BackupManager( + final BackupsDb backupsDb, final GenericServerSecretParams serverSecretParams, - final TusBackupCredentialGenerator tusBackupCredentialGenerator, - final DynamoDbAsyncClient dynamoClient, - final String backupTableName, + final Cdn3BackupCredentialGenerator cdn3BackupCredentialGenerator, + final RemoteStorageManager remoteStorageManager, + final Map attachmentCdnBaseUris, final Clock clock) { + this.backupsDb = backupsDb; this.serverSecretParams = serverSecretParams; - this.dynamoClient = dynamoClient; - this.tusBackupCredentialGenerator = tusBackupCredentialGenerator; - this.backupTableName = backupTableName; + this.cdn3BackupCredentialGenerator = cdn3BackupCredentialGenerator; + this.remoteStorageManager = remoteStorageManager; this.clock = clock; + // strip trailing "/" for easier URI construction + this.attachmentCdnBaseUris = attachmentCdnBaseUris.entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + entry -> StringUtils.removeEnd(entry.getValue(), "/") + )); } + /** * Set the public key for the backup-id. *

@@ -114,30 +92,16 @@ public class BackupManager { .withDescription("credential does not support setting public key") .asRuntimeException(); } - - final byte[] hashedBackupId = hashedBackupId(presentation.getBackupId()); - return dynamoClient.updateItem(UpdateItemRequest.builder() - .tableName(backupTableName) - .key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId))) - .updateExpression("SET #publicKey = :publicKey") - .expressionAttributeNames(Map.of("#publicKey", ATTR_PUBLIC_KEY)) - .expressionAttributeValues(Map.of(":publicKey", AttributeValues.b(publicKey.serialize()))) - .conditionExpression("attribute_not_exists(#publicKey) OR #publicKey = :publicKey") - .build()) - .exceptionally(throwable -> { - // There was already a row for this backup-id and it contained a different publicKey - if (ExceptionUtils.unwrap(throwable) instanceof ConditionalCheckFailedException) { - Metrics.counter(ZK_AUTHN_COUNTER_NAME, - SUCCESS_TAG_NAME, String.valueOf(false), - FAILURE_REASON_TAG_NAME, "public_key_conflict") - .increment(); - throw Status.UNAUTHENTICATED - .withDescription("public key does not match existing public key for the backup-id") - .asRuntimeException(); - } - throw ExceptionUtils.wrap(throwable); - }) - .thenRun(Util.NOOP); + return backupsDb.setPublicKey(presentation.getBackupId(), backupTier, publicKey) + .exceptionally(ExceptionUtils.exceptionallyHandler(PublicKeyConflictException.class, ex -> { + Metrics.counter(ZK_AUTHN_COUNTER_NAME, + SUCCESS_TAG_NAME, String.valueOf(false), + FAILURE_REASON_TAG_NAME, "public_key_conflict") + .increment(); + throw Status.UNAUTHENTICATED + .withDescription("public key does not match existing public key for the backup-id") + .asRuntimeException(); + })); } @@ -151,31 +115,12 @@ public class BackupManager { */ public CompletableFuture createMessageBackupUploadDescriptor( final AuthenticatedBackupUser backupUser) { - final byte[] hashedBackupId = hashedBackupId(backupUser); - final String encodedBackupId = encodeForCdn(hashedBackupId); - - final long refreshTimeSecs = clock.instant().getEpochSecond(); - - final List updates = new ArrayList<>(List.of("#cdn = :cdn", "#lastRefresh = :expiration")); - final Map expressionAttributeNames = new HashMap<>(Map.of( - "#cdn", ATTR_CDN, - "#lastRefresh", ATTR_LAST_REFRESH)); - if (backupUser.backupTier().compareTo(BackupTier.MEDIA) >= 0) { - updates.add("#lastMediaRefresh = :expiration"); - expressionAttributeNames.put("#lastMediaRefresh", ATTR_LAST_MEDIA_REFRESH); - } + final String encodedBackupId = encodeBackupIdForCdn(backupUser); // this could race with concurrent updates, but the only effect would be last-writer-wins on the timestamp - return dynamoClient.updateItem(UpdateItemRequest.builder() - .tableName(backupTableName) - .key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId))) - .updateExpression("SET %s".formatted(String.join(",", updates))) - .expressionAttributeNames(expressionAttributeNames) - .expressionAttributeValues(Map.of( - ":cdn", AttributeValues.n(BACKUP_CDN), - ":expiration", AttributeValues.n(refreshTimeSecs))) - .build()) - .thenApply(result -> tusBackupCredentialGenerator.generateUpload(encodedBackupId, MESSAGE_BACKUP_NAME)); + return backupsDb + .addMessageBackup(backupUser) + .thenApply(result -> cdn3BackupCredentialGenerator.generateUpload(encodedBackupId, MESSAGE_BACKUP_NAME)); } /** @@ -190,23 +135,8 @@ public class BackupManager { .withDescription("credential does not support ttl operation") .asRuntimeException(); } - final long refreshTimeSecs = clock.instant().getEpochSecond(); // update message backup TTL - final List updates = new ArrayList<>(Collections.singletonList("#lastRefresh = :expiration")); - final Map expressionAttributeNames = new HashMap<>(Map.of("#lastRefresh", ATTR_LAST_REFRESH)); - if (backupUser.backupTier().compareTo(BackupTier.MEDIA) >= 0) { - // update media TTL - expressionAttributeNames.put("#lastMediaRefresh", ATTR_LAST_MEDIA_REFRESH); - updates.add("#lastMediaRefresh = :expiration"); - } - return dynamoClient.updateItem(UpdateItemRequest.builder() - .tableName(backupTableName) - .key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupUser)))) - .updateExpression("SET %s".formatted(String.join(",", updates))) - .expressionAttributeNames(expressionAttributeNames) - .expressionAttributeValues(Map.of(":expiration", AttributeValues.n(refreshTimeSecs))) - .build()) - .thenRun(Util.NOOP); + return backupsDb.ttlRefresh(backupUser); } public record BackupInfo(int cdn, String backupSubdir, String messageBackupKey, Optional mediaUsedSpace) {} @@ -223,31 +153,107 @@ public class BackupManager { throw Status.PERMISSION_DENIED.withDescription("credential does not support info operation") .asRuntimeException(); } - return backupInfoHelper(backupUser); + return backupsDb.describeBackup(backupUser) + .thenApply(backupDescription -> new BackupInfo( + backupDescription.cdn(), + encodeBackupIdForCdn(backupUser), + MESSAGE_BACKUP_NAME, + backupDescription.mediaUsedSpace())); } - private CompletableFuture backupInfoHelper(final AuthenticatedBackupUser backupUser) { - return dynamoClient.getItem(GetItemRequest.builder() - .tableName(backupTableName) - .key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupUser)))) - .projectionExpression("#cdn,#bytesUsed") - .expressionAttributeNames(Map.of("#cdn", ATTR_CDN, "#bytesUsed", ATTR_MEDIA_BYTES_USED)) - .build()) - .thenApply(response -> { - if (!response.hasItem()) { - throw Status.NOT_FOUND.withDescription("Backup not found").asRuntimeException(); - } - final int cdn = AttributeValues.get(response.item(), ATTR_CDN) - .map(AttributeValue::n) - .map(Integer::parseInt) - .orElseThrow(() -> Status.NOT_FOUND.withDescription("Stored backup not found").asRuntimeException()); + /** + * Check if there is enough capacity to store the requested amount of media + * + * @param backupUser an already ZK authenticated backup user + * @param mediaLength the desired number of media bytes to store + * @return true if mediaLength bytes can be stored + */ + public CompletableFuture canStoreMedia(final AuthenticatedBackupUser backupUser, final long mediaLength) { + if (backupUser.backupTier().compareTo(BackupTier.MEDIA) < 0) { + Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment(); + throw Status.PERMISSION_DENIED + .withDescription("credential does not support storing media") + .asRuntimeException(); + } + return backupsDb.describeBackup(backupUser) + .thenApply(info -> info.mediaUsedSpace() + .filter(usedSpace -> MAX_TOTAL_BACKUP_MEDIA_BYTES - usedSpace >= mediaLength) + .isPresent()); + } - final Optional mediaUsed = AttributeValues.get(response.item(), ATTR_MEDIA_BYTES_USED) - .map(AttributeValue::n) - .map(Long::parseLong); + public record StorageDescriptor(int cdn, byte[] key) {} - return new BackupInfo(cdn, encodeForCdn(hashedBackupId(backupUser)), MESSAGE_BACKUP_NAME, mediaUsed); - }); + /** + * Copy an encrypted object to the backup cdn, adding a layer of encryption + *

+ * Implementation notes:

This method guarantees that any object that gets successfully copied to the backup cdn + * will also have an entry for the user in the database.

+ *

+ * However, the converse isn't true; there may be entries in the database that have not made it to the cdn. On list, + * these entries are checked against the cdn and removed. + * + * @return A stage that completes successfully with location of the twice-encrypted object on the backup cdn. The + * returned CompletionStage can be completed exceptionally with the following exceptions. + *

    + *
  • {@link InvalidLengthException} If the expectedSourceLength does not match the length of the sourceUri
  • + *
  • {@link SourceObjectNotFoundException} If the no object at sourceUri is found
  • + *
  • {@link java.io.IOException} If there was a generic IO issue
  • + *
+ */ + public CompletableFuture copyToBackup( + final AuthenticatedBackupUser backupUser, + final int sourceCdn, + final String sourceKey, + final int sourceLength, + final MediaEncryptionParameters encryptionParameters, + final byte[] destinationMediaId) { + if (backupUser.backupTier().compareTo(BackupTier.MEDIA) < 0) { + Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment(); + throw Status.PERMISSION_DENIED + .withDescription("credential does not support storing media") + .asRuntimeException(); + } + if (sourceLength > MAX_MEDIA_OBJECT_SIZE) { + throw Status.INVALID_ARGUMENT + .withDescription("Invalid sourceObject size") + .asRuntimeException(); + } + + final MessageBackupUploadDescriptor dst = cdn3BackupCredentialGenerator.generateUpload( + encodeBackupIdForCdn(backupUser), + encodeForCdn(destinationMediaId)); + + return this.backupsDb + // Write the ddb updates before actually updating backing storage + .trackMedia(backupUser, destinationMediaId, sourceLength) + + // copy the objects. On a failure, make a best-effort attempt to reverse the ddb transaction. If cleanup fails + // the client may be left with some cleanup to do if they don't eventually upload the media id. + .thenCompose(ignored -> remoteStorageManager + // actually perform the copy + .copy(attachmentReadUri(sourceCdn, sourceKey), sourceLength, encryptionParameters, dst) + // best effort: on failure, untrack the copied media + .exceptionallyCompose(copyError -> backupsDb.untrackMedia(backupUser, destinationMediaId, sourceLength) + .thenCompose(ignoredSuccess -> CompletableFuture.failedFuture(copyError)))) + + // indicates where the backup was stored + .thenApply(ignore -> new StorageDescriptor(dst.cdn(), destinationMediaId)); + + } + + /** + * Construct the URI for an attachment with the specified key + * + * @param cdn where the attachment is located + * @param key the attachment key + * @return A {@link URI} where the attachment can be retrieved + */ + private URI attachmentReadUri(final int cdn, final String key) { + final String baseUri = attachmentCdnBaseUris.get(cdn); + if (baseUri == null) { + throw Status.INVALID_ARGUMENT.withDescription("Unknown cdn " + cdn).asRuntimeException(); + } + return URI.create("%s/%s".formatted(baseUri, key)); } /** @@ -264,8 +270,8 @@ public class BackupManager { .asRuntimeException(); } - final String encodedBackupId = encodeForCdn(hashedBackupId(backupUser)); - return tusBackupCredentialGenerator.readHeaders(encodedBackupId); + final String encodedBackupId = encodeBackupIdForCdn(backupUser); + return cdn3BackupCredentialGenerator.readHeaders(encodedBackupId); } /** @@ -284,27 +290,17 @@ public class BackupManager { public CompletableFuture authenticateBackupUser( final BackupAuthCredentialPresentation presentation, final byte[] signature) { - final byte[] hashedBackupId = hashedBackupId(presentation.getBackupId()); - return dynamoClient.getItem(GetItemRequest.builder() - .tableName(backupTableName) - .key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId))) - .projectionExpression("#publicKey") - .expressionAttributeNames(Map.of("#publicKey", ATTR_PUBLIC_KEY)) - .build()) - .thenApply(response -> { - if (!response.hasItem()) { - Metrics.counter(ZK_AUTHN_COUNTER_NAME, - SUCCESS_TAG_NAME, String.valueOf(false), - FAILURE_REASON_TAG_NAME, "missing_public_key") - .increment(); - throw Status.NOT_FOUND.withDescription("Backup not found").asRuntimeException(); - } - final byte[] publicKeyBytes = AttributeValues.get(response.item(), ATTR_PUBLIC_KEY) - .map(AttributeValue::b) - .map(SdkBytes::asByteArray) - .orElseThrow(() -> Status.INTERNAL - .withDescription("Stored backup missing public key") - .asRuntimeException()); + return backupsDb + .retrievePublicKey(presentation.getBackupId()) + .thenApply(optionalPublicKey -> { + final byte[] publicKeyBytes = optionalPublicKey + .orElseThrow(() -> { + Metrics.counter(ZK_AUTHN_COUNTER_NAME, + SUCCESS_TAG_NAME, String.valueOf(false), + FAILURE_REASON_TAG_NAME, "missing_public_key") + .increment(); + return Status.NOT_FOUND.withDescription("Backup not found").asRuntimeException(); + }); try { final ECPublicKey publicKey = new ECPublicKey(publicKeyBytes); return new AuthenticatedBackupUser( @@ -316,7 +312,7 @@ public class BackupManager { FAILURE_REASON_TAG_NAME, "invalid_public_key") .increment(); logger.error("Invalid publicKey for backupId hash {}", - HexFormat.of().formatHex(hashedBackupId), e); + HexFormat.of().formatHex(BackupsDb.hashedBackupId(presentation.getBackupId())), e); throw Status.INTERNAL .withCause(e) .withDescription("Could not deserialize stored public key") @@ -373,19 +369,12 @@ public class BackupManager { }); } - private static byte[] hashedBackupId(final AuthenticatedBackupUser backupId) { - return hashedBackupId(backupId.backupId()); - } - - private static byte[] hashedBackupId(final byte[] backupId) { - try { - return Arrays.copyOf(MessageDigest.getInstance("SHA-256").digest(backupId), 16); - } catch (NoSuchAlgorithmException e) { - throw new AssertionError(e); - } + private static String encodeBackupIdForCdn(final AuthenticatedBackupUser backupUser) { + return encodeForCdn(BackupsDb.hashedBackupId(backupUser.backupId())); } private static String encodeForCdn(final byte[] bytes) { return Base64.getUrlEncoder().encodeToString(bytes); } + } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupMediaEncrypter.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupMediaEncrypter.java new file mode 100644 index 000000000..4f68209c9 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupMediaEncrypter.java @@ -0,0 +1,103 @@ +package org.whispersystems.textsecuregcm.backup; + +import java.net.http.HttpRequest; +import java.nio.ByteBuffer; +import java.security.InvalidAlgorithmParameterException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.concurrent.Flow; +import javax.crypto.BadPaddingException; +import javax.crypto.Cipher; +import javax.crypto.IllegalBlockSizeException; +import javax.crypto.Mac; +import javax.crypto.NoSuchPaddingException; +import org.reactivestreams.FlowAdapters; +import org.whispersystems.textsecuregcm.util.ExceptionUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class BackupMediaEncrypter { + + private final Cipher cipher; + private final Mac mac; + + public BackupMediaEncrypter(final MediaEncryptionParameters encryptionParameters) { + cipher = initializeCipher(encryptionParameters); + mac = initializeMac(encryptionParameters); + } + + public int outputSize(final int inputSize) { + return cipher.getIV().length + cipher.getOutputSize(inputSize) + mac.getMacLength(); + } + + /** + * Perform streaming encryption + * + * @param sourceBody A source of ByteBuffers, typically from an asynchronous HttpResponse + * @return A publisher that returns IV + AES/CBC/PKCS5Padding encrypted source + HMAC(IV + encrypted source) suitable + * to write with an asynchronous HttpRequest + */ + public Flow.Publisher encryptBody(Flow.Publisher> sourceBody) { + + // Write IV, encrypted payload, mac + final Flux encryptedBody = Flux.concat( + Mono.fromSupplier(() -> { + mac.update(cipher.getIV()); + return ByteBuffer.wrap(cipher.getIV()); + }), + Flux.from(FlowAdapters.toPublisher(sourceBody)) + .flatMap(buffers -> Flux.fromIterable(buffers)) + .concatMap(byteBuffer -> { + final byte[] copy = new byte[byteBuffer.remaining()]; + byteBuffer.get(copy); + final byte[] res = cipher.update(copy); + if (res == null) { + return Mono.empty(); + } else { + mac.update(res); + return Mono.just(ByteBuffer.wrap(res)); + } + }), + Mono.fromSupplier(() -> { + try { + final byte[] finalBytes = cipher.doFinal(); + mac.update(finalBytes); + return ByteBuffer.wrap(finalBytes); + } catch (IllegalBlockSizeException | BadPaddingException e) { + throw ExceptionUtils.wrap(e); + } + }), + Mono.fromSupplier(() -> ByteBuffer.wrap(mac.doFinal()))); + return FlowAdapters.toFlowPublisher(encryptedBody); + } + + private static Mac initializeMac(final MediaEncryptionParameters encryptionParameters) { + try { + final Mac mac = Mac.getInstance("HmacSHA256"); + mac.init(encryptionParameters.hmacSHA256Key()); + return mac; + } catch (NoSuchAlgorithmException e) { + throw new AssertionError(e); + } catch (InvalidKeyException e) { + throw new IllegalArgumentException(e); + } + } + + private static Cipher initializeCipher(final MediaEncryptionParameters encryptionParameters) { + try { + final Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding"); + cipher.init( + Cipher.ENCRYPT_MODE, + encryptionParameters.aesEncryptionKey(), + encryptionParameters.iv()); + return cipher; + + } catch (NoSuchAlgorithmException | NoSuchPaddingException e) { + throw new AssertionError(e); + } catch (InvalidAlgorithmParameterException | InvalidKeyException e) { + throw new IllegalArgumentException(e); + } + } + +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupsDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupsDb.java new file mode 100644 index 000000000..287061225 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupsDb.java @@ -0,0 +1,489 @@ +package org.whispersystems.textsecuregcm.backup; + +import io.grpc.Status; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import org.signal.libsignal.protocol.ecc.ECPublicKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser; +import org.whispersystems.textsecuregcm.util.AttributeValues; +import org.whispersystems.textsecuregcm.util.ExceptionUtils; +import org.whispersystems.textsecuregcm.util.Util; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.CancellationReason; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.Delete; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.Put; +import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure; +import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem; +import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest; +import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException; +import software.amazon.awssdk.services.dynamodb.model.Update; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; + +/** + * Tracks backup metadata in a persistent store. + * + * It's assumed that the caller has already validated that the backupUser being operated on has valid credentials and + * possesses the appropriate {@link BackupTier} to perform the current operation. + */ +public class BackupsDb { + private static final Logger logger = LoggerFactory.getLogger(BackupsDb.class); + static final int BACKUP_CDN = 3; + + private final DynamoDbAsyncClient dynamoClient; + private final String backupTableName; + private final String backupMediaTableName; + private final Clock clock; + + // The backups table + + // B: 16 bytes that identifies the backup + public static final String KEY_BACKUP_ID_HASH = "U"; + // N: Time in seconds since epoch of the last backup refresh. This timestamp must be periodically updated to avoid + // garbage collection of archive objects. + public static final String ATTR_LAST_REFRESH = "R"; + // N: Time in seconds since epoch of the last backup media refresh. This timestamp can only be updated if the client + // has BackupTier.MEDIA, and must be periodically updated to avoid garbage collection of media objects. + public static final String ATTR_LAST_MEDIA_REFRESH = "MR"; + // B: A 32 byte public key that should be used to sign the presentation used to authenticate requests against the + // backup-id + public static final String ATTR_PUBLIC_KEY = "P"; + // N: Bytes consumed by this backup + public static final String ATTR_MEDIA_BYTES_USED = "MB"; + // N: Number of media objects in the backup + public static final String ATTR_MEDIA_COUNT = "MC"; + // N: The cdn number where the message backup is stored + public static final String ATTR_CDN = "CDN"; + + // The stored media table (hashedBackupId, mediaId, cdn, objectLength) + + // B: 15-byte mediaId + public static final String KEY_MEDIA_ID = "M"; + // N: The length of the encrypted media object + public static final String ATTR_LENGTH = "L"; + + public BackupsDb( + final DynamoDbAsyncClient dynamoClient, + final String backupTableName, + final String backupMediaTableName, + final Clock clock) { + this.dynamoClient = dynamoClient; + this.backupTableName = backupTableName; + this.backupMediaTableName = backupMediaTableName; + this.clock = clock; + } + + /** + * Set the public key associated with a backupId. + * + * @param authenticatedBackupId The backup-id bytes that should be associated with the provided public key + * @param authenticatedBackupTier The backup tier + * @param publicKey The public key to associate with the backup id + * @return A stage that completes when the public key has been set. If the backup-id already has a set public key that + * does not match, the stage will be completed exceptionally with a {@link PublicKeyConflictException} + */ + CompletableFuture setPublicKey( + final byte[] authenticatedBackupId, + final BackupTier authenticatedBackupTier, + final ECPublicKey publicKey) { + final byte[] hashedBackupId = hashedBackupId(authenticatedBackupId); + return dynamoClient.updateItem(new UpdateBuilder(backupTableName, authenticatedBackupTier, hashedBackupId) + .addSetExpression("#publicKey = :publicKey", + Map.entry("#publicKey", ATTR_PUBLIC_KEY), + Map.entry(":publicKey", AttributeValues.b(publicKey.serialize()))) + .setRefreshTimes(clock) + .withConditionExpression("attribute_not_exists(#publicKey) OR #publicKey = :publicKey") + .updateItemBuilder() + .build()) + .exceptionally(throwable -> { + // There was already a row for this backup-id and it contained a different publicKey + if (ExceptionUtils.unwrap(throwable) instanceof ConditionalCheckFailedException) { + throw ExceptionUtils.wrap(new PublicKeyConflictException()); + } + throw ExceptionUtils.wrap(throwable); + }) + .thenRun(Util.NOOP); + } + + CompletableFuture> retrievePublicKey(byte[] backupId) { + final byte[] hashedBackupId = hashedBackupId(backupId); + return dynamoClient.getItem(GetItemRequest.builder() + .tableName(backupTableName) + .key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId))) + .consistentRead(true) + .projectionExpression("#publicKey") + .expressionAttributeNames(Map.of("#publicKey", ATTR_PUBLIC_KEY)) + .build()) + .thenApply(response -> + AttributeValues.get(response.item(), ATTR_PUBLIC_KEY) + .map(AttributeValue::b) + .map(SdkBytes::asByteArray)); + } + + + /** + * Add media to the backup media table and update the quota in the backup table + * + * @param backupUser The + * @param mediaId The mediaId to add + * @param mediaLength The length of the media before encryption (the length of the source media) + * @return A stage that completes successfully once the tables are updated. If the media with the provided id has + * previously been tracked with a different length, the stage will complete exceptionally with an + * {@link InvalidLengthException} + */ + CompletableFuture trackMedia( + final AuthenticatedBackupUser backupUser, + final byte[] mediaId, + final int mediaLength) { + final byte[] hashedBackupId = hashedBackupId(backupUser); + return dynamoClient + .transactWriteItems(TransactWriteItemsRequest.builder().transactItems( + + // Add the media to the media table + TransactWriteItem.builder().put(Put.builder() + .tableName(backupMediaTableName) + .returnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD) + .item(Map.of( + KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId), + KEY_MEDIA_ID, AttributeValues.b(mediaId), + ATTR_CDN, AttributeValues.n(BACKUP_CDN), + ATTR_LENGTH, AttributeValues.n(mediaLength))) + .conditionExpression("attribute_not_exists(#mediaId)") + .expressionAttributeNames(Map.of("#mediaId", KEY_MEDIA_ID)) + .build()).build(), + + // Update the media quota and TTL + TransactWriteItem.builder().update( + UpdateBuilder.forUser(backupTableName, backupUser) + .setRefreshTimes(clock) + .incrementMediaBytes(mediaLength) + .incrementMediaCount(1) + .transactItemBuilder() + .build()).build()).build()) + .exceptionally(throwable -> { + if (ExceptionUtils.unwrap(throwable) instanceof TransactionCanceledException txCancelled) { + final long oldItemLength = conditionCheckFailed(txCancelled, 0) + .flatMap(item -> Optional.ofNullable(item.get(ATTR_LENGTH))) + .map(attr -> Long.parseLong(attr.n())) + .orElseThrow(() -> ExceptionUtils.wrap(throwable)); + if (oldItemLength != mediaLength) { + throw new CompletionException( + new InvalidLengthException("Previously tried to copy media with a different length. " + + "Provided " + mediaLength + " was " + oldItemLength)); + } + // The client already "paid" for this media, can let them through + return null; + } else { + // rethrow original exception + throw ExceptionUtils.wrap(throwable); + } + }) + .thenRun(Util.NOOP); + } + + /** + * Remove media from backup media table and update the quota in the backup table + * + * @param backupUser The backup user + * @param mediaId The mediaId to add + * @param mediaLength The length of the media before encryption (the length of the source media) + * @return A stage that completes successfully once the tables are updated + */ + CompletableFuture untrackMedia( + final AuthenticatedBackupUser backupUser, + final byte[] mediaId, + final int mediaLength) { + final byte[] hashedBackupId = hashedBackupId(backupUser); + return dynamoClient.transactWriteItems(TransactWriteItemsRequest.builder().transactItems( + TransactWriteItem.builder().delete(Delete.builder() + .tableName(backupMediaTableName) + .returnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD) + .key(Map.of( + KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId), + KEY_MEDIA_ID, AttributeValues.b(mediaId) + )) + .conditionExpression("#length = :length") + .expressionAttributeNames(Map.of("#length", ATTR_LENGTH)) + .expressionAttributeValues(Map.of(":length", AttributeValues.n(mediaLength))) + .build()).build(), + + // Don't update TTLs, since we're just cleaning up media + TransactWriteItem.builder().update(UpdateBuilder.forUser(backupTableName, backupUser) + .incrementMediaBytes(-mediaLength) + .incrementMediaCount(-1) + .transactItemBuilder().build()).build()).build()) + .exceptionally(error -> { + logger.warn("failed cleanup after failed copy operation", error); + return null; + }) + .thenRun(Util.NOOP); + } + + /** + * Update the last update timestamps for the backupId in the presentation + * + * @param backupUser an already authorized backup user + */ + CompletableFuture ttlRefresh(final AuthenticatedBackupUser backupUser) { + // update message backup TTL + return dynamoClient.updateItem(UpdateBuilder.forUser(backupTableName, backupUser) + .setRefreshTimes(clock) + .updateItemBuilder() + .build()) + .thenRun(Util.NOOP); + } + + /** + * Track that a backup will be stored for the user + * @param backupUser an already authorized backup user + */ + CompletableFuture addMessageBackup(final AuthenticatedBackupUser backupUser) { + // this could race with concurrent updates, but the only effect would be last-writer-wins on the timestamp + return dynamoClient.updateItem( + UpdateBuilder.forUser(backupTableName, backupUser) + .setRefreshTimes(clock) + .setCdn(BACKUP_CDN) + .updateItemBuilder() + .build()) + .thenRun(Util.NOOP); + } + + + record BackupDescription(int cdn, Optional mediaUsedSpace) {} + + /** + * Retrieve information about the backup + * + * @param backupUser an already authorized backup user + * @return A {@link BackupDescription} containing the cdn of the message backup and the total number of media space + * bytes used by the backup user. + */ + CompletableFuture describeBackup(final AuthenticatedBackupUser backupUser) { + return dynamoClient.getItem(GetItemRequest.builder() + .tableName(backupTableName) + .key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupUser)))) + .projectionExpression("#cdn,#bytesUsed") + .expressionAttributeNames(Map.of("#cdn", ATTR_CDN, "#bytesUsed", ATTR_MEDIA_BYTES_USED)) + .consistentRead(true) + .build()) + .thenApply(response -> { + if (!response.hasItem()) { + throw Status.NOT_FOUND.withDescription("Backup not found").asRuntimeException(); + } + final int cdn = AttributeValues.get(response.item(), ATTR_CDN) + .map(AttributeValue::n) + .map(Integer::parseInt) + .orElseThrow(() -> Status.NOT_FOUND.withDescription("Stored backup not found").asRuntimeException()); + + final Optional mediaUsed = AttributeValues.get(response.item(), ATTR_MEDIA_BYTES_USED) + .map(AttributeValue::n) + .map(Long::parseLong); + + return new BackupDescription(cdn, mediaUsed); + }); + } + + + /** + * Build ddb update statements for the backups table + */ + private static class UpdateBuilder { + + private final List setStatements = new ArrayList<>(); + private final Map attrValues = new HashMap<>(); + private final Map attrNames = new HashMap<>(); + + private final String tableName; + private final BackupTier backupTier; + private final byte[] hashedBackupId; + private String conditionExpression = null; + + static UpdateBuilder forUser(String tableName, AuthenticatedBackupUser backupUser) { + return new UpdateBuilder(tableName, backupUser.backupTier(), hashedBackupId(backupUser)); + } + + UpdateBuilder(String tableName, BackupTier backupTier, byte[] hashedBackupId) { + this.tableName = tableName; + this.backupTier = backupTier; + this.hashedBackupId = hashedBackupId; + } + + private void addAttrValue(Map.Entry attrValue) { + final AttributeValue old = attrValues.put(attrValue.getKey(), attrValue.getValue()); + if (old != null && !old.equals(attrValue.getValue())) { + throw new IllegalArgumentException("duplicate attrValue key used for different values"); + } + } + + private void addAttrName(Map.Entry attrName) { + final String oldName = attrNames.put(attrName.getKey(), attrName.getValue()); + if (oldName != null && !oldName.equals(attrName.getValue())) { + throw new IllegalArgumentException("duplicate attrName key used for different attribute names"); + } + } + + private void addAttrs(final Map.Entry attrName, final Map.Entry attrValue) { + addAttrName(attrName); + addAttrValue(attrValue); + } + + UpdateBuilder addSetExpression( + final String update, + final Map.Entry attrName, + final Map.Entry attrValue) { + setStatements.add(update); + addAttrs(attrName, attrValue); + return this; + } + + UpdateBuilder addSetExpression(final String update) { + setStatements.add(update); + return this; + } + + UpdateBuilder withConditionExpression(final String conditionExpression) { + this.conditionExpression = conditionExpression; + return this; + } + + UpdateBuilder withConditionExpression( + final String conditionExpression, + final Map.Entry attrName, + final Map.Entry attrValue) { + this.addAttrs(attrName, attrValue); + this.conditionExpression = conditionExpression; + return this; + } + + UpdateBuilder setCdn(final int cdn) { + return addSetExpression( + "#cdn = :cdn", + Map.entry("#cdn", ATTR_CDN), + Map.entry(":cdn", AttributeValues.n(cdn))); + } + + UpdateBuilder incrementMediaCount(long delta) { + addAttrName(Map.entry("#mediaCount", ATTR_MEDIA_COUNT)); + addAttrValue(Map.entry(":zero", AttributeValues.n(0))); + addAttrValue(Map.entry(":mediaCountDelta", AttributeValues.n(delta))); + addSetExpression("#mediaCount = if_not_exists(#mediaCount, :zero) + :mediaCountDelta"); + return this; + } + + UpdateBuilder incrementMediaBytes(long delta) { + addAttrName(Map.entry("#mediaBytes", ATTR_MEDIA_BYTES_USED)); + addAttrValue(Map.entry(":zero", AttributeValues.n(0))); + addAttrValue(Map.entry(":mediaBytesDelta", AttributeValues.n(delta))); + addSetExpression("#mediaBytes = if_not_exists(#mediaBytes, :zero) + :mediaBytesDelta"); + return this; + } + + /** + * Set the lastRefresh time as part of the update + *

+ * This always updates lastRefreshTime, and updates lastMediaRefreshTime if the backup user has the appropriate + * tier + */ + UpdateBuilder setRefreshTimes(final Clock clock) { + final long refreshTimeSecs = clock.instant().getEpochSecond(); + addSetExpression("#lastRefreshTime = :lastRefreshTime", + Map.entry("#lastRefreshTime", ATTR_LAST_REFRESH), + Map.entry(":lastRefreshTime", AttributeValues.n(refreshTimeSecs))); + + if (backupTier.compareTo(BackupTier.MEDIA) >= 0) { + // update the media time if we have the appropriate tier + addSetExpression("#lastMediaRefreshTime = :lastMediaRefreshTime", + Map.entry("#lastMediaRefreshTime", ATTR_LAST_MEDIA_REFRESH), + Map.entry(":lastMediaRefreshTime", AttributeValues.n(refreshTimeSecs))); + } + return this; + } + + /** + * Prepare a non-transactional update + * + * @return An {@link UpdateItemRequest#builder()} that can be used with updateItem + */ + UpdateItemRequest.Builder updateItemBuilder() { + final UpdateItemRequest.Builder bldr = UpdateItemRequest.builder() + .tableName(tableName) + .key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId))) + .updateExpression("SET %s".formatted(String.join(",", setStatements))) + .expressionAttributeNames(attrNames) + .expressionAttributeValues(attrValues); + if (this.conditionExpression != null) { + bldr.conditionExpression(conditionExpression); + } + return bldr; + } + + /** + * Prepare a transactional update + * + * @return An {@link Update#builder()} that can be used with transactItem + */ + Update.Builder transactItemBuilder() { + final Update.Builder bldr = Update.builder() + .tableName(tableName) + .key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId))) + .updateExpression("SET %s".formatted(String.join(",", setStatements))) + .expressionAttributeNames(attrNames) + .expressionAttributeValues(attrValues); + if (this.conditionExpression != null) { + bldr.conditionExpression(conditionExpression); + } + return bldr; + } + } + + + private static byte[] hashedBackupId(final AuthenticatedBackupUser backupId) { + return hashedBackupId(backupId.backupId()); + } + + static byte[] hashedBackupId(final byte[] backupId) { + try { + return Arrays.copyOf(MessageDigest.getInstance("SHA-256").digest(backupId), 16); + } catch (NoSuchAlgorithmException e) { + throw new AssertionError(e); + } + } + + /** + * Check if a DynamoDb error indicates a condition check failed error, and return the value of the item failed to + * update. + * + * @param e The error returned by {@link DynamoDbAsyncClient#transactWriteItems} attempt + * @param itemIndex The index of the item in the transaction that had a condition expression + * @return The remote value of the item that failed to update, or empty if the error was not a condition check failure + */ + private static Optional> conditionCheckFailed(TransactionCanceledException e, + int itemIndex) { + if (!e.hasCancellationReasons()) { + return Optional.empty(); + } + if (e.cancellationReasons().size() < itemIndex + 1) { + return Optional.empty(); + } + final CancellationReason reason = e.cancellationReasons().get(itemIndex); + if (!"ConditionalCheckFailed".equals(reason.code()) || !reason.hasItem()) { + return Optional.empty(); + } + return Optional.of(reason.item()); + } + +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/TusBackupCredentialGenerator.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3BackupCredentialGenerator.java similarity index 93% rename from service/src/main/java/org/whispersystems/textsecuregcm/backup/TusBackupCredentialGenerator.java rename to service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3BackupCredentialGenerator.java index 1d762606a..3b056d883 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/backup/TusBackupCredentialGenerator.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3BackupCredentialGenerator.java @@ -16,13 +16,13 @@ import java.time.Clock; import java.util.Base64; import java.util.Map; -public class TusBackupCredentialGenerator { +public class Cdn3BackupCredentialGenerator { - private static final int BACKUP_CDN = 3; + public static final String CDN_PATH = "backups"; + public static final int BACKUP_CDN = 3; private static String READ_PERMISSION = "read"; private static String WRITE_PERMISSION = "write"; - private static String CDN_PATH = "backups"; private static String PERMISSION_SEPARATOR = "$"; // Write entities will be of the form 'write$backups/ @@ -35,7 +35,7 @@ public class TusBackupCredentialGenerator { private final ExternalServiceCredentialsGenerator credentialsGenerator; private final String tusUri; - public TusBackupCredentialGenerator(final TusConfiguration cfg) { + public Cdn3BackupCredentialGenerator(final TusConfiguration cfg) { this.tusUri = cfg.uploadUri(); this.credentialsGenerator = credentialsGenerator(Clock.systemUTC(), cfg); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManager.java new file mode 100644 index 000000000..fe366d8ec --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManager.java @@ -0,0 +1,102 @@ +package org.whispersystems.textsecuregcm.backup; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.security.cert.CertificateException; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Stream; +import javax.ws.rs.core.Response; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; +import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient; + +public class Cdn3RemoteStorageManager implements RemoteStorageManager { + + private final FaultTolerantHttpClient httpClient; + + public Cdn3RemoteStorageManager( + final ScheduledExecutorService retryExecutor, + final CircuitBreakerConfiguration circuitBreakerConfiguration, + final RetryConfiguration retryConfiguration, + final List caCertificates) throws CertificateException { + this.httpClient = FaultTolerantHttpClient.newBuilder() + .withName("cdn3-remote-storage") + .withCircuitBreaker(circuitBreakerConfiguration) + .withExecutor(Executors.newCachedThreadPool()) + .withRetryExecutor(retryExecutor) + .withRetry(retryConfiguration) + .withConnectTimeout(Duration.ofSeconds(10)) + .withVersion(HttpClient.Version.HTTP_2) + .withTrustedServerCertificates(caCertificates.toArray(new String[0])) + .build(); + } + + @Override + public int cdnNumber() { + return 3; + } + + @Override + public CompletionStage copy( + final URI sourceUri, + final int expectedSourceLength, + final MediaEncryptionParameters encryptionParameters, + final MessageBackupUploadDescriptor uploadDescriptor) { + + if (uploadDescriptor.cdn() != cdnNumber()) { + throw new IllegalArgumentException("Cdn3RemoteStorageManager can only copy to cdn3"); + } + + final BackupMediaEncrypter encrypter = new BackupMediaEncrypter(encryptionParameters); + + final HttpRequest request = HttpRequest.newBuilder().GET().uri(sourceUri).build(); + return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> { + if (response.statusCode() == Response.Status.NOT_FOUND.getStatusCode()) { + throw new CompletionException(new SourceObjectNotFoundException()); + } else if (response.statusCode() != Response.Status.OK.getStatusCode()) { + throw new CompletionException(new IOException("error reading from source: " + response.statusCode())); + } + + final int actualSourceLength = Math.toIntExact(response.headers().firstValueAsLong("Content-Length") + .orElseThrow(() -> new CompletionException(new IOException("upstream missing Content-Length")))); + + if (actualSourceLength != expectedSourceLength) { + throw new CompletionException( + new InvalidLengthException("Provided sourceLength " + expectedSourceLength + " was " + actualSourceLength)); + } + + final int expectedEncryptedLength = encrypter.outputSize(actualSourceLength); + final HttpRequest.BodyPublisher encryptedBody = HttpRequest.BodyPublishers.fromPublisher( + encrypter.encryptBody(response.body()), expectedEncryptedLength); + + final String[] headers = Stream.concat( + uploadDescriptor.headers().entrySet() + .stream() + .flatMap(e -> Stream.of(e.getKey(), e.getValue())), + Stream.of("Upload-Length", Integer.toString(expectedEncryptedLength), "Tus-Resumable", "1.0.0")) + .toArray(String[]::new); + + final HttpRequest put = HttpRequest.newBuilder() + .uri(URI.create(uploadDescriptor.signedUploadLocation())) + .headers(headers) + .POST(encryptedBody) + .build(); + + return httpClient.sendAsync(put, HttpResponse.BodyHandlers.discarding()); + }) + .thenAccept(response -> { + if (response.statusCode() != Response.Status.CREATED.getStatusCode() && + response.statusCode() != Response.Status.OK.getStatusCode()) { + throw new CompletionException(new IOException("Failed to copy object: " + response.statusCode())); + } + }); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/InvalidLengthException.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/InvalidLengthException.java new file mode 100644 index 000000000..475df6121 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/InvalidLengthException.java @@ -0,0 +1,10 @@ +package org.whispersystems.textsecuregcm.backup; + +import java.io.IOException; + +public class InvalidLengthException extends IOException { + + public InvalidLengthException(String s) { + super(s); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/MediaEncryptionParameters.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/MediaEncryptionParameters.java new file mode 100644 index 000000000..f18efc1fc --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/MediaEncryptionParameters.java @@ -0,0 +1,17 @@ +package org.whispersystems.textsecuregcm.backup; + +import javax.crypto.spec.IvParameterSpec; +import javax.crypto.spec.SecretKeySpec; + +public record MediaEncryptionParameters( + SecretKeySpec aesEncryptionKey, + SecretKeySpec hmacSHA256Key, + IvParameterSpec iv) { + + public MediaEncryptionParameters(byte[] encryptionKey, byte[] macKey, byte[] iv) { + this( + new SecretKeySpec(encryptionKey, "AES"), + new SecretKeySpec(macKey, "HmacSHA256"), + new IvParameterSpec(iv)); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/PublicKeyConflictException.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/PublicKeyConflictException.java new file mode 100644 index 000000000..af0b13279 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/PublicKeyConflictException.java @@ -0,0 +1,6 @@ +package org.whispersystems.textsecuregcm.backup; + +import java.io.IOException; + +public class PublicKeyConflictException extends IOException { +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/RemoteStorageManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/RemoteStorageManager.java new file mode 100644 index 000000000..627ac65d5 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/RemoteStorageManager.java @@ -0,0 +1,38 @@ +package org.whispersystems.textsecuregcm.backup; + +import java.net.URI; +import java.util.concurrent.CompletionStage; + +/** + * Handles management operations over a external cdn storage system. + */ +public interface RemoteStorageManager { + + /** + * @return The cdn number that this RemoteStorageManager manages + */ + int cdnNumber(); + + /** + * Copy and the object from a remote source into the backup, adding an additional layer of encryption + * + * @param sourceUri The location of the object to copy + * @param expectedSourceLength The length of the source object, should match the content-length of the object returned + * from the sourceUri. + * @param encryptionParameters The encryption keys that should be used to apply an additional layer of encryption to + * the object + * @param uploadDescriptor The destination, which must be in the cdn returned by {@link #cdnNumber()} + * @return A stage that completes successfully when the source has been successfully re-encrypted and copied into + * uploadDescriptor. The returned CompletionStage can be completed exceptionally with the following exceptions. + *

    + *
  • {@link InvalidLengthException} If the expectedSourceLength does not match the length of the sourceUri
  • + *
  • {@link SourceObjectNotFoundException} If the no object at sourceUri is found
  • + *
  • {@link java.io.IOException} If there was a generic IO issue
  • + *
+ */ + CompletionStage copy( + URI sourceUri, + int expectedSourceLength, + MediaEncryptionParameters encryptionParameters, + MessageBackupUploadDescriptor uploadDescriptor); +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/SourceObjectNotFoundException.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/SourceObjectNotFoundException.java new file mode 100644 index 000000000..5d2b20f3b --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/SourceObjectNotFoundException.java @@ -0,0 +1,11 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.backup; + +import java.io.IOException; + +public class SourceObjectNotFoundException extends IOException { +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/ClientCdnConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/ClientCdnConfiguration.java new file mode 100644 index 000000000..1f24929d1 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/ClientCdnConfiguration.java @@ -0,0 +1,53 @@ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Configuration used to interact with a cdn via HTTP + */ +public class ClientCdnConfiguration { + + /** + * Map from cdn number to the base url for attachments. + *

+ * For example, if an attachment with the id 'abc' can be retrieved from cdn 2 at https://example.org/attachments/abc, + * the attachment url for 2 should https://example.org/attachments + */ + @JsonProperty + @NotNull + Map attachmentUrls; + + @JsonProperty + @NotNull + @NotEmpty List<@NotBlank String> caCertificates = new ArrayList<>(); + + @JsonProperty + @NotNull + CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration(); + + @JsonProperty + @NotNull + RetryConfiguration retry = new RetryConfiguration(); + + public List getCaCertificates() { + return caCertificates; + } + + public CircuitBreakerConfiguration getCircuitBreaker() { + return circuitBreaker; + } + + public RetryConfiguration getRetry() { + return retry; + } + + public Map getAttachmentUrls() { + return attachmentUrls; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DynamoDbTables.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DynamoDbTables.java index abce0398e..55d282db1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DynamoDbTables.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DynamoDbTables.java @@ -49,6 +49,7 @@ public class DynamoDbTables { private final AccountsTableConfiguration accounts; private final Table backups; + private final Table backupMedia; private final Table clientReleases; private final Table deletedAccounts; private final Table deletedAccountsLock; @@ -71,6 +72,7 @@ public class DynamoDbTables { public DynamoDbTables( @JsonProperty("accounts") final AccountsTableConfiguration accounts, @JsonProperty("backups") final Table backups, + @JsonProperty("backupMedia") final Table backupMedia, @JsonProperty("clientReleases") final Table clientReleases, @JsonProperty("deletedAccounts") final Table deletedAccounts, @JsonProperty("deletedAccountsLock") final Table deletedAccountsLock, @@ -92,6 +94,7 @@ public class DynamoDbTables { this.accounts = accounts; this.backups = backups; + this.backupMedia = backupMedia; this.clientReleases = clientReleases; this.deletedAccounts = deletedAccounts; this.deletedAccountsLock = deletedAccountsLock; @@ -124,6 +127,12 @@ public class DynamoDbTables { return backups; } + @NotNull + @Valid + public Table getBackupMedia() { + return backupMedia; + } + @NotNull @Valid public Table getClientReleases() { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ArchiveController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ArchiveController.java index da7854a3b..5fa3058fd 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ArchiveController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ArchiveController.java @@ -14,6 +14,7 @@ import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; +import java.io.IOException; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; @@ -26,8 +27,11 @@ import java.util.Optional; import java.util.concurrent.CompletionStage; import javax.annotation.Nullable; import javax.validation.Valid; +import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; import javax.ws.rs.BadRequestException; +import javax.ws.rs.ClientErrorException; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.HeaderParam; @@ -37,15 +41,27 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import org.signal.libsignal.protocol.ecc.ECPublicKey; import org.signal.libsignal.zkgroup.InvalidInputException; import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation; import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialRequest; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; +import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser; import org.whispersystems.textsecuregcm.backup.BackupAuthManager; import org.whispersystems.textsecuregcm.backup.BackupManager; +import org.whispersystems.textsecuregcm.backup.InvalidLengthException; +import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters; +import org.whispersystems.textsecuregcm.backup.SourceObjectNotFoundException; import org.whispersystems.textsecuregcm.util.BackupAuthCredentialAdapter; +import org.whispersystems.textsecuregcm.util.ByteArrayAdapter; +import org.whispersystems.textsecuregcm.util.ByteArrayBase64UrlAdapter; import org.whispersystems.textsecuregcm.util.ECPublicKeyAdapter; +import org.whispersystems.textsecuregcm.util.ExactlySize; +import org.whispersystems.textsecuregcm.util.ExceptionUtils; +import org.whispersystems.textsecuregcm.util.Util; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @Path("/v1/archives") @Tag(name = "Archive") @@ -72,6 +88,7 @@ public class ArchiveController { @JsonSerialize(using = BackupAuthCredentialAdapter.CredentialRequestSerializer.class) @NotNull BackupAuthCredentialRequest backupAuthCredentialRequest) {} + @PUT @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) @@ -88,10 +105,12 @@ public class ArchiveController { @ApiResponse(responseCode = "204", description = "The backup-id was set") @ApiResponse(responseCode = "400", description = "The provided backup auth credential request was invalid") @ApiResponse(responseCode = "429", description = "Rate limited. Too many attempts to change the backup-id have been made") - public CompletionStage setBackupId( + public CompletionStage setBackupId( @Auth final AuthenticatedAccount account, @Valid @NotNull final SetBackupIdRequest setBackupIdRequest) throws RateLimitExceededException { - return this.backupAuthManager.commitBackupId(account.getAccount(), setBackupIdRequest.backupAuthCredentialRequest); + return this.backupAuthManager + .commitBackupId(account.getAccount(), setBackupIdRequest.backupAuthCredentialRequest) + .thenApply(Util.ASYNC_EMPTY_RESPONSE); } public record BackupAuthCredentialsResponse( @@ -274,7 +293,7 @@ public class ArchiveController { @ApiResponseZkAuth @ApiResponse(responseCode = "204", description = "The public key was set") @ApiResponse(responseCode = "429", description = "Rate limited.") - public CompletionStage setPublicKey( + public CompletionStage setPublicKey( @Auth final Optional account, @Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class)) @@ -286,9 +305,9 @@ public class ArchiveController { @HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature, @NotNull SetPublicKeyRequest setPublicKeyRequest) { - return backupManager.setPublicKey( - presentation.presentation, signature.signature, - setPublicKeyRequest.backupIdPublicKey); + return backupManager + .setPublicKey(presentation.presentation, signature.signature, setPublicKeyRequest.backupIdPublicKey) + .thenApply(Util.ASYNC_EMPTY_RESPONSE); } @@ -333,6 +352,233 @@ public class ArchiveController { result.signedUploadLocation())); } + public record RemoteAttachment( + @Schema(description = "The attachment cdn") + @NotNull + Integer cdn, + @NotBlank + @Schema(description = "The attachment key") + String key) {} + + public record CopyMediaRequest( + @Schema(description = "The object on the attachment CDN to copy") + @NotNull + RemoteAttachment sourceAttachment, + + @Schema(description = "The length of the source attachment before the encryption applied by the copy operation") + @NotNull + int objectLength, + + @Schema(description = "mediaId to copy on to the backup CDN in URL-safe base64", implementation = String.class) + @JsonSerialize(using = ByteArrayBase64UrlAdapter.Serializing.class) + @JsonDeserialize(using = ByteArrayBase64UrlAdapter.Deserializing.class) + @NotNull + @ExactlySize(15) + byte[] mediaId, + + @Schema(description = "A 32-byte key for the MAC, base64 encoded", implementation = String.class) + @JsonDeserialize(using = ByteArrayAdapter.Deserializing.class) + @NotNull + @ExactlySize(32) + byte[] hmacKey, + + @Schema(description = "A 32-byte encryption key for AES, base64 encoded", implementation = String.class) + @JsonDeserialize(using = ByteArrayAdapter.Deserializing.class) + @NotNull + @ExactlySize(32) + byte[] encryptionKey, + + @Schema(description = "A 16-byte IV for AES, base64 encoded", implementation = String.class) + @JsonDeserialize(using = ByteArrayAdapter.Deserializing.class) + @NotNull + @ExactlySize(16) + byte[] iv) {} + + public record CopyMediaResponse( + @Schema(description = "The backup cdn where this media object is stored") + @NotNull + Integer cdn) {} + + @PUT + @Path("/media/") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Operation( + summary = "Backup media", + description = """ + Copy and re-encrypt media from the attachments cdn into the backup cdn. + + The original, already encrypted, attachment will be encrypted with the provided key material before being copied + + If the destination media already exists, the copy will be skipped and a 200 will be returned. + """) + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = CopyMediaResponse.class))) + @ApiResponse(responseCode = "400", description = "The provided object length was incorrect") + @ApiResponse(responseCode = "413", description = "All media capacity has been consumed. Free some space to continue.") + @ApiResponse(responseCode = "410", description = "The source object was not found.") + @ApiResponse(responseCode = "429", description = "Rate limited.") + @ApiResponseZkAuth + public CompletionStage copyMedia(@Auth final Optional account, + + @Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class)) + @NotNull + @HeaderParam(X_SIGNAL_ZK_AUTH) final ArchiveController.BackupAuthCredentialPresentationHeader presentation, + + @Parameter(description = BackupAuthCredentialPresentationSignature.DESCRIPTION, schema = @Schema(implementation = String.class)) + @NotNull + @HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature, + + @NotNull + @Valid final ArchiveController.CopyMediaRequest copyMediaRequest) { + if (account.isPresent()) { + throw new BadRequestException("must not use authenticated connection for anonymous operations"); + } + + final AuthenticatedBackupUser backupUser = backupManager.authenticateBackupUser( + presentation.presentation, signature.signature).join(); + + final boolean fits = backupManager.canStoreMedia(backupUser, copyMediaRequest.objectLength()).join(); + if (!fits) { + throw new ClientErrorException("Media quota exhausted", Response.Status.REQUEST_ENTITY_TOO_LARGE); + } + return copyMediaImpl(backupUser, copyMediaRequest) + .thenApply(result -> new CopyMediaResponse(result.cdn())) + .exceptionally(e -> { + final Throwable unwrapped = ExceptionUtils.unwrap(e); + if (unwrapped instanceof SourceObjectNotFoundException) { + throw new ClientErrorException("Source object not found " + unwrapped.getMessage(), Response.Status.GONE); + } else if (unwrapped instanceof InvalidLengthException) { + throw new BadRequestException("Invalid length " + unwrapped.getMessage()); + } else { + throw ExceptionUtils.wrap(e); + } + }); + } + + private CompletionStage copyMediaImpl(final AuthenticatedBackupUser backupUser, + final CopyMediaRequest copyMediaRequest) { + return this.backupManager.copyToBackup( + backupUser, + copyMediaRequest.sourceAttachment.cdn, + copyMediaRequest.sourceAttachment.key, + copyMediaRequest.objectLength, + new MediaEncryptionParameters( + copyMediaRequest.encryptionKey, + copyMediaRequest.hmacKey, + copyMediaRequest.iv), + copyMediaRequest.mediaId); + } + + + public record CopyMediaBatchRequest( + @Schema(description = "A list of media objects to copy from the attachments CDN to the backup CDN") + @NotNull + @Size(min = 1, max = 1000) + List items) {} + + public record CopyMediaBatchResponse( + + @Schema(description = "Detailed outcome information for each copy request in the batch") + List responses) { + + public record Entry( + @Schema(description = """ + The outcome of the copy attempt. + A 200 indicates the object was successfully copied. + A 400 indicates an invalid argument in the request + A 410 indicates that the source object was not found + """) + int status, + + @Schema(description = "On a copy failure, a detailed failure reason") + String failureReason, + + @Schema(description = "The backup cdn where this media object is stored") + Integer cdn, + + @Schema(description = "The mediaId of the object in URL-safe base64", implementation = String.class) + @JsonSerialize(using = ByteArrayBase64UrlAdapter.Serializing.class) + @JsonDeserialize(using = ByteArrayBase64UrlAdapter.Deserializing.class) + @NotNull + @ExactlySize(15) + byte[] mediaId) {} + } + + @PUT + @Path("/media/batch") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Operation( + summary = "Batched backup media", + description = """ + Copy and re-encrypt media from the attachments cdn into the backup cdn. + + The original already encrypted attachment will be encrypted with the provided key material before being copied + + If the batch request is processed at all, a 207 will be returned and the outcome of each constituent copy will + be provided as a separate entry in the response. + """) + @ApiResponse(responseCode = "207", description = """ + The request was processed and each operation's outcome must be inspected individually. This does NOT necessarily + indicate the operation was a success. + """, content = @Content(schema = @Schema(implementation = CopyMediaBatchResponse.class))) + @ApiResponse(responseCode = "413", description = "All media capacity has been consumed. Free some space to continue.") + @ApiResponse(responseCode = "429", description = "Rate limited.") + @ApiResponseZkAuth + public CompletionStage copyMedia( + @Auth final Optional account, + + @Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class)) + @NotNull + @HeaderParam(X_SIGNAL_ZK_AUTH) final ArchiveController.BackupAuthCredentialPresentationHeader presentation, + + @Parameter(description = BackupAuthCredentialPresentationSignature.DESCRIPTION, schema = @Schema(implementation = String.class)) + @NotNull + @HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature, + + @NotNull + @Valid final ArchiveController.CopyMediaBatchRequest copyMediaRequest) { + + if (account.isPresent()) { + throw new BadRequestException("must not use authenticated connection for anonymous operations"); + } + + final AuthenticatedBackupUser backupUser = backupManager.authenticateBackupUser( + presentation.presentation, signature.signature).join(); + + // If the entire batch won't fit in the user's remaining quota, reject the whole request. + final long expectedStorage = copyMediaRequest.items().stream().mapToLong(CopyMediaRequest::objectLength).sum(); + final boolean fits = backupManager.canStoreMedia(backupUser, expectedStorage).join(); + if (!fits) { + throw new ClientErrorException("Media quota exhausted", Response.Status.REQUEST_ENTITY_TOO_LARGE); + } + + return Flux.fromIterable(copyMediaRequest.items) + // Operate sequentially, waiting for one copy to finish before starting the next one. At least right now, + // copying concurrently will introduce contention over the metadata. + .concatMap(request -> Mono + .fromCompletionStage(copyMediaImpl(backupUser, request)) + .map(result -> new CopyMediaBatchResponse.Entry(200, null, result.cdn(), result.key())) + .onErrorResume(throwable -> ExceptionUtils.unwrap(throwable) instanceof IOException, throwable -> { + final Throwable unwrapped = ExceptionUtils.unwrap(throwable); + + int status; + String error; + if (unwrapped instanceof SourceObjectNotFoundException) { + status = 410; + error = "Source object not found " + unwrapped.getMessage(); + } else if (unwrapped instanceof InvalidLengthException) { + status = 400; + error = "Invalid length " + unwrapped.getMessage(); + } else { + throw ExceptionUtils.wrap(throwable); + } + return Mono.just(new CopyMediaBatchResponse.Entry(status, error, null, request.mediaId)); + })) + .collectList() + .map(list -> Response.status(207).entity(new CopyMediaBatchResponse(list)).build()) + .toFuture(); + } @POST @Produces(MediaType.APPLICATION_JSON) @@ -345,7 +591,7 @@ public class ArchiveController { @ApiResponse(responseCode = "204", description = "The backup was successfully refreshed") @ApiResponse(responseCode = "429", description = "Rate limited.") @ApiResponseZkAuth - public CompletionStage refresh( + public CompletionStage refresh( @Auth final Optional account, @Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class)) @@ -360,6 +606,7 @@ public class ArchiveController { } return backupManager .authenticateBackupUser(presentation.presentation, signature.signature) - .thenCompose(backupManager::ttlRefresh); + .thenCompose(backupManager::ttlRefresh) + .thenApply(Util.ASYNC_EMPTY_RESPONSE); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/ExceptionUtils.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/ExceptionUtils.java index 28da692c2..dab21ec3b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/util/ExceptionUtils.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/ExceptionUtils.java @@ -1,13 +1,14 @@ package org.whispersystems.textsecuregcm.util; import java.util.concurrent.CompletionException; +import java.util.function.Function; public final class ExceptionUtils { private ExceptionUtils() { // utility class } - + /** * Extracts the cause of a {@link CompletionException}. If the given {@code throwable} is a * {@code CompletionException}, this method will recursively iterate through its causal chain until it finds the first @@ -16,7 +17,6 @@ public final class ExceptionUtils { * {@code throwable} is not a {@code CompletionException}, then this method returns the original {@code throwable}. * * @param throwable the throwable to "unwrap" - * * @return the first entity in the given {@code throwable}'s causal chain that is not a {@code CompletionException} */ public static Throwable unwrap(Throwable throwable) { @@ -27,8 +27,8 @@ public final class ExceptionUtils { } /** - * Wraps the given {@code throwable} in a {@link CompletionException} unless the given {@code throwable} is already - * a {@code CompletionException}, in which case this method returns the original throwable. + * Wraps the given {@code throwable} in a {@link CompletionException} unless the given {@code throwable} is already a + * {@code CompletionException}, in which case this method returns the original throwable. * * @param throwable the throwable to wrap in a {@code CompletionException} */ @@ -37,4 +37,29 @@ public final class ExceptionUtils { ? completionException : new CompletionException(throwable); } + + /** + * Create a handler suitable for use with {@link java.util.concurrent.CompletionStage#exceptionally} that only handles + * a specific exception subclass. + * + * @param exceptionType The class of exception that will be handled + * @param fn A function that handles exceptions of type exceptionType + * @param The type of the stage that will be mapped + * @param The type of the exception that will be handled + * @return A function suitable for use with {@link java.util.concurrent.CompletionStage#exceptionally} + */ + public static Function exceptionallyHandler( + final Class exceptionType, + final Function fn) { + return anyException -> { + if (exceptionType.isInstance(anyException)) { + return fn.apply(exceptionType.cast(anyException)); + } + final Throwable unwrap = unwrap(anyException); + if (exceptionType.isInstance(unwrap)) { + return fn.apply(exceptionType.cast(unwrap)); + } + throw wrap(anyException); + }; + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java index 619486337..caa8fcfc3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java @@ -8,27 +8,31 @@ package org.whispersystems.textsecuregcm.backup; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import java.net.URI; +import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.time.Duration; import java.time.Instant; import java.util.Arrays; import java.util.Base64; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.commons.lang3.RandomUtils; -import org.assertj.core.api.ThrowableAssert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -36,28 +40,28 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.signal.libsignal.protocol.ecc.Curve; import org.signal.libsignal.protocol.ecc.ECKeyPair; -import org.signal.libsignal.zkgroup.InvalidInputException; import org.signal.libsignal.zkgroup.VerificationFailedException; import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation; import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser; -import org.whispersystems.textsecuregcm.backup.BackupManager.BackupInfo; import org.whispersystems.textsecuregcm.storage.DynamoDbExtension; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema; import org.whispersystems.textsecuregcm.util.AttributeValues; -import org.whispersystems.textsecuregcm.util.ExceptionUtils; +import org.whispersystems.textsecuregcm.util.CompletableFutureTestUtil; import org.whispersystems.textsecuregcm.util.TestClock; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; -import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; public class BackupManagerTest { @RegisterExtension - private static final DynamoDbExtension DYNAMO_DB_EXTENSION = new DynamoDbExtension( - DynamoDbExtensionSchema.Tables.BACKUPS); + public static final DynamoDbExtension DYNAMO_DB_EXTENSION = new DynamoDbExtension( + DynamoDbExtensionSchema.Tables.BACKUPS, + DynamoDbExtensionSchema.Tables.BACKUP_MEDIA); private final TestClock testClock = TestClock.now(); private final BackupAuthTestUtil backupAuthTestUtil = new BackupAuthTestUtil(testClock); - private final TusBackupCredentialGenerator tusCredentialGenerator = mock(TusBackupCredentialGenerator.class); + private final Cdn3BackupCredentialGenerator tusCredentialGenerator = mock(Cdn3BackupCredentialGenerator.class); + private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); private final byte[] backupKey = RandomUtils.nextBytes(32); private final UUID aci = UUID.randomUUID(); @@ -68,16 +72,19 @@ public class BackupManagerTest { reset(tusCredentialGenerator); testClock.unpin(); this.backupManager = new BackupManager( + new BackupsDb(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), + DynamoDbExtensionSchema.Tables.BACKUPS.tableName(), DynamoDbExtensionSchema.Tables.BACKUP_MEDIA.tableName(), + testClock), backupAuthTestUtil.params, tusCredentialGenerator, - DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), - DynamoDbExtensionSchema.Tables.BACKUPS.tableName(), + remoteStorageManager, + Map.of(3, "cdn3.example.org/attachments"), testClock); } @ParameterizedTest @EnumSource(mode = EnumSource.Mode.EXCLUDE, names = {"NONE"}) - public void createBackup(final BackupTier backupTier) throws InvalidInputException, VerificationFailedException { + public void createBackup(final BackupTier backupTier) { final Instant now = Instant.ofEpochSecond(Duration.ofDays(1).getSeconds()); testClock.pin(now); @@ -89,18 +96,18 @@ public class BackupManagerTest { verify(tusCredentialGenerator, times(1)) .generateUpload(encodedBackupId, BackupManager.MESSAGE_BACKUP_NAME); - final BackupInfo info = backupManager.backupInfo(backupUser).join(); + final BackupManager.BackupInfo info = backupManager.backupInfo(backupUser).join(); assertThat(info.backupSubdir()).isEqualTo(encodedBackupId); assertThat(info.messageBackupKey()).isEqualTo(BackupManager.MESSAGE_BACKUP_NAME); assertThat(info.mediaUsedSpace()).isEqualTo(Optional.empty()); // Check that the initial expiration times are the initial write times - checkExpectedExpirations(now, backupTier == BackupTier.MEDIA ? now : null, backupUser.backupId()); + checkExpectedExpirations(now, backupTier == BackupTier.MEDIA ? now : null, backupUser); } @ParameterizedTest @EnumSource(mode = EnumSource.Mode.EXCLUDE, names = {"NONE"}) - public void ttlRefresh(final BackupTier backupTier) throws InvalidInputException, VerificationFailedException { + public void ttlRefresh(final BackupTier backupTier) { final AuthenticatedBackupUser backupUser = backupUser(RandomUtils.nextBytes(16), backupTier); final Instant tstart = Instant.ofEpochSecond(1).plus(Duration.ofDays(1)); @@ -117,12 +124,12 @@ public class BackupManagerTest { checkExpectedExpirations( tnext, backupTier == BackupTier.MEDIA ? tnext : null, - backupUser.backupId()); + backupUser); } @ParameterizedTest @EnumSource(mode = EnumSource.Mode.EXCLUDE, names = {"NONE"}) - public void createBackupRefreshesTtl(final BackupTier backupTier) throws VerificationFailedException { + public void createBackupRefreshesTtl(final BackupTier backupTier) { final Instant tstart = Instant.ofEpochSecond(1).plus(Duration.ofDays(1)); final Instant tnext = tstart.plus(Duration.ofSeconds(1)); @@ -139,7 +146,7 @@ public class BackupManagerTest { checkExpectedExpirations( tnext, backupTier == BackupTier.MEDIA ? tnext : null, - backupUser.backupId()); + backupUser); } @Test @@ -151,9 +158,10 @@ public class BackupManagerTest { final byte[] signature = keyPair.getPrivateKey().calculateSignature(presentation.serialize()); // haven't set a public key yet - assertThatExceptionOfType(StatusRuntimeException.class) - .isThrownBy(unwrapExceptions(() -> backupManager.authenticateBackupUser(presentation, signature))) - .extracting(ex -> ex.getStatus().getCode()) + assertThat(CompletableFutureTestUtil.assertFailsWithCause( + StatusRuntimeException.class, + backupManager.authenticateBackupUser(presentation, signature)) + .getStatus().getCode()) .isEqualTo(Status.NOT_FOUND.getCode()); } @@ -170,9 +178,10 @@ public class BackupManagerTest { backupManager.setPublicKey(presentation, signature1, keyPair1.getPublicKey()).join(); // shouldn't be able to set a different public key - assertThatExceptionOfType(StatusRuntimeException.class) - .isThrownBy(unwrapExceptions(() -> backupManager.setPublicKey(presentation, signature2, keyPair2.getPublicKey()))) - .extracting(ex -> ex.getStatus().getCode()) + assertThat(CompletableFutureTestUtil.assertFailsWithCause( + StatusRuntimeException.class, + backupManager.setPublicKey(presentation, signature2, keyPair2.getPublicKey())) + .getStatus().getCode()) .isEqualTo(Status.UNAUTHENTICATED.getCode()); // should be able to set the same public key again (noop) @@ -193,16 +202,17 @@ public class BackupManagerTest { // shouldn't be able to set a public key with an invalid signature assertThatExceptionOfType(StatusRuntimeException.class) - .isThrownBy(unwrapExceptions(() -> backupManager.setPublicKey(presentation, wrongSignature, keyPair.getPublicKey()))) + .isThrownBy(() -> backupManager.setPublicKey(presentation, wrongSignature, keyPair.getPublicKey())) .extracting(ex -> ex.getStatus().getCode()) .isEqualTo(Status.UNAUTHENTICATED.getCode()); backupManager.setPublicKey(presentation, signature, keyPair.getPublicKey()).join(); // shouldn't be able to authenticate with an invalid signature - assertThatExceptionOfType(StatusRuntimeException.class) - .isThrownBy(unwrapExceptions(() -> backupManager.authenticateBackupUser(presentation, wrongSignature))) - .extracting(ex -> ex.getStatus().getCode()) + assertThat(CompletableFutureTestUtil.assertFailsWithCause( + StatusRuntimeException.class, + backupManager.authenticateBackupUser(presentation, wrongSignature)) + .getStatus().getCode()) .isEqualTo(Status.UNAUTHENTICATED.getCode()); // correct signature @@ -212,11 +222,12 @@ public class BackupManagerTest { } @Test - public void credentialExpiration() throws InvalidInputException, VerificationFailedException { + public void credentialExpiration() throws VerificationFailedException { // credential for 1 day after epoch testClock.pin(Instant.ofEpochSecond(1).plus(Duration.ofDays(1))); - final BackupAuthCredentialPresentation oldCredential = backupAuthTestUtil.getPresentation(BackupTier.MESSAGES, backupKey, aci); + final BackupAuthCredentialPresentation oldCredential = backupAuthTestUtil.getPresentation(BackupTier.MESSAGES, + backupKey, aci); final ECKeyPair keyPair = Curve.generateKeyPair(); final byte[] signature = keyPair.getPrivateKey().calculateSignature(oldCredential.serialize()); backupManager.setPublicKey(oldCredential, signature, keyPair.getPublicKey()).join(); @@ -231,28 +242,95 @@ public class BackupManagerTest { // should be rejected the day after that testClock.pin(Instant.ofEpochSecond(1).plus(Duration.ofDays(3))); - assertThatExceptionOfType(StatusRuntimeException.class) - .isThrownBy(unwrapExceptions(() -> backupManager.authenticateBackupUser(oldCredential, signature))) - .extracting(ex -> ex.getStatus().getCode()) + assertThat(CompletableFutureTestUtil.assertFailsWithCause( + StatusRuntimeException.class, + backupManager.authenticateBackupUser(oldCredential, signature)) + .getStatus().getCode()) .isEqualTo(Status.UNAUTHENTICATED.getCode()); } + @Test + public void copySuccess() { + final AuthenticatedBackupUser backupUser = backupUser(RandomUtils.nextBytes(16), BackupTier.MEDIA); + when(tusCredentialGenerator.generateUpload(any(), any())) + .thenReturn(new MessageBackupUploadDescriptor(3, "def", Collections.emptyMap(), "")); + when(remoteStorageManager.copy(eq(URI.create("cdn3.example.org/attachments/abc")), eq(100), any(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + + final BackupManager.StorageDescriptor copied = backupManager.copyToBackup( + backupUser, 3, "abc", 100, mock(MediaEncryptionParameters.class), + "def".getBytes(StandardCharsets.UTF_8)).join(); + + assertThat(copied.cdn()).isEqualTo(3); + assertThat(copied.key()).isEqualTo("def".getBytes(StandardCharsets.UTF_8)); + + final Map backup = getBackupItem(backupUser); + final long bytesUsed = AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_BYTES_USED, 0L); + assertThat(bytesUsed).isEqualTo(100); + + final long mediaCount = AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_COUNT, 0L); + assertThat(mediaCount).isEqualTo(1); + + final Map mediaItem = getBackupMediaItem(backupUser, + "def".getBytes(StandardCharsets.UTF_8)); + final long mediaLength = AttributeValues.getLong(mediaItem, BackupsDb.ATTR_LENGTH, 0L); + assertThat(mediaLength).isEqualTo(100L); + } + + @Test + public void copyFailure() { + final AuthenticatedBackupUser backupUser = backupUser(RandomUtils.nextBytes(16), BackupTier.MEDIA); + when(tusCredentialGenerator.generateUpload(any(), any())) + .thenReturn(new MessageBackupUploadDescriptor(3, "def", Collections.emptyMap(), "")); + when(remoteStorageManager.copy(eq(URI.create("cdn3.example.org/attachments/abc")), eq(100), any(), any())) + .thenReturn(CompletableFuture.failedFuture(new SourceObjectNotFoundException())); + + CompletableFutureTestUtil.assertFailsWithCause(SourceObjectNotFoundException.class, + backupManager.copyToBackup( + backupUser, + 3, "abc", 100, + mock(MediaEncryptionParameters.class), + "def".getBytes(StandardCharsets.UTF_8))); + + final Map backup = getBackupItem(backupUser); + assertThat(AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_BYTES_USED, -1L)).isEqualTo(0L); + assertThat(AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_COUNT, -1L)).isEqualTo(0L); + + final Map media = getBackupMediaItem(backupUser, "def".getBytes(StandardCharsets.UTF_8)); + assertThat(media).isEmpty(); + } + + private Map getBackupItem(final AuthenticatedBackupUser backupUser) { + return DYNAMO_DB_EXTENSION.getDynamoDbClient().getItem(GetItemRequest.builder() + .tableName(DynamoDbExtensionSchema.Tables.BACKUPS.tableName()) + .key(Map.of(BackupsDb.KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupUser.backupId())))) + .build()) + .item(); + } + + private Map getBackupMediaItem(final AuthenticatedBackupUser backupUser, + final byte[] mediaId) { + return DYNAMO_DB_EXTENSION.getDynamoDbClient().getItem(GetItemRequest.builder() + .tableName(DynamoDbExtensionSchema.Tables.BACKUP_MEDIA.tableName()) + .key(Map.of( + BackupsDb.KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupUser.backupId())), + BackupsDb.KEY_MEDIA_ID, AttributeValues.b(mediaId))) + .build()) + .item(); + } + private void checkExpectedExpirations( final Instant expectedExpiration, final @Nullable Instant expectedMediaExpiration, - final byte[] backupId) { - final GetItemResponse item = DYNAMO_DB_EXTENSION.getDynamoDbClient().getItem(GetItemRequest.builder() - .tableName(DynamoDbExtensionSchema.Tables.BACKUPS.tableName()) - .key(Map.of(BackupManager.KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupId)))) - .build()); - assertThat(item.hasItem()).isTrue(); - final Instant refresh = Instant.ofEpochSecond(Long.parseLong(item.item().get(BackupManager.ATTR_LAST_REFRESH).n())); + final AuthenticatedBackupUser backupUser) { + final Map item = getBackupItem(backupUser); + final Instant refresh = Instant.ofEpochSecond(Long.parseLong(item.get(BackupsDb.ATTR_LAST_REFRESH).n())); assertThat(refresh).isEqualTo(expectedExpiration); if (expectedMediaExpiration == null) { - assertThat(item.item()).doesNotContainKey(BackupManager.ATTR_LAST_MEDIA_REFRESH); + assertThat(item).doesNotContainKey(BackupsDb.ATTR_LAST_MEDIA_REFRESH); } else { - assertThat(Instant.ofEpochSecond(Long.parseLong(item.item().get(BackupManager.ATTR_LAST_MEDIA_REFRESH).n()))) + assertThat(Instant.ofEpochSecond(Long.parseLong(item.get(BackupsDb.ATTR_LAST_MEDIA_REFRESH).n()))) .isEqualTo(expectedMediaExpiration); } } @@ -268,17 +346,4 @@ public class BackupManagerTest { private AuthenticatedBackupUser backupUser(final byte[] backupId, final BackupTier backupTier) { return new AuthenticatedBackupUser(backupId, backupTier); } - - private ThrowableAssert.ThrowingCallable unwrapExceptions(final Supplier> f) { - return () -> { - try { - f.get().join(); - } catch (Exception e) { - if (ExceptionUtils.unwrap(e) instanceof StatusRuntimeException ex) { - throw ex; - } - throw e; - } - }; - } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupsDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupsDbTest.java new file mode 100644 index 000000000..1420e89c5 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupsDbTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.backup; + + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser; +import org.whispersystems.textsecuregcm.storage.DynamoDbExtension; +import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema; +import org.whispersystems.textsecuregcm.util.CompletableFutureTestUtil; +import org.whispersystems.textsecuregcm.util.TestClock; + +public class BackupsDbTest { + + @RegisterExtension + public static final DynamoDbExtension DYNAMO_DB_EXTENSION = new DynamoDbExtension( + DynamoDbExtensionSchema.Tables.BACKUPS, + DynamoDbExtensionSchema.Tables.BACKUP_MEDIA); + + private final TestClock testClock = TestClock.now(); + private BackupsDb backupsDb; + + @BeforeEach + public void setup() { + testClock.unpin(); + backupsDb = new BackupsDb(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), + DynamoDbExtensionSchema.Tables.BACKUPS.tableName(), DynamoDbExtensionSchema.Tables.BACKUP_MEDIA.tableName(), + testClock); + } + + @Test + public void trackMediaIdempotent() { + final AuthenticatedBackupUser backupUser = backupUser(RandomUtils.nextBytes(16), BackupTier.MEDIA); + this.backupsDb.trackMedia(backupUser, "abc".getBytes(StandardCharsets.UTF_8), 100).join(); + assertDoesNotThrow(() -> + this.backupsDb.trackMedia(backupUser, "abc".getBytes(StandardCharsets.UTF_8), 100).join()); + } + + @Test + public void trackMediaLengthChange() { + final AuthenticatedBackupUser backupUser = backupUser(RandomUtils.nextBytes(16), BackupTier.MEDIA); + this.backupsDb.trackMedia(backupUser, "abc".getBytes(StandardCharsets.UTF_8), 100).join(); + CompletableFutureTestUtil.assertFailsWithCause(InvalidLengthException.class, + this.backupsDb.trackMedia(backupUser, "abc".getBytes(StandardCharsets.UTF_8), 99)); + } + + @Test + public void trackMediaStats() { + final AuthenticatedBackupUser backupUser = backupUser(RandomUtils.nextBytes(16), BackupTier.MEDIA); + // add at least one message backup so we can describe it + backupsDb.addMessageBackup(backupUser).join(); + int total = 0; + for (int i = 0; i < 5; i++) { + this.backupsDb.trackMedia(backupUser, Integer.toString(i).getBytes(StandardCharsets.UTF_8), i).join(); + total += i; + final BackupsDb.BackupDescription description = this.backupsDb.describeBackup(backupUser).join(); + assertThat(description.mediaUsedSpace().get()).isEqualTo(total); + } + + for (int i = 0; i < 5; i++) { + this.backupsDb.untrackMedia(backupUser, Integer.toString(i).getBytes(StandardCharsets.UTF_8), i).join(); + total -= i; + final BackupsDb.BackupDescription description = this.backupsDb.describeBackup(backupUser).join(); + assertThat(description.mediaUsedSpace().get()).isEqualTo(total); + } + } + + + private static byte[] hashedBackupId(final byte[] backupId) { + try { + return Arrays.copyOf(MessageDigest.getInstance("SHA-256").digest(backupId), 16); + } catch (NoSuchAlgorithmException e) { + throw new AssertionError(e); + } + } + + private AuthenticatedBackupUser backupUser(final byte[] backupId, final BackupTier backupTier) { + return new AuthenticatedBackupUser(backupId, backupTier); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/TusBackupCredentialGeneratorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3BackupCredentialGeneratorTest.java similarity index 88% rename from service/src/test/java/org/whispersystems/textsecuregcm/backup/TusBackupCredentialGeneratorTest.java rename to service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3BackupCredentialGeneratorTest.java index 993a370f6..80c183a7c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/backup/TusBackupCredentialGeneratorTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3BackupCredentialGeneratorTest.java @@ -16,10 +16,10 @@ import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; -public class TusBackupCredentialGeneratorTest { +public class Cdn3BackupCredentialGeneratorTest { @Test public void uploadGenerator() { - TusBackupCredentialGenerator generator = new TusBackupCredentialGenerator(new TusConfiguration( + Cdn3BackupCredentialGenerator generator = new Cdn3BackupCredentialGenerator(new TusConfiguration( new SecretBytes(RandomUtils.nextBytes(32)), "https://example.org/upload")); @@ -33,7 +33,7 @@ public class TusBackupCredentialGeneratorTest { @Test public void readCredential() { - TusBackupCredentialGenerator generator = new TusBackupCredentialGenerator(new TusConfiguration( + Cdn3BackupCredentialGenerator generator = new Cdn3BackupCredentialGenerator(new TusConfiguration( new SecretBytes(RandomUtils.nextBytes(32)), "https://example.org/upload")); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManagerTest.java new file mode 100644 index 000000000..3aec5b6e6 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManagerTest.java @@ -0,0 +1,185 @@ +package org.whispersystems.textsecuregcm.backup; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +import com.github.tomakehurst.wiremock.junit5.WireMockExtension; +import io.dropwizard.testing.junit5.DropwizardExtensionsSupport; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.security.InvalidAlgorithmParameterException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import javax.crypto.BadPaddingException; +import javax.crypto.Cipher; +import javax.crypto.IllegalBlockSizeException; +import javax.crypto.Mac; +import javax.crypto.NoSuchPaddingException; +import javax.crypto.spec.IvParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; +import org.whispersystems.textsecuregcm.util.CompletableFutureTestUtil; + +@ExtendWith(DropwizardExtensionsSupport.class) +public class Cdn3RemoteStorageManagerTest { + + private static byte[] HMAC_KEY = getRandomBytes(32); + private static byte[] AES_KEY = getRandomBytes(32); + private static byte[] IV = getRandomBytes(16); + + @RegisterExtension + private final WireMockExtension wireMock = WireMockExtension.newInstance() + .options(wireMockConfig().dynamicPort()) + .build(); + + private static String SMALL_CDN2 = "a small object from cdn2"; + private static String SMALL_CDN3 = "a small object from cdn3"; + private static String LARGE = "a".repeat(1024 * 1024 * 5); + + private RemoteStorageManager remoteStorageManager; + + @BeforeEach + public void init() throws CertificateException { + remoteStorageManager = new Cdn3RemoteStorageManager( + Executors.newSingleThreadScheduledExecutor(), + new CircuitBreakerConfiguration(), + new RetryConfiguration(), + Collections.emptyList()); + + wireMock.stubFor(get(urlEqualTo("/cdn2/source/small")) + .willReturn(aResponse() + .withHeader("Content-Length", Integer.toString(SMALL_CDN2.length())) + .withBody(SMALL_CDN2))); + + wireMock.stubFor(get(urlEqualTo("/cdn3/source/small")) + .willReturn(aResponse() + .withHeader("Content-Length", Integer.toString(SMALL_CDN3.length())) + .withBody(SMALL_CDN3))); + + wireMock.stubFor(get(urlEqualTo("/cdn3/source/large")) + .willReturn(aResponse() + .withHeader("Content-Length", Integer.toString(LARGE.length())) + .withBody(LARGE))); + + wireMock.stubFor(get(urlEqualTo("/cdn3/source/missing")) + .willReturn(aResponse().withStatus(404))); + } + + @ParameterizedTest + @ValueSource(ints = {2, 3}) + public void copySmall(final int sourceCdn) + throws InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { + + final String expectedSource = switch (sourceCdn) { + case 2 -> SMALL_CDN2; + case 3 -> SMALL_CDN3; + default -> throw new AssertionError(); + }; + + wireMock.stubFor(post(urlEqualTo("/cdn3/dest")) + .willReturn(aResponse() + .withStatus(201))); + + remoteStorageManager.copy( + URI.create(wireMock.url("/cdn" + sourceCdn + "/source/small")), + expectedSource.length(), + new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV), + new MessageBackupUploadDescriptor(3, "test", Collections.emptyMap(), wireMock.url("/cdn3/dest"))) + .toCompletableFuture().join(); + + final byte[] destBody = wireMock.findAll(postRequestedFor(urlEqualTo("/cdn3/dest"))).get(0).getBody(); + assertThat(new String(decrypt(destBody), StandardCharsets.UTF_8)) + .isEqualTo(expectedSource); + } + + @Test + public void copyLarge() + throws InvalidAlgorithmParameterException, IllegalBlockSizeException, BadPaddingException, InvalidKeyException { + wireMock.stubFor(post(urlEqualTo("/cdn3/dest")) + .willReturn(aResponse() + .withStatus(201))); + final MediaEncryptionParameters params = new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV); + remoteStorageManager.copy( + URI.create(wireMock.url("/cdn3/source/large")), + LARGE.length(), + params, + new MessageBackupUploadDescriptor(3, "test", Collections.emptyMap(), wireMock.url("/cdn3/dest"))) + .toCompletableFuture().join(); + + final byte[] destBody = wireMock.findAll(postRequestedFor(urlEqualTo("/cdn3/dest"))).get(0).getBody(); + assertThat(destBody.length).isEqualTo(new BackupMediaEncrypter(params).outputSize(LARGE.length())); + assertThat(new String(decrypt(destBody), StandardCharsets.UTF_8)).isEqualTo(LARGE); + } + + @Test + public void incorrectLength() { + CompletableFutureTestUtil.assertFailsWithCause(InvalidLengthException.class, + remoteStorageManager.copy( + URI.create(wireMock.url("/cdn3/source/small")), + SMALL_CDN3.length() - 1, + new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV), + new MessageBackupUploadDescriptor(3, "test", Collections.emptyMap(), wireMock.url("/cdn3/dest"))) + .toCompletableFuture()); + } + + @Test + public void sourceMissing() { + CompletableFutureTestUtil.assertFailsWithCause(SourceObjectNotFoundException.class, + remoteStorageManager.copy( + URI.create(wireMock.url("/cdn3/source/missing")), + 1, + new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV), + new MessageBackupUploadDescriptor(3, "test", Collections.emptyMap(), wireMock.url("/cdn3/dest"))) + .toCompletableFuture()); + } + + private byte[] decrypt(final byte[] encrypted) + throws InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { + + final Mac mac; + try { + mac = Mac.getInstance("HmacSHA256"); + } catch (NoSuchAlgorithmException e) { + throw new AssertionError(e); + } + + mac.init(new SecretKeySpec(HMAC_KEY, "HmacSHA256")); + mac.update(encrypted, 0, encrypted.length - mac.getMacLength()); + assertArrayEquals(mac.doFinal(), + Arrays.copyOfRange(encrypted, encrypted.length - mac.getMacLength(), encrypted.length)); + assertArrayEquals(IV, Arrays.copyOf(encrypted, 16)); + + final Cipher cipher; + try { + cipher = Cipher.getInstance("AES/CBC/PKCS5Padding"); + } catch (NoSuchAlgorithmException | NoSuchPaddingException e) { + throw new AssertionError(e); + } + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(AES_KEY, "AES"), new IvParameterSpec(IV)); + return cipher.doFinal(encrypted, IV.length, encrypted.length - IV.length - mac.getMacLength()); + } + + private static byte[] getRandomBytes(int length) { + byte[] result = new byte[length]; + ThreadLocalRandom.current().nextBytes(result); + return result; + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/ArchiveControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/ArchiveControllerTest.java index 88299afe3..78636f119 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/ArchiveControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/ArchiveControllerTest.java @@ -7,6 +7,8 @@ package org.whispersystems.textsecuregcm.controllers; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; @@ -22,11 +24,13 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Arrays; import java.util.Base64; import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; import java.util.stream.Stream; import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation; @@ -54,6 +58,8 @@ import org.whispersystems.textsecuregcm.backup.BackupAuthManager; import org.whispersystems.textsecuregcm.backup.BackupAuthTestUtil; import org.whispersystems.textsecuregcm.backup.BackupManager; import org.whispersystems.textsecuregcm.backup.BackupTier; +import org.whispersystems.textsecuregcm.backup.InvalidLengthException; +import org.whispersystems.textsecuregcm.backup.SourceObjectNotFoundException; import org.whispersystems.textsecuregcm.mappers.CompletionExceptionMapper; import org.whispersystems.textsecuregcm.mappers.GrpcStatusRuntimeExceptionMapper; import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper; @@ -96,6 +102,22 @@ public class ArchiveControllerTest { GET, v1/archives/upload/form, POST, v1/archives/, PUT, v1/archives/keys, '{"backupIdPublicKey": "aaaaa"}' + PUT, v1/archives/media, '{ + "sourceAttachment": {"cdn": 3, "key": "abc"}, + "objectLength": 10, + "mediaId": "aaaaaaaaaaaaaaaaaaaa", + "hmacKey": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "encryptionKey": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "iv": "aaaaaaaaaaaaaaaaaaaaaa" + }' + PUT, v1/archives/media/batch, '{"items": [{ + "sourceAttachment": {"cdn": 3, "key": "abc"}, + "objectLength": 10, + "mediaId": "aaaaaaaaaaaaaaaaaaaa", + "hmacKey": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "encryptionKey": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "iv": "aaaaaaaaaaaaaaaaaaaaaa" + }]}' """) public void anonymousAuthOnly(final String method, final String path, final String body) throws VerificationFailedException { @@ -269,4 +291,139 @@ public class ArchiveControllerTest { assertThat(response.cdn()).isEqualTo(1); assertThat(response.usedSpace()).isNull(); } + + @Test + public void putMediaBatchSuccess() throws VerificationFailedException { + final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( + BackupTier.MEDIA, backupKey, aci); + when(backupManager.authenticateBackupUser(any(), any())) + .thenReturn(CompletableFuture.completedFuture( + new AuthenticatedBackupUser(presentation.getBackupId(), BackupTier.MEDIA))); + when(backupManager.canStoreMedia(any(), anyLong())).thenReturn(CompletableFuture.completedFuture(true)); + when(backupManager.copyToBackup(any(), anyInt(), any(), anyInt(), any(), any())) + .thenAnswer(invocation -> { + byte[] mediaId = invocation.getArgument(5, byte[].class); + return CompletableFuture.completedFuture(new BackupManager.StorageDescriptor(1, mediaId)); + }); + + final byte[][] mediaIds = new byte[][]{RandomUtils.nextBytes(15), RandomUtils.nextBytes(15)}; + + final Response r = resources.getJerseyTest() + .target("v1/archives/media/batch") + .request() + .header("X-Signal-ZK-Auth", Base64.getEncoder().encodeToString(presentation.serialize())) + .header("X-Signal-ZK-Auth-Signature", "aaa") + .put(Entity.json(new ArchiveController.CopyMediaBatchRequest(List.of( + new ArchiveController.CopyMediaRequest( + new ArchiveController.RemoteAttachment(3, "abc"), + 100, + mediaIds[0], + RandomUtils.nextBytes(32), + RandomUtils.nextBytes(32), + RandomUtils.nextBytes(16)), + + new ArchiveController.CopyMediaRequest( + new ArchiveController.RemoteAttachment(3, "def"), + 200, + mediaIds[1], + RandomUtils.nextBytes(32), + RandomUtils.nextBytes(32), + RandomUtils.nextBytes(16)) + )))); + assertThat(r.getStatus()).isEqualTo(207); + final ArchiveController.CopyMediaBatchResponse copyResponse = r.readEntity( + ArchiveController.CopyMediaBatchResponse.class); + assertThat(copyResponse.responses()).hasSize(2); + for (int i = 0; i < 2; i++) { + final ArchiveController.CopyMediaBatchResponse.Entry response = copyResponse.responses().get(i); + assertThat(response.cdn()).isEqualTo(1); + assertThat(response.mediaId()).isEqualTo(mediaIds[i]); + assertThat(response.status()).isEqualTo(200); + } + } + + @Test + public void putMediaBatchPartialFailure() throws VerificationFailedException { + + final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( + BackupTier.MEDIA, backupKey, aci); + when(backupManager.authenticateBackupUser(any(), any())) + .thenReturn(CompletableFuture.completedFuture( + new AuthenticatedBackupUser(presentation.getBackupId(), BackupTier.MEDIA))); + + final byte[][] mediaIds = IntStream.range(0, 3).mapToObj(i -> RandomUtils.nextBytes(15)).toArray(byte[][]::new); + when(backupManager.canStoreMedia(any(), anyLong())).thenReturn(CompletableFuture.completedFuture(true)); + + when(backupManager.copyToBackup(any(), anyInt(), any(), anyInt(), any(), eq(mediaIds[0]))) + .thenReturn(CompletableFuture.completedFuture(new BackupManager.StorageDescriptor(1, mediaIds[0]))); + when(backupManager.copyToBackup(any(), anyInt(), any(), anyInt(), any(), eq(mediaIds[1]))) + .thenReturn(CompletableFuture.failedFuture(new SourceObjectNotFoundException())); + when(backupManager.copyToBackup(any(), anyInt(), any(), anyInt(), any(), eq(mediaIds[2]))) + .thenReturn(CompletableFuture.failedFuture(new InvalidLengthException("bad length"))); + + final List copyRequests = Arrays.stream(mediaIds) + .map(mediaId -> new ArchiveController.CopyMediaRequest( + new ArchiveController.RemoteAttachment(3, "abc"), + 100, + mediaId, + RandomUtils.nextBytes(32), + RandomUtils.nextBytes(32), + RandomUtils.nextBytes(16)) + ).toList(); + + Response r = resources.getJerseyTest() + .target("v1/archives/media/batch") + .request() + .header("X-Signal-ZK-Auth", Base64.getEncoder().encodeToString(presentation.serialize())) + .header("X-Signal-ZK-Auth-Signature", "aaa") + .put(Entity.json(new ArchiveController.CopyMediaBatchRequest(copyRequests))); + assertThat(r.getStatus()).isEqualTo(207); + final ArchiveController.CopyMediaBatchResponse copyResponse = r.readEntity( + ArchiveController.CopyMediaBatchResponse.class); + + assertThat(copyResponse.responses()).hasSize(3); + + final ArchiveController.CopyMediaBatchResponse.Entry r1 = copyResponse.responses().get(0); + assertThat(r1.cdn()).isEqualTo(1); + assertThat(r1.mediaId()).isEqualTo(mediaIds[0]); + assertThat(r1.status()).isEqualTo(200); + + final ArchiveController.CopyMediaBatchResponse.Entry r2 = copyResponse.responses().get(1); + assertThat(r2.mediaId()).isEqualTo(mediaIds[1]); + assertThat(r2.status()).isEqualTo(410); + assertThat(r2.failureReason()).isNotBlank(); + + final ArchiveController.CopyMediaBatchResponse.Entry r3 = copyResponse.responses().get(2); + assertThat(r3.mediaId()).isEqualTo(mediaIds[2]); + assertThat(r3.status()).isEqualTo(400); + assertThat(r3.failureReason()).isNotBlank(); + } + + @Test + public void putMediaBatchOutOfSpace() throws VerificationFailedException { + final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( + BackupTier.MEDIA, backupKey, aci); + when(backupManager.authenticateBackupUser(any(), any())) + .thenReturn(CompletableFuture.completedFuture( + new AuthenticatedBackupUser(presentation.getBackupId(), BackupTier.MEDIA))); + + when(backupManager.canStoreMedia(any(), eq(1L + 2L + 3L))) + .thenReturn(CompletableFuture.completedFuture(false)); + + final Response response = resources.getJerseyTest() + .target("v1/archives/media/batch") + .request() + .header("X-Signal-ZK-Auth", Base64.getEncoder().encodeToString(presentation.serialize())) + .header("X-Signal-ZK-Auth-Signature", "aaa") + .put(Entity.json(new ArchiveController.CopyMediaBatchRequest(IntStream.range(0, 3) + .mapToObj(i -> new ArchiveController.CopyMediaRequest( + new ArchiveController.RemoteAttachment(3, "abc"), + i + 1, + RandomUtils.nextBytes(15), + RandomUtils.nextBytes(32), + RandomUtils.nextBytes(32), + RandomUtils.nextBytes(16)) + ).toList()))); + assertThat(response.getStatus()).isEqualTo(413); + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java index c301cb425..86d73847d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java @@ -7,7 +7,7 @@ package org.whispersystems.textsecuregcm.storage; import java.util.Collections; import java.util.List; -import org.whispersystems.textsecuregcm.backup.BackupManager; +import org.whispersystems.textsecuregcm.backup.BackupsDb; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; @@ -50,13 +50,25 @@ public final class DynamoDbExtensionSchema { List.of()), BACKUPS("backups_test", - BackupManager.KEY_BACKUP_ID_HASH, + BackupsDb.KEY_BACKUP_ID_HASH, null, List.of(AttributeDefinition.builder() - .attributeName(BackupManager.KEY_BACKUP_ID_HASH) + .attributeName(BackupsDb.KEY_BACKUP_ID_HASH) .attributeType(ScalarAttributeType.B).build()), Collections.emptyList(), Collections.emptyList()), + BACKUP_MEDIA("backups_media_test", + BackupsDb.KEY_BACKUP_ID_HASH, + BackupsDb.KEY_MEDIA_ID, + List.of( + AttributeDefinition.builder() + .attributeName(BackupsDb.KEY_BACKUP_ID_HASH) + .attributeType(ScalarAttributeType.B).build(), + AttributeDefinition.builder() + .attributeName(BackupsDb.KEY_MEDIA_ID) + .attributeType(ScalarAttributeType.B).build()), + Collections.emptyList(), Collections.emptyList()), + CLIENT_RELEASES("client_releases_test", ClientReleases.ATTR_PLATFORM, ClientReleases.ATTR_VERSION, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/util/CompletableFutureTestUtil.java b/service/src/test/java/org/whispersystems/textsecuregcm/util/CompletableFutureTestUtil.java index 6656bba36..4ee4f0069 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/util/CompletableFutureTestUtil.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/util/CompletableFutureTestUtil.java @@ -17,13 +17,16 @@ public class CompletableFutureTestUtil { private CompletableFutureTestUtil() { } - public static void assertFailsWithCause(final Class expectedCause, final CompletableFuture completableFuture) { - assertFailsWithCause(expectedCause, completableFuture, null); + public static T assertFailsWithCause(final Class expectedCause, final CompletableFuture completableFuture) { + return assertFailsWithCause(expectedCause, completableFuture, null); } - public static void assertFailsWithCause(final Class expectedCause, final CompletableFuture completableFuture, final String message) { + public static T assertFailsWithCause(final Class expectedCause, final CompletableFuture completableFuture, final String message) { final CompletionException completionException = assertThrows(CompletionException.class, completableFuture::join, message); - assertTrue(ExceptionUtils.unwrap(completionException).getClass().isAssignableFrom(expectedCause), message); + final Throwable unwrapped = ExceptionUtils.unwrap(completionException); + final String compError = "Expected failure " + expectedCause + " was " + unwrapped.getClass(); + assertTrue(unwrapped.getClass().isAssignableFrom(expectedCause), message == null ? compError : message + " : " + compError); + return expectedCause.cast(unwrapped); } public static CompletableFuture almostCompletedFuture(T result) {