Add copy endpoint to ArchiveController

Co-authored-by: Jonathan Klabunde Tomer <125505367+jkt-signal@users.noreply.github.com>
Co-authored-by: Chris Eager <79161849+eager-signal@users.noreply.github.com>
This commit is contained in:
ravi-signal 2023-11-28 11:45:41 -06:00 committed by GitHub
parent 1da3f96d10
commit 202dd8e92d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1918 additions and 248 deletions

View File

@ -87,6 +87,8 @@ dynamoDbTables:
usernamesTableName: Example_Accounts_Usernames
backups:
tableName: Example_Backups
backupMedia:
tableName: Example_BackupMedia
clientReleases:
tableName: Example_ClientReleases
deletedAccounts:
@ -219,6 +221,33 @@ cdn:
bucket: cdn # S3 Bucket name
region: us-west-2 # AWS region
clientCdn:
attachmentUrls:
2: https://cdn2.example.com/attachments/
caCertificates:
- |
-----BEGIN CERTIFICATE-----
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz
AAAAAAAAAAAAAAAAAAAA
-----END CERTIFICATE-----
dogstatsd:
environment: dev

View File

@ -24,6 +24,7 @@ import org.whispersystems.textsecuregcm.configuration.AwsAttachmentsConfiguratio
import org.whispersystems.textsecuregcm.configuration.BadgesConfiguration;
import org.whispersystems.textsecuregcm.configuration.BraintreeConfiguration;
import org.whispersystems.textsecuregcm.configuration.CdnConfiguration;
import org.whispersystems.textsecuregcm.configuration.ClientCdnConfiguration;
import org.whispersystems.textsecuregcm.configuration.ClientReleaseConfiguration;
import org.whispersystems.textsecuregcm.configuration.CommandStopListenerConfiguration;
import org.whispersystems.textsecuregcm.configuration.DogstatsdConfiguration;
@ -101,6 +102,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private CdnConfiguration cdn;
@NotNull
@Valid
@JsonProperty
private ClientCdnConfiguration clientCdn;
@NotNull
@Valid
@JsonProperty
@ -405,6 +411,10 @@ public class WhisperServerConfiguration extends Configuration {
return cdn;
}
public ClientCdnConfiguration getClientCdn() {
return clientCdn;
}
public DogstatsdConfiguration getDatadogConfiguration() {
return dogstatsd;
}

View File

@ -79,7 +79,9 @@ import org.whispersystems.textsecuregcm.auth.WebsocketRefreshApplicationEventLis
import org.whispersystems.textsecuregcm.auth.grpc.BasicCredentialAuthenticationInterceptor;
import org.whispersystems.textsecuregcm.backup.BackupAuthManager;
import org.whispersystems.textsecuregcm.backup.BackupManager;
import org.whispersystems.textsecuregcm.backup.TusBackupCredentialGenerator;
import org.whispersystems.textsecuregcm.backup.BackupsDb;
import org.whispersystems.textsecuregcm.backup.Cdn3BackupCredentialGenerator;
import org.whispersystems.textsecuregcm.backup.Cdn3RemoteStorageManager;
import org.whispersystems.textsecuregcm.badges.ConfiguredProfileBadgeConverter;
import org.whispersystems.textsecuregcm.badges.ResourceBundleLevelTranslator;
import org.whispersystems.textsecuregcm.captcha.CaptchaChecker;
@ -420,6 +422,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.scheduledExecutorService(name(getClass(), "storageServiceRetry-%d")).threads(1).build();
ScheduledExecutorService hcaptchaRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "hCaptchaRetry-%d")).threads(1).build();
ScheduledExecutorService remoteStorageExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "remoteStorageRetry-%d")).threads(1).build();
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
ExecutorServiceMetrics.monitor(Metrics.globalRegistry,
@ -650,10 +654,23 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ServerZkAuthOperations zkAuthOperations = new ServerZkAuthOperations(zkSecretParams);
ServerZkReceiptOperations zkReceiptOperations = new ServerZkReceiptOperations(zkSecretParams);
TusBackupCredentialGenerator tusBackupCredentialGenerator = new TusBackupCredentialGenerator(config.getTus());
Cdn3BackupCredentialGenerator cdn3BackupCredentialGenerator = new Cdn3BackupCredentialGenerator(config.getTus());
BackupAuthManager backupAuthManager = new BackupAuthManager(dynamicConfigurationManager, rateLimiters, accountsManager, backupsGenericZkSecretParams, clock);
BackupManager backupManager = new BackupManager(backupsGenericZkSecretParams, tusBackupCredentialGenerator, dynamoDbAsyncClient,
BackupsDb backupsDb = new BackupsDb(
dynamoDbAsyncClient,
config.getDynamoDbTables().getBackups().getTableName(),
config.getDynamoDbTables().getBackupMedia().getTableName(),
clock);
BackupManager backupManager = new BackupManager(
backupsDb,
backupsGenericZkSecretParams,
cdn3BackupCredentialGenerator,
new Cdn3RemoteStorageManager(
remoteStorageExecutor,
config.getClientCdn().getCircuitBreaker(),
config.getClientCdn().getRetry(),
config.getClientCdn().getCaCertificates()),
config.getClientCdn().getAttachmentUrls(),
clock);
AuthFilter<BasicCredentials, AuthenticatedAccount> accountAuthFilter = new BasicCredentialAuthFilter.Builder<AuthenticatedAccount>().setAuthenticator(

View File

@ -7,19 +7,15 @@ package org.whispersystems.textsecuregcm.backup;
import io.grpc.Status;
import io.micrometer.core.instrument.Metrics;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.net.URI;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.signal.libsignal.protocol.InvalidKeyException;
import org.signal.libsignal.protocol.ecc.ECPublicKey;
import org.signal.libsignal.zkgroup.GenericServerSecretParams;
@ -29,66 +25,48 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Util;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
public class BackupManager {
private static final Logger logger = LoggerFactory.getLogger(BackupManager.class);
static final String MESSAGE_BACKUP_NAME = "messageBackup";
private static final int BACKUP_CDN = 3;
private static final long MAX_TOTAL_BACKUP_MEDIA_BYTES = 1024L * 1024L * 1024L * 50L;
private static final long MAX_MEDIA_OBJECT_SIZE = 1024L * 1024L * 101L;
private static final String ZK_AUTHN_COUNTER_NAME = MetricsUtil.name(BackupManager.class, "authentication");
private static final String ZK_AUTHZ_FAILURE_COUNTER_NAME = MetricsUtil.name(BackupManager.class, "authorizationFailure");
private static final String ZK_AUTHZ_FAILURE_COUNTER_NAME = MetricsUtil.name(BackupManager.class,
"authorizationFailure");
private static final String SUCCESS_TAG_NAME = "success";
private static final String FAILURE_REASON_TAG_NAME = "reason";
private final BackupsDb backupsDb;
private final GenericServerSecretParams serverSecretParams;
private final TusBackupCredentialGenerator tusBackupCredentialGenerator;
private final DynamoDbAsyncClient dynamoClient;
private final String backupTableName;
private final Cdn3BackupCredentialGenerator cdn3BackupCredentialGenerator;
private final RemoteStorageManager remoteStorageManager;
private final Map<Integer, String> attachmentCdnBaseUris;
private final Clock clock;
// The backups table
// B: 16 bytes that identifies the backup
public static final String KEY_BACKUP_ID_HASH = "U";
// N: Time in seconds since epoch of the last backup refresh. This timestamp must be periodically updated to avoid
// garbage collection of archive objects.
public static final String ATTR_LAST_REFRESH = "R";
// N: Time in seconds since epoch of the last backup media refresh. This timestamp can only be updated if the client
// has BackupTier.MEDIA, and must be periodically updated to avoid garbage collection of media objects.
public static final String ATTR_LAST_MEDIA_REFRESH = "MR";
// B: A 32 byte public key that should be used to sign the presentation used to authenticate requests against the
// backup-id
public static final String ATTR_PUBLIC_KEY = "P";
// N: Bytes consumed by this backup
public static final String ATTR_MEDIA_BYTES_USED = "MB";
// N: Number of media objects in the backup
public static final String ATTR_MEDIA_COUNT = "MC";
// N: The cdn number where the message backup is stored
public static final String ATTR_CDN = "CDN";
public BackupManager(
final BackupsDb backupsDb,
final GenericServerSecretParams serverSecretParams,
final TusBackupCredentialGenerator tusBackupCredentialGenerator,
final DynamoDbAsyncClient dynamoClient,
final String backupTableName,
final Cdn3BackupCredentialGenerator cdn3BackupCredentialGenerator,
final RemoteStorageManager remoteStorageManager,
final Map<Integer, String> attachmentCdnBaseUris,
final Clock clock) {
this.backupsDb = backupsDb;
this.serverSecretParams = serverSecretParams;
this.dynamoClient = dynamoClient;
this.tusBackupCredentialGenerator = tusBackupCredentialGenerator;
this.backupTableName = backupTableName;
this.cdn3BackupCredentialGenerator = cdn3BackupCredentialGenerator;
this.remoteStorageManager = remoteStorageManager;
this.clock = clock;
// strip trailing "/" for easier URI construction
this.attachmentCdnBaseUris = attachmentCdnBaseUris.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
entry -> StringUtils.removeEnd(entry.getValue(), "/")
));
}
/**
* Set the public key for the backup-id.
* <p>
@ -114,30 +92,16 @@ public class BackupManager {
.withDescription("credential does not support setting public key")
.asRuntimeException();
}
final byte[] hashedBackupId = hashedBackupId(presentation.getBackupId());
return dynamoClient.updateItem(UpdateItemRequest.builder()
.tableName(backupTableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId)))
.updateExpression("SET #publicKey = :publicKey")
.expressionAttributeNames(Map.of("#publicKey", ATTR_PUBLIC_KEY))
.expressionAttributeValues(Map.of(":publicKey", AttributeValues.b(publicKey.serialize())))
.conditionExpression("attribute_not_exists(#publicKey) OR #publicKey = :publicKey")
.build())
.exceptionally(throwable -> {
// There was already a row for this backup-id and it contained a different publicKey
if (ExceptionUtils.unwrap(throwable) instanceof ConditionalCheckFailedException) {
Metrics.counter(ZK_AUTHN_COUNTER_NAME,
SUCCESS_TAG_NAME, String.valueOf(false),
FAILURE_REASON_TAG_NAME, "public_key_conflict")
.increment();
throw Status.UNAUTHENTICATED
.withDescription("public key does not match existing public key for the backup-id")
.asRuntimeException();
}
throw ExceptionUtils.wrap(throwable);
})
.thenRun(Util.NOOP);
return backupsDb.setPublicKey(presentation.getBackupId(), backupTier, publicKey)
.exceptionally(ExceptionUtils.exceptionallyHandler(PublicKeyConflictException.class, ex -> {
Metrics.counter(ZK_AUTHN_COUNTER_NAME,
SUCCESS_TAG_NAME, String.valueOf(false),
FAILURE_REASON_TAG_NAME, "public_key_conflict")
.increment();
throw Status.UNAUTHENTICATED
.withDescription("public key does not match existing public key for the backup-id")
.asRuntimeException();
}));
}
@ -151,31 +115,12 @@ public class BackupManager {
*/
public CompletableFuture<MessageBackupUploadDescriptor> createMessageBackupUploadDescriptor(
final AuthenticatedBackupUser backupUser) {
final byte[] hashedBackupId = hashedBackupId(backupUser);
final String encodedBackupId = encodeForCdn(hashedBackupId);
final long refreshTimeSecs = clock.instant().getEpochSecond();
final List<String> updates = new ArrayList<>(List.of("#cdn = :cdn", "#lastRefresh = :expiration"));
final Map<String, String> expressionAttributeNames = new HashMap<>(Map.of(
"#cdn", ATTR_CDN,
"#lastRefresh", ATTR_LAST_REFRESH));
if (backupUser.backupTier().compareTo(BackupTier.MEDIA) >= 0) {
updates.add("#lastMediaRefresh = :expiration");
expressionAttributeNames.put("#lastMediaRefresh", ATTR_LAST_MEDIA_REFRESH);
}
final String encodedBackupId = encodeBackupIdForCdn(backupUser);
// this could race with concurrent updates, but the only effect would be last-writer-wins on the timestamp
return dynamoClient.updateItem(UpdateItemRequest.builder()
.tableName(backupTableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId)))
.updateExpression("SET %s".formatted(String.join(",", updates)))
.expressionAttributeNames(expressionAttributeNames)
.expressionAttributeValues(Map.of(
":cdn", AttributeValues.n(BACKUP_CDN),
":expiration", AttributeValues.n(refreshTimeSecs)))
.build())
.thenApply(result -> tusBackupCredentialGenerator.generateUpload(encodedBackupId, MESSAGE_BACKUP_NAME));
return backupsDb
.addMessageBackup(backupUser)
.thenApply(result -> cdn3BackupCredentialGenerator.generateUpload(encodedBackupId, MESSAGE_BACKUP_NAME));
}
/**
@ -190,23 +135,8 @@ public class BackupManager {
.withDescription("credential does not support ttl operation")
.asRuntimeException();
}
final long refreshTimeSecs = clock.instant().getEpochSecond();
// update message backup TTL
final List<String> updates = new ArrayList<>(Collections.singletonList("#lastRefresh = :expiration"));
final Map<String, String> expressionAttributeNames = new HashMap<>(Map.of("#lastRefresh", ATTR_LAST_REFRESH));
if (backupUser.backupTier().compareTo(BackupTier.MEDIA) >= 0) {
// update media TTL
expressionAttributeNames.put("#lastMediaRefresh", ATTR_LAST_MEDIA_REFRESH);
updates.add("#lastMediaRefresh = :expiration");
}
return dynamoClient.updateItem(UpdateItemRequest.builder()
.tableName(backupTableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupUser))))
.updateExpression("SET %s".formatted(String.join(",", updates)))
.expressionAttributeNames(expressionAttributeNames)
.expressionAttributeValues(Map.of(":expiration", AttributeValues.n(refreshTimeSecs)))
.build())
.thenRun(Util.NOOP);
return backupsDb.ttlRefresh(backupUser);
}
public record BackupInfo(int cdn, String backupSubdir, String messageBackupKey, Optional<Long> mediaUsedSpace) {}
@ -223,31 +153,107 @@ public class BackupManager {
throw Status.PERMISSION_DENIED.withDescription("credential does not support info operation")
.asRuntimeException();
}
return backupInfoHelper(backupUser);
return backupsDb.describeBackup(backupUser)
.thenApply(backupDescription -> new BackupInfo(
backupDescription.cdn(),
encodeBackupIdForCdn(backupUser),
MESSAGE_BACKUP_NAME,
backupDescription.mediaUsedSpace()));
}
private CompletableFuture<BackupInfo> backupInfoHelper(final AuthenticatedBackupUser backupUser) {
return dynamoClient.getItem(GetItemRequest.builder()
.tableName(backupTableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupUser))))
.projectionExpression("#cdn,#bytesUsed")
.expressionAttributeNames(Map.of("#cdn", ATTR_CDN, "#bytesUsed", ATTR_MEDIA_BYTES_USED))
.build())
.thenApply(response -> {
if (!response.hasItem()) {
throw Status.NOT_FOUND.withDescription("Backup not found").asRuntimeException();
}
final int cdn = AttributeValues.get(response.item(), ATTR_CDN)
.map(AttributeValue::n)
.map(Integer::parseInt)
.orElseThrow(() -> Status.NOT_FOUND.withDescription("Stored backup not found").asRuntimeException());
/**
* Check if there is enough capacity to store the requested amount of media
*
* @param backupUser an already ZK authenticated backup user
* @param mediaLength the desired number of media bytes to store
* @return true if mediaLength bytes can be stored
*/
public CompletableFuture<Boolean> canStoreMedia(final AuthenticatedBackupUser backupUser, final long mediaLength) {
if (backupUser.backupTier().compareTo(BackupTier.MEDIA) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
throw Status.PERMISSION_DENIED
.withDescription("credential does not support storing media")
.asRuntimeException();
}
return backupsDb.describeBackup(backupUser)
.thenApply(info -> info.mediaUsedSpace()
.filter(usedSpace -> MAX_TOTAL_BACKUP_MEDIA_BYTES - usedSpace >= mediaLength)
.isPresent());
}
final Optional<Long> mediaUsed = AttributeValues.get(response.item(), ATTR_MEDIA_BYTES_USED)
.map(AttributeValue::n)
.map(Long::parseLong);
public record StorageDescriptor(int cdn, byte[] key) {}
return new BackupInfo(cdn, encodeForCdn(hashedBackupId(backupUser)), MESSAGE_BACKUP_NAME, mediaUsed);
});
/**
* Copy an encrypted object to the backup cdn, adding a layer of encryption
* <p>
* Implementation notes: <p> This method guarantees that any object that gets successfully copied to the backup cdn
* will also have an entry for the user in the database. <p>
* <p>
* However, the converse isn't true; there may be entries in the database that have not made it to the cdn. On list,
* these entries are checked against the cdn and removed.
*
* @return A stage that completes successfully with location of the twice-encrypted object on the backup cdn. The
* returned CompletionStage can be completed exceptionally with the following exceptions.
* <ul>
* <li> {@link InvalidLengthException} If the expectedSourceLength does not match the length of the sourceUri </li>
* <li> {@link SourceObjectNotFoundException} If the no object at sourceUri is found </li>
* <li> {@link java.io.IOException} If there was a generic IO issue </li>
* </ul>
*/
public CompletableFuture<StorageDescriptor> copyToBackup(
final AuthenticatedBackupUser backupUser,
final int sourceCdn,
final String sourceKey,
final int sourceLength,
final MediaEncryptionParameters encryptionParameters,
final byte[] destinationMediaId) {
if (backupUser.backupTier().compareTo(BackupTier.MEDIA) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
throw Status.PERMISSION_DENIED
.withDescription("credential does not support storing media")
.asRuntimeException();
}
if (sourceLength > MAX_MEDIA_OBJECT_SIZE) {
throw Status.INVALID_ARGUMENT
.withDescription("Invalid sourceObject size")
.asRuntimeException();
}
final MessageBackupUploadDescriptor dst = cdn3BackupCredentialGenerator.generateUpload(
encodeBackupIdForCdn(backupUser),
encodeForCdn(destinationMediaId));
return this.backupsDb
// Write the ddb updates before actually updating backing storage
.trackMedia(backupUser, destinationMediaId, sourceLength)
// copy the objects. On a failure, make a best-effort attempt to reverse the ddb transaction. If cleanup fails
// the client may be left with some cleanup to do if they don't eventually upload the media id.
.thenCompose(ignored -> remoteStorageManager
// actually perform the copy
.copy(attachmentReadUri(sourceCdn, sourceKey), sourceLength, encryptionParameters, dst)
// best effort: on failure, untrack the copied media
.exceptionallyCompose(copyError -> backupsDb.untrackMedia(backupUser, destinationMediaId, sourceLength)
.thenCompose(ignoredSuccess -> CompletableFuture.failedFuture(copyError))))
// indicates where the backup was stored
.thenApply(ignore -> new StorageDescriptor(dst.cdn(), destinationMediaId));
}
/**
* Construct the URI for an attachment with the specified key
*
* @param cdn where the attachment is located
* @param key the attachment key
* @return A {@link URI} where the attachment can be retrieved
*/
private URI attachmentReadUri(final int cdn, final String key) {
final String baseUri = attachmentCdnBaseUris.get(cdn);
if (baseUri == null) {
throw Status.INVALID_ARGUMENT.withDescription("Unknown cdn " + cdn).asRuntimeException();
}
return URI.create("%s/%s".formatted(baseUri, key));
}
/**
@ -264,8 +270,8 @@ public class BackupManager {
.asRuntimeException();
}
final String encodedBackupId = encodeForCdn(hashedBackupId(backupUser));
return tusBackupCredentialGenerator.readHeaders(encodedBackupId);
final String encodedBackupId = encodeBackupIdForCdn(backupUser);
return cdn3BackupCredentialGenerator.readHeaders(encodedBackupId);
}
/**
@ -284,27 +290,17 @@ public class BackupManager {
public CompletableFuture<AuthenticatedBackupUser> authenticateBackupUser(
final BackupAuthCredentialPresentation presentation,
final byte[] signature) {
final byte[] hashedBackupId = hashedBackupId(presentation.getBackupId());
return dynamoClient.getItem(GetItemRequest.builder()
.tableName(backupTableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId)))
.projectionExpression("#publicKey")
.expressionAttributeNames(Map.of("#publicKey", ATTR_PUBLIC_KEY))
.build())
.thenApply(response -> {
if (!response.hasItem()) {
Metrics.counter(ZK_AUTHN_COUNTER_NAME,
SUCCESS_TAG_NAME, String.valueOf(false),
FAILURE_REASON_TAG_NAME, "missing_public_key")
.increment();
throw Status.NOT_FOUND.withDescription("Backup not found").asRuntimeException();
}
final byte[] publicKeyBytes = AttributeValues.get(response.item(), ATTR_PUBLIC_KEY)
.map(AttributeValue::b)
.map(SdkBytes::asByteArray)
.orElseThrow(() -> Status.INTERNAL
.withDescription("Stored backup missing public key")
.asRuntimeException());
return backupsDb
.retrievePublicKey(presentation.getBackupId())
.thenApply(optionalPublicKey -> {
final byte[] publicKeyBytes = optionalPublicKey
.orElseThrow(() -> {
Metrics.counter(ZK_AUTHN_COUNTER_NAME,
SUCCESS_TAG_NAME, String.valueOf(false),
FAILURE_REASON_TAG_NAME, "missing_public_key")
.increment();
return Status.NOT_FOUND.withDescription("Backup not found").asRuntimeException();
});
try {
final ECPublicKey publicKey = new ECPublicKey(publicKeyBytes);
return new AuthenticatedBackupUser(
@ -316,7 +312,7 @@ public class BackupManager {
FAILURE_REASON_TAG_NAME, "invalid_public_key")
.increment();
logger.error("Invalid publicKey for backupId hash {}",
HexFormat.of().formatHex(hashedBackupId), e);
HexFormat.of().formatHex(BackupsDb.hashedBackupId(presentation.getBackupId())), e);
throw Status.INTERNAL
.withCause(e)
.withDescription("Could not deserialize stored public key")
@ -373,19 +369,12 @@ public class BackupManager {
});
}
private static byte[] hashedBackupId(final AuthenticatedBackupUser backupId) {
return hashedBackupId(backupId.backupId());
}
private static byte[] hashedBackupId(final byte[] backupId) {
try {
return Arrays.copyOf(MessageDigest.getInstance("SHA-256").digest(backupId), 16);
} catch (NoSuchAlgorithmException e) {
throw new AssertionError(e);
}
private static String encodeBackupIdForCdn(final AuthenticatedBackupUser backupUser) {
return encodeForCdn(BackupsDb.hashedBackupId(backupUser.backupId()));
}
private static String encodeForCdn(final byte[] bytes) {
return Base64.getUrlEncoder().encodeToString(bytes);
}
}

View File

@ -0,0 +1,103 @@
package org.whispersystems.textsecuregcm.backup;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.concurrent.Flow;
import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.Mac;
import javax.crypto.NoSuchPaddingException;
import org.reactivestreams.FlowAdapters;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class BackupMediaEncrypter {
private final Cipher cipher;
private final Mac mac;
public BackupMediaEncrypter(final MediaEncryptionParameters encryptionParameters) {
cipher = initializeCipher(encryptionParameters);
mac = initializeMac(encryptionParameters);
}
public int outputSize(final int inputSize) {
return cipher.getIV().length + cipher.getOutputSize(inputSize) + mac.getMacLength();
}
/**
* Perform streaming encryption
*
* @param sourceBody A source of ByteBuffers, typically from an asynchronous HttpResponse
* @return A publisher that returns IV + AES/CBC/PKCS5Padding encrypted source + HMAC(IV + encrypted source) suitable
* to write with an asynchronous HttpRequest
*/
public Flow.Publisher<ByteBuffer> encryptBody(Flow.Publisher<List<ByteBuffer>> sourceBody) {
// Write IV, encrypted payload, mac
final Flux<ByteBuffer> encryptedBody = Flux.concat(
Mono.fromSupplier(() -> {
mac.update(cipher.getIV());
return ByteBuffer.wrap(cipher.getIV());
}),
Flux.from(FlowAdapters.toPublisher(sourceBody))
.flatMap(buffers -> Flux.fromIterable(buffers))
.concatMap(byteBuffer -> {
final byte[] copy = new byte[byteBuffer.remaining()];
byteBuffer.get(copy);
final byte[] res = cipher.update(copy);
if (res == null) {
return Mono.empty();
} else {
mac.update(res);
return Mono.just(ByteBuffer.wrap(res));
}
}),
Mono.fromSupplier(() -> {
try {
final byte[] finalBytes = cipher.doFinal();
mac.update(finalBytes);
return ByteBuffer.wrap(finalBytes);
} catch (IllegalBlockSizeException | BadPaddingException e) {
throw ExceptionUtils.wrap(e);
}
}),
Mono.fromSupplier(() -> ByteBuffer.wrap(mac.doFinal())));
return FlowAdapters.toFlowPublisher(encryptedBody);
}
private static Mac initializeMac(final MediaEncryptionParameters encryptionParameters) {
try {
final Mac mac = Mac.getInstance("HmacSHA256");
mac.init(encryptionParameters.hmacSHA256Key());
return mac;
} catch (NoSuchAlgorithmException e) {
throw new AssertionError(e);
} catch (InvalidKeyException e) {
throw new IllegalArgumentException(e);
}
}
private static Cipher initializeCipher(final MediaEncryptionParameters encryptionParameters) {
try {
final Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
cipher.init(
Cipher.ENCRYPT_MODE,
encryptionParameters.aesEncryptionKey(),
encryptionParameters.iv());
return cipher;
} catch (NoSuchAlgorithmException | NoSuchPaddingException e) {
throw new AssertionError(e);
} catch (InvalidAlgorithmParameterException | InvalidKeyException e) {
throw new IllegalArgumentException(e);
}
}
}

View File

@ -0,0 +1,489 @@
package org.whispersystems.textsecuregcm.backup;
import io.grpc.Status;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.signal.libsignal.protocol.ecc.ECPublicKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Util;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.CancellationReason;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.Delete;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.Put;
import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;
import software.amazon.awssdk.services.dynamodb.model.Update;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
/**
* Tracks backup metadata in a persistent store.
*
* It's assumed that the caller has already validated that the backupUser being operated on has valid credentials and
* possesses the appropriate {@link BackupTier} to perform the current operation.
*/
public class BackupsDb {
private static final Logger logger = LoggerFactory.getLogger(BackupsDb.class);
static final int BACKUP_CDN = 3;
private final DynamoDbAsyncClient dynamoClient;
private final String backupTableName;
private final String backupMediaTableName;
private final Clock clock;
// The backups table
// B: 16 bytes that identifies the backup
public static final String KEY_BACKUP_ID_HASH = "U";
// N: Time in seconds since epoch of the last backup refresh. This timestamp must be periodically updated to avoid
// garbage collection of archive objects.
public static final String ATTR_LAST_REFRESH = "R";
// N: Time in seconds since epoch of the last backup media refresh. This timestamp can only be updated if the client
// has BackupTier.MEDIA, and must be periodically updated to avoid garbage collection of media objects.
public static final String ATTR_LAST_MEDIA_REFRESH = "MR";
// B: A 32 byte public key that should be used to sign the presentation used to authenticate requests against the
// backup-id
public static final String ATTR_PUBLIC_KEY = "P";
// N: Bytes consumed by this backup
public static final String ATTR_MEDIA_BYTES_USED = "MB";
// N: Number of media objects in the backup
public static final String ATTR_MEDIA_COUNT = "MC";
// N: The cdn number where the message backup is stored
public static final String ATTR_CDN = "CDN";
// The stored media table (hashedBackupId, mediaId, cdn, objectLength)
// B: 15-byte mediaId
public static final String KEY_MEDIA_ID = "M";
// N: The length of the encrypted media object
public static final String ATTR_LENGTH = "L";
public BackupsDb(
final DynamoDbAsyncClient dynamoClient,
final String backupTableName,
final String backupMediaTableName,
final Clock clock) {
this.dynamoClient = dynamoClient;
this.backupTableName = backupTableName;
this.backupMediaTableName = backupMediaTableName;
this.clock = clock;
}
/**
* Set the public key associated with a backupId.
*
* @param authenticatedBackupId The backup-id bytes that should be associated with the provided public key
* @param authenticatedBackupTier The backup tier
* @param publicKey The public key to associate with the backup id
* @return A stage that completes when the public key has been set. If the backup-id already has a set public key that
* does not match, the stage will be completed exceptionally with a {@link PublicKeyConflictException}
*/
CompletableFuture<Void> setPublicKey(
final byte[] authenticatedBackupId,
final BackupTier authenticatedBackupTier,
final ECPublicKey publicKey) {
final byte[] hashedBackupId = hashedBackupId(authenticatedBackupId);
return dynamoClient.updateItem(new UpdateBuilder(backupTableName, authenticatedBackupTier, hashedBackupId)
.addSetExpression("#publicKey = :publicKey",
Map.entry("#publicKey", ATTR_PUBLIC_KEY),
Map.entry(":publicKey", AttributeValues.b(publicKey.serialize())))
.setRefreshTimes(clock)
.withConditionExpression("attribute_not_exists(#publicKey) OR #publicKey = :publicKey")
.updateItemBuilder()
.build())
.exceptionally(throwable -> {
// There was already a row for this backup-id and it contained a different publicKey
if (ExceptionUtils.unwrap(throwable) instanceof ConditionalCheckFailedException) {
throw ExceptionUtils.wrap(new PublicKeyConflictException());
}
throw ExceptionUtils.wrap(throwable);
})
.thenRun(Util.NOOP);
}
CompletableFuture<Optional<byte[]>> retrievePublicKey(byte[] backupId) {
final byte[] hashedBackupId = hashedBackupId(backupId);
return dynamoClient.getItem(GetItemRequest.builder()
.tableName(backupTableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId)))
.consistentRead(true)
.projectionExpression("#publicKey")
.expressionAttributeNames(Map.of("#publicKey", ATTR_PUBLIC_KEY))
.build())
.thenApply(response ->
AttributeValues.get(response.item(), ATTR_PUBLIC_KEY)
.map(AttributeValue::b)
.map(SdkBytes::asByteArray));
}
/**
* Add media to the backup media table and update the quota in the backup table
*
* @param backupUser The
* @param mediaId The mediaId to add
* @param mediaLength The length of the media before encryption (the length of the source media)
* @return A stage that completes successfully once the tables are updated. If the media with the provided id has
* previously been tracked with a different length, the stage will complete exceptionally with an
* {@link InvalidLengthException}
*/
CompletableFuture<Void> trackMedia(
final AuthenticatedBackupUser backupUser,
final byte[] mediaId,
final int mediaLength) {
final byte[] hashedBackupId = hashedBackupId(backupUser);
return dynamoClient
.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(
// Add the media to the media table
TransactWriteItem.builder().put(Put.builder()
.tableName(backupMediaTableName)
.returnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD)
.item(Map.of(
KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId),
KEY_MEDIA_ID, AttributeValues.b(mediaId),
ATTR_CDN, AttributeValues.n(BACKUP_CDN),
ATTR_LENGTH, AttributeValues.n(mediaLength)))
.conditionExpression("attribute_not_exists(#mediaId)")
.expressionAttributeNames(Map.of("#mediaId", KEY_MEDIA_ID))
.build()).build(),
// Update the media quota and TTL
TransactWriteItem.builder().update(
UpdateBuilder.forUser(backupTableName, backupUser)
.setRefreshTimes(clock)
.incrementMediaBytes(mediaLength)
.incrementMediaCount(1)
.transactItemBuilder()
.build()).build()).build())
.exceptionally(throwable -> {
if (ExceptionUtils.unwrap(throwable) instanceof TransactionCanceledException txCancelled) {
final long oldItemLength = conditionCheckFailed(txCancelled, 0)
.flatMap(item -> Optional.ofNullable(item.get(ATTR_LENGTH)))
.map(attr -> Long.parseLong(attr.n()))
.orElseThrow(() -> ExceptionUtils.wrap(throwable));
if (oldItemLength != mediaLength) {
throw new CompletionException(
new InvalidLengthException("Previously tried to copy media with a different length. "
+ "Provided " + mediaLength + " was " + oldItemLength));
}
// The client already "paid" for this media, can let them through
return null;
} else {
// rethrow original exception
throw ExceptionUtils.wrap(throwable);
}
})
.thenRun(Util.NOOP);
}
/**
* Remove media from backup media table and update the quota in the backup table
*
* @param backupUser The backup user
* @param mediaId The mediaId to add
* @param mediaLength The length of the media before encryption (the length of the source media)
* @return A stage that completes successfully once the tables are updated
*/
CompletableFuture<Void> untrackMedia(
final AuthenticatedBackupUser backupUser,
final byte[] mediaId,
final int mediaLength) {
final byte[] hashedBackupId = hashedBackupId(backupUser);
return dynamoClient.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(
TransactWriteItem.builder().delete(Delete.builder()
.tableName(backupMediaTableName)
.returnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD)
.key(Map.of(
KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId),
KEY_MEDIA_ID, AttributeValues.b(mediaId)
))
.conditionExpression("#length = :length")
.expressionAttributeNames(Map.of("#length", ATTR_LENGTH))
.expressionAttributeValues(Map.of(":length", AttributeValues.n(mediaLength)))
.build()).build(),
// Don't update TTLs, since we're just cleaning up media
TransactWriteItem.builder().update(UpdateBuilder.forUser(backupTableName, backupUser)
.incrementMediaBytes(-mediaLength)
.incrementMediaCount(-1)
.transactItemBuilder().build()).build()).build())
.exceptionally(error -> {
logger.warn("failed cleanup after failed copy operation", error);
return null;
})
.thenRun(Util.NOOP);
}
/**
* Update the last update timestamps for the backupId in the presentation
*
* @param backupUser an already authorized backup user
*/
CompletableFuture<Void> ttlRefresh(final AuthenticatedBackupUser backupUser) {
// update message backup TTL
return dynamoClient.updateItem(UpdateBuilder.forUser(backupTableName, backupUser)
.setRefreshTimes(clock)
.updateItemBuilder()
.build())
.thenRun(Util.NOOP);
}
/**
* Track that a backup will be stored for the user
* @param backupUser an already authorized backup user
*/
CompletableFuture<Void> addMessageBackup(final AuthenticatedBackupUser backupUser) {
// this could race with concurrent updates, but the only effect would be last-writer-wins on the timestamp
return dynamoClient.updateItem(
UpdateBuilder.forUser(backupTableName, backupUser)
.setRefreshTimes(clock)
.setCdn(BACKUP_CDN)
.updateItemBuilder()
.build())
.thenRun(Util.NOOP);
}
record BackupDescription(int cdn, Optional<Long> mediaUsedSpace) {}
/**
* Retrieve information about the backup
*
* @param backupUser an already authorized backup user
* @return A {@link BackupDescription} containing the cdn of the message backup and the total number of media space
* bytes used by the backup user.
*/
CompletableFuture<BackupDescription> describeBackup(final AuthenticatedBackupUser backupUser) {
return dynamoClient.getItem(GetItemRequest.builder()
.tableName(backupTableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupUser))))
.projectionExpression("#cdn,#bytesUsed")
.expressionAttributeNames(Map.of("#cdn", ATTR_CDN, "#bytesUsed", ATTR_MEDIA_BYTES_USED))
.consistentRead(true)
.build())
.thenApply(response -> {
if (!response.hasItem()) {
throw Status.NOT_FOUND.withDescription("Backup not found").asRuntimeException();
}
final int cdn = AttributeValues.get(response.item(), ATTR_CDN)
.map(AttributeValue::n)
.map(Integer::parseInt)
.orElseThrow(() -> Status.NOT_FOUND.withDescription("Stored backup not found").asRuntimeException());
final Optional<Long> mediaUsed = AttributeValues.get(response.item(), ATTR_MEDIA_BYTES_USED)
.map(AttributeValue::n)
.map(Long::parseLong);
return new BackupDescription(cdn, mediaUsed);
});
}
/**
* Build ddb update statements for the backups table
*/
private static class UpdateBuilder {
private final List<String> setStatements = new ArrayList<>();
private final Map<String, AttributeValue> attrValues = new HashMap<>();
private final Map<String, String> attrNames = new HashMap<>();
private final String tableName;
private final BackupTier backupTier;
private final byte[] hashedBackupId;
private String conditionExpression = null;
static UpdateBuilder forUser(String tableName, AuthenticatedBackupUser backupUser) {
return new UpdateBuilder(tableName, backupUser.backupTier(), hashedBackupId(backupUser));
}
UpdateBuilder(String tableName, BackupTier backupTier, byte[] hashedBackupId) {
this.tableName = tableName;
this.backupTier = backupTier;
this.hashedBackupId = hashedBackupId;
}
private void addAttrValue(Map.Entry<String, AttributeValue> attrValue) {
final AttributeValue old = attrValues.put(attrValue.getKey(), attrValue.getValue());
if (old != null && !old.equals(attrValue.getValue())) {
throw new IllegalArgumentException("duplicate attrValue key used for different values");
}
}
private void addAttrName(Map.Entry<String, String> attrName) {
final String oldName = attrNames.put(attrName.getKey(), attrName.getValue());
if (oldName != null && !oldName.equals(attrName.getValue())) {
throw new IllegalArgumentException("duplicate attrName key used for different attribute names");
}
}
private void addAttrs(final Map.Entry<String, String> attrName, final Map.Entry<String, AttributeValue> attrValue) {
addAttrName(attrName);
addAttrValue(attrValue);
}
UpdateBuilder addSetExpression(
final String update,
final Map.Entry<String, String> attrName,
final Map.Entry<String, AttributeValue> attrValue) {
setStatements.add(update);
addAttrs(attrName, attrValue);
return this;
}
UpdateBuilder addSetExpression(final String update) {
setStatements.add(update);
return this;
}
UpdateBuilder withConditionExpression(final String conditionExpression) {
this.conditionExpression = conditionExpression;
return this;
}
UpdateBuilder withConditionExpression(
final String conditionExpression,
final Map.Entry<String, String> attrName,
final Map.Entry<String, AttributeValue> attrValue) {
this.addAttrs(attrName, attrValue);
this.conditionExpression = conditionExpression;
return this;
}
UpdateBuilder setCdn(final int cdn) {
return addSetExpression(
"#cdn = :cdn",
Map.entry("#cdn", ATTR_CDN),
Map.entry(":cdn", AttributeValues.n(cdn)));
}
UpdateBuilder incrementMediaCount(long delta) {
addAttrName(Map.entry("#mediaCount", ATTR_MEDIA_COUNT));
addAttrValue(Map.entry(":zero", AttributeValues.n(0)));
addAttrValue(Map.entry(":mediaCountDelta", AttributeValues.n(delta)));
addSetExpression("#mediaCount = if_not_exists(#mediaCount, :zero) + :mediaCountDelta");
return this;
}
UpdateBuilder incrementMediaBytes(long delta) {
addAttrName(Map.entry("#mediaBytes", ATTR_MEDIA_BYTES_USED));
addAttrValue(Map.entry(":zero", AttributeValues.n(0)));
addAttrValue(Map.entry(":mediaBytesDelta", AttributeValues.n(delta)));
addSetExpression("#mediaBytes = if_not_exists(#mediaBytes, :zero) + :mediaBytesDelta");
return this;
}
/**
* Set the lastRefresh time as part of the update
* <p>
* This always updates lastRefreshTime, and updates lastMediaRefreshTime if the backup user has the appropriate
* tier
*/
UpdateBuilder setRefreshTimes(final Clock clock) {
final long refreshTimeSecs = clock.instant().getEpochSecond();
addSetExpression("#lastRefreshTime = :lastRefreshTime",
Map.entry("#lastRefreshTime", ATTR_LAST_REFRESH),
Map.entry(":lastRefreshTime", AttributeValues.n(refreshTimeSecs)));
if (backupTier.compareTo(BackupTier.MEDIA) >= 0) {
// update the media time if we have the appropriate tier
addSetExpression("#lastMediaRefreshTime = :lastMediaRefreshTime",
Map.entry("#lastMediaRefreshTime", ATTR_LAST_MEDIA_REFRESH),
Map.entry(":lastMediaRefreshTime", AttributeValues.n(refreshTimeSecs)));
}
return this;
}
/**
* Prepare a non-transactional update
*
* @return An {@link UpdateItemRequest#builder()} that can be used with updateItem
*/
UpdateItemRequest.Builder updateItemBuilder() {
final UpdateItemRequest.Builder bldr = UpdateItemRequest.builder()
.tableName(tableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId)))
.updateExpression("SET %s".formatted(String.join(",", setStatements)))
.expressionAttributeNames(attrNames)
.expressionAttributeValues(attrValues);
if (this.conditionExpression != null) {
bldr.conditionExpression(conditionExpression);
}
return bldr;
}
/**
* Prepare a transactional update
*
* @return An {@link Update#builder()} that can be used with transactItem
*/
Update.Builder transactItemBuilder() {
final Update.Builder bldr = Update.builder()
.tableName(tableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId)))
.updateExpression("SET %s".formatted(String.join(",", setStatements)))
.expressionAttributeNames(attrNames)
.expressionAttributeValues(attrValues);
if (this.conditionExpression != null) {
bldr.conditionExpression(conditionExpression);
}
return bldr;
}
}
private static byte[] hashedBackupId(final AuthenticatedBackupUser backupId) {
return hashedBackupId(backupId.backupId());
}
static byte[] hashedBackupId(final byte[] backupId) {
try {
return Arrays.copyOf(MessageDigest.getInstance("SHA-256").digest(backupId), 16);
} catch (NoSuchAlgorithmException e) {
throw new AssertionError(e);
}
}
/**
* Check if a DynamoDb error indicates a condition check failed error, and return the value of the item failed to
* update.
*
* @param e The error returned by {@link DynamoDbAsyncClient#transactWriteItems} attempt
* @param itemIndex The index of the item in the transaction that had a condition expression
* @return The remote value of the item that failed to update, or empty if the error was not a condition check failure
*/
private static Optional<Map<String, AttributeValue>> conditionCheckFailed(TransactionCanceledException e,
int itemIndex) {
if (!e.hasCancellationReasons()) {
return Optional.empty();
}
if (e.cancellationReasons().size() < itemIndex + 1) {
return Optional.empty();
}
final CancellationReason reason = e.cancellationReasons().get(itemIndex);
if (!"ConditionalCheckFailed".equals(reason.code()) || !reason.hasItem()) {
return Optional.empty();
}
return Optional.of(reason.item());
}
}

View File

@ -16,13 +16,13 @@ import java.time.Clock;
import java.util.Base64;
import java.util.Map;
public class TusBackupCredentialGenerator {
public class Cdn3BackupCredentialGenerator {
private static final int BACKUP_CDN = 3;
public static final String CDN_PATH = "backups";
public static final int BACKUP_CDN = 3;
private static String READ_PERMISSION = "read";
private static String WRITE_PERMISSION = "write";
private static String CDN_PATH = "backups";
private static String PERMISSION_SEPARATOR = "$";
// Write entities will be of the form 'write$backups/<string>
@ -35,7 +35,7 @@ public class TusBackupCredentialGenerator {
private final ExternalServiceCredentialsGenerator credentialsGenerator;
private final String tusUri;
public TusBackupCredentialGenerator(final TusConfiguration cfg) {
public Cdn3BackupCredentialGenerator(final TusConfiguration cfg) {
this.tusUri = cfg.uploadUri();
this.credentialsGenerator = credentialsGenerator(Clock.systemUTC(), cfg);
}

View File

@ -0,0 +1,102 @@
package org.whispersystems.textsecuregcm.backup;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Stream;
import javax.ws.rs.core.Response;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient;
public class Cdn3RemoteStorageManager implements RemoteStorageManager {
private final FaultTolerantHttpClient httpClient;
public Cdn3RemoteStorageManager(
final ScheduledExecutorService retryExecutor,
final CircuitBreakerConfiguration circuitBreakerConfiguration,
final RetryConfiguration retryConfiguration,
final List<String> caCertificates) throws CertificateException {
this.httpClient = FaultTolerantHttpClient.newBuilder()
.withName("cdn3-remote-storage")
.withCircuitBreaker(circuitBreakerConfiguration)
.withExecutor(Executors.newCachedThreadPool())
.withRetryExecutor(retryExecutor)
.withRetry(retryConfiguration)
.withConnectTimeout(Duration.ofSeconds(10))
.withVersion(HttpClient.Version.HTTP_2)
.withTrustedServerCertificates(caCertificates.toArray(new String[0]))
.build();
}
@Override
public int cdnNumber() {
return 3;
}
@Override
public CompletionStage<Void> copy(
final URI sourceUri,
final int expectedSourceLength,
final MediaEncryptionParameters encryptionParameters,
final MessageBackupUploadDescriptor uploadDescriptor) {
if (uploadDescriptor.cdn() != cdnNumber()) {
throw new IllegalArgumentException("Cdn3RemoteStorageManager can only copy to cdn3");
}
final BackupMediaEncrypter encrypter = new BackupMediaEncrypter(encryptionParameters);
final HttpRequest request = HttpRequest.newBuilder().GET().uri(sourceUri).build();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> {
if (response.statusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
throw new CompletionException(new SourceObjectNotFoundException());
} else if (response.statusCode() != Response.Status.OK.getStatusCode()) {
throw new CompletionException(new IOException("error reading from source: " + response.statusCode()));
}
final int actualSourceLength = Math.toIntExact(response.headers().firstValueAsLong("Content-Length")
.orElseThrow(() -> new CompletionException(new IOException("upstream missing Content-Length"))));
if (actualSourceLength != expectedSourceLength) {
throw new CompletionException(
new InvalidLengthException("Provided sourceLength " + expectedSourceLength + " was " + actualSourceLength));
}
final int expectedEncryptedLength = encrypter.outputSize(actualSourceLength);
final HttpRequest.BodyPublisher encryptedBody = HttpRequest.BodyPublishers.fromPublisher(
encrypter.encryptBody(response.body()), expectedEncryptedLength);
final String[] headers = Stream.concat(
uploadDescriptor.headers().entrySet()
.stream()
.flatMap(e -> Stream.of(e.getKey(), e.getValue())),
Stream.of("Upload-Length", Integer.toString(expectedEncryptedLength), "Tus-Resumable", "1.0.0"))
.toArray(String[]::new);
final HttpRequest put = HttpRequest.newBuilder()
.uri(URI.create(uploadDescriptor.signedUploadLocation()))
.headers(headers)
.POST(encryptedBody)
.build();
return httpClient.sendAsync(put, HttpResponse.BodyHandlers.discarding());
})
.thenAccept(response -> {
if (response.statusCode() != Response.Status.CREATED.getStatusCode() &&
response.statusCode() != Response.Status.OK.getStatusCode()) {
throw new CompletionException(new IOException("Failed to copy object: " + response.statusCode()));
}
});
}
}

View File

@ -0,0 +1,10 @@
package org.whispersystems.textsecuregcm.backup;
import java.io.IOException;
public class InvalidLengthException extends IOException {
public InvalidLengthException(String s) {
super(s);
}
}

View File

@ -0,0 +1,17 @@
package org.whispersystems.textsecuregcm.backup;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
public record MediaEncryptionParameters(
SecretKeySpec aesEncryptionKey,
SecretKeySpec hmacSHA256Key,
IvParameterSpec iv) {
public MediaEncryptionParameters(byte[] encryptionKey, byte[] macKey, byte[] iv) {
this(
new SecretKeySpec(encryptionKey, "AES"),
new SecretKeySpec(macKey, "HmacSHA256"),
new IvParameterSpec(iv));
}
}

View File

@ -0,0 +1,6 @@
package org.whispersystems.textsecuregcm.backup;
import java.io.IOException;
public class PublicKeyConflictException extends IOException {
}

View File

@ -0,0 +1,38 @@
package org.whispersystems.textsecuregcm.backup;
import java.net.URI;
import java.util.concurrent.CompletionStage;
/**
* Handles management operations over a external cdn storage system.
*/
public interface RemoteStorageManager {
/**
* @return The cdn number that this RemoteStorageManager manages
*/
int cdnNumber();
/**
* Copy and the object from a remote source into the backup, adding an additional layer of encryption
*
* @param sourceUri The location of the object to copy
* @param expectedSourceLength The length of the source object, should match the content-length of the object returned
* from the sourceUri.
* @param encryptionParameters The encryption keys that should be used to apply an additional layer of encryption to
* the object
* @param uploadDescriptor The destination, which must be in the cdn returned by {@link #cdnNumber()}
* @return A stage that completes successfully when the source has been successfully re-encrypted and copied into
* uploadDescriptor. The returned CompletionStage can be completed exceptionally with the following exceptions.
* <ul>
* <li> {@link InvalidLengthException} If the expectedSourceLength does not match the length of the sourceUri </li>
* <li> {@link SourceObjectNotFoundException} If the no object at sourceUri is found </li>
* <li> {@link java.io.IOException} If there was a generic IO issue </li>
* </ul>
*/
CompletionStage<Void> copy(
URI sourceUri,
int expectedSourceLength,
MediaEncryptionParameters encryptionParameters,
MessageBackupUploadDescriptor uploadDescriptor);
}

View File

@ -0,0 +1,11 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.backup;
import java.io.IOException;
public class SourceObjectNotFoundException extends IOException {
}

View File

@ -0,0 +1,53 @@
package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Configuration used to interact with a cdn via HTTP
*/
public class ClientCdnConfiguration {
/**
* Map from cdn number to the base url for attachments.
* <p>
* For example, if an attachment with the id 'abc' can be retrieved from cdn 2 at https://example.org/attachments/abc,
* the attachment url for 2 should https://example.org/attachments
*/
@JsonProperty
@NotNull
Map<Integer, @NotBlank String> attachmentUrls;
@JsonProperty
@NotNull
@NotEmpty List<@NotBlank String> caCertificates = new ArrayList<>();
@JsonProperty
@NotNull
CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration();
@JsonProperty
@NotNull
RetryConfiguration retry = new RetryConfiguration();
public List<String> getCaCertificates() {
return caCertificates;
}
public CircuitBreakerConfiguration getCircuitBreaker() {
return circuitBreaker;
}
public RetryConfiguration getRetry() {
return retry;
}
public Map<Integer, String> getAttachmentUrls() {
return attachmentUrls;
}
}

View File

@ -49,6 +49,7 @@ public class DynamoDbTables {
private final AccountsTableConfiguration accounts;
private final Table backups;
private final Table backupMedia;
private final Table clientReleases;
private final Table deletedAccounts;
private final Table deletedAccountsLock;
@ -71,6 +72,7 @@ public class DynamoDbTables {
public DynamoDbTables(
@JsonProperty("accounts") final AccountsTableConfiguration accounts,
@JsonProperty("backups") final Table backups,
@JsonProperty("backupMedia") final Table backupMedia,
@JsonProperty("clientReleases") final Table clientReleases,
@JsonProperty("deletedAccounts") final Table deletedAccounts,
@JsonProperty("deletedAccountsLock") final Table deletedAccountsLock,
@ -92,6 +94,7 @@ public class DynamoDbTables {
this.accounts = accounts;
this.backups = backups;
this.backupMedia = backupMedia;
this.clientReleases = clientReleases;
this.deletedAccounts = deletedAccounts;
this.deletedAccountsLock = deletedAccountsLock;
@ -124,6 +127,12 @@ public class DynamoDbTables {
return backups;
}
@NotNull
@Valid
public Table getBackupMedia() {
return backupMedia;
}
@NotNull
@Valid
public Table getClientReleases() {

View File

@ -14,6 +14,7 @@ import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.io.IOException;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@ -26,8 +27,11 @@ import java.util.Optional;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
@ -37,15 +41,27 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.signal.libsignal.protocol.ecc.ECPublicKey;
import org.signal.libsignal.zkgroup.InvalidInputException;
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation;
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialRequest;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.backup.BackupAuthManager;
import org.whispersystems.textsecuregcm.backup.BackupManager;
import org.whispersystems.textsecuregcm.backup.InvalidLengthException;
import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters;
import org.whispersystems.textsecuregcm.backup.SourceObjectNotFoundException;
import org.whispersystems.textsecuregcm.util.BackupAuthCredentialAdapter;
import org.whispersystems.textsecuregcm.util.ByteArrayAdapter;
import org.whispersystems.textsecuregcm.util.ByteArrayBase64UrlAdapter;
import org.whispersystems.textsecuregcm.util.ECPublicKeyAdapter;
import org.whispersystems.textsecuregcm.util.ExactlySize;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Path("/v1/archives")
@Tag(name = "Archive")
@ -72,6 +88,7 @@ public class ArchiveController {
@JsonSerialize(using = BackupAuthCredentialAdapter.CredentialRequestSerializer.class)
@NotNull BackupAuthCredentialRequest backupAuthCredentialRequest) {}
@PUT
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@ -88,10 +105,12 @@ public class ArchiveController {
@ApiResponse(responseCode = "204", description = "The backup-id was set")
@ApiResponse(responseCode = "400", description = "The provided backup auth credential request was invalid")
@ApiResponse(responseCode = "429", description = "Rate limited. Too many attempts to change the backup-id have been made")
public CompletionStage<Void> setBackupId(
public CompletionStage<Response> setBackupId(
@Auth final AuthenticatedAccount account,
@Valid @NotNull final SetBackupIdRequest setBackupIdRequest) throws RateLimitExceededException {
return this.backupAuthManager.commitBackupId(account.getAccount(), setBackupIdRequest.backupAuthCredentialRequest);
return this.backupAuthManager
.commitBackupId(account.getAccount(), setBackupIdRequest.backupAuthCredentialRequest)
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
}
public record BackupAuthCredentialsResponse(
@ -274,7 +293,7 @@ public class ArchiveController {
@ApiResponseZkAuth
@ApiResponse(responseCode = "204", description = "The public key was set")
@ApiResponse(responseCode = "429", description = "Rate limited.")
public CompletionStage<Void> setPublicKey(
public CompletionStage<Response> setPublicKey(
@Auth final Optional<AuthenticatedAccount> account,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@ -286,9 +305,9 @@ public class ArchiveController {
@HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature,
@NotNull SetPublicKeyRequest setPublicKeyRequest) {
return backupManager.setPublicKey(
presentation.presentation, signature.signature,
setPublicKeyRequest.backupIdPublicKey);
return backupManager
.setPublicKey(presentation.presentation, signature.signature, setPublicKeyRequest.backupIdPublicKey)
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
}
@ -333,6 +352,233 @@ public class ArchiveController {
result.signedUploadLocation()));
}
public record RemoteAttachment(
@Schema(description = "The attachment cdn")
@NotNull
Integer cdn,
@NotBlank
@Schema(description = "The attachment key")
String key) {}
public record CopyMediaRequest(
@Schema(description = "The object on the attachment CDN to copy")
@NotNull
RemoteAttachment sourceAttachment,
@Schema(description = "The length of the source attachment before the encryption applied by the copy operation")
@NotNull
int objectLength,
@Schema(description = "mediaId to copy on to the backup CDN in URL-safe base64", implementation = String.class)
@JsonSerialize(using = ByteArrayBase64UrlAdapter.Serializing.class)
@JsonDeserialize(using = ByteArrayBase64UrlAdapter.Deserializing.class)
@NotNull
@ExactlySize(15)
byte[] mediaId,
@Schema(description = "A 32-byte key for the MAC, base64 encoded", implementation = String.class)
@JsonDeserialize(using = ByteArrayAdapter.Deserializing.class)
@NotNull
@ExactlySize(32)
byte[] hmacKey,
@Schema(description = "A 32-byte encryption key for AES, base64 encoded", implementation = String.class)
@JsonDeserialize(using = ByteArrayAdapter.Deserializing.class)
@NotNull
@ExactlySize(32)
byte[] encryptionKey,
@Schema(description = "A 16-byte IV for AES, base64 encoded", implementation = String.class)
@JsonDeserialize(using = ByteArrayAdapter.Deserializing.class)
@NotNull
@ExactlySize(16)
byte[] iv) {}
public record CopyMediaResponse(
@Schema(description = "The backup cdn where this media object is stored")
@NotNull
Integer cdn) {}
@PUT
@Path("/media/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Operation(
summary = "Backup media",
description = """
Copy and re-encrypt media from the attachments cdn into the backup cdn.
The original, already encrypted, attachment will be encrypted with the provided key material before being copied
If the destination media already exists, the copy will be skipped and a 200 will be returned.
""")
@ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = CopyMediaResponse.class)))
@ApiResponse(responseCode = "400", description = "The provided object length was incorrect")
@ApiResponse(responseCode = "413", description = "All media capacity has been consumed. Free some space to continue.")
@ApiResponse(responseCode = "410", description = "The source object was not found.")
@ApiResponse(responseCode = "429", description = "Rate limited.")
@ApiResponseZkAuth
public CompletionStage<CopyMediaResponse> copyMedia(@Auth final Optional<AuthenticatedAccount> account,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@HeaderParam(X_SIGNAL_ZK_AUTH) final ArchiveController.BackupAuthCredentialPresentationHeader presentation,
@Parameter(description = BackupAuthCredentialPresentationSignature.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature,
@NotNull
@Valid final ArchiveController.CopyMediaRequest copyMediaRequest) {
if (account.isPresent()) {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
final AuthenticatedBackupUser backupUser = backupManager.authenticateBackupUser(
presentation.presentation, signature.signature).join();
final boolean fits = backupManager.canStoreMedia(backupUser, copyMediaRequest.objectLength()).join();
if (!fits) {
throw new ClientErrorException("Media quota exhausted", Response.Status.REQUEST_ENTITY_TOO_LARGE);
}
return copyMediaImpl(backupUser, copyMediaRequest)
.thenApply(result -> new CopyMediaResponse(result.cdn()))
.exceptionally(e -> {
final Throwable unwrapped = ExceptionUtils.unwrap(e);
if (unwrapped instanceof SourceObjectNotFoundException) {
throw new ClientErrorException("Source object not found " + unwrapped.getMessage(), Response.Status.GONE);
} else if (unwrapped instanceof InvalidLengthException) {
throw new BadRequestException("Invalid length " + unwrapped.getMessage());
} else {
throw ExceptionUtils.wrap(e);
}
});
}
private CompletionStage<BackupManager.StorageDescriptor> copyMediaImpl(final AuthenticatedBackupUser backupUser,
final CopyMediaRequest copyMediaRequest) {
return this.backupManager.copyToBackup(
backupUser,
copyMediaRequest.sourceAttachment.cdn,
copyMediaRequest.sourceAttachment.key,
copyMediaRequest.objectLength,
new MediaEncryptionParameters(
copyMediaRequest.encryptionKey,
copyMediaRequest.hmacKey,
copyMediaRequest.iv),
copyMediaRequest.mediaId);
}
public record CopyMediaBatchRequest(
@Schema(description = "A list of media objects to copy from the attachments CDN to the backup CDN")
@NotNull
@Size(min = 1, max = 1000)
List<CopyMediaRequest> items) {}
public record CopyMediaBatchResponse(
@Schema(description = "Detailed outcome information for each copy request in the batch")
List<Entry> responses) {
public record Entry(
@Schema(description = """
The outcome of the copy attempt.
A 200 indicates the object was successfully copied.
A 400 indicates an invalid argument in the request
A 410 indicates that the source object was not found
""")
int status,
@Schema(description = "On a copy failure, a detailed failure reason")
String failureReason,
@Schema(description = "The backup cdn where this media object is stored")
Integer cdn,
@Schema(description = "The mediaId of the object in URL-safe base64", implementation = String.class)
@JsonSerialize(using = ByteArrayBase64UrlAdapter.Serializing.class)
@JsonDeserialize(using = ByteArrayBase64UrlAdapter.Deserializing.class)
@NotNull
@ExactlySize(15)
byte[] mediaId) {}
}
@PUT
@Path("/media/batch")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Operation(
summary = "Batched backup media",
description = """
Copy and re-encrypt media from the attachments cdn into the backup cdn.
The original already encrypted attachment will be encrypted with the provided key material before being copied
If the batch request is processed at all, a 207 will be returned and the outcome of each constituent copy will
be provided as a separate entry in the response.
""")
@ApiResponse(responseCode = "207", description = """
The request was processed and each operation's outcome must be inspected individually. This does NOT necessarily
indicate the operation was a success.
""", content = @Content(schema = @Schema(implementation = CopyMediaBatchResponse.class)))
@ApiResponse(responseCode = "413", description = "All media capacity has been consumed. Free some space to continue.")
@ApiResponse(responseCode = "429", description = "Rate limited.")
@ApiResponseZkAuth
public CompletionStage<Response> copyMedia(
@Auth final Optional<AuthenticatedAccount> account,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@HeaderParam(X_SIGNAL_ZK_AUTH) final ArchiveController.BackupAuthCredentialPresentationHeader presentation,
@Parameter(description = BackupAuthCredentialPresentationSignature.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature,
@NotNull
@Valid final ArchiveController.CopyMediaBatchRequest copyMediaRequest) {
if (account.isPresent()) {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
final AuthenticatedBackupUser backupUser = backupManager.authenticateBackupUser(
presentation.presentation, signature.signature).join();
// If the entire batch won't fit in the user's remaining quota, reject the whole request.
final long expectedStorage = copyMediaRequest.items().stream().mapToLong(CopyMediaRequest::objectLength).sum();
final boolean fits = backupManager.canStoreMedia(backupUser, expectedStorage).join();
if (!fits) {
throw new ClientErrorException("Media quota exhausted", Response.Status.REQUEST_ENTITY_TOO_LARGE);
}
return Flux.fromIterable(copyMediaRequest.items)
// Operate sequentially, waiting for one copy to finish before starting the next one. At least right now,
// copying concurrently will introduce contention over the metadata.
.concatMap(request -> Mono
.fromCompletionStage(copyMediaImpl(backupUser, request))
.map(result -> new CopyMediaBatchResponse.Entry(200, null, result.cdn(), result.key()))
.onErrorResume(throwable -> ExceptionUtils.unwrap(throwable) instanceof IOException, throwable -> {
final Throwable unwrapped = ExceptionUtils.unwrap(throwable);
int status;
String error;
if (unwrapped instanceof SourceObjectNotFoundException) {
status = 410;
error = "Source object not found " + unwrapped.getMessage();
} else if (unwrapped instanceof InvalidLengthException) {
status = 400;
error = "Invalid length " + unwrapped.getMessage();
} else {
throw ExceptionUtils.wrap(throwable);
}
return Mono.just(new CopyMediaBatchResponse.Entry(status, error, null, request.mediaId));
}))
.collectList()
.map(list -> Response.status(207).entity(new CopyMediaBatchResponse(list)).build())
.toFuture();
}
@POST
@Produces(MediaType.APPLICATION_JSON)
@ -345,7 +591,7 @@ public class ArchiveController {
@ApiResponse(responseCode = "204", description = "The backup was successfully refreshed")
@ApiResponse(responseCode = "429", description = "Rate limited.")
@ApiResponseZkAuth
public CompletionStage<Void> refresh(
public CompletionStage<Response> refresh(
@Auth final Optional<AuthenticatedAccount> account,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@ -360,6 +606,7 @@ public class ArchiveController {
}
return backupManager
.authenticateBackupUser(presentation.presentation, signature.signature)
.thenCompose(backupManager::ttlRefresh);
.thenCompose(backupManager::ttlRefresh)
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
}
}

View File

@ -1,13 +1,14 @@
package org.whispersystems.textsecuregcm.util;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
public final class ExceptionUtils {
private ExceptionUtils() {
// utility class
}
/**
* Extracts the cause of a {@link CompletionException}. If the given {@code throwable} is a
* {@code CompletionException}, this method will recursively iterate through its causal chain until it finds the first
@ -16,7 +17,6 @@ public final class ExceptionUtils {
* {@code throwable} is not a {@code CompletionException}, then this method returns the original {@code throwable}.
*
* @param throwable the throwable to "unwrap"
*
* @return the first entity in the given {@code throwable}'s causal chain that is not a {@code CompletionException}
*/
public static Throwable unwrap(Throwable throwable) {
@ -27,8 +27,8 @@ public final class ExceptionUtils {
}
/**
* Wraps the given {@code throwable} in a {@link CompletionException} unless the given {@code throwable} is already
* a {@code CompletionException}, in which case this method returns the original throwable.
* Wraps the given {@code throwable} in a {@link CompletionException} unless the given {@code throwable} is already a
* {@code CompletionException}, in which case this method returns the original throwable.
*
* @param throwable the throwable to wrap in a {@code CompletionException}
*/
@ -37,4 +37,29 @@ public final class ExceptionUtils {
? completionException
: new CompletionException(throwable);
}
/**
* Create a handler suitable for use with {@link java.util.concurrent.CompletionStage#exceptionally} that only handles
* a specific exception subclass.
*
* @param exceptionType The class of exception that will be handled
* @param fn A function that handles exceptions of type exceptionType
* @param <T> The type of the stage that will be mapped
* @param <E> The type of the exception that will be handled
* @return A function suitable for use with {@link java.util.concurrent.CompletionStage#exceptionally}
*/
public static <T, E extends Throwable> Function<Throwable, ? extends T> exceptionallyHandler(
final Class<E> exceptionType,
final Function<E, ? extends T> fn) {
return anyException -> {
if (exceptionType.isInstance(anyException)) {
return fn.apply(exceptionType.cast(anyException));
}
final Throwable unwrap = unwrap(anyException);
if (exceptionType.isInstance(unwrap)) {
return fn.apply(exceptionType.cast(unwrap));
}
throw wrap(anyException);
};
}
}

View File

@ -8,27 +8,31 @@ package org.whispersystems.textsecuregcm.backup;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.commons.lang3.RandomUtils;
import org.assertj.core.api.ThrowableAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -36,28 +40,28 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.signal.libsignal.protocol.ecc.Curve;
import org.signal.libsignal.protocol.ecc.ECKeyPair;
import org.signal.libsignal.zkgroup.InvalidInputException;
import org.signal.libsignal.zkgroup.VerificationFailedException;
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation;
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.backup.BackupManager.BackupInfo;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtension;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.CompletableFutureTestUtil;
import org.whispersystems.textsecuregcm.util.TestClock;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
public class BackupManagerTest {
@RegisterExtension
private static final DynamoDbExtension DYNAMO_DB_EXTENSION = new DynamoDbExtension(
DynamoDbExtensionSchema.Tables.BACKUPS);
public static final DynamoDbExtension DYNAMO_DB_EXTENSION = new DynamoDbExtension(
DynamoDbExtensionSchema.Tables.BACKUPS,
DynamoDbExtensionSchema.Tables.BACKUP_MEDIA);
private final TestClock testClock = TestClock.now();
private final BackupAuthTestUtil backupAuthTestUtil = new BackupAuthTestUtil(testClock);
private final TusBackupCredentialGenerator tusCredentialGenerator = mock(TusBackupCredentialGenerator.class);
private final Cdn3BackupCredentialGenerator tusCredentialGenerator = mock(Cdn3BackupCredentialGenerator.class);
private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
private final byte[] backupKey = RandomUtils.nextBytes(32);
private final UUID aci = UUID.randomUUID();
@ -68,16 +72,19 @@ public class BackupManagerTest {
reset(tusCredentialGenerator);
testClock.unpin();
this.backupManager = new BackupManager(
new BackupsDb(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(),
DynamoDbExtensionSchema.Tables.BACKUPS.tableName(), DynamoDbExtensionSchema.Tables.BACKUP_MEDIA.tableName(),
testClock),
backupAuthTestUtil.params,
tusCredentialGenerator,
DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(),
DynamoDbExtensionSchema.Tables.BACKUPS.tableName(),
remoteStorageManager,
Map.of(3, "cdn3.example.org/attachments"),
testClock);
}
@ParameterizedTest
@EnumSource(mode = EnumSource.Mode.EXCLUDE, names = {"NONE"})
public void createBackup(final BackupTier backupTier) throws InvalidInputException, VerificationFailedException {
public void createBackup(final BackupTier backupTier) {
final Instant now = Instant.ofEpochSecond(Duration.ofDays(1).getSeconds());
testClock.pin(now);
@ -89,18 +96,18 @@ public class BackupManagerTest {
verify(tusCredentialGenerator, times(1))
.generateUpload(encodedBackupId, BackupManager.MESSAGE_BACKUP_NAME);
final BackupInfo info = backupManager.backupInfo(backupUser).join();
final BackupManager.BackupInfo info = backupManager.backupInfo(backupUser).join();
assertThat(info.backupSubdir()).isEqualTo(encodedBackupId);
assertThat(info.messageBackupKey()).isEqualTo(BackupManager.MESSAGE_BACKUP_NAME);
assertThat(info.mediaUsedSpace()).isEqualTo(Optional.empty());
// Check that the initial expiration times are the initial write times
checkExpectedExpirations(now, backupTier == BackupTier.MEDIA ? now : null, backupUser.backupId());
checkExpectedExpirations(now, backupTier == BackupTier.MEDIA ? now : null, backupUser);
}
@ParameterizedTest
@EnumSource(mode = EnumSource.Mode.EXCLUDE, names = {"NONE"})
public void ttlRefresh(final BackupTier backupTier) throws InvalidInputException, VerificationFailedException {
public void ttlRefresh(final BackupTier backupTier) {
final AuthenticatedBackupUser backupUser = backupUser(RandomUtils.nextBytes(16), backupTier);
final Instant tstart = Instant.ofEpochSecond(1).plus(Duration.ofDays(1));
@ -117,12 +124,12 @@ public class BackupManagerTest {
checkExpectedExpirations(
tnext,
backupTier == BackupTier.MEDIA ? tnext : null,
backupUser.backupId());
backupUser);
}
@ParameterizedTest
@EnumSource(mode = EnumSource.Mode.EXCLUDE, names = {"NONE"})
public void createBackupRefreshesTtl(final BackupTier backupTier) throws VerificationFailedException {
public void createBackupRefreshesTtl(final BackupTier backupTier) {
final Instant tstart = Instant.ofEpochSecond(1).plus(Duration.ofDays(1));
final Instant tnext = tstart.plus(Duration.ofSeconds(1));
@ -139,7 +146,7 @@ public class BackupManagerTest {
checkExpectedExpirations(
tnext,
backupTier == BackupTier.MEDIA ? tnext : null,
backupUser.backupId());
backupUser);
}
@Test
@ -151,9 +158,10 @@ public class BackupManagerTest {
final byte[] signature = keyPair.getPrivateKey().calculateSignature(presentation.serialize());
// haven't set a public key yet
assertThatExceptionOfType(StatusRuntimeException.class)
.isThrownBy(unwrapExceptions(() -> backupManager.authenticateBackupUser(presentation, signature)))
.extracting(ex -> ex.getStatus().getCode())
assertThat(CompletableFutureTestUtil.assertFailsWithCause(
StatusRuntimeException.class,
backupManager.authenticateBackupUser(presentation, signature))
.getStatus().getCode())
.isEqualTo(Status.NOT_FOUND.getCode());
}
@ -170,9 +178,10 @@ public class BackupManagerTest {
backupManager.setPublicKey(presentation, signature1, keyPair1.getPublicKey()).join();
// shouldn't be able to set a different public key
assertThatExceptionOfType(StatusRuntimeException.class)
.isThrownBy(unwrapExceptions(() -> backupManager.setPublicKey(presentation, signature2, keyPair2.getPublicKey())))
.extracting(ex -> ex.getStatus().getCode())
assertThat(CompletableFutureTestUtil.assertFailsWithCause(
StatusRuntimeException.class,
backupManager.setPublicKey(presentation, signature2, keyPair2.getPublicKey()))
.getStatus().getCode())
.isEqualTo(Status.UNAUTHENTICATED.getCode());
// should be able to set the same public key again (noop)
@ -193,16 +202,17 @@ public class BackupManagerTest {
// shouldn't be able to set a public key with an invalid signature
assertThatExceptionOfType(StatusRuntimeException.class)
.isThrownBy(unwrapExceptions(() -> backupManager.setPublicKey(presentation, wrongSignature, keyPair.getPublicKey())))
.isThrownBy(() -> backupManager.setPublicKey(presentation, wrongSignature, keyPair.getPublicKey()))
.extracting(ex -> ex.getStatus().getCode())
.isEqualTo(Status.UNAUTHENTICATED.getCode());
backupManager.setPublicKey(presentation, signature, keyPair.getPublicKey()).join();
// shouldn't be able to authenticate with an invalid signature
assertThatExceptionOfType(StatusRuntimeException.class)
.isThrownBy(unwrapExceptions(() -> backupManager.authenticateBackupUser(presentation, wrongSignature)))
.extracting(ex -> ex.getStatus().getCode())
assertThat(CompletableFutureTestUtil.assertFailsWithCause(
StatusRuntimeException.class,
backupManager.authenticateBackupUser(presentation, wrongSignature))
.getStatus().getCode())
.isEqualTo(Status.UNAUTHENTICATED.getCode());
// correct signature
@ -212,11 +222,12 @@ public class BackupManagerTest {
}
@Test
public void credentialExpiration() throws InvalidInputException, VerificationFailedException {
public void credentialExpiration() throws VerificationFailedException {
// credential for 1 day after epoch
testClock.pin(Instant.ofEpochSecond(1).plus(Duration.ofDays(1)));
final BackupAuthCredentialPresentation oldCredential = backupAuthTestUtil.getPresentation(BackupTier.MESSAGES, backupKey, aci);
final BackupAuthCredentialPresentation oldCredential = backupAuthTestUtil.getPresentation(BackupTier.MESSAGES,
backupKey, aci);
final ECKeyPair keyPair = Curve.generateKeyPair();
final byte[] signature = keyPair.getPrivateKey().calculateSignature(oldCredential.serialize());
backupManager.setPublicKey(oldCredential, signature, keyPair.getPublicKey()).join();
@ -231,28 +242,95 @@ public class BackupManagerTest {
// should be rejected the day after that
testClock.pin(Instant.ofEpochSecond(1).plus(Duration.ofDays(3)));
assertThatExceptionOfType(StatusRuntimeException.class)
.isThrownBy(unwrapExceptions(() -> backupManager.authenticateBackupUser(oldCredential, signature)))
.extracting(ex -> ex.getStatus().getCode())
assertThat(CompletableFutureTestUtil.assertFailsWithCause(
StatusRuntimeException.class,
backupManager.authenticateBackupUser(oldCredential, signature))
.getStatus().getCode())
.isEqualTo(Status.UNAUTHENTICATED.getCode());
}
@Test
public void copySuccess() {
final AuthenticatedBackupUser backupUser = backupUser(RandomUtils.nextBytes(16), BackupTier.MEDIA);
when(tusCredentialGenerator.generateUpload(any(), any()))
.thenReturn(new MessageBackupUploadDescriptor(3, "def", Collections.emptyMap(), ""));
when(remoteStorageManager.copy(eq(URI.create("cdn3.example.org/attachments/abc")), eq(100), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
final BackupManager.StorageDescriptor copied = backupManager.copyToBackup(
backupUser, 3, "abc", 100, mock(MediaEncryptionParameters.class),
"def".getBytes(StandardCharsets.UTF_8)).join();
assertThat(copied.cdn()).isEqualTo(3);
assertThat(copied.key()).isEqualTo("def".getBytes(StandardCharsets.UTF_8));
final Map<String, AttributeValue> backup = getBackupItem(backupUser);
final long bytesUsed = AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_BYTES_USED, 0L);
assertThat(bytesUsed).isEqualTo(100);
final long mediaCount = AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_COUNT, 0L);
assertThat(mediaCount).isEqualTo(1);
final Map<String, AttributeValue> mediaItem = getBackupMediaItem(backupUser,
"def".getBytes(StandardCharsets.UTF_8));
final long mediaLength = AttributeValues.getLong(mediaItem, BackupsDb.ATTR_LENGTH, 0L);
assertThat(mediaLength).isEqualTo(100L);
}
@Test
public void copyFailure() {
final AuthenticatedBackupUser backupUser = backupUser(RandomUtils.nextBytes(16), BackupTier.MEDIA);
when(tusCredentialGenerator.generateUpload(any(), any()))
.thenReturn(new MessageBackupUploadDescriptor(3, "def", Collections.emptyMap(), ""));
when(remoteStorageManager.copy(eq(URI.create("cdn3.example.org/attachments/abc")), eq(100), any(), any()))
.thenReturn(CompletableFuture.failedFuture(new SourceObjectNotFoundException()));
CompletableFutureTestUtil.assertFailsWithCause(SourceObjectNotFoundException.class,
backupManager.copyToBackup(
backupUser,
3, "abc", 100,
mock(MediaEncryptionParameters.class),
"def".getBytes(StandardCharsets.UTF_8)));
final Map<String, AttributeValue> backup = getBackupItem(backupUser);
assertThat(AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_BYTES_USED, -1L)).isEqualTo(0L);
assertThat(AttributeValues.getLong(backup, BackupsDb.ATTR_MEDIA_COUNT, -1L)).isEqualTo(0L);
final Map<String, AttributeValue> media = getBackupMediaItem(backupUser, "def".getBytes(StandardCharsets.UTF_8));
assertThat(media).isEmpty();
}
private Map<String, AttributeValue> getBackupItem(final AuthenticatedBackupUser backupUser) {
return DYNAMO_DB_EXTENSION.getDynamoDbClient().getItem(GetItemRequest.builder()
.tableName(DynamoDbExtensionSchema.Tables.BACKUPS.tableName())
.key(Map.of(BackupsDb.KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupUser.backupId()))))
.build())
.item();
}
private Map<String, AttributeValue> getBackupMediaItem(final AuthenticatedBackupUser backupUser,
final byte[] mediaId) {
return DYNAMO_DB_EXTENSION.getDynamoDbClient().getItem(GetItemRequest.builder()
.tableName(DynamoDbExtensionSchema.Tables.BACKUP_MEDIA.tableName())
.key(Map.of(
BackupsDb.KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupUser.backupId())),
BackupsDb.KEY_MEDIA_ID, AttributeValues.b(mediaId)))
.build())
.item();
}
private void checkExpectedExpirations(
final Instant expectedExpiration,
final @Nullable Instant expectedMediaExpiration,
final byte[] backupId) {
final GetItemResponse item = DYNAMO_DB_EXTENSION.getDynamoDbClient().getItem(GetItemRequest.builder()
.tableName(DynamoDbExtensionSchema.Tables.BACKUPS.tableName())
.key(Map.of(BackupManager.KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupId))))
.build());
assertThat(item.hasItem()).isTrue();
final Instant refresh = Instant.ofEpochSecond(Long.parseLong(item.item().get(BackupManager.ATTR_LAST_REFRESH).n()));
final AuthenticatedBackupUser backupUser) {
final Map<String, AttributeValue> item = getBackupItem(backupUser);
final Instant refresh = Instant.ofEpochSecond(Long.parseLong(item.get(BackupsDb.ATTR_LAST_REFRESH).n()));
assertThat(refresh).isEqualTo(expectedExpiration);
if (expectedMediaExpiration == null) {
assertThat(item.item()).doesNotContainKey(BackupManager.ATTR_LAST_MEDIA_REFRESH);
assertThat(item).doesNotContainKey(BackupsDb.ATTR_LAST_MEDIA_REFRESH);
} else {
assertThat(Instant.ofEpochSecond(Long.parseLong(item.item().get(BackupManager.ATTR_LAST_MEDIA_REFRESH).n())))
assertThat(Instant.ofEpochSecond(Long.parseLong(item.get(BackupsDb.ATTR_LAST_MEDIA_REFRESH).n())))
.isEqualTo(expectedMediaExpiration);
}
}
@ -268,17 +346,4 @@ public class BackupManagerTest {
private AuthenticatedBackupUser backupUser(final byte[] backupId, final BackupTier backupTier) {
return new AuthenticatedBackupUser(backupId, backupTier);
}
private <T> ThrowableAssert.ThrowingCallable unwrapExceptions(final Supplier<CompletableFuture<T>> f) {
return () -> {
try {
f.get().join();
} catch (Exception e) {
if (ExceptionUtils.unwrap(e) instanceof StatusRuntimeException ex) {
throw ex;
}
throw e;
}
};
}
}

View File

@ -0,0 +1,93 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.backup;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtension;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema;
import org.whispersystems.textsecuregcm.util.CompletableFutureTestUtil;
import org.whispersystems.textsecuregcm.util.TestClock;
public class BackupsDbTest {
@RegisterExtension
public static final DynamoDbExtension DYNAMO_DB_EXTENSION = new DynamoDbExtension(
DynamoDbExtensionSchema.Tables.BACKUPS,
DynamoDbExtensionSchema.Tables.BACKUP_MEDIA);
private final TestClock testClock = TestClock.now();
private BackupsDb backupsDb;
@BeforeEach
public void setup() {
testClock.unpin();
backupsDb = new BackupsDb(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(),
DynamoDbExtensionSchema.Tables.BACKUPS.tableName(), DynamoDbExtensionSchema.Tables.BACKUP_MEDIA.tableName(),
testClock);
}
@Test
public void trackMediaIdempotent() {
final AuthenticatedBackupUser backupUser = backupUser(RandomUtils.nextBytes(16), BackupTier.MEDIA);
this.backupsDb.trackMedia(backupUser, "abc".getBytes(StandardCharsets.UTF_8), 100).join();
assertDoesNotThrow(() ->
this.backupsDb.trackMedia(backupUser, "abc".getBytes(StandardCharsets.UTF_8), 100).join());
}
@Test
public void trackMediaLengthChange() {
final AuthenticatedBackupUser backupUser = backupUser(RandomUtils.nextBytes(16), BackupTier.MEDIA);
this.backupsDb.trackMedia(backupUser, "abc".getBytes(StandardCharsets.UTF_8), 100).join();
CompletableFutureTestUtil.assertFailsWithCause(InvalidLengthException.class,
this.backupsDb.trackMedia(backupUser, "abc".getBytes(StandardCharsets.UTF_8), 99));
}
@Test
public void trackMediaStats() {
final AuthenticatedBackupUser backupUser = backupUser(RandomUtils.nextBytes(16), BackupTier.MEDIA);
// add at least one message backup so we can describe it
backupsDb.addMessageBackup(backupUser).join();
int total = 0;
for (int i = 0; i < 5; i++) {
this.backupsDb.trackMedia(backupUser, Integer.toString(i).getBytes(StandardCharsets.UTF_8), i).join();
total += i;
final BackupsDb.BackupDescription description = this.backupsDb.describeBackup(backupUser).join();
assertThat(description.mediaUsedSpace().get()).isEqualTo(total);
}
for (int i = 0; i < 5; i++) {
this.backupsDb.untrackMedia(backupUser, Integer.toString(i).getBytes(StandardCharsets.UTF_8), i).join();
total -= i;
final BackupsDb.BackupDescription description = this.backupsDb.describeBackup(backupUser).join();
assertThat(description.mediaUsedSpace().get()).isEqualTo(total);
}
}
private static byte[] hashedBackupId(final byte[] backupId) {
try {
return Arrays.copyOf(MessageDigest.getInstance("SHA-256").digest(backupId), 16);
} catch (NoSuchAlgorithmException e) {
throw new AssertionError(e);
}
}
private AuthenticatedBackupUser backupUser(final byte[] backupId, final BackupTier backupTier) {
return new AuthenticatedBackupUser(backupId, backupTier);
}
}

View File

@ -16,10 +16,10 @@ import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
public class TusBackupCredentialGeneratorTest {
public class Cdn3BackupCredentialGeneratorTest {
@Test
public void uploadGenerator() {
TusBackupCredentialGenerator generator = new TusBackupCredentialGenerator(new TusConfiguration(
Cdn3BackupCredentialGenerator generator = new Cdn3BackupCredentialGenerator(new TusConfiguration(
new SecretBytes(RandomUtils.nextBytes(32)),
"https://example.org/upload"));
@ -33,7 +33,7 @@ public class TusBackupCredentialGeneratorTest {
@Test
public void readCredential() {
TusBackupCredentialGenerator generator = new TusBackupCredentialGenerator(new TusConfiguration(
Cdn3BackupCredentialGenerator generator = new Cdn3BackupCredentialGenerator(new TusConfiguration(
new SecretBytes(RandomUtils.nextBytes(32)),
"https://example.org/upload"));

View File

@ -0,0 +1,185 @@
package org.whispersystems.textsecuregcm.backup;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
import io.dropwizard.testing.junit5.DropwizardExtensionsSupport;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.Mac;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.util.CompletableFutureTestUtil;
@ExtendWith(DropwizardExtensionsSupport.class)
public class Cdn3RemoteStorageManagerTest {
private static byte[] HMAC_KEY = getRandomBytes(32);
private static byte[] AES_KEY = getRandomBytes(32);
private static byte[] IV = getRandomBytes(16);
@RegisterExtension
private final WireMockExtension wireMock = WireMockExtension.newInstance()
.options(wireMockConfig().dynamicPort())
.build();
private static String SMALL_CDN2 = "a small object from cdn2";
private static String SMALL_CDN3 = "a small object from cdn3";
private static String LARGE = "a".repeat(1024 * 1024 * 5);
private RemoteStorageManager remoteStorageManager;
@BeforeEach
public void init() throws CertificateException {
remoteStorageManager = new Cdn3RemoteStorageManager(
Executors.newSingleThreadScheduledExecutor(),
new CircuitBreakerConfiguration(),
new RetryConfiguration(),
Collections.emptyList());
wireMock.stubFor(get(urlEqualTo("/cdn2/source/small"))
.willReturn(aResponse()
.withHeader("Content-Length", Integer.toString(SMALL_CDN2.length()))
.withBody(SMALL_CDN2)));
wireMock.stubFor(get(urlEqualTo("/cdn3/source/small"))
.willReturn(aResponse()
.withHeader("Content-Length", Integer.toString(SMALL_CDN3.length()))
.withBody(SMALL_CDN3)));
wireMock.stubFor(get(urlEqualTo("/cdn3/source/large"))
.willReturn(aResponse()
.withHeader("Content-Length", Integer.toString(LARGE.length()))
.withBody(LARGE)));
wireMock.stubFor(get(urlEqualTo("/cdn3/source/missing"))
.willReturn(aResponse().withStatus(404)));
}
@ParameterizedTest
@ValueSource(ints = {2, 3})
public void copySmall(final int sourceCdn)
throws InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException {
final String expectedSource = switch (sourceCdn) {
case 2 -> SMALL_CDN2;
case 3 -> SMALL_CDN3;
default -> throw new AssertionError();
};
wireMock.stubFor(post(urlEqualTo("/cdn3/dest"))
.willReturn(aResponse()
.withStatus(201)));
remoteStorageManager.copy(
URI.create(wireMock.url("/cdn" + sourceCdn + "/source/small")),
expectedSource.length(),
new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV),
new MessageBackupUploadDescriptor(3, "test", Collections.emptyMap(), wireMock.url("/cdn3/dest")))
.toCompletableFuture().join();
final byte[] destBody = wireMock.findAll(postRequestedFor(urlEqualTo("/cdn3/dest"))).get(0).getBody();
assertThat(new String(decrypt(destBody), StandardCharsets.UTF_8))
.isEqualTo(expectedSource);
}
@Test
public void copyLarge()
throws InvalidAlgorithmParameterException, IllegalBlockSizeException, BadPaddingException, InvalidKeyException {
wireMock.stubFor(post(urlEqualTo("/cdn3/dest"))
.willReturn(aResponse()
.withStatus(201)));
final MediaEncryptionParameters params = new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV);
remoteStorageManager.copy(
URI.create(wireMock.url("/cdn3/source/large")),
LARGE.length(),
params,
new MessageBackupUploadDescriptor(3, "test", Collections.emptyMap(), wireMock.url("/cdn3/dest")))
.toCompletableFuture().join();
final byte[] destBody = wireMock.findAll(postRequestedFor(urlEqualTo("/cdn3/dest"))).get(0).getBody();
assertThat(destBody.length).isEqualTo(new BackupMediaEncrypter(params).outputSize(LARGE.length()));
assertThat(new String(decrypt(destBody), StandardCharsets.UTF_8)).isEqualTo(LARGE);
}
@Test
public void incorrectLength() {
CompletableFutureTestUtil.assertFailsWithCause(InvalidLengthException.class,
remoteStorageManager.copy(
URI.create(wireMock.url("/cdn3/source/small")),
SMALL_CDN3.length() - 1,
new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV),
new MessageBackupUploadDescriptor(3, "test", Collections.emptyMap(), wireMock.url("/cdn3/dest")))
.toCompletableFuture());
}
@Test
public void sourceMissing() {
CompletableFutureTestUtil.assertFailsWithCause(SourceObjectNotFoundException.class,
remoteStorageManager.copy(
URI.create(wireMock.url("/cdn3/source/missing")),
1,
new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV),
new MessageBackupUploadDescriptor(3, "test", Collections.emptyMap(), wireMock.url("/cdn3/dest")))
.toCompletableFuture());
}
private byte[] decrypt(final byte[] encrypted)
throws InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException {
final Mac mac;
try {
mac = Mac.getInstance("HmacSHA256");
} catch (NoSuchAlgorithmException e) {
throw new AssertionError(e);
}
mac.init(new SecretKeySpec(HMAC_KEY, "HmacSHA256"));
mac.update(encrypted, 0, encrypted.length - mac.getMacLength());
assertArrayEquals(mac.doFinal(),
Arrays.copyOfRange(encrypted, encrypted.length - mac.getMacLength(), encrypted.length));
assertArrayEquals(IV, Arrays.copyOf(encrypted, 16));
final Cipher cipher;
try {
cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
} catch (NoSuchAlgorithmException | NoSuchPaddingException e) {
throw new AssertionError(e);
}
cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(AES_KEY, "AES"), new IvParameterSpec(IV));
return cipher.doFinal(encrypted, IV.length, encrypted.length - IV.length - mac.getMacLength());
}
private static byte[] getRandomBytes(int length) {
byte[] result = new byte[length];
ThreadLocalRandom.current().nextBytes(result);
return result;
}
}

View File

@ -7,6 +7,8 @@ package org.whispersystems.textsecuregcm.controllers;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
@ -22,11 +24,13 @@ import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
@ -54,6 +58,8 @@ import org.whispersystems.textsecuregcm.backup.BackupAuthManager;
import org.whispersystems.textsecuregcm.backup.BackupAuthTestUtil;
import org.whispersystems.textsecuregcm.backup.BackupManager;
import org.whispersystems.textsecuregcm.backup.BackupTier;
import org.whispersystems.textsecuregcm.backup.InvalidLengthException;
import org.whispersystems.textsecuregcm.backup.SourceObjectNotFoundException;
import org.whispersystems.textsecuregcm.mappers.CompletionExceptionMapper;
import org.whispersystems.textsecuregcm.mappers.GrpcStatusRuntimeExceptionMapper;
import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper;
@ -96,6 +102,22 @@ public class ArchiveControllerTest {
GET, v1/archives/upload/form,
POST, v1/archives/,
PUT, v1/archives/keys, '{"backupIdPublicKey": "aaaaa"}'
PUT, v1/archives/media, '{
"sourceAttachment": {"cdn": 3, "key": "abc"},
"objectLength": 10,
"mediaId": "aaaaaaaaaaaaaaaaaaaa",
"hmacKey": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"encryptionKey": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"iv": "aaaaaaaaaaaaaaaaaaaaaa"
}'
PUT, v1/archives/media/batch, '{"items": [{
"sourceAttachment": {"cdn": 3, "key": "abc"},
"objectLength": 10,
"mediaId": "aaaaaaaaaaaaaaaaaaaa",
"hmacKey": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"encryptionKey": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"iv": "aaaaaaaaaaaaaaaaaaaaaa"
}]}'
""")
public void anonymousAuthOnly(final String method, final String path, final String body)
throws VerificationFailedException {
@ -269,4 +291,139 @@ public class ArchiveControllerTest {
assertThat(response.cdn()).isEqualTo(1);
assertThat(response.usedSpace()).isNull();
}
@Test
public void putMediaBatchSuccess() throws VerificationFailedException {
final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation(
BackupTier.MEDIA, backupKey, aci);
when(backupManager.authenticateBackupUser(any(), any()))
.thenReturn(CompletableFuture.completedFuture(
new AuthenticatedBackupUser(presentation.getBackupId(), BackupTier.MEDIA)));
when(backupManager.canStoreMedia(any(), anyLong())).thenReturn(CompletableFuture.completedFuture(true));
when(backupManager.copyToBackup(any(), anyInt(), any(), anyInt(), any(), any()))
.thenAnswer(invocation -> {
byte[] mediaId = invocation.getArgument(5, byte[].class);
return CompletableFuture.completedFuture(new BackupManager.StorageDescriptor(1, mediaId));
});
final byte[][] mediaIds = new byte[][]{RandomUtils.nextBytes(15), RandomUtils.nextBytes(15)};
final Response r = resources.getJerseyTest()
.target("v1/archives/media/batch")
.request()
.header("X-Signal-ZK-Auth", Base64.getEncoder().encodeToString(presentation.serialize()))
.header("X-Signal-ZK-Auth-Signature", "aaa")
.put(Entity.json(new ArchiveController.CopyMediaBatchRequest(List.of(
new ArchiveController.CopyMediaRequest(
new ArchiveController.RemoteAttachment(3, "abc"),
100,
mediaIds[0],
RandomUtils.nextBytes(32),
RandomUtils.nextBytes(32),
RandomUtils.nextBytes(16)),
new ArchiveController.CopyMediaRequest(
new ArchiveController.RemoteAttachment(3, "def"),
200,
mediaIds[1],
RandomUtils.nextBytes(32),
RandomUtils.nextBytes(32),
RandomUtils.nextBytes(16))
))));
assertThat(r.getStatus()).isEqualTo(207);
final ArchiveController.CopyMediaBatchResponse copyResponse = r.readEntity(
ArchiveController.CopyMediaBatchResponse.class);
assertThat(copyResponse.responses()).hasSize(2);
for (int i = 0; i < 2; i++) {
final ArchiveController.CopyMediaBatchResponse.Entry response = copyResponse.responses().get(i);
assertThat(response.cdn()).isEqualTo(1);
assertThat(response.mediaId()).isEqualTo(mediaIds[i]);
assertThat(response.status()).isEqualTo(200);
}
}
@Test
public void putMediaBatchPartialFailure() throws VerificationFailedException {
final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation(
BackupTier.MEDIA, backupKey, aci);
when(backupManager.authenticateBackupUser(any(), any()))
.thenReturn(CompletableFuture.completedFuture(
new AuthenticatedBackupUser(presentation.getBackupId(), BackupTier.MEDIA)));
final byte[][] mediaIds = IntStream.range(0, 3).mapToObj(i -> RandomUtils.nextBytes(15)).toArray(byte[][]::new);
when(backupManager.canStoreMedia(any(), anyLong())).thenReturn(CompletableFuture.completedFuture(true));
when(backupManager.copyToBackup(any(), anyInt(), any(), anyInt(), any(), eq(mediaIds[0])))
.thenReturn(CompletableFuture.completedFuture(new BackupManager.StorageDescriptor(1, mediaIds[0])));
when(backupManager.copyToBackup(any(), anyInt(), any(), anyInt(), any(), eq(mediaIds[1])))
.thenReturn(CompletableFuture.failedFuture(new SourceObjectNotFoundException()));
when(backupManager.copyToBackup(any(), anyInt(), any(), anyInt(), any(), eq(mediaIds[2])))
.thenReturn(CompletableFuture.failedFuture(new InvalidLengthException("bad length")));
final List<ArchiveController.CopyMediaRequest> copyRequests = Arrays.stream(mediaIds)
.map(mediaId -> new ArchiveController.CopyMediaRequest(
new ArchiveController.RemoteAttachment(3, "abc"),
100,
mediaId,
RandomUtils.nextBytes(32),
RandomUtils.nextBytes(32),
RandomUtils.nextBytes(16))
).toList();
Response r = resources.getJerseyTest()
.target("v1/archives/media/batch")
.request()
.header("X-Signal-ZK-Auth", Base64.getEncoder().encodeToString(presentation.serialize()))
.header("X-Signal-ZK-Auth-Signature", "aaa")
.put(Entity.json(new ArchiveController.CopyMediaBatchRequest(copyRequests)));
assertThat(r.getStatus()).isEqualTo(207);
final ArchiveController.CopyMediaBatchResponse copyResponse = r.readEntity(
ArchiveController.CopyMediaBatchResponse.class);
assertThat(copyResponse.responses()).hasSize(3);
final ArchiveController.CopyMediaBatchResponse.Entry r1 = copyResponse.responses().get(0);
assertThat(r1.cdn()).isEqualTo(1);
assertThat(r1.mediaId()).isEqualTo(mediaIds[0]);
assertThat(r1.status()).isEqualTo(200);
final ArchiveController.CopyMediaBatchResponse.Entry r2 = copyResponse.responses().get(1);
assertThat(r2.mediaId()).isEqualTo(mediaIds[1]);
assertThat(r2.status()).isEqualTo(410);
assertThat(r2.failureReason()).isNotBlank();
final ArchiveController.CopyMediaBatchResponse.Entry r3 = copyResponse.responses().get(2);
assertThat(r3.mediaId()).isEqualTo(mediaIds[2]);
assertThat(r3.status()).isEqualTo(400);
assertThat(r3.failureReason()).isNotBlank();
}
@Test
public void putMediaBatchOutOfSpace() throws VerificationFailedException {
final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation(
BackupTier.MEDIA, backupKey, aci);
when(backupManager.authenticateBackupUser(any(), any()))
.thenReturn(CompletableFuture.completedFuture(
new AuthenticatedBackupUser(presentation.getBackupId(), BackupTier.MEDIA)));
when(backupManager.canStoreMedia(any(), eq(1L + 2L + 3L)))
.thenReturn(CompletableFuture.completedFuture(false));
final Response response = resources.getJerseyTest()
.target("v1/archives/media/batch")
.request()
.header("X-Signal-ZK-Auth", Base64.getEncoder().encodeToString(presentation.serialize()))
.header("X-Signal-ZK-Auth-Signature", "aaa")
.put(Entity.json(new ArchiveController.CopyMediaBatchRequest(IntStream.range(0, 3)
.mapToObj(i -> new ArchiveController.CopyMediaRequest(
new ArchiveController.RemoteAttachment(3, "abc"),
i + 1,
RandomUtils.nextBytes(15),
RandomUtils.nextBytes(32),
RandomUtils.nextBytes(32),
RandomUtils.nextBytes(16))
).toList())));
assertThat(response.getStatus()).isEqualTo(413);
}
}

View File

@ -7,7 +7,7 @@ package org.whispersystems.textsecuregcm.storage;
import java.util.Collections;
import java.util.List;
import org.whispersystems.textsecuregcm.backup.BackupManager;
import org.whispersystems.textsecuregcm.backup.BackupsDb;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
@ -50,13 +50,25 @@ public final class DynamoDbExtensionSchema {
List.of()),
BACKUPS("backups_test",
BackupManager.KEY_BACKUP_ID_HASH,
BackupsDb.KEY_BACKUP_ID_HASH,
null,
List.of(AttributeDefinition.builder()
.attributeName(BackupManager.KEY_BACKUP_ID_HASH)
.attributeName(BackupsDb.KEY_BACKUP_ID_HASH)
.attributeType(ScalarAttributeType.B).build()),
Collections.emptyList(), Collections.emptyList()),
BACKUP_MEDIA("backups_media_test",
BackupsDb.KEY_BACKUP_ID_HASH,
BackupsDb.KEY_MEDIA_ID,
List.of(
AttributeDefinition.builder()
.attributeName(BackupsDb.KEY_BACKUP_ID_HASH)
.attributeType(ScalarAttributeType.B).build(),
AttributeDefinition.builder()
.attributeName(BackupsDb.KEY_MEDIA_ID)
.attributeType(ScalarAttributeType.B).build()),
Collections.emptyList(), Collections.emptyList()),
CLIENT_RELEASES("client_releases_test",
ClientReleases.ATTR_PLATFORM,
ClientReleases.ATTR_VERSION,

View File

@ -17,13 +17,16 @@ public class CompletableFutureTestUtil {
private CompletableFutureTestUtil() {
}
public static <T extends Throwable> void assertFailsWithCause(final Class<T> expectedCause, final CompletableFuture<?> completableFuture) {
assertFailsWithCause(expectedCause, completableFuture, null);
public static <T extends Throwable> T assertFailsWithCause(final Class<T> expectedCause, final CompletableFuture<?> completableFuture) {
return assertFailsWithCause(expectedCause, completableFuture, null);
}
public static <T extends Throwable> void assertFailsWithCause(final Class<T> expectedCause, final CompletableFuture<?> completableFuture, final String message) {
public static <T extends Throwable> T assertFailsWithCause(final Class<T> expectedCause, final CompletableFuture<?> completableFuture, final String message) {
final CompletionException completionException = assertThrows(CompletionException.class, completableFuture::join, message);
assertTrue(ExceptionUtils.unwrap(completionException).getClass().isAssignableFrom(expectedCause), message);
final Throwable unwrapped = ExceptionUtils.unwrap(completionException);
final String compError = "Expected failure " + expectedCause + " was " + unwrapped.getClass();
assertTrue(unwrapped.getClass().isAssignableFrom(expectedCause), message == null ? compError : message + " : " + compError);
return expectedCause.cast(unwrapped);
}
public static <T> CompletableFuture<T> almostCompletedFuture(T result) {