From 4aadabfac0e4f79ebdbc3baae541335352831eac Mon Sep 17 00:00:00 2001 From: ravi-signal <99042880+ravi-signal@users.noreply.github.com> Date: Thu, 20 Jun 2024 16:00:09 -0500 Subject: [PATCH] Make copy/delete streaming friendly --- .../textsecuregcm/backup/BackupManager.java | 281 +++++++++++------- .../textsecuregcm/backup/CopyParameters.java | 29 ++ .../textsecuregcm/backup/CopyResult.java | 46 +++ .../controllers/ArchiveController.java | 136 ++++----- .../backup/BackupManagerTest.java | 196 +++++++----- .../controllers/ArchiveControllerTest.java | 69 ++--- 6 files changed, 436 insertions(+), 321 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/backup/CopyParameters.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/backup/CopyResult.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java index a79892841..3889acd1f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java @@ -14,13 +14,14 @@ import java.security.SecureRandom; import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import org.signal.libsignal.protocol.ecc.Curve; import org.signal.libsignal.protocol.ecc.ECPublicKey; import org.signal.libsignal.zkgroup.GenericServerSecretParams; @@ -32,7 +33,6 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.attachments.AttachmentGenerator; import org.whispersystems.textsecuregcm.attachments.TusAttachmentGenerator; import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser; -import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; import org.whispersystems.textsecuregcm.limits.RateLimiter; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; @@ -56,6 +56,9 @@ public class BackupManager { // How many cdn object deletion requests can be outstanding at a time per backup deletion operation private static final int DELETION_CONCURRENCY = 10; + // How many cdn object copy requests can be outstanding at a time per batch copy-to-backup operation + private static final int COPY_CONCURRENCY = 10; + private static final String ZK_AUTHN_COUNTER_NAME = MetricsUtil.name(BackupManager.class, "authentication"); private static final String ZK_AUTHZ_FAILURE_COUNTER_NAME = MetricsUtil.name(BackupManager.class, @@ -148,7 +151,7 @@ public class BackupManager { .thenApply(result -> cdn3BackupCredentialGenerator.generateUpload(cdnMessageBackupName(backupUser))); } - public CompletionStage createTemporaryAttachmentUploadDescriptor( + public CompletableFuture createTemporaryAttachmentUploadDescriptor( final AuthenticatedBackupUser backupUser) { checkBackupLevel(backupUser, BackupLevel.MEDIA); @@ -160,7 +163,7 @@ public class BackupManager { final String attachmentKey = Base64.getUrlEncoder().encodeToString(bytes); final AttachmentGenerator.Descriptor descriptor = tusAttachmentGenerator.generateAttachment(attachmentKey); return new BackupUploadDescriptor(3, attachmentKey, descriptor.headers(), descriptor.signedUploadLocation()); - }); + }).toCompletableFuture(); } /** @@ -195,19 +198,96 @@ public class BackupManager { } /** - * Check if there is enough capacity to store the requested amount of media + * Copy an encrypted object to the backup cdn, adding a layer of encryption + *

+ * Implementation notes:

This method guarantees that any object that gets successfully copied to the backup cdn + * will also be deducted from the user's quota.

+ *

+ * However, the converse isn't true. It's possible we may charge the user for media they failed to copy. As a result, + * the quota may be over reported. It should be recalculated before taking quota enforcement actions. * - * @param backupUser an already ZK authenticated backup user - * @param mediaLength the desired number of media bytes to store - * @return true if mediaLength bytes can be stored + * @return A Flux that emits the locations of the double-encrypted objects on the backup cdn, or includes an error + * detailing why the object could not be copied. */ - public CompletableFuture canStoreMedia(final AuthenticatedBackupUser backupUser, final long mediaLength) { + public Flux copyToBackup(final AuthenticatedBackupUser backupUser, List toCopy) { checkBackupLevel(backupUser, BackupLevel.MEDIA); + + return Mono + // Figure out how many objects we're allowed to copy, updating the quota usage for the amount we are allowed + .fromFuture(enforceQuota(backupUser, toCopy)) + + // Copy the ones we have enough quota to hold + .flatMapMany(quotaResult -> Flux.concat( + + // These fit in our remaining quota, so perform the copy. If the copy fails, our estimated quota usage may not + // be exact since we already updated our usage. We make a best-effort attempt to undo the usage update if we + // know that the copied failed for sure though. + Flux.fromIterable(quotaResult.requestsToCopy()).flatMapSequential( + copyParams -> copyToBackup(backupUser, copyParams) + .flatMap(copyResult -> switch (copyResult.outcome()) { + case SUCCESS -> Mono.just(copyResult); + case SOURCE_WRONG_LENGTH, SOURCE_NOT_FOUND, OUT_OF_QUOTA -> Mono + .fromFuture(this.backupsDb.trackMedia(backupUser, -1, -copyParams.destinationObjectSize())) + .thenReturn(copyResult); + }), + COPY_CONCURRENCY), + + // There wasn't enough quota remaining to perform these copies + Flux.fromIterable(quotaResult.requestsToReject()) + .map(arg -> new CopyResult(CopyResult.Outcome.OUT_OF_QUOTA, arg.destinationMediaId(), null)))); + } + + private Mono copyToBackup(final AuthenticatedBackupUser backupUser, final CopyParameters copyParameters) { + return Mono.fromCompletionStage(() -> remoteStorageManager.copy( + copyParameters.sourceCdn(), copyParameters.sourceKey(), copyParameters.sourceLength(), + copyParameters.encryptionParameters(), + cdnMediaPath(backupUser, copyParameters.destinationMediaId()))) + + // Successfully copied! + .thenReturn(new CopyResult( + CopyResult.Outcome.SUCCESS, copyParameters.destinationMediaId(), remoteStorageManager.cdnNumber())) + + // Otherwise, squash per-item copy errors that don't fail the entire operation + .onErrorResume( + // If the error maps to an explicit result type + throwable -> + CopyResult.fromCopyError(throwable, copyParameters.destinationMediaId()).isPresent(), + // return that result type instead of propagating the error + throwable -> + Mono.just(CopyResult.fromCopyError(throwable, copyParameters.destinationMediaId()).orElseThrow())); + } + + private record QuotaResult(List requestsToCopy, List requestsToReject) {} + + /** + * Determine which copy requests can be performed with the user's remaining quota and update the used quota. If a copy + * request subsequently fails, the caller should attempt to restore the quota for the failed copy. + * + * @param backupUser The user quota to update + * @param toCopy The proposed copy requests + * @return QuotaResult indicating which requests fit into the remaining quota and which requests should be rejected + * with {@link CopyResult.Outcome#OUT_OF_QUOTA} + */ + private CompletableFuture enforceQuota( + final AuthenticatedBackupUser backupUser, + final List toCopy) { + final long totalBytesAdded = toCopy.stream() + .mapToLong(copyParameters -> { + if (copyParameters.sourceLength() > MAX_MEDIA_OBJECT_SIZE) { + throw Status.INVALID_ARGUMENT + .withDescription("Invalid sourceObject size") + .asRuntimeException(); + } + return copyParameters.destinationObjectSize(); + }) + .sum(); + return backupsDb.getMediaUsage(backupUser) .thenComposeAsync(info -> { - final boolean canStore = MAX_TOTAL_BACKUP_MEDIA_BYTES - info.usageInfo().bytesUsed() >= mediaLength; + long remainingQuota = MAX_TOTAL_BACKUP_MEDIA_BYTES - info.usageInfo().bytesUsed(); + final boolean canStore = remainingQuota >= totalBytesAdded; if (canStore || info.lastRecalculationTime().isAfter(clock.instant().minus(MAX_QUOTA_STALENESS))) { - return CompletableFuture.completedFuture(canStore); + return CompletableFuture.completedFuture(remainingQuota); } // The user is out of quota, and we have not recently recalculated the user's usage. Double check by doing a @@ -221,69 +301,46 @@ public class BackupManager { Metrics.counter(USAGE_RECALCULATION_COUNTER_NAME, "usageChanged", String.valueOf(usageChanged)) .increment(); }) - .thenApply(newUsage -> MAX_TOTAL_BACKUP_MEDIA_BYTES - newUsage.bytesUsed() >= mediaLength); + .thenApply(newUsage -> MAX_TOTAL_BACKUP_MEDIA_BYTES - newUsage.bytesUsed()); + }) + .thenCompose(remainingQuota -> { + // Figure out how many of the requested objects fit in the remaining quota + final int index = indexWhereTotalExceeds(toCopy, CopyParameters::destinationObjectSize, + remainingQuota); + final QuotaResult result = new QuotaResult(toCopy.subList(0, index), + toCopy.subList(index, toCopy.size())); + if (index == 0) { + // Skip the usage update if we're not able to write anything + return CompletableFuture.completedFuture(result); + } + + // Update the usage + final long quotaToConsume = result.requestsToCopy.stream() + .mapToLong(CopyParameters::destinationObjectSize) + .sum(); + return backupsDb.trackMedia(backupUser, index, quotaToConsume).thenApply(ignored -> result); }); } + /** + * @return the largest index i such that sum(ts[0],...ts[i - 1]) <= max + */ + private static int indexWhereTotalExceeds(List ts, Function valueFunction, long max) { + long sum = 0; + for (int index = 0; index < ts.size(); index++) { + sum += valueFunction.apply(ts.get(index)); + if (sum > max) { + return index; + } + } + return ts.size(); + } + + public record StorageDescriptor(int cdn, byte[] key) {} public record StorageDescriptorWithLength(int cdn, byte[] key, long length) {} - /** - * Copy an encrypted object to the backup cdn, adding a layer of encryption - *

- * Implementation notes:

This method guarantees that any object that gets successfully copied to the backup cdn - * will also be deducted from the user's quota.

- *

- * However, the converse isn't true. It's possible we may charge the user for media they failed to copy. As a result, - * the quota may be over reported and it should be recalculated before taking quota enforcement actions. - * - * @return A stage that completes successfully with location of the twice-encrypted object on the backup cdn. The - * returned CompletionStage can be completed exceptionally with the following exceptions. - *

- */ - public CompletableFuture copyToBackup( - final AuthenticatedBackupUser backupUser, - final int sourceCdn, - final String sourceKey, - final int sourceLength, - final MediaEncryptionParameters encryptionParameters, - final byte[] destinationMediaId) { - checkBackupLevel(backupUser, BackupLevel.MEDIA); - if (sourceLength > MAX_MEDIA_OBJECT_SIZE) { - throw Status.INVALID_ARGUMENT - .withDescription("Invalid sourceObject size") - .asRuntimeException(); - } - - final String destination = cdnMediaPath(backupUser, destinationMediaId); - final int destinationLength = encryptionParameters.outputSize(sourceLength); - return this.backupsDb - // Write the ddb updates before actually updating backing storage - .trackMedia(backupUser, 1, destinationLength) - - // Actually copy the objects. If the copy fails, our estimated quota usage may not be exact - .thenComposeAsync(ignored -> - remoteStorageManager.copy(sourceCdn, sourceKey, sourceLength, encryptionParameters, destination)) - .exceptionallyCompose(throwable -> { - final Throwable unwrapped = ExceptionUtils.unwrap(throwable); - if (!(unwrapped instanceof SourceObjectNotFoundException) && !(unwrapped instanceof InvalidLengthException)) { - throw ExceptionUtils.wrap(unwrapped); - } - // In cases where we know the copy fails without writing anything, we can try to restore the user's quota - return this.backupsDb.trackMedia(backupUser, -1, -destinationLength).whenComplete((ignored, ignoredEx) -> { - throw ExceptionUtils.wrap(unwrapped); - }); - }) - // indicates where the backup was stored - .thenApply(ignore -> new StorageDescriptor(remoteStorageManager.cdnNumber(), destinationMediaId)); - - } - /** * Generate credentials that can be used to read from the backup CDN * @@ -348,66 +405,60 @@ public class BackupManager { deletePrefix(backupUser.backupDir(), DELETION_CONCURRENCY)))); } - private sealed interface Either permits DeleteSuccess, DeleteFailure {} - private record DeleteSuccess(long usage) implements Either {} - - private record DeleteFailure(Throwable e) implements Either {} - - public CompletableFuture delete(final AuthenticatedBackupUser backupUser, + public Flux deleteMedia(final AuthenticatedBackupUser backupUser, final List storageDescriptors) { checkBackupLevel(backupUser, BackupLevel.MESSAGES); + // Check for a cdn we don't know how to process if (storageDescriptors.stream().anyMatch(sd -> sd.cdn() != remoteStorageManager.cdnNumber())) { throw Status.INVALID_ARGUMENT .withDescription("unsupported media cdn provided") .asRuntimeException(); } - return Flux - .fromIterable(storageDescriptors) + return Flux.usingWhen( - // Issue deletes for all storage descriptors (proceeds with default flux concurrency) - .flatMap(descriptor -> Mono.fromCompletionStage( - remoteStorageManager - .delete(cdnMediaPath(backupUser, descriptor.key)) - // Squash errors/success into a single type - .handle((bytesDeleted, throwable) -> throwable != null - ? new DeleteFailure(throwable) - : new DeleteSuccess(bytesDeleted)) - )) + // Gather usage updates into the UsageBatcher to apply during the cleanup operation + Mono.just(new UsageBatcher()), - // Update backupsDb with the change in usage - .collectList() - .flatMap(eithers -> { - // count up usage changes - long totalBytesDeleted = 0; - long totalCountDeleted = 0; - final List toThrow = new ArrayList<>(); - for (Either either : eithers) { - switch (either) { - case DeleteFailure f: - toThrow.add(f.e()); - break; - case DeleteSuccess s when s.usage() > 0: - totalBytesDeleted += s.usage(); - totalCountDeleted++; - break; - default: - break; - } - } - final Mono result = toThrow.isEmpty() - ? Mono.empty() - : Mono.error(toThrow.stream().reduce((t1, t2) -> { - t1.addSuppressed(t2); - return t1; - }).get()); - return Mono - .fromCompletionStage(this.backupsDb.trackMedia(backupUser, -totalCountDeleted, -totalBytesDeleted)) - .then(result); - }) - .toFuture(); + // Deletes the objects, returning their former location. Tracks bytes removed so the quota can be updated on + // completion + batcher -> Flux.fromIterable(storageDescriptors) + .flatMapSequential(sd -> Mono + // Delete the object + .fromCompletionStage(remoteStorageManager.delete(cdnMediaPath(backupUser, sd.key()))) + // Track how much the remote storage manager indicated was deleted as part of the operation + .doOnNext(deletedBytes -> batcher.update(-deletedBytes)) + .thenReturn(sd), DELETION_CONCURRENCY), + + // On cleanup, update the quota using whatever updates were accumulated in the batcher + batcher -> + Mono.fromFuture(backupsDb.trackMedia(backupUser, batcher.countDelta.get(), batcher.usageDelta.get()))); + } + + /** + * Track pending media usage updates + */ + private static class UsageBatcher { + + AtomicLong countDelta = new AtomicLong(); + AtomicLong usageDelta = new AtomicLong(); + + /** + * Stage a usage update that will be applied later + * + * @param bytesDelta The amount of bytes that should be tracked as used (or if negative, freed). If the delta is + * non-zero, the count will also be updated. + */ + void update(long bytesDelta) { + if (bytesDelta < 0) { + countDelta.decrementAndGet(); + } else if (bytesDelta > 0) { + countDelta.incrementAndGet(); + } + usageDelta.addAndGet(bytesDelta); + } } private static final ECPublicKey INVALID_PUBLIC_KEY = Curve.generateKeyPair().getPublicKey(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/CopyParameters.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/CopyParameters.java new file mode 100644 index 000000000..f132284a2 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/CopyParameters.java @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.backup; + +/** + * Descriptor for a single copy-and-encrypt operation + * + * @param sourceCdn The cdn of the object to copy + * @param sourceKey The mediaId within the cdn of the object to copy + * @param sourceLength The length of the object to copy + * @param encryptionParameters Encryption parameters to double encrypt the object + * @param destinationMediaId The mediaId of the destination object + */ +public record CopyParameters( + int sourceCdn, + String sourceKey, + int sourceLength, + MediaEncryptionParameters encryptionParameters, + byte[] destinationMediaId) { + + /** + * @return The size of the double-encrypted destination object after it is copied + */ + long destinationObjectSize() { + return encryptionParameters().outputSize(sourceLength()); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/CopyResult.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/CopyResult.java new file mode 100644 index 000000000..fcbb72205 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/CopyResult.java @@ -0,0 +1,46 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.backup; + +import org.whispersystems.textsecuregcm.util.ExceptionUtils; + +import javax.annotation.Nullable; +import java.util.Optional; + + +/** + * The result of a copy operation + * + * @param outcome Whether the copy was a success + * @param mediaId The destination mediaId + * @param cdn On success, the destination cdn + */ +public record CopyResult(Outcome outcome, byte[] mediaId, @Nullable Integer cdn) { + + public enum Outcome { + SUCCESS, + SOURCE_NOT_FOUND, + SOURCE_WRONG_LENGTH, + OUT_OF_QUOTA + } + + /** + * Map an exception returned by {@link RemoteStorageManager#copy} to CopyResult with the appropriate outcome. + * + * @param throwable result of a failed copy operation + * @param key the copy destination mediaId + * @return The appropriate CopyResult, or empty if the exception does not match to an Outcome. + */ + static Optional fromCopyError(final Throwable throwable, final byte[] key) { + final Throwable unwrapped = ExceptionUtils.unwrap(throwable); + if (unwrapped instanceof SourceObjectNotFoundException) { + return Optional.of(new CopyResult(Outcome.SOURCE_NOT_FOUND, key, null)); + } else if (unwrapped instanceof InvalidLengthException) { + return Optional.of(new CopyResult(Outcome.SOURCE_WRONG_LENGTH, key, null)); + } else { + return Optional.empty(); + } + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ArchiveController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ArchiveController.java index 8be846ad5..b410b8988 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ArchiveController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ArchiveController.java @@ -27,8 +27,8 @@ import java.util.Base64; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.stream.Stream; import javax.validation.Valid; import javax.validation.constraints.Max; import javax.validation.constraints.Min; @@ -54,22 +54,19 @@ import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation; import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialRequest; import org.signal.libsignal.zkgroup.receipts.ReceiptCredentialPresentation; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; -import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser; import org.whispersystems.textsecuregcm.backup.BackupAuthManager; import org.whispersystems.textsecuregcm.backup.BackupManager; -import org.whispersystems.textsecuregcm.backup.InvalidLengthException; +import org.whispersystems.textsecuregcm.backup.CopyParameters; +import org.whispersystems.textsecuregcm.backup.CopyResult; import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters; -import org.whispersystems.textsecuregcm.backup.SourceObjectNotFoundException; import org.whispersystems.textsecuregcm.util.BackupAuthCredentialAdapter; import org.whispersystems.textsecuregcm.util.ByteArrayAdapter; import org.whispersystems.textsecuregcm.util.ByteArrayBase64UrlAdapter; import org.whispersystems.textsecuregcm.util.ECPublicKeyAdapter; import org.whispersystems.textsecuregcm.util.ExactlySize; -import org.whispersystems.textsecuregcm.util.ExceptionUtils; import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.websocket.auth.Mutable; import org.whispersystems.websocket.auth.ReadOnly; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Path("/v1/archives") @@ -124,7 +121,7 @@ public class ArchiveController { public record RedeemBackupReceiptRequest( @Schema(description = "Presentation of a ZK receipt encoded in standard padded base64", implementation = String.class) - @JsonDeserialize(using = RedeemBackupReceiptRequest.Deserializer.class) + @JsonDeserialize(using = Deserializer.class) @NotNull ReceiptCredentialPresentation receiptCredentialPresentation) { @@ -503,7 +500,16 @@ public class ArchiveController { @JsonDeserialize(using = ByteArrayAdapter.Deserializing.class) @NotNull @ExactlySize(16) - byte[] iv) {} + byte[] iv) { + + CopyParameters toCopyParameters() { + return new CopyParameters( + sourceAttachment.cdn, sourceAttachment.key, + objectLength, + new MediaEncryptionParameters(encryptionKey, hmacKey, iv), + mediaId); + } + } public record CopyMediaResponse( @Schema(description = "The backup cdn where this media object is stored") @@ -547,48 +553,20 @@ public class ArchiveController { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } - return backupManager - .authenticateBackupUser(presentation.presentation, signature.signature) - .thenCompose(backupUser -> checkMediaFits(backupUser, copyMediaRequest.objectLength) - .thenCompose(ignored -> copyMediaImpl(backupUser, copyMediaRequest))) - .thenApply(result -> new CopyMediaResponse(result.cdn())) - .exceptionally(e -> { - final Throwable unwrapped = ExceptionUtils.unwrap(e); - if (unwrapped instanceof SourceObjectNotFoundException) { - throw new ClientErrorException("Source object not found " + unwrapped.getMessage(), Response.Status.GONE); - } else if (unwrapped instanceof InvalidLengthException) { - throw new BadRequestException("Invalid length " + unwrapped.getMessage()); - } else { - throw ExceptionUtils.wrap(e); - } - }); + return Mono + .fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature)) + .flatMap(backupUser -> backupManager.copyToBackup(backupUser, List.of(copyMediaRequest.toCopyParameters())) + .next() + .map(copyResult -> switch (copyResult.outcome()) { + case SUCCESS -> new CopyMediaResponse(copyResult.cdn()); + case SOURCE_WRONG_LENGTH -> throw new BadRequestException("Invalid length"); + case SOURCE_NOT_FOUND -> throw new ClientErrorException("Source object not found", Response.Status.GONE); + case OUT_OF_QUOTA -> + throw new ClientErrorException("Media quota exhausted", Response.Status.REQUEST_ENTITY_TOO_LARGE); + })) + .toFuture(); } - private CompletableFuture checkMediaFits(AuthenticatedBackupUser backupUser, long amountToStore) { - return backupManager.canStoreMedia(backupUser, amountToStore) - .thenApply(fits -> { - if (!fits) { - throw new ClientErrorException("Media quota exhausted", Response.Status.REQUEST_ENTITY_TOO_LARGE); - } - return null; - }); - } - - private CompletionStage copyMediaImpl(final AuthenticatedBackupUser backupUser, - final CopyMediaRequest copyMediaRequest) { - return this.backupManager.copyToBackup( - backupUser, - copyMediaRequest.sourceAttachment.cdn, - copyMediaRequest.sourceAttachment.key, - copyMediaRequest.objectLength, - new MediaEncryptionParameters( - copyMediaRequest.encryptionKey, - copyMediaRequest.hmacKey, - copyMediaRequest.iv), - copyMediaRequest.mediaId); - } - - public record CopyMediaBatchRequest( @Schema(description = "A list of media objects to copy from the attachments CDN to the backup CDN") @NotNull @@ -606,6 +584,7 @@ public class ArchiveController { A 200 indicates the object was successfully copied. A 400 indicates an invalid argument in the request A 410 indicates that the source object was not found + A 413 indicates that the media quota was exhausted """) int status, @@ -620,7 +599,17 @@ public class ArchiveController { @JsonDeserialize(using = ByteArrayBase64UrlAdapter.Deserializing.class) @NotNull @ExactlySize(15) - byte[] mediaId) {} + byte[] mediaId) { + + static Entry fromCopyResult(final CopyResult copyResult) { + return switch (copyResult.outcome()) { + case SUCCESS -> new Entry(200, null, copyResult.cdn(), copyResult.mediaId()); + case SOURCE_WRONG_LENGTH -> new Entry(400, "Invalid source length", null, copyResult.mediaId()); + case SOURCE_NOT_FOUND -> new Entry(410, "Source not found", null, copyResult.mediaId()); + case OUT_OF_QUOTA -> new Entry(413, "Media quota exhausted", null, copyResult.mediaId()); + }; + } + } } @PUT @@ -661,37 +650,13 @@ public class ArchiveController { if (account.isPresent()) { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } - - // If the entire batch won't fit in the user's remaining quota, reject the whole request. - final long expectedStorage = copyMediaRequest.items().stream().mapToLong(CopyMediaRequest::objectLength).sum(); - - return backupManager.authenticateBackupUser(presentation.presentation, signature.signature) - .thenCompose(backupUser -> checkMediaFits(backupUser, expectedStorage).thenCompose( - ignored -> Flux.fromIterable(copyMediaRequest.items) - // Operate sequentially, waiting for one copy to finish before starting the next one. At least right now, - // copying concurrently will introduce contention over the metadata. - .concatMap(request -> Mono - .fromCompletionStage(copyMediaImpl(backupUser, request)) - .map(result -> new CopyMediaBatchResponse.Entry(200, null, result.cdn(), result.key())) - .onErrorResume(throwable -> ExceptionUtils.unwrap(throwable) instanceof IOException, throwable -> { - final Throwable unwrapped = ExceptionUtils.unwrap(throwable); - - int status; - String error; - if (unwrapped instanceof SourceObjectNotFoundException) { - status = 410; - error = "Source object not found " + unwrapped.getMessage(); - } else if (unwrapped instanceof InvalidLengthException) { - status = 400; - error = "Invalid length " + unwrapped.getMessage(); - } else { - throw ExceptionUtils.wrap(throwable); - } - return Mono.just(new CopyMediaBatchResponse.Entry(status, error, null, request.mediaId)); - })) - .collectList() - .map(list -> Response.status(207).entity(new CopyMediaBatchResponse(list)).build()) - .toFuture())); + final Stream copyParams = copyMediaRequest.items().stream().map(CopyMediaRequest::toCopyParameters); + return Mono.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature)) + .flatMapMany(backupUser -> backupManager.copyToBackup(backupUser, copyParams.toList())) + .map(CopyMediaBatchResponse.Entry::fromCopyResult) + .collectList() + .map(list -> Response.status(207).entity(new CopyMediaBatchResponse(list)).build()) + .toFuture(); } @POST @@ -842,12 +807,15 @@ public class ArchiveController { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } + final List toDelete = deleteMedia.mediaToDelete().stream() + .map(media -> new BackupManager.StorageDescriptor(media.cdn(), media.mediaId)) + .toList(); + return backupManager .authenticateBackupUser(presentation.presentation, signature.signature) - .thenCompose(authenticatedBackupUser -> backupManager.delete(authenticatedBackupUser, - deleteMedia.mediaToDelete().stream() - .map(media -> new BackupManager.StorageDescriptor(media.cdn(), media.mediaId)) - .toList())) + .thenCompose(authenticatedBackupUser -> backupManager + .deleteMedia(authenticatedBackupUser, toDelete) + .then().toFuture()) .thenApply(Util.ASYNC_EMPTY_RESPONSE); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java index d6ddbeddf..fbdca666a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java @@ -49,9 +49,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; +import org.junitpioneer.jupiter.cartesian.CartesianTest; import org.signal.libsignal.protocol.InvalidKeyException; import org.signal.libsignal.protocol.ecc.Curve; import org.signal.libsignal.protocol.ecc.ECKeyPair; @@ -80,6 +80,15 @@ public class BackupManagerTest { public static final DynamoDbExtension DYNAMO_DB_EXTENSION = new DynamoDbExtension( DynamoDbExtensionSchema.Tables.BACKUPS); + private static final MediaEncryptionParameters COPY_ENCRYPTION_PARAM = new MediaEncryptionParameters( + TestRandomUtil.nextBytes(32), + TestRandomUtil.nextBytes(32), + TestRandomUtil.nextBytes(16)); + private static final CopyParameters COPY_PARAM = new CopyParameters( + 3, "abc", 100, + COPY_ENCRYPTION_PARAM, TestRandomUtil.nextBytes(15)); + private static final String COPY_DEST_STRING = Base64.getEncoder().encodeToString(COPY_PARAM.destinationMediaId()); + private final TestClock testClock = TestClock.now(); private final BackupAuthTestUtil backupAuthTestUtil = new BackupAuthTestUtil(testClock); private final RateLimiter mediaUploadLimiter = mock(RateLimiter.class); @@ -139,7 +148,7 @@ public class BackupManagerTest { } @Test - public void createTemporaryMediaAttachmentRateLimited() { + public void createTemporaryMediaAttachmentRateLimited() { final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MEDIA); when(mediaUploadLimiter.validateAsync(eq(BackupManager.rateLimitKey(backupUser)))) .thenReturn(CompletableFuture.failedFuture(new RateLimitExceededException(null, true))); @@ -348,24 +357,15 @@ public class BackupManagerTest { @Test public void copySuccess() { final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MEDIA); - when(tusCredentialGenerator.generateUpload(any())) - .thenReturn(new BackupUploadDescriptor(3, "def", Collections.emptyMap(), "")); - when(remoteStorageManager.copy(eq(3), eq("abc"), eq(100), any(), any())) - .thenReturn(CompletableFuture.completedFuture(null)); - final MediaEncryptionParameters encryptionParams = new MediaEncryptionParameters( - TestRandomUtil.nextBytes(32), - TestRandomUtil.nextBytes(32), - TestRandomUtil.nextBytes(16)); - - final BackupManager.StorageDescriptor copied = backupManager.copyToBackup( - backupUser, 3, "abc", 100, encryptionParams, "def".getBytes(StandardCharsets.UTF_8)).join(); + final CopyResult copied = copy(backupUser); assertThat(copied.cdn()).isEqualTo(3); - assertThat(copied.key()).isEqualTo("def".getBytes(StandardCharsets.UTF_8)); + assertThat(copied.mediaId()).isEqualTo(COPY_PARAM.destinationMediaId()); + assertThat(copied.outcome()).isEqualTo(CopyResult.Outcome.SUCCESS); final Map backup = getBackupItem(backupUser); final long bytesUsed = AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_BYTES_USED, 0L); - assertThat(bytesUsed).isEqualTo(encryptionParams.outputSize(100)); + assertThat(bytesUsed).isEqualTo(COPY_PARAM.destinationObjectSize()); final long mediaCount = AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_COUNT, 0L); assertThat(mediaCount).isEqualTo(1); @@ -374,17 +374,8 @@ public class BackupManagerTest { @Test public void copyFailure() { final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MEDIA); - when(tusCredentialGenerator.generateUpload(any())) - .thenReturn(new BackupUploadDescriptor(3, "def", Collections.emptyMap(), "")); - when(remoteStorageManager.copy(eq(3), eq("abc"), eq(100), any(), any())) - .thenReturn(CompletableFuture.failedFuture(new SourceObjectNotFoundException())); - - CompletableFutureTestUtil.assertFailsWithCause(SourceObjectNotFoundException.class, - backupManager.copyToBackup( - backupUser, - 3, "abc", 100, - mock(MediaEncryptionParameters.class), - "def".getBytes(StandardCharsets.UTF_8))); + assertThat(copyError(backupUser, new SourceObjectNotFoundException()).outcome()) + .isEqualTo(CopyResult.Outcome.SOURCE_NOT_FOUND); // usage should be rolled back after a known copy failure final Map backup = getBackupItem(backupUser); @@ -392,6 +383,37 @@ public class BackupManagerTest { assertThat(AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_COUNT, -1L)).isEqualTo(0L); } + @Test + public void copyPartialSuccess() { + final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MEDIA); + final List toCopy = List.of( + new CopyParameters(3, "success", 100, COPY_ENCRYPTION_PARAM, TestRandomUtil.nextBytes(15)), + new CopyParameters(3, "missing", 200, COPY_ENCRYPTION_PARAM, TestRandomUtil.nextBytes(15)), + new CopyParameters(3, "badlength", 300, COPY_ENCRYPTION_PARAM, TestRandomUtil.nextBytes(15))); + + when(tusCredentialGenerator.generateUpload(any())) + .thenReturn(new BackupUploadDescriptor(3, "", Collections.emptyMap(), "")); + when(remoteStorageManager.copy(eq(3), eq("success"), eq(100), any(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(remoteStorageManager.copy(eq(3), eq("missing"), eq(200), any(), any())) + .thenReturn(CompletableFuture.failedFuture(new SourceObjectNotFoundException())); + when(remoteStorageManager.copy(eq(3), eq("badlength"), eq(300), any(), any())) + .thenReturn(CompletableFuture.failedFuture(new InvalidLengthException(""))); + + final List results = backupManager.copyToBackup(backupUser, toCopy) + .collectList().block(); + + assertThat(results.get(0).outcome()).isEqualTo(CopyResult.Outcome.SUCCESS); + assertThat(results.get(1).outcome()).isEqualTo(CopyResult.Outcome.SOURCE_NOT_FOUND); + assertThat(results.get(2).outcome()).isEqualTo(CopyResult.Outcome.SOURCE_WRONG_LENGTH); + + // usage should be rolled back after a known copy failure + final Map backup = getBackupItem(backupUser); + assertThat(AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_BYTES_USED, -1L)) + .isEqualTo(toCopy.get(0).destinationObjectSize()); + assertThat(AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_COUNT, -1L)).isEqualTo(1L); + } + @Test public void quotaEnforcementNoRecalculation() { final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MEDIA); @@ -404,7 +426,9 @@ public class BackupManagerTest { testClock.pin(Instant.ofEpochSecond(0) .plus(BackupManager.MAX_QUOTA_STALENESS) .minus(Duration.ofSeconds(1))); - assertThat(backupManager.canStoreMedia(backupUser, 10).join()).isFalse(); + + // Try to copy + assertThat(copy(backupUser).outcome()).isEqualTo(CopyResult.Outcome.OUT_OF_QUOTA); } @Test @@ -412,56 +436,58 @@ public class BackupManagerTest { final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MEDIA); final String backupMediaPrefix = "%s/%s/".formatted(backupUser.backupDir(), backupUser.mediaDir()); - // on recalculation, say there's actually 10 bytes left - when(remoteStorageManager.calculateBytesUsed(eq(backupMediaPrefix))) - .thenReturn( - CompletableFuture.completedFuture(new UsageInfo(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES - 10, 1000))); + final long remainingAfterRecalc = BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES - COPY_PARAM.destinationObjectSize(); - // set the backupsDb to be out of quota at t=0 + // on recalculation, say there's actually enough left to do the copy + when(remoteStorageManager.calculateBytesUsed(eq(backupMediaPrefix))) + .thenReturn(CompletableFuture.completedFuture(new UsageInfo(remainingAfterRecalc, 1000))); + + // set the backupsDb to be totally out of quota at t=0 testClock.pin(Instant.ofEpochSecond(0)); backupsDb.setMediaUsage(backupUser, new UsageInfo(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES, 1000)).join(); testClock.pin(Instant.ofEpochSecond(0).plus(BackupManager.MAX_QUOTA_STALENESS)); - assertThat(backupManager.canStoreMedia(backupUser, 10).join()).isTrue(); + + // Should recalculate quota and copy can succeed + assertThat(copy(backupUser).outcome()).isEqualTo(CopyResult.Outcome.SUCCESS); // backupsDb should have the new value final BackupsDb.TimestampedUsageInfo info = backupsDb.getMediaUsage(backupUser).join(); - assertThat(info.lastRecalculationTime()).isEqualTo( - Instant.ofEpochSecond(0).plus(BackupManager.MAX_QUOTA_STALENESS)); - assertThat(info.usageInfo().bytesUsed()).isEqualTo(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES - 10); + assertThat(info.lastRecalculationTime()) + .isEqualTo(Instant.ofEpochSecond(0).plus(BackupManager.MAX_QUOTA_STALENESS)); + assertThat(info.usageInfo().bytesUsed()).isEqualTo(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES); + assertThat(info.usageInfo().numObjects()).isEqualTo(1001); } - @ParameterizedTest - @CsvSource({ - "true, 10, 10, true", - "true, 10, 11, false", - "true, 0, 1, false", - "true, 0, 0, true", - "false, 10, 10, true", - "false, 10, 11, false", - "false, 0, 1, false", - "false, 0, 0, true", - }) + @CartesianTest() public void quotaEnforcement( - boolean recalculation, - final long spaceLeft, - final long mediaToAddSize, - boolean shouldAccept) { + @CartesianTest.Values(booleans = {true, false}) boolean hasSpaceBeforeRecalc, + @CartesianTest.Values(booleans = {true, false}) boolean hasSpaceAfterRecalc, + @CartesianTest.Values(booleans = {true, false}) boolean doesReaclc) { final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MEDIA); final String backupMediaPrefix = "%s/%s/".formatted(backupUser.backupDir(), backupUser.mediaDir()); + final long destSize = COPY_PARAM.destinationObjectSize(); + final long originalRemainingSpace = + BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES - (hasSpaceBeforeRecalc ? destSize : (destSize - 1)); + final long afterRecalcRemainingSpace = + BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES - (hasSpaceAfterRecalc ? destSize : (destSize - 1)); + // set the backupsDb to be out of quota at t=0 testClock.pin(Instant.ofEpochSecond(0)); - backupsDb.setMediaUsage(backupUser, new UsageInfo(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES - spaceLeft, 1000)) - .join(); + backupsDb.setMediaUsage(backupUser, new UsageInfo(originalRemainingSpace, 1000)).join(); - if (recalculation) { + if (doesReaclc) { testClock.pin(Instant.ofEpochSecond(0).plus(BackupManager.MAX_QUOTA_STALENESS).plus(Duration.ofSeconds(1))); when(remoteStorageManager.calculateBytesUsed(eq(backupMediaPrefix))) - .thenReturn(CompletableFuture.completedFuture( - new UsageInfo(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES - spaceLeft, 1000))); + .thenReturn(CompletableFuture.completedFuture(new UsageInfo(afterRecalcRemainingSpace, 1000))); } - assertThat(backupManager.canStoreMedia(backupUser, mediaToAddSize).join()).isEqualTo(shouldAccept); - if (recalculation && !shouldAccept) { + final CopyResult copyResult = copy(backupUser); + if (hasSpaceBeforeRecalc || (hasSpaceAfterRecalc && doesReaclc)) { + assertThat(copyResult.outcome()).isEqualTo(CopyResult.Outcome.SUCCESS); + } else { + assertThat(copyResult.outcome()).isEqualTo(CopyResult.Outcome.OUT_OF_QUOTA); + } + if (doesReaclc && !hasSpaceBeforeRecalc) { // should have recalculated if we exceeded quota verify(remoteStorageManager, times(1)).calculateBytesUsed(anyString()); } @@ -488,7 +514,7 @@ public class BackupManagerTest { assertThat(result.media().getFirst().key()).isEqualTo( Base64.getDecoder().decode("aaa".getBytes(StandardCharsets.UTF_8))); assertThat(result.media().getFirst().length()).isEqualTo(123); - assertThat(result.cursor().get()).isEqualTo("newCursor"); + assertThat(result.cursor().orElseThrow()).isEqualTo("newCursor"); } @@ -539,8 +565,8 @@ public class BackupManagerTest { when(remoteStorageManager.delete(backupMediaKey)) .thenReturn(CompletableFuture.completedFuture(7L)); when(remoteStorageManager.cdnNumber()).thenReturn(5); - backupManager.delete(backupUser, List.of(new BackupManager.StorageDescriptor(5, mediaId))).toCompletableFuture() - .join(); + backupManager.deleteMedia(backupUser, List.of(new BackupManager.StorageDescriptor(5, mediaId))) + .collectList().block(); assertThat(backupsDb.getMediaUsage(backupUser).join().usageInfo()) .isEqualTo(new UsageInfo(93, 999)); @@ -549,9 +575,10 @@ public class BackupManagerTest { @Test public void deleteUnknownCdn() { final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupLevel.MEDIA); + final BackupManager.StorageDescriptor sd = new BackupManager.StorageDescriptor(4, TestRandomUtil.nextBytes(15)); when(remoteStorageManager.cdnNumber()).thenReturn(5); assertThatThrownBy(() -> - backupManager.delete(backupUser, List.of(new BackupManager.StorageDescriptor(4, TestRandomUtil.nextBytes(15))))) + backupManager.deleteMedia(backupUser, List.of(sd)).then().block()) .isInstanceOf(StatusRuntimeException.class) .matches(e -> ((StatusRuntimeException) e).getStatus().getCode() == Status.INVALID_ARGUMENT.getCode()); } @@ -562,7 +589,7 @@ public class BackupManagerTest { final List descriptors = new ArrayList<>(); long initialBytes = 0; - for (int i = 1; i <= 10; i++) { + for (int i = 1; i <= 5; i++) { final BackupManager.StorageDescriptor descriptor = new BackupManager.StorageDescriptor(5, TestRandomUtil.nextBytes(15)); descriptors.add(descriptor); @@ -572,20 +599,24 @@ public class BackupManagerTest { BackupManager.encodeMediaIdForCdn(descriptor.key())); initialBytes += i; - // fail 2 deletions, otherwise return the corresponding object's size as i - final CompletableFuture deleteResult = - i == 3 || i == 6 - ? CompletableFuture.failedFuture(new IOException("oh no")) - : CompletableFuture.completedFuture(Long.valueOf(i)); + // fail deletion 3, otherwise return the corresponding object's size as i + final CompletableFuture deleteResult = i == 3 + ? CompletableFuture.failedFuture(new IOException("oh no")) + : CompletableFuture.completedFuture(Long.valueOf(i)); when(remoteStorageManager.delete(backupMediaKey)).thenReturn(deleteResult); } when(remoteStorageManager.cdnNumber()).thenReturn(5); - backupsDb.setMediaUsage(backupUser, new UsageInfo(initialBytes, 10)).join(); - CompletableFutureTestUtil.assertFailsWithCause(IOException.class, backupManager.delete(backupUser, descriptors)); - // 2 objects should have failed to be deleted + backupsDb.setMediaUsage(backupUser, new UsageInfo(initialBytes, 5)).join(); + + final List deleted = backupManager + .deleteMedia(backupUser, descriptors) + .onErrorComplete() + .collectList().block(); + // first two objects should be deleted + assertThat(deleted.size()).isEqualTo(2); assertThat(backupsDb.getMediaUsage(backupUser).join().usageInfo()) - .isEqualTo(new UsageInfo(9, 2)); + .isEqualTo(new UsageInfo(initialBytes - 1 - 2, 3)); } @@ -603,8 +634,7 @@ public class BackupManagerTest { // Deletion doesn't remove anything when(remoteStorageManager.delete(backupMediaKey)).thenReturn(CompletableFuture.completedFuture(0L)); when(remoteStorageManager.cdnNumber()).thenReturn(5); - backupManager.delete(backupUser, List.of(new BackupManager.StorageDescriptor(5, mediaId))).toCompletableFuture() - .join(); + backupManager.deleteMedia(backupUser, List.of(new BackupManager.StorageDescriptor(5, mediaId))).then().block(); assertThat(backupsDb.getMediaUsage(backupUser).join().usageInfo()) .isEqualTo(new UsageInfo(100, 5)); @@ -752,6 +782,24 @@ public class BackupManagerTest { verifyNoMoreInteractions(remoteStorageManager); } + private CopyResult copyError(final AuthenticatedBackupUser backupUser, Throwable copyException) { + when(tusCredentialGenerator.generateUpload(any())) + .thenReturn(new BackupUploadDescriptor(3, "def", Collections.emptyMap(), "")); + when(remoteStorageManager.copy(eq(3), eq(COPY_PARAM.sourceKey()), eq(COPY_PARAM.sourceLength()), any(), any())) + .thenReturn(CompletableFuture.failedFuture(copyException)); + return backupManager.copyToBackup(backupUser, List.of(COPY_PARAM)).single().block(); + } + + private CopyResult copy(final AuthenticatedBackupUser backupUser) { + when(tusCredentialGenerator.generateUpload(any())) + .thenReturn(new BackupUploadDescriptor(3, "def", Collections.emptyMap(), "")); + when(tusCredentialGenerator.generateUpload(any())) + .thenReturn(new BackupUploadDescriptor(3, "def", Collections.emptyMap(), "")); + when(remoteStorageManager.copy(eq(3), eq(COPY_PARAM.sourceKey()), eq(COPY_PARAM.sourceLength()), any(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + return backupManager.copyToBackup(backupUser, List.of(COPY_PARAM)).single().block(); + } + private static ExpiredBackup expiredBackup(final ExpiredBackup.ExpirationType expirationType, final AuthenticatedBackupUser backupUser) { return new ExpiredBackup( diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/ArchiveControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/ArchiveControllerTest.java index b7e5110e8..d698cdd47 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/ArchiveControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/ArchiveControllerTest.java @@ -7,8 +7,6 @@ package org.whispersystems.textsecuregcm.controllers; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; @@ -66,15 +64,15 @@ import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser; import org.whispersystems.textsecuregcm.backup.BackupAuthManager; import org.whispersystems.textsecuregcm.backup.BackupAuthTestUtil; import org.whispersystems.textsecuregcm.backup.BackupManager; -import org.whispersystems.textsecuregcm.backup.InvalidLengthException; -import org.whispersystems.textsecuregcm.backup.SourceObjectNotFoundException; import org.whispersystems.textsecuregcm.backup.BackupUploadDescriptor; +import org.whispersystems.textsecuregcm.backup.CopyResult; import org.whispersystems.textsecuregcm.mappers.CompletionExceptionMapper; import org.whispersystems.textsecuregcm.mappers.GrpcStatusRuntimeExceptionMapper; import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper; import org.whispersystems.textsecuregcm.tests.util.AuthHelper; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.TestRandomUtil; +import reactor.core.publisher.Flux; @ExtendWith(DropwizardExtensionsSupport.class) public class ArchiveControllerTest { @@ -346,14 +344,11 @@ public class ArchiveControllerTest { BackupLevel.MEDIA, backupKey, aci); when(backupManager.authenticateBackupUser(any(), any())) .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupLevel.MEDIA))); - when(backupManager.canStoreMedia(any(), anyLong())).thenReturn(CompletableFuture.completedFuture(true)); - when(backupManager.copyToBackup(any(), anyInt(), any(), anyInt(), any(), any())) - .thenAnswer(invocation -> { - byte[] mediaId = invocation.getArgument(5, byte[].class); - return CompletableFuture.completedFuture(new BackupManager.StorageDescriptor(1, mediaId)); - }); - final byte[][] mediaIds = new byte[][]{TestRandomUtil.nextBytes(15), TestRandomUtil.nextBytes(15)}; + when(backupManager.copyToBackup(any(), any())) + .thenReturn(Flux.just( + new CopyResult(CopyResult.Outcome.SUCCESS, mediaIds[0], 1), + new CopyResult(CopyResult.Outcome.SUCCESS, mediaIds[1], 1))); final Response r = resources.getJerseyTest() .target("v1/archives/media/batch") @@ -397,15 +392,13 @@ public class ArchiveControllerTest { when(backupManager.authenticateBackupUser(any(), any())) .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupLevel.MEDIA))); - final byte[][] mediaIds = IntStream.range(0, 3).mapToObj(i -> TestRandomUtil.nextBytes(15)).toArray(byte[][]::new); - when(backupManager.canStoreMedia(any(), anyLong())).thenReturn(CompletableFuture.completedFuture(true)); - - when(backupManager.copyToBackup(any(), anyInt(), any(), anyInt(), any(), eq(mediaIds[0]))) - .thenReturn(CompletableFuture.completedFuture(new BackupManager.StorageDescriptor(1, mediaIds[0]))); - when(backupManager.copyToBackup(any(), anyInt(), any(), anyInt(), any(), eq(mediaIds[1]))) - .thenReturn(CompletableFuture.failedFuture(new SourceObjectNotFoundException())); - when(backupManager.copyToBackup(any(), anyInt(), any(), anyInt(), any(), eq(mediaIds[2]))) - .thenReturn(CompletableFuture.failedFuture(new InvalidLengthException("bad length"))); + final byte[][] mediaIds = IntStream.range(0, 4).mapToObj(i -> TestRandomUtil.nextBytes(15)).toArray(byte[][]::new); + when(backupManager.copyToBackup(any(), any())) + .thenReturn(Flux.just( + new CopyResult(CopyResult.Outcome.SUCCESS, mediaIds[0], 1), + new CopyResult(CopyResult.Outcome.SOURCE_NOT_FOUND, mediaIds[1], null), + new CopyResult(CopyResult.Outcome.SOURCE_WRONG_LENGTH, mediaIds[2], null), + new CopyResult(CopyResult.Outcome.OUT_OF_QUOTA, mediaIds[3], null))); final List copyRequests = Arrays.stream(mediaIds) .map(mediaId -> new ArchiveController.CopyMediaRequest( @@ -427,7 +420,7 @@ public class ArchiveControllerTest { final ArchiveController.CopyMediaBatchResponse copyResponse = r.readEntity( ArchiveController.CopyMediaBatchResponse.class); - assertThat(copyResponse.responses()).hasSize(3); + assertThat(copyResponse.responses()).hasSize(4); final ArchiveController.CopyMediaBatchResponse.Entry r1 = copyResponse.responses().get(0); assertThat(r1.cdn()).isEqualTo(1); @@ -443,33 +436,11 @@ public class ArchiveControllerTest { assertThat(r3.mediaId()).isEqualTo(mediaIds[2]); assertThat(r3.status()).isEqualTo(400); assertThat(r3.failureReason()).isNotBlank(); - } - @Test - public void putMediaBatchOutOfSpace() throws VerificationFailedException { - final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( - BackupLevel.MEDIA, backupKey, aci); - when(backupManager.authenticateBackupUser(any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupLevel.MEDIA))); - - when(backupManager.canStoreMedia(any(), eq(1L + 2L + 3L))) - .thenReturn(CompletableFuture.completedFuture(false)); - - final Response response = resources.getJerseyTest() - .target("v1/archives/media/batch") - .request() - .header("X-Signal-ZK-Auth", Base64.getEncoder().encodeToString(presentation.serialize())) - .header("X-Signal-ZK-Auth-Signature", "aaa") - .put(Entity.json(new ArchiveController.CopyMediaBatchRequest(IntStream.range(0, 3) - .mapToObj(i -> new ArchiveController.CopyMediaRequest( - new ArchiveController.RemoteAttachment(3, "abc"), - i + 1, - TestRandomUtil.nextBytes(15), - TestRandomUtil.nextBytes(32), - TestRandomUtil.nextBytes(32), - TestRandomUtil.nextBytes(16)) - ).toList()))); - assertThat(response.getStatus()).isEqualTo(413); + final ArchiveController.CopyMediaBatchResponse.Entry r4 = copyResponse.responses().get(3); + assertThat(r4.mediaId()).isEqualTo(mediaIds[3]); + assertThat(r4.status()).isEqualTo(413); + assertThat(r4.failureReason()).isNotBlank(); } @CartesianTest @@ -523,7 +494,9 @@ public class ArchiveControllerTest { .mapToObj(i -> new ArchiveController.DeleteMedia.MediaToDelete(3, TestRandomUtil.nextBytes(15))) .toList()); - when(backupManager.delete(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); + when(backupManager.deleteMedia(any(), any())) + .thenReturn(Flux.fromStream(deleteRequest.mediaToDelete().stream() + .map(m -> new BackupManager.StorageDescriptor(m.cdn(), m.mediaId())))); final Response response = resources.getJerseyTest() .target("v1/archives/media/delete")