From 2bb14892af4142fb6fee1ca16c693bbc640ea57e Mon Sep 17 00:00:00 2001 From: Ravi Khadiwala Date: Tue, 20 May 2025 10:47:45 -0500 Subject: [PATCH] Add paged prekey store --- pom.xml | 11 + service/config/sample.yml | 6 + service/pom.xml | 15 + .../WhisperServerConfiguration.java | 10 + .../textsecuregcm/WhisperServerService.java | 25 +- .../configuration/DynamoDbTables.java | 9 + ...dSingleUseKEMPreKeyStoreConfiguration.java | 15 + .../textsecuregcm/storage/KEMPreKeyPage.java | 136 +++++++ .../textsecuregcm/storage/KeysManager.java | 21 +- .../storage/PagedSingleUseKEMPreKeyStore.java | 367 ++++++++++++++++++ .../storage/SingleUseECPreKeyStore.java | 2 +- .../storage/SingleUseKEMPreKeyStore.java | 2 +- .../workers/CommandDependencies.java | 24 +- ...ccountCreationDeletionIntegrationTest.java | 21 +- ...ntsManagerChangeNumberIntegrationTest.java | 21 +- ...ccountsManagerUsernameIntegrationTest.java | 21 +- .../AddRemoveDeviceIntegrationTest.java | 21 +- .../storage/DynamoDbExtensionSchema.java | 14 + .../storage/KEMPreKeyPageTest.java | 102 +++++ .../storage/KeysManagerTest.java | 19 +- .../PagedSingleUseKEMPreKeyStoreTest.java | 218 +++++++++++ .../storage/S3LocalStackExtension.java | 93 +++++ service/src/test/resources/config/test.yml | 6 + 23 files changed, 1125 insertions(+), 54 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/PagedSingleUseKEMPreKeyStoreConfiguration.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/KEMPreKeyPage.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/PagedSingleUseKEMPreKeyStore.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/storage/KEMPreKeyPageTest.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/storage/PagedSingleUseKEMPreKeyStoreTest.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/storage/S3LocalStackExtension.java diff --git a/pom.xml b/pom.xml index 3a3b6b0d2..c8684f105 100644 --- a/pom.xml +++ b/pom.xml @@ -77,6 +77,10 @@ 2.0.17 23.10.0 2.2.27 + 1.21.1 + + + localstack/localstack:3.5.0 02fc89fa8766a9ba221e69225f8d1c10bb91885ddbd3c112448e23488ba40ab6 @@ -311,6 +315,13 @@ logback-access-common ${logback-access.version} + + org.testcontainers + testcontainers-bom + ${testcontainers.version} + pom + import + diff --git a/service/config/sample.yml b/service/config/sample.yml index 04f8fc15d..3a461cb83 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -138,6 +138,8 @@ dynamoDbTables: tableName: Example_EC_Signed_Pre_Keys pqKeys: tableName: Example_PQ_Keys + pagedPqKeys: + tableName: Example_PQ_Paged_Keys pqLastResortKeys: tableName: Example_PQ_Last_Resort_Keys messages: @@ -174,6 +176,10 @@ dynamoDbTables: verificationSessions: tableName: Example_VerificationSessions +pagedSingleUseKEMPreKeyStore: + bucket: preKeyBucket # S3 Bucket name + region: us-west-2 # AWS region + cacheCluster: # Redis server configuration for cache cluster configurationUri: redis://redis.example.com:6379/ diff --git a/service/pom.xml b/service/pom.xml index 64fe98deb..a199fa889 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -485,6 +485,18 @@ test + + org.testcontainers + localstack + test + + + + org.testcontainers + junit-jupiter + test + + com.google.auth google-auth-library-oauth2-http @@ -712,6 +724,9 @@ -javaagent:${org.mockito:mockito-core:jar} --add-opens=java.base/java.net=ALL-UNNAMED + + ${localstack.image} + diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index ec70bc1c7..b7cc1f66b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -45,6 +45,7 @@ import org.whispersystems.textsecuregcm.configuration.MessageByteLimitCardinalit import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration; import org.whispersystems.textsecuregcm.configuration.NoiseTunnelConfiguration; import org.whispersystems.textsecuregcm.configuration.OneTimeDonationConfiguration; +import org.whispersystems.textsecuregcm.configuration.PagedSingleUseKEMPreKeyStoreConfiguration; import org.whispersystems.textsecuregcm.configuration.PaymentsServiceConfiguration; import org.whispersystems.textsecuregcm.configuration.RegistrationServiceClientFactory; import org.whispersystems.textsecuregcm.configuration.RemoteConfigConfiguration; @@ -257,6 +258,11 @@ public class WhisperServerConfiguration extends Configuration { @NotNull private OneTimeDonationConfiguration oneTimeDonations; + @Valid + @JsonProperty + @NotNull + private PagedSingleUseKEMPreKeyStoreConfiguration pagedSingleUseKEMPreKeyStore; + @Valid @NotNull @JsonProperty @@ -478,6 +484,10 @@ public class WhisperServerConfiguration extends Configuration { return oneTimeDonations; } + public PagedSingleUseKEMPreKeyStoreConfiguration getPagedSingleUseKEMPreKeyStore() { + return pagedSingleUseKEMPreKeyStore; + } + public ReportMessageConfiguration getReportMessageConfiguration() { return reportMessage; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 64a2a6560..a887fff57 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -225,6 +225,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.OneTimeDonationsManager; +import org.whispersystems.textsecuregcm.storage.PagedSingleUseKEMPreKeyStore; import org.whispersystems.textsecuregcm.storage.PersistentTimer; import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers; import org.whispersystems.textsecuregcm.storage.Profiles; @@ -235,8 +236,12 @@ import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswords; import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager; import org.whispersystems.textsecuregcm.storage.RemoteConfigs; import org.whispersystems.textsecuregcm.storage.RemoteConfigsManager; +import org.whispersystems.textsecuregcm.storage.RepeatedUseECSignedPreKeyStore; +import org.whispersystems.textsecuregcm.storage.RepeatedUseKEMSignedPreKeyStore; import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb; import org.whispersystems.textsecuregcm.storage.ReportMessageManager; +import org.whispersystems.textsecuregcm.storage.SingleUseECPreKeyStore; +import org.whispersystems.textsecuregcm.storage.SingleUseKEMPreKeyStore; import org.whispersystems.textsecuregcm.storage.SubscriptionManager; import org.whispersystems.textsecuregcm.storage.Subscriptions; import org.whispersystems.textsecuregcm.storage.VerificationSessionManager; @@ -425,13 +430,21 @@ public class WhisperServerService extends Application preKeys) { + if (format != FORMAT) { + throw new IllegalArgumentException("Unknown format: " + format + ", must be " + FORMAT); + } + + if (preKeys.isEmpty()) { + throw new IllegalArgumentException("PreKeys cannot be empty"); + } + final ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE + SERIALIZED_PREKEY_LENGTH * preKeys.size()); + buffer.putLong(HEADER); + for (KEMSignedPreKey preKey : preKeys) { + + buffer.putLong(preKey.keyId()); + + final byte[] publicKeyBytes = preKey.serializedPublicKey(); + if (publicKeyBytes[0] != KEM_KEY_TYPE_KYBER_1024) { + // 0x08 is libsignal's current KEM key format. If some future version of libsignal supports additional KEM + // keys, we'll have to roll out read support before rolling out write support. Otherwise, we may write keys + // to storage that are not readable by other chat instances. + throw new IllegalArgumentException("Format 1 only supports " + KEM_KEY_TYPE_KYBER_1024 + " public keys"); + } + if (publicKeyBytes.length != SERIALIZED_PUBKEY_LENGTH) { + throw new IllegalArgumentException("Unexpected public key length " + publicKeyBytes.length); + } + buffer.put(publicKeyBytes); + + if (preKey.signature().length != SERIALIZED_SIGNATURE_LENGTH) { + throw new IllegalArgumentException("prekey signature length must be " + SERIALIZED_SIGNATURE_LENGTH); + } + buffer.put(preKey.signature()); + } + buffer.flip(); + return buffer; + } + + /** + * Deserialize a single {@link KEMSignedPreKey} + * + * @param format The format of the page this buffer is from + * @param buffer The key to deserialize. The position of the buffer should be the start of the key, and the limit of + * the buffer should be the end of the key. After a successful deserialization the position of the + * buffer will be the limit + * @return The deserialized key + * @throws InvalidKeyException + */ + static KEMSignedPreKey deserializeKey(int format, ByteBuffer buffer) throws InvalidKeyException { + if (format != FORMAT) { + throw new IllegalArgumentException("Unknown prekey page format " + format); + } + if (buffer.remaining() != SERIALIZED_PREKEY_LENGTH) { + throw new IllegalArgumentException("PreKeys must be length " + SERIALIZED_PREKEY_LENGTH); + } + final long keyId = buffer.getLong(); + + final byte[] publicKeyBytes = new byte[SERIALIZED_PUBKEY_LENGTH]; + buffer.get(publicKeyBytes); + final KEMPublicKey kemPublicKey = new KEMPublicKey(publicKeyBytes); + + final byte[] signature = new byte[SERIALIZED_SIGNATURE_LENGTH]; + buffer.get(signature); + return new KEMSignedPreKey(keyId, kemPublicKey, signature); + } + + /** + * The location of a specific key within a serialized page + */ + record KeyLocation(int start, int length) { + + int getStartInclusive() { + return start; + } + + int getEndInclusive() { + return start + length - 1; + } + } + + /** + * Get the location of the key at the provided index within a page + * + * @param format The format of the page + * @param index The index of the key to retrieve + * @return An {@link KeyLocation} indicating where within the page the key is + */ + static KeyLocation keyLocation(final int format, final int index) { + if (format != FORMAT) { + throw new IllegalArgumentException("unknown format " + format); + } + final int startOffset = HEADER_SIZE + (index * SERIALIZED_PREKEY_LENGTH); + return new KeyLocation(startOffset, SERIALIZED_PREKEY_LENGTH); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysManager.java index 40ecc6fab..602ae61d0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysManager.java @@ -12,26 +12,27 @@ import java.util.concurrent.CompletableFuture; import org.whispersystems.textsecuregcm.entities.ECPreKey; import org.whispersystems.textsecuregcm.entities.ECSignedPreKey; import org.whispersystems.textsecuregcm.entities.KEMSignedPreKey; -import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem; public class KeysManager { private final SingleUseECPreKeyStore ecPreKeys; private final SingleUseKEMPreKeyStore pqPreKeys; + private final PagedSingleUseKEMPreKeyStore pagedPqPreKeys; private final RepeatedUseECSignedPreKeyStore ecSignedPreKeys; private final RepeatedUseKEMSignedPreKeyStore pqLastResortKeys; public KeysManager( - final DynamoDbAsyncClient dynamoDbAsyncClient, - final String ecTableName, - final String pqTableName, - final String ecSignedPreKeysTableName, - final String pqLastResortTableName) { - this.ecPreKeys = new SingleUseECPreKeyStore(dynamoDbAsyncClient, ecTableName); - this.pqPreKeys = new SingleUseKEMPreKeyStore(dynamoDbAsyncClient, pqTableName); - this.ecSignedPreKeys = new RepeatedUseECSignedPreKeyStore(dynamoDbAsyncClient, ecSignedPreKeysTableName); - this.pqLastResortKeys = new RepeatedUseKEMSignedPreKeyStore(dynamoDbAsyncClient, pqLastResortTableName); + final SingleUseECPreKeyStore ecPreKeys, + final SingleUseKEMPreKeyStore pqPreKeys, + final PagedSingleUseKEMPreKeyStore pagedPqPreKeys, + final RepeatedUseECSignedPreKeyStore ecSignedPreKeys, + final RepeatedUseKEMSignedPreKeyStore pqLastResortKeys) { + this.ecPreKeys = ecPreKeys; + this.pqPreKeys = pqPreKeys; + this.pagedPqPreKeys = pagedPqPreKeys; + this.ecSignedPreKeys = ecSignedPreKeys; + this.pqLastResortKeys = pqLastResortKeys; } public TransactWriteItem buildWriteItemForEcSignedPreKey(final UUID identifier, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PagedSingleUseKEMPreKeyStore.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PagedSingleUseKEMPreKeyStore.java new file mode 100644 index 000000000..3ebd1cecf --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PagedSingleUseKEMPreKeyStore.java @@ -0,0 +1,367 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; + +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import org.signal.libsignal.protocol.InvalidKeyException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.entities.KEMSignedPreKey; +import org.whispersystems.textsecuregcm.util.AttributeValues; +import org.whispersystems.textsecuregcm.util.ExceptionUtils; +import org.whispersystems.textsecuregcm.util.Util; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.QueryRequest; +import software.amazon.awssdk.services.dynamodb.model.ReturnValue; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +/** + * @implNote This version of a {@link SingleUsePreKeyStore} store bundles prekeys into "pages", which are stored in on + * an object store and referenced via dynamodb. Each device may only have a single active page at a time. Crashes or + * errors may leave orphaned pages which are no longer referenced by the database. A background process must + * periodically check for orphaned pages and remove them. + * @see SingleUsePreKeyStore + */ +public class PagedSingleUseKEMPreKeyStore { + + private static final Logger log = LoggerFactory.getLogger(PagedSingleUseKEMPreKeyStore.class); + + private final DynamoDbAsyncClient dynamoDbAsyncClient; + private final S3AsyncClient s3AsyncClient; + private final String tableName; + private final String bucketName; + + private final Timer getKeyCountTimer = Metrics.timer(name(getClass(), "getCount")); + private final Timer storeKeyBatchTimer = Metrics.timer(name(getClass(), "storeKeyBatch")); + private final Timer deleteForDeviceTimer = Metrics.timer(name(getClass(), "deleteForDevice")); + private final Timer deleteForAccountTimer = Metrics.timer(name(getClass(), "deleteForAccount")); + + final DistributionSummary availableKeyCountDistributionSummary = DistributionSummary + .builder(name(getClass(), "availableKeyCount")) + .publishPercentileHistogram() + .register(Metrics.globalRegistry); + + private final String takeKeyTimerName = name(getClass(), "takeKey"); + private static final String KEY_PRESENT_TAG_NAME = "keyPresent"; + + static final String KEY_ACCOUNT_UUID = "U"; + static final String KEY_DEVICE_ID = "D"; + static final String ATTR_PAGE_ID = "ID"; + static final String ATTR_PAGE_IDX = "I"; + static final String ATTR_PAGE_NUM_KEYS = "N"; + static final String ATTR_PAGE_FORMAT_VERSION = "F"; + + public PagedSingleUseKEMPreKeyStore( + final DynamoDbAsyncClient dynamoDbAsyncClient, + final S3AsyncClient s3AsyncClient, + final String tableName, + final String bucketName) { + this.s3AsyncClient = s3AsyncClient; + this.dynamoDbAsyncClient = dynamoDbAsyncClient; + this.tableName = tableName; + this.bucketName = bucketName; + } + + /** + * Stores a batch of single-use pre-keys for a specific device. All previously-stored keys for the device are cleared + * before storing new keys. + * + * @param identifier the identifier for the account/identity with which the target device is associated + * @param deviceId the identifier for the device within the given account/identity + * @param preKeys a collection of single-use pre-keys to store for the target device + * @return a future that completes when all previously-stored keys have been removed and the given collection of + * pre-keys has been stored in its place + */ + public CompletableFuture store( + final UUID identifier, final byte deviceId, final List preKeys) { + final Timer.Sample sample = Timer.start(); + + final List sorted = preKeys.stream().sorted(Comparator.comparing(KEMSignedPreKey::keyId)).toList(); + + final int bundleFormat = KEMPreKeyPage.FORMAT; + final ByteBuffer bundle = KEMPreKeyPage.serialize(KEMPreKeyPage.FORMAT, sorted); + + // Write the bundle to S3, then update the database. Delete the S3 object that was in the database before. This can + // leave orphans in S3 if we fail to update after writing to S3, or fail to delete the old page. However, it can + // never leave a broken pointer in the database. To keep this invariant, we must make sure to generate a new + // name for the page any time we were to retry this entire operation. + return writeBundleToS3(identifier, deviceId, bundle) + .thenCompose(pageId -> dynamoDbAsyncClient.putItem(PutItemRequest.builder() + .tableName(tableName) + .item(Map.of( + KEY_ACCOUNT_UUID, AttributeValues.fromUUID(identifier), + KEY_DEVICE_ID, AttributeValues.fromInt(deviceId), + ATTR_PAGE_ID, AttributeValues.fromUUID(pageId), + ATTR_PAGE_IDX, AttributeValues.fromInt(0), + ATTR_PAGE_NUM_KEYS, AttributeValues.fromInt(sorted.size()), + ATTR_PAGE_FORMAT_VERSION, AttributeValues.fromInt(bundleFormat) + )) + .returnValues(ReturnValue.ALL_OLD) + .build())) + .thenCompose(response -> { + if (response.hasAttributes()) { + final UUID pageId = AttributeValues.getUUID(response.attributes(), ATTR_PAGE_ID, null); + if (pageId == null) { + log.error("Replaced record: {} with no pageId", response.attributes()); + return CompletableFuture.completedFuture(null); + } + return deleteBundleFromS3(identifier, deviceId, pageId); + } else { + return CompletableFuture.completedFuture(null); + } + }) + .whenComplete((result, error) -> sample.stop(storeKeyBatchTimer)); + } + + /** + * Attempts to retrieve a single-use pre-key for a specific device. Keys may only be returned by this method at most + * once; once the key is returned, it is removed from the key store and subsequent calls to this method will never + * return the same key. + * + * @param identifier the identifier for the account/identity with which the target device is associated + * @param deviceId the identifier for the device within the given account/identity + * @return a future that yields a single-use pre-key if one is available or empty if no single-use pre-keys are + * available for the target device + */ + public CompletableFuture> take(final UUID identifier, final byte deviceId) { + final Timer.Sample sample = Timer.start(); + + return dynamoDbAsyncClient.updateItem(UpdateItemRequest.builder() + .tableName(tableName) + .key(Map.of( + KEY_ACCOUNT_UUID, AttributeValues.fromUUID(identifier), + KEY_DEVICE_ID, AttributeValues.fromInt(deviceId))) + .updateExpression("SET #index = #index + :one") + .conditionExpression("#id = :id AND #index < #numkeys") + .expressionAttributeNames(Map.of( + "#id", KEY_ACCOUNT_UUID, + "#index", ATTR_PAGE_IDX, + "#numkeys", ATTR_PAGE_NUM_KEYS)) + .expressionAttributeValues(Map.of( + ":one", AttributeValues.n(1), + ":id", AttributeValues.fromUUID(identifier))) + .returnValues(ReturnValue.ALL_OLD) + .build()) + .thenCompose(updateItemResponse -> { + if (!updateItemResponse.hasAttributes()) { + throw new IllegalStateException("update succeeded but did not return an item"); + } + + final int index = AttributeValues.getInt(updateItemResponse.attributes(), ATTR_PAGE_IDX, -1); + final UUID pageId = AttributeValues.getUUID(updateItemResponse.attributes(), ATTR_PAGE_ID, null); + final int format = AttributeValues.getInt(updateItemResponse.attributes(), ATTR_PAGE_FORMAT_VERSION, -1); + if (index < 0 || format < 0 || pageId == null) { + throw new CompletionException( + new IOException("unexpected page descriptor " + updateItemResponse.attributes())); + } + + return readPreKeyAtIndexFromS3(identifier, deviceId, pageId, format, index).thenApply(Optional::of); + }) + // If this check fails, it means that the item did not exist, or its index was already at the last key. Either + // way, there are no keys left so we return empty + .exceptionally(ExceptionUtils.exceptionallyHandler( + ConditionalCheckFailedException.class, + e -> Optional.empty())) + .whenComplete((maybeKey, throwable) -> + sample.stop(Metrics.timer( + takeKeyTimerName, + KEY_PRESENT_TAG_NAME, String.valueOf(maybeKey != null && maybeKey.isPresent())))); + } + + /** + * Returns the number of single-use pre-keys available for a given device. + * + * @param identifier the identifier for the account/identity with which the target device is associated + * @param deviceId the identifier for the device within the given account/identity + * @return a future that yields the approximate number of single-use pre-keys currently available for the target + * device + */ + public CompletableFuture getCount(final UUID identifier, final byte deviceId) { + final Timer.Sample sample = Timer.start(); + + return dynamoDbAsyncClient.getItem(GetItemRequest.builder() + .tableName(tableName) + .key(Map.of( + KEY_ACCOUNT_UUID, AttributeValues.fromUUID(identifier), + KEY_DEVICE_ID, AttributeValues.fromInt(deviceId))) + .consistentRead(true) + .projectionExpression("#total, #index") + .expressionAttributeNames(Map.of( + "#total", ATTR_PAGE_NUM_KEYS, + "#index", ATTR_PAGE_IDX)) + .build()) + .thenApply(getResponse -> { + if (!getResponse.hasItem()) { + return 0; + } + final int numKeys = AttributeValues.getInt(getResponse.item(), ATTR_PAGE_NUM_KEYS, -1); + final int index = AttributeValues.getInt(getResponse.item(), ATTR_PAGE_IDX, -1); + if (numKeys < 0 || index < 0 || index > numKeys) { + log.error("unexpected index/length in page descriptor: {}", getResponse.item()); + return 0; + } + + return numKeys - index; + }) + .whenComplete((keyCount, throwable) -> { + sample.stop(getKeyCountTimer); + + if (throwable == null && keyCount != null) { + availableKeyCountDistributionSummary.record(keyCount); + } + }); + } + + /** + * Removes all single-use pre-keys for all devices associated with the given account/identity. + * + * @param identifier the identifier for the account/identity for which to remove single-use pre-keys + * @return a future that completes when all single-use pre-keys have been removed for all devices associated with the + * given account/identity + */ + public CompletableFuture delete(final UUID identifier) { + final Timer.Sample sample = Timer.start(); + + return deleteItems(identifier, Flux.from(dynamoDbAsyncClient.queryPaginator(QueryRequest.builder() + .tableName(tableName) + .keyConditionExpression("#uuid = :uuid") + .projectionExpression("#uuid,#deviceid,#pageid") + .expressionAttributeNames(Map.of( + "#uuid", KEY_ACCOUNT_UUID, + "#deviceid", KEY_DEVICE_ID, + "#pageid", ATTR_PAGE_ID)) + .expressionAttributeValues(Map.of(":uuid", AttributeValues.fromUUID(identifier))) + .consistentRead(true) + .build()) + .items())) + .thenRun(() -> sample.stop(deleteForAccountTimer)); + } + + /** + * Removes all single-use pre-keys for a specific device. + * + * @param identifier the identifier for the account/identity with which the target device is associated + * @param deviceId the identifier for the device within the given account/identity + * @return a future that completes when all single-use pre-keys have been removed for the target device + */ + public CompletableFuture delete(final UUID identifier, final byte deviceId) { + final Timer.Sample sample = Timer.start(); + + return dynamoDbAsyncClient.getItem(GetItemRequest.builder() + .tableName(tableName) + .key(Map.of( + KEY_ACCOUNT_UUID, AttributeValues.fromUUID(identifier), + KEY_DEVICE_ID, AttributeValues.fromInt(deviceId))) + .consistentRead(true) + .projectionExpression("#uuid,#deviceid,#pageid") + .expressionAttributeNames(Map.of( + "#uuid", KEY_ACCOUNT_UUID, + "#deviceid", KEY_DEVICE_ID, + "#pageid", ATTR_PAGE_ID)) + .build()) + .thenCompose(getItemResponse -> deleteItems(identifier, getItemResponse.hasItem() + ? Flux.just(getItemResponse.item()) + : Flux.empty())) + .thenRun(() -> sample.stop(deleteForDeviceTimer)); + } + + private CompletableFuture deleteItems(final UUID identifier, + final Flux> items) { + return items + .flatMap(item -> { + final UUID aci = AttributeValues.getUUID(item, KEY_ACCOUNT_UUID, null); + final byte deviceId = (byte) AttributeValues.getInt(item, KEY_DEVICE_ID, -1); + final UUID pageId = AttributeValues.getUUID(item, ATTR_PAGE_ID, null); + if (aci == null || deviceId < 0 || pageId == null) { + log.error("can't delete page from unexpected page descriptor {}", item); + } + return Mono.fromFuture(deleteBundleFromS3(aci, deviceId, pageId)) + .thenReturn(Map.of( + KEY_ACCOUNT_UUID, AttributeValues.fromUUID(identifier), + KEY_DEVICE_ID, AttributeValues.fromInt(deviceId))); + }) + .flatMap(itemToDelete -> Mono.fromFuture(() -> dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder() + .tableName(tableName) + .key(itemToDelete) + .build()))) + .then() + .toFuture() + .thenRun(Util.NOOP); + } + + private static String s3Key(final UUID identifier, final byte deviceId, final UUID pageId) { + return String.format("%s/%s/%s", identifier, deviceId, pageId); + } + + private CompletableFuture writeBundleToS3(final UUID identifier, final byte deviceId, + final ByteBuffer bundle) { + final UUID pageId = UUID.randomUUID(); + return s3AsyncClient.putObject(PutObjectRequest.builder() + .bucket(bucketName) + .key(s3Key(identifier, deviceId, pageId)).build(), + AsyncRequestBody.fromByteBuffer(bundle)) + .thenApply(ignoredResponse -> pageId); + } + + private CompletableFuture deleteBundleFromS3(final UUID identifier, final byte deviceId, final UUID pageId) { + return s3AsyncClient.deleteObject(DeleteObjectRequest.builder() + .bucket(bucketName) + .key(s3Key(identifier, deviceId, pageId)) + .build()) + .thenRun(Util.NOOP); + } + + private CompletableFuture readPreKeyAtIndexFromS3( + final UUID identifier, final byte deviceId, final UUID pageId, final int format, final int index) { + final KEMPreKeyPage.KeyLocation keyLocation = KEMPreKeyPage.keyLocation(format, index); + return s3AsyncClient.getObject(GetObjectRequest.builder() + .bucket(bucketName) + .key(s3Key(identifier, deviceId, pageId)) + // An RFC9110 range header, inclusive on both ends + // https://www.rfc-editor.org/rfc/rfc9110.html#section-14.1.2 + .range("bytes=%s-%s".formatted(keyLocation.getStartInclusive(), keyLocation.getEndInclusive())) + .build(), AsyncResponseTransformer.toBytes()) + .thenApply(bytes -> { + final ByteBuffer serialized = bytes.asByteBuffer(); + if (serialized.remaining() != keyLocation.length()) { + log.error("Unexpected ranged read response, requested {} got {} for offset {} in page {}", + keyLocation.length(), serialized.remaining(), keyLocation, s3Key(identifier, deviceId, pageId)); + throw new CompletionException(new IOException("Invalid response to ranged read")); + } + try { + return KEMPreKeyPage.deserializeKey(format, bytes.asByteBuffer()); + } catch (InvalidKeyException e) { + throw new CompletionException(new IOException(e)); + } + }); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUseECPreKeyStore.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUseECPreKeyStore.java index 6984d94ab..e2d750f5a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUseECPreKeyStore.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUseECPreKeyStore.java @@ -19,7 +19,7 @@ import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; public class SingleUseECPreKeyStore extends SingleUsePreKeyStore { private static final String PARSE_BYTE_ARRAY_COUNTER_NAME = name(SingleUseECPreKeyStore.class, "parseByteArray"); - protected SingleUseECPreKeyStore(final DynamoDbAsyncClient dynamoDbAsyncClient, final String tableName) { + public SingleUseECPreKeyStore(final DynamoDbAsyncClient dynamoDbAsyncClient, final String tableName) { super(dynamoDbAsyncClient, tableName); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUseKEMPreKeyStore.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUseKEMPreKeyStore.java index de485705c..1aa13f141 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUseKEMPreKeyStore.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUseKEMPreKeyStore.java @@ -16,7 +16,7 @@ import java.util.UUID; public class SingleUseKEMPreKeyStore extends SingleUsePreKeyStore { - protected SingleUseKEMPreKeyStore(final DynamoDbAsyncClient dynamoDbAsyncClient, final String tableName) { + public SingleUseKEMPreKeyStore(final DynamoDbAsyncClient dynamoDbAsyncClient, final String tableName) { super(dynamoDbAsyncClient, tableName); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 9893910f3..a1260faa7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -57,13 +57,18 @@ import org.whispersystems.textsecuregcm.storage.KeysManager; import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb; import org.whispersystems.textsecuregcm.storage.MessagesManager; +import org.whispersystems.textsecuregcm.storage.PagedSingleUseKEMPreKeyStore; import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers; import org.whispersystems.textsecuregcm.storage.Profiles; import org.whispersystems.textsecuregcm.storage.ProfilesManager; import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswords; import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager; +import org.whispersystems.textsecuregcm.storage.RepeatedUseECSignedPreKeyStore; +import org.whispersystems.textsecuregcm.storage.RepeatedUseKEMSignedPreKeyStore; import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb; import org.whispersystems.textsecuregcm.storage.ReportMessageManager; +import org.whispersystems.textsecuregcm.storage.SingleUseECPreKeyStore; +import org.whispersystems.textsecuregcm.storage.SingleUseKEMPreKeyStore; import org.whispersystems.textsecuregcm.util.ManagedAwsCrt; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -204,13 +209,20 @@ record CommandDependencies( configuration.getDynamoDbTables().getPhoneNumberIdentifiers().getTableName()); Profiles profiles = new Profiles(dynamoDbClient, dynamoDbAsyncClient, configuration.getDynamoDbTables().getProfiles().getTableName()); + S3AsyncClient asyncKeysS3Client = S3AsyncClient.builder() + .credentialsProvider(awsCredentialsProvider) + .region(Region.of(configuration.getPagedSingleUseKEMPreKeyStore().region())) + .build(); KeysManager keys = new KeysManager( - dynamoDbAsyncClient, - configuration.getDynamoDbTables().getEcKeys().getTableName(), - configuration.getDynamoDbTables().getKemKeys().getTableName(), - configuration.getDynamoDbTables().getEcSignedPreKeys().getTableName(), - configuration.getDynamoDbTables().getKemLastResortKeys().getTableName() - ); + new SingleUseECPreKeyStore(dynamoDbAsyncClient, configuration.getDynamoDbTables().getEcKeys().getTableName()), + new SingleUseKEMPreKeyStore(dynamoDbAsyncClient, configuration.getDynamoDbTables().getKemKeys().getTableName()), + new PagedSingleUseKEMPreKeyStore(dynamoDbAsyncClient, asyncKeysS3Client, + configuration.getDynamoDbTables().getPagedKemKeys().getTableName(), + configuration.getPagedSingleUseKEMPreKeyStore().bucket()), + new RepeatedUseECSignedPreKeyStore(dynamoDbAsyncClient, + configuration.getDynamoDbTables().getEcSignedPreKeys().getTableName()), + new RepeatedUseKEMSignedPreKeyStore(dynamoDbAsyncClient, + configuration.getDynamoDbTables().getKemLastResortKeys().getTableName())); MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbClient, dynamoDbAsyncClient, configuration.getDynamoDbTables().getMessages().getTableName(), configuration.getDynamoDbTables().getMessages().getExpiration(), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCreationDeletionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCreationDeletionIntegrationTest.java index 1e073be4b..c47411be8 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCreationDeletionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCreationDeletionIntegrationTest.java @@ -50,6 +50,7 @@ import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; import org.whispersystems.textsecuregcm.tests.util.KeysHelper; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; public class AccountCreationDeletionIntegrationTest { @@ -71,6 +72,9 @@ public class AccountCreationDeletionIntegrationTest { @RegisterExtension static final RedisClusterExtension CACHE_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); + @RegisterExtension + static final S3LocalStackExtension S3_EXTENSION = new S3LocalStackExtension("testbucket"); + private static final Clock CLOCK = Clock.fixed(Instant.now(), ZoneId.systemDefault()); private ScheduledExecutorService executor; @@ -90,13 +94,18 @@ public class AccountCreationDeletionIntegrationTest { final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + final DynamoDbAsyncClient dynamoDbAsyncClient = DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(); keysManager = new KeysManager( - DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), - DynamoDbExtensionSchema.Tables.EC_KEYS.tableName(), - DynamoDbExtensionSchema.Tables.PQ_KEYS.tableName(), - DynamoDbExtensionSchema.Tables.REPEATED_USE_EC_SIGNED_PRE_KEYS.tableName(), - DynamoDbExtensionSchema.Tables.REPEATED_USE_KEM_SIGNED_PRE_KEYS.tableName() - ); + new SingleUseECPreKeyStore(dynamoDbAsyncClient, DynamoDbExtensionSchema.Tables.EC_KEYS.tableName()), + new SingleUseKEMPreKeyStore(dynamoDbAsyncClient, DynamoDbExtensionSchema.Tables.PQ_KEYS.tableName()), + new PagedSingleUseKEMPreKeyStore(dynamoDbAsyncClient, + S3_EXTENSION.getS3Client(), + DynamoDbExtensionSchema.Tables.PAGED_PQ_KEYS.tableName(), + S3_EXTENSION.getBucketName()), + new RepeatedUseECSignedPreKeyStore(dynamoDbAsyncClient, + DynamoDbExtensionSchema.Tables.REPEATED_USE_EC_SIGNED_PRE_KEYS.tableName()), + new RepeatedUseKEMSignedPreKeyStore(dynamoDbAsyncClient, + DynamoDbExtensionSchema.Tables.REPEATED_USE_KEM_SIGNED_PRE_KEYS.tableName())); final ClientPublicKeys clientPublicKeys = new ClientPublicKeys(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), DynamoDbExtensionSchema.Tables.CLIENT_PUBLIC_KEYS.tableName()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java index 81f2ddf50..d1869812b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java @@ -44,6 +44,7 @@ import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2 import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.tests.util.AccountsHelper; import org.whispersystems.textsecuregcm.tests.util.KeysHelper; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; class AccountsManagerChangeNumberIntegrationTest { @@ -65,6 +66,9 @@ class AccountsManagerChangeNumberIntegrationTest { @RegisterExtension static final RedisClusterExtension CACHE_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); + @RegisterExtension + static final S3LocalStackExtension S3_EXTENSION = new S3LocalStackExtension("testbucket"); + private KeysManager keysManager; private DisconnectionRequestManager disconnectionRequestManager; private ScheduledExecutorService executor; @@ -81,13 +85,18 @@ class AccountsManagerChangeNumberIntegrationTest { DynamicConfiguration dynamicConfiguration = new DynamicConfiguration(); when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + final DynamoDbAsyncClient dynamoDbAsyncClient = DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(); keysManager = new KeysManager( - DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), - Tables.EC_KEYS.tableName(), - Tables.PQ_KEYS.tableName(), - Tables.REPEATED_USE_EC_SIGNED_PRE_KEYS.tableName(), - Tables.REPEATED_USE_KEM_SIGNED_PRE_KEYS.tableName() - ); + new SingleUseECPreKeyStore(dynamoDbAsyncClient, DynamoDbExtensionSchema.Tables.EC_KEYS.tableName()), + new SingleUseKEMPreKeyStore(dynamoDbAsyncClient, DynamoDbExtensionSchema.Tables.PQ_KEYS.tableName()), + new PagedSingleUseKEMPreKeyStore(dynamoDbAsyncClient, + S3_EXTENSION.getS3Client(), + DynamoDbExtensionSchema.Tables.PAGED_PQ_KEYS.tableName(), + S3_EXTENSION.getBucketName()), + new RepeatedUseECSignedPreKeyStore(dynamoDbAsyncClient, + DynamoDbExtensionSchema.Tables.REPEATED_USE_EC_SIGNED_PRE_KEYS.tableName()), + new RepeatedUseKEMSignedPreKeyStore(dynamoDbAsyncClient, + DynamoDbExtensionSchema.Tables.REPEATED_USE_KEM_SIGNED_PRE_KEYS.tableName())); final ClientPublicKeys clientPublicKeys = new ClientPublicKeys(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), DynamoDbExtensionSchema.Tables.CLIENT_PUBLIC_KEYS.tableName()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java index 4fa4ab6e1..add5d5b5c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java @@ -47,6 +47,7 @@ import org.whispersystems.textsecuregcm.tests.util.AccountsHelper; import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.CompletableFutureTestUtil; import org.whispersystems.textsecuregcm.util.TestRandomUtil; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; @@ -78,6 +79,9 @@ class AccountsManagerUsernameIntegrationTest { @RegisterExtension static RedisClusterExtension CACHE_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); + @RegisterExtension + static final S3LocalStackExtension S3_EXTENSION = new S3LocalStackExtension("testbucket"); + private AccountsManager accountsManager; private Accounts accounts; @@ -94,13 +98,18 @@ class AccountsManagerUsernameIntegrationTest { DynamicConfiguration dynamicConfiguration = new DynamicConfiguration(); when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + final DynamoDbAsyncClient dynamoDbAsyncClient = DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(); final KeysManager keysManager = new KeysManager( - DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), - Tables.EC_KEYS.tableName(), - Tables.PQ_KEYS.tableName(), - Tables.REPEATED_USE_EC_SIGNED_PRE_KEYS.tableName(), - Tables.REPEATED_USE_KEM_SIGNED_PRE_KEYS.tableName() - ); + new SingleUseECPreKeyStore(dynamoDbAsyncClient, DynamoDbExtensionSchema.Tables.EC_KEYS.tableName()), + new SingleUseKEMPreKeyStore(dynamoDbAsyncClient, DynamoDbExtensionSchema.Tables.PQ_KEYS.tableName()), + new PagedSingleUseKEMPreKeyStore(dynamoDbAsyncClient, + S3_EXTENSION.getS3Client(), + DynamoDbExtensionSchema.Tables.PAGED_PQ_KEYS.tableName(), + S3_EXTENSION.getBucketName()), + new RepeatedUseECSignedPreKeyStore(dynamoDbAsyncClient, + DynamoDbExtensionSchema.Tables.REPEATED_USE_EC_SIGNED_PRE_KEYS.tableName()), + new RepeatedUseKEMSignedPreKeyStore(dynamoDbAsyncClient, + DynamoDbExtensionSchema.Tables.REPEATED_USE_KEM_SIGNED_PRE_KEYS.tableName())); accounts = Mockito.spy(new Accounts( Clock.systemUTC(), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AddRemoveDeviceIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AddRemoveDeviceIntegrationTest.java index cf64e9904..c5f2d4f31 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AddRemoveDeviceIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AddRemoveDeviceIntegrationTest.java @@ -45,6 +45,7 @@ import org.whispersystems.textsecuregcm.tests.util.AccountsHelper; import org.whispersystems.textsecuregcm.tests.util.KeysHelper; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.TestClock; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; public class AddRemoveDeviceIntegrationTest { @@ -70,6 +71,9 @@ public class AddRemoveDeviceIntegrationTest { @RegisterExtension static final RedisServerExtension PUBSUB_SERVER_EXTENSION = RedisServerExtension.builder().build(); + @RegisterExtension + static final S3LocalStackExtension S3_EXTENSION = new S3LocalStackExtension("testbucket"); + private ExecutorService accountLockExecutor; private ScheduledExecutorService messagePollExecutor; @@ -89,13 +93,18 @@ public class AddRemoveDeviceIntegrationTest { clock = TestClock.pinned(Instant.now()); + final DynamoDbAsyncClient dynamoDbAsyncClient = DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(); keysManager = new KeysManager( - DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), - DynamoDbExtensionSchema.Tables.EC_KEYS.tableName(), - DynamoDbExtensionSchema.Tables.PQ_KEYS.tableName(), - DynamoDbExtensionSchema.Tables.REPEATED_USE_EC_SIGNED_PRE_KEYS.tableName(), - DynamoDbExtensionSchema.Tables.REPEATED_USE_KEM_SIGNED_PRE_KEYS.tableName() - ); + new SingleUseECPreKeyStore(dynamoDbAsyncClient, DynamoDbExtensionSchema.Tables.EC_KEYS.tableName()), + new SingleUseKEMPreKeyStore(dynamoDbAsyncClient, DynamoDbExtensionSchema.Tables.PQ_KEYS.tableName()), + new PagedSingleUseKEMPreKeyStore(dynamoDbAsyncClient, + S3_EXTENSION.getS3Client(), + DynamoDbExtensionSchema.Tables.PAGED_PQ_KEYS.tableName(), + S3_EXTENSION.getBucketName()), + new RepeatedUseECSignedPreKeyStore(dynamoDbAsyncClient, + DynamoDbExtensionSchema.Tables.REPEATED_USE_EC_SIGNED_PRE_KEYS.tableName()), + new RepeatedUseKEMSignedPreKeyStore(dynamoDbAsyncClient, + DynamoDbExtensionSchema.Tables.REPEATED_USE_KEM_SIGNED_PRE_KEYS.tableName())); final ClientPublicKeys clientPublicKeys = new ClientPublicKeys(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), DynamoDbExtensionSchema.Tables.CLIENT_PUBLIC_KEYS.tableName()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java index 6bb6efb66..4a77083b6 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java @@ -143,6 +143,20 @@ public final class DynamoDbExtensionSchema { .build()), List.of(), List.of()), + PAGED_PQ_KEYS("paged_pq_keys_test", + PagedSingleUseKEMPreKeyStore.KEY_ACCOUNT_UUID, + PagedSingleUseKEMPreKeyStore.KEY_DEVICE_ID, + List.of( + AttributeDefinition.builder() + .attributeName(PagedSingleUseKEMPreKeyStore.KEY_ACCOUNT_UUID) + .attributeType(ScalarAttributeType.B) + .build(), + AttributeDefinition.builder() + .attributeName(PagedSingleUseKEMPreKeyStore.KEY_DEVICE_ID) + .attributeType(ScalarAttributeType.N) + .build()), + List.of(), List.of()), + PUSH_NOTIFICATION_EXPERIMENT_SAMPLES("push_notification_experiment_samples_test", PushNotificationExperimentSamples.KEY_EXPERIMENT_NAME, PushNotificationExperimentSamples.ATTR_ACI_AND_DEVICE_ID, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/KEMPreKeyPageTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/KEMPreKeyPageTest.java new file mode 100644 index 000000000..0d47ebc2f --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/KEMPreKeyPageTest.java @@ -0,0 +1,102 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.signal.libsignal.protocol.InvalidKeyException; +import org.signal.libsignal.protocol.ecc.Curve; +import org.signal.libsignal.protocol.ecc.ECKeyPair; +import org.whispersystems.textsecuregcm.entities.KEMSignedPreKey; +import org.whispersystems.textsecuregcm.tests.util.KeysHelper; + +class KEMPreKeyPageTest { + + private static final ECKeyPair IDENTITY_KEY_PAIR = Curve.generateKeyPair(); + + @Test + void serializeSinglePreKey() { + final ByteBuffer page = KEMPreKeyPage.serialize(KEMPreKeyPage.FORMAT, List.of(generatePreKey(5))); + final int actualMagic = page.getInt(); + assertEquals(KEMPreKeyPage.HEADER_MAGIC, actualMagic); + final int version = page.getInt(); + assertEquals(version, 1); + assertEquals(KEMPreKeyPage.SERIALIZED_PREKEY_LENGTH, page.remaining()); + } + + @Test + void emptyPreKeys() { + assertThrows(IllegalArgumentException.class, () -> KEMPreKeyPage.serialize(KEMPreKeyPage.FORMAT, Collections.emptyList())); + } + + @Test + void roundTripSingleton() throws InvalidKeyException { + final KEMSignedPreKey preKey = generatePreKey(5); + final ByteBuffer buffer = KEMPreKeyPage.serialize(KEMPreKeyPage.FORMAT, List.of(preKey)); + final long serializedLength = buffer.remaining(); + assertEquals(KEMPreKeyPage.HEADER_SIZE + KEMPreKeyPage.SERIALIZED_PREKEY_LENGTH, serializedLength); + + final KEMPreKeyPage.KeyLocation keyLocation = KEMPreKeyPage.keyLocation(1, 0); + assertEquals(KEMPreKeyPage.HEADER_SIZE, keyLocation.getStartInclusive()); + assertEquals(serializedLength, KEMPreKeyPage.HEADER_SIZE + keyLocation.length()); + + buffer.position(keyLocation.getStartInclusive()); + final KEMSignedPreKey deserializedPreKey = KEMPreKeyPage.deserializeKey(1, buffer); + + assertEquals(5L, deserializedPreKey.keyId()); + assertEquals(preKey, deserializedPreKey); + } + + @Test + void roundTripMultiple() throws InvalidKeyException { + final List keys = Arrays.asList(generatePreKey(1), generatePreKey(2), generatePreKey(5)); + final ByteBuffer page = KEMPreKeyPage.serialize(KEMPreKeyPage.FORMAT, keys); + + assertEquals(KEMPreKeyPage.HEADER_SIZE + KEMPreKeyPage.SERIALIZED_PREKEY_LENGTH * 3, page.remaining()); + + for (int i = 0; i < keys.size(); i++) { + final KEMPreKeyPage.KeyLocation keyLocation = KEMPreKeyPage.keyLocation(1, i); + assertEquals( + KEMPreKeyPage.HEADER_SIZE + KEMPreKeyPage.SERIALIZED_PREKEY_LENGTH * i, + keyLocation.getStartInclusive()); + final ByteBuffer buf = page.slice(keyLocation.getStartInclusive(), keyLocation.length()); + final KEMSignedPreKey actual = KEMPreKeyPage.deserializeKey(1, buf); + assertEquals(keys.get(i), actual); + } + } + + @Test + void wrongFormat() { + assertThrows(IllegalArgumentException.class, () -> + KEMPreKeyPage.deserializeKey(2, + ByteBuffer.allocate(KEMPreKeyPage.HEADER_SIZE + KEMPreKeyPage.SERIALIZED_PREKEY_LENGTH))); + } + + @Test + void wrongSize() { + assertThrows(IllegalArgumentException.class, () -> KEMPreKeyPage.deserializeKey(1, ByteBuffer.allocate(100))); + } + + + @Test + void negativeKeyId() throws InvalidKeyException { + final KEMSignedPreKey preKey = generatePreKey(-1); + ByteBuffer page = KEMPreKeyPage.serialize(KEMPreKeyPage.FORMAT, List.of(preKey)); + page.position(KEMPreKeyPage.HEADER_SIZE); + KEMSignedPreKey deserializedPreKey = KEMPreKeyPage.deserializeKey(1, page); + assertEquals(-1L, deserializedPreKey.keyId()); + } + + private static KEMSignedPreKey generatePreKey(long keyId) { + return KeysHelper.signedKEMPreKey((int) keyId, IDENTITY_KEY_PAIR); + } + +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/KeysManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/KeysManagerTest.java index 236665c8a..0a98cb72f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/KeysManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/KeysManagerTest.java @@ -22,6 +22,7 @@ import org.whispersystems.textsecuregcm.entities.ECSignedPreKey; import org.whispersystems.textsecuregcm.entities.KEMSignedPreKey; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.tests.util.KeysHelper; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; class KeysManagerTest { @@ -31,6 +32,9 @@ class KeysManagerTest { static final DynamoDbExtension DYNAMO_DB_EXTENSION = new DynamoDbExtension( Tables.EC_KEYS, Tables.PQ_KEYS, Tables.REPEATED_USE_EC_SIGNED_PRE_KEYS, Tables.REPEATED_USE_KEM_SIGNED_PRE_KEYS); + @RegisterExtension + static final S3LocalStackExtension S3_EXTENSION = new S3LocalStackExtension("testbucket"); + private static final UUID ACCOUNT_UUID = UUID.randomUUID(); private static final byte DEVICE_ID = 1; @@ -38,13 +42,16 @@ class KeysManagerTest { @BeforeEach void setup() { + final DynamoDbAsyncClient dynamoDbAsyncClient = DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(); keysManager = new KeysManager( - DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), - Tables.EC_KEYS.tableName(), - Tables.PQ_KEYS.tableName(), - Tables.REPEATED_USE_EC_SIGNED_PRE_KEYS.tableName(), - Tables.REPEATED_USE_KEM_SIGNED_PRE_KEYS.tableName() - ); + new SingleUseECPreKeyStore(dynamoDbAsyncClient, Tables.EC_KEYS.tableName()), + new SingleUseKEMPreKeyStore(dynamoDbAsyncClient, Tables.PQ_KEYS.tableName()), + new PagedSingleUseKEMPreKeyStore(dynamoDbAsyncClient, + S3_EXTENSION.getS3Client(), + DynamoDbExtensionSchema.Tables.PAGED_PQ_KEYS.tableName(), + S3_EXTENSION.getBucketName()), + new RepeatedUseECSignedPreKeyStore(dynamoDbAsyncClient, Tables.REPEATED_USE_EC_SIGNED_PRE_KEYS.tableName()), + new RepeatedUseKEMSignedPreKeyStore(dynamoDbAsyncClient, Tables.REPEATED_USE_KEM_SIGNED_PRE_KEYS.tableName())); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/PagedSingleUseKEMPreKeyStoreTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/PagedSingleUseKEMPreKeyStoreTest.java new file mode 100644 index 000000000..1b3f906bc --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/PagedSingleUseKEMPreKeyStoreTest.java @@ -0,0 +1,218 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3; + +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.signal.libsignal.protocol.ecc.Curve; +import org.signal.libsignal.protocol.ecc.ECKeyPair; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import org.whispersystems.textsecuregcm.entities.KEMSignedPreKey; +import org.whispersystems.textsecuregcm.tests.util.KeysHelper; +import reactor.core.publisher.Flux; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.S3Object; + +class PagedSingleUseKEMPreKeyStoreTest { + + private static final int KEY_COUNT = 100; + private static final ECKeyPair IDENTITY_KEY_PAIR = Curve.generateKeyPair(); + private static final String BUCKET_NAME = "testbucket"; + + private PagedSingleUseKEMPreKeyStore keyStore; + + @RegisterExtension + static final DynamoDbExtension DYNAMO_DB_EXTENSION = new DynamoDbExtension( + DynamoDbExtensionSchema.Tables.PAGED_PQ_KEYS); + + @RegisterExtension + static final S3LocalStackExtension S3_EXTENSION = new S3LocalStackExtension(BUCKET_NAME); + + @BeforeEach + void setUp() { + keyStore = new PagedSingleUseKEMPreKeyStore( + DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), + S3_EXTENSION.getS3Client(), + DynamoDbExtensionSchema.Tables.PAGED_PQ_KEYS.tableName(), + BUCKET_NAME); + } + + @Test + void storeTake() { + final UUID accountIdentifier = UUID.randomUUID(); + final byte deviceId = 1; + + assertEquals(Optional.empty(), keyStore.take(accountIdentifier, deviceId).join()); + + final List preKeys = generateRandomPreKeys(); + assertDoesNotThrow(() -> keyStore.store(accountIdentifier, deviceId, preKeys).join()); + + final List sortedPreKeys = preKeys.stream() + .sorted(Comparator.comparing(preKey -> preKey.keyId())) + .toList(); + + assertEquals(Optional.of(sortedPreKeys.get(0)), keyStore.take(accountIdentifier, deviceId).join()); + assertEquals(Optional.of(sortedPreKeys.get(1)), keyStore.take(accountIdentifier, deviceId).join()); + } + + @Test + void storeTwice() { + final UUID accountIdentifier = UUID.randomUUID(); + final byte deviceId = 1; + + final List preKeys1 = generateRandomPreKeys(); + keyStore.store(accountIdentifier, deviceId, preKeys1).join(); + List oldPages = listPages(accountIdentifier).stream().map(S3Object::key).collect(Collectors.toList()); + assertEquals(1, oldPages.size()); + + final List preKeys2 = generateRandomPreKeys(); + keyStore.store(accountIdentifier, deviceId, preKeys2).join(); + List newPages = listPages(accountIdentifier).stream().map(S3Object::key).collect(Collectors.toList()); + assertEquals(1, newPages.size()); + + assertNotEquals(oldPages.getFirst(), newPages.getFirst()); + + assertEquals( + preKeys2.stream().sorted(Comparator.comparing(preKey -> preKey.keyId())).toList(), + + IntStream.range(0, preKeys2.size()) + .mapToObj(i -> keyStore.take(accountIdentifier, deviceId).join()) + .map(Optional::orElseThrow) + .toList()); + + assertTrue(keyStore.take(accountIdentifier, deviceId).join().isEmpty()); + + } + + @Test + void takeAll() { + final UUID accountIdentifier = UUID.randomUUID(); + final byte deviceId = 1; + + final List preKeys = generateRandomPreKeys(); + assertDoesNotThrow(() -> keyStore.store(accountIdentifier, deviceId, preKeys).join()); + + final List sortedPreKeys = preKeys.stream() + .sorted(Comparator.comparing(preKey -> preKey.keyId())) + .toList(); + + for (int i = 0; i < KEY_COUNT; i++) { + assertEquals(Optional.of(sortedPreKeys.get(i)), keyStore.take(accountIdentifier, deviceId).join()); + } + assertEquals(0, keyStore.getCount(accountIdentifier, deviceId).join()); + assertTrue(keyStore.take(accountIdentifier, deviceId).join().isEmpty()); + } + + @Test + void getCount() { + final UUID accountIdentifier = UUID.randomUUID(); + final byte deviceId = 1; + + assertEquals(0, keyStore.getCount(accountIdentifier, deviceId).join()); + + final List preKeys = generateRandomPreKeys(); + + keyStore.store(accountIdentifier, deviceId, preKeys).join(); + + assertEquals(KEY_COUNT, keyStore.getCount(accountIdentifier, deviceId).join()); + + for (int i = 0; i < KEY_COUNT; i++) { + keyStore.take(accountIdentifier, deviceId).join(); + assertEquals(KEY_COUNT - (i + 1), keyStore.getCount(accountIdentifier, deviceId).join()); + } + } + + @Test + void deleteSingleDevice() { + final UUID accountIdentifier = UUID.randomUUID(); + final byte deviceId = 1; + + assertEquals(0, keyStore.getCount(accountIdentifier, deviceId).join()); + assertDoesNotThrow(() -> keyStore.delete(accountIdentifier, deviceId).join()); + + final List preKeys = generateRandomPreKeys(); + + keyStore.store(accountIdentifier, deviceId, preKeys).join(); + keyStore.store(accountIdentifier, (byte) (deviceId + 1), preKeys).join(); + + assertDoesNotThrow(() -> keyStore.delete(accountIdentifier, deviceId).join()); + + assertEquals(0, keyStore.getCount(accountIdentifier, deviceId).join()); + assertEquals(KEY_COUNT, keyStore.getCount(accountIdentifier, (byte) (deviceId + 1)).join()); + + final List pages = listPages(accountIdentifier); + assertEquals(1, pages.size()); + assertTrue(pages.get(0).key().startsWith("%s/%s".formatted(accountIdentifier, deviceId + 1))); + } + + @Test + void deleteAllDevices() { + final UUID accountIdentifier = UUID.randomUUID(); + final byte deviceId = 1; + + assertEquals(0, keyStore.getCount(accountIdentifier, deviceId).join()); + assertDoesNotThrow(() -> keyStore.delete(accountIdentifier).join()); + + final List preKeys = generateRandomPreKeys(); + + keyStore.store(accountIdentifier, deviceId, preKeys).join(); + keyStore.store(accountIdentifier, (byte) (deviceId + 1), preKeys).join(); + + assertDoesNotThrow(() -> keyStore.delete(accountIdentifier).join()); + + assertEquals(0, keyStore.getCount(accountIdentifier, deviceId).join()); + assertEquals(0, keyStore.getCount(accountIdentifier, (byte) (deviceId + 1)).join()); + assertEquals(0, listPages(accountIdentifier).size()); + } + + private List listPages(final UUID identifier) { + return Flux.from(S3_EXTENSION.getS3Client().listObjectsV2Paginator(ListObjectsV2Request.builder() + .bucket(BUCKET_NAME) + .prefix(identifier.toString()) + .build())) + .concatMap(response -> Flux.fromIterable(response.contents())) + .collectList() + .block(); + } + + private List generateRandomPreKeys() { + final Set keyIds = new HashSet<>(KEY_COUNT); + + while (keyIds.size() < KEY_COUNT) { + keyIds.add(Math.abs(ThreadLocalRandom.current().nextInt())); + } + + return keyIds.stream() + .map(keyId -> KeysHelper.signedKEMPreKey(keyId, IDENTITY_KEY_PAIR)) + .toList(); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/S3LocalStackExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/S3LocalStackExtension.java new file mode 100644 index 000000000..2aa0ae58e --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/S3LocalStackExtension.java @@ -0,0 +1,93 @@ +/* + * Copyright 2021-2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3; + +import java.util.Objects; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; + +@Testcontainers +public class S3LocalStackExtension implements BeforeEachCallback, AfterEachCallback, BeforeAllCallback, + AfterAllCallback { + + private final static DockerImageName LOCAL_STACK_IMAGE = + DockerImageName.parse(Objects.requireNonNull( + System.getProperty("localstackImage"), + "Local stack image not found; must provide localstackImage system property")); + + private static LocalStackContainer LOCAL_STACK = new LocalStackContainer(LOCAL_STACK_IMAGE).withServices(S3); + + private final String bucketName; + private S3AsyncClient s3Client; + + public S3LocalStackExtension(final String bucketName) { + this.bucketName = bucketName; + } + + @Override + public void afterEach(ExtensionContext context) { + Flux.from(s3Client.listObjectsV2Paginator(ListObjectsV2Request.builder() + .bucket(bucketName) + .build()) + .contents()) + .flatMap(obj -> Mono.fromFuture(() -> s3Client.deleteObject(DeleteObjectRequest.builder() + .bucket(bucketName) + .key(obj.key()) + .build())), 100) + .then() + .block(); + s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build()).join(); + } + + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + s3Client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build()).join(); + } + + public S3AsyncClient getS3Client() { + return s3Client; + } + + @Override + public void afterAll(final ExtensionContext context) throws Exception { + s3Client.close(); + LOCAL_STACK.close(); + } + + @Override + public void beforeAll(final ExtensionContext context) throws Exception { + LOCAL_STACK.start(); + s3Client = S3AsyncClient.builder() + .endpointOverride(LOCAL_STACK.getEndpoint()) + .credentialsProvider(StaticCredentialsProvider + .create(AwsBasicCredentials.create(LOCAL_STACK.getAccessKey(), LOCAL_STACK.getSecretKey()))) + .region(Region.of(LOCAL_STACK.getRegion())) + .build(); + } + + public String getBucketName() { + return bucketName; + } +} diff --git a/service/src/test/resources/config/test.yml b/service/src/test/resources/config/test.yml index 1c2e0f800..fd353d5ad 100644 --- a/service/src/test/resources/config/test.yml +++ b/service/src/test/resources/config/test.yml @@ -135,6 +135,8 @@ dynamoDbTables: tableName: repeated_use_signed_ec_pre_keys_test pqKeys: tableName: pq_keys_test + pagedPqKeys: + tableName: paged_pq_keys_test pqLastResortKeys: tableName: repeated_use_signed_kem_pre_keys_test messages: @@ -171,6 +173,10 @@ dynamoDbTables: verificationSessions: tableName: verification_sessions_test +pagedSingleUseKEMPreKeyStore: + bucket: preKeyBucket # S3 Bucket name + region: us-west-2 # AWS region + cacheCluster: # Redis server configuration for cache cluster type: local