diff --git a/pom.xml b/pom.xml
index 3a3b6b0d2..c8684f105 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,10 @@
2.0.17
23.10.0
2.2.27
+ 1.21.1
+
+
+ localstack/localstack:3.5.0
02fc89fa8766a9ba221e69225f8d1c10bb91885ddbd3c112448e23488ba40ab6
@@ -311,6 +315,13 @@
logback-access-common
${logback-access.version}
+
+ org.testcontainers
+ testcontainers-bom
+ ${testcontainers.version}
+ pom
+ import
+
diff --git a/service/config/sample.yml b/service/config/sample.yml
index 04f8fc15d..3a461cb83 100644
--- a/service/config/sample.yml
+++ b/service/config/sample.yml
@@ -138,6 +138,8 @@ dynamoDbTables:
tableName: Example_EC_Signed_Pre_Keys
pqKeys:
tableName: Example_PQ_Keys
+ pagedPqKeys:
+ tableName: Example_PQ_Paged_Keys
pqLastResortKeys:
tableName: Example_PQ_Last_Resort_Keys
messages:
@@ -174,6 +176,10 @@ dynamoDbTables:
verificationSessions:
tableName: Example_VerificationSessions
+pagedSingleUseKEMPreKeyStore:
+ bucket: preKeyBucket # S3 Bucket name
+ region: us-west-2 # AWS region
+
cacheCluster: # Redis server configuration for cache cluster
configurationUri: redis://redis.example.com:6379/
diff --git a/service/pom.xml b/service/pom.xml
index 64fe98deb..a199fa889 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -485,6 +485,18 @@
test
+
+ org.testcontainers
+ localstack
+ test
+
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
com.google.auth
google-auth-library-oauth2-http
@@ -712,6 +724,9 @@
-javaagent:${org.mockito:mockito-core:jar} --add-opens=java.base/java.net=ALL-UNNAMED
+
+ ${localstack.image}
+
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
index ec70bc1c7..b7cc1f66b 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
@@ -45,6 +45,7 @@ import org.whispersystems.textsecuregcm.configuration.MessageByteLimitCardinalit
import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration;
import org.whispersystems.textsecuregcm.configuration.NoiseTunnelConfiguration;
import org.whispersystems.textsecuregcm.configuration.OneTimeDonationConfiguration;
+import org.whispersystems.textsecuregcm.configuration.PagedSingleUseKEMPreKeyStoreConfiguration;
import org.whispersystems.textsecuregcm.configuration.PaymentsServiceConfiguration;
import org.whispersystems.textsecuregcm.configuration.RegistrationServiceClientFactory;
import org.whispersystems.textsecuregcm.configuration.RemoteConfigConfiguration;
@@ -257,6 +258,11 @@ public class WhisperServerConfiguration extends Configuration {
@NotNull
private OneTimeDonationConfiguration oneTimeDonations;
+ @Valid
+ @JsonProperty
+ @NotNull
+ private PagedSingleUseKEMPreKeyStoreConfiguration pagedSingleUseKEMPreKeyStore;
+
@Valid
@NotNull
@JsonProperty
@@ -478,6 +484,10 @@ public class WhisperServerConfiguration extends Configuration {
return oneTimeDonations;
}
+ public PagedSingleUseKEMPreKeyStoreConfiguration getPagedSingleUseKEMPreKeyStore() {
+ return pagedSingleUseKEMPreKeyStore;
+ }
+
public ReportMessageConfiguration getReportMessageConfiguration() {
return reportMessage;
}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
index 64a2a6560..a887fff57 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
@@ -225,6 +225,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.OneTimeDonationsManager;
+import org.whispersystems.textsecuregcm.storage.PagedSingleUseKEMPreKeyStore;
import org.whispersystems.textsecuregcm.storage.PersistentTimer;
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
import org.whispersystems.textsecuregcm.storage.Profiles;
@@ -235,8 +236,12 @@ import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswords;
import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager;
import org.whispersystems.textsecuregcm.storage.RemoteConfigs;
import org.whispersystems.textsecuregcm.storage.RemoteConfigsManager;
+import org.whispersystems.textsecuregcm.storage.RepeatedUseECSignedPreKeyStore;
+import org.whispersystems.textsecuregcm.storage.RepeatedUseKEMSignedPreKeyStore;
import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb;
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
+import org.whispersystems.textsecuregcm.storage.SingleUseECPreKeyStore;
+import org.whispersystems.textsecuregcm.storage.SingleUseKEMPreKeyStore;
import org.whispersystems.textsecuregcm.storage.SubscriptionManager;
import org.whispersystems.textsecuregcm.storage.Subscriptions;
import org.whispersystems.textsecuregcm.storage.VerificationSessionManager;
@@ -425,13 +430,21 @@ public class WhisperServerService extends Application preKeys) {
+ if (format != FORMAT) {
+ throw new IllegalArgumentException("Unknown format: " + format + ", must be " + FORMAT);
+ }
+
+ if (preKeys.isEmpty()) {
+ throw new IllegalArgumentException("PreKeys cannot be empty");
+ }
+ final ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE + SERIALIZED_PREKEY_LENGTH * preKeys.size());
+ buffer.putLong(HEADER);
+ for (KEMSignedPreKey preKey : preKeys) {
+
+ buffer.putLong(preKey.keyId());
+
+ final byte[] publicKeyBytes = preKey.serializedPublicKey();
+ if (publicKeyBytes[0] != KEM_KEY_TYPE_KYBER_1024) {
+ // 0x08 is libsignal's current KEM key format. If some future version of libsignal supports additional KEM
+ // keys, we'll have to roll out read support before rolling out write support. Otherwise, we may write keys
+ // to storage that are not readable by other chat instances.
+ throw new IllegalArgumentException("Format 1 only supports " + KEM_KEY_TYPE_KYBER_1024 + " public keys");
+ }
+ if (publicKeyBytes.length != SERIALIZED_PUBKEY_LENGTH) {
+ throw new IllegalArgumentException("Unexpected public key length " + publicKeyBytes.length);
+ }
+ buffer.put(publicKeyBytes);
+
+ if (preKey.signature().length != SERIALIZED_SIGNATURE_LENGTH) {
+ throw new IllegalArgumentException("prekey signature length must be " + SERIALIZED_SIGNATURE_LENGTH);
+ }
+ buffer.put(preKey.signature());
+ }
+ buffer.flip();
+ return buffer;
+ }
+
+ /**
+ * Deserialize a single {@link KEMSignedPreKey}
+ *
+ * @param format The format of the page this buffer is from
+ * @param buffer The key to deserialize. The position of the buffer should be the start of the key, and the limit of
+ * the buffer should be the end of the key. After a successful deserialization the position of the
+ * buffer will be the limit
+ * @return The deserialized key
+ * @throws InvalidKeyException
+ */
+ static KEMSignedPreKey deserializeKey(int format, ByteBuffer buffer) throws InvalidKeyException {
+ if (format != FORMAT) {
+ throw new IllegalArgumentException("Unknown prekey page format " + format);
+ }
+ if (buffer.remaining() != SERIALIZED_PREKEY_LENGTH) {
+ throw new IllegalArgumentException("PreKeys must be length " + SERIALIZED_PREKEY_LENGTH);
+ }
+ final long keyId = buffer.getLong();
+
+ final byte[] publicKeyBytes = new byte[SERIALIZED_PUBKEY_LENGTH];
+ buffer.get(publicKeyBytes);
+ final KEMPublicKey kemPublicKey = new KEMPublicKey(publicKeyBytes);
+
+ final byte[] signature = new byte[SERIALIZED_SIGNATURE_LENGTH];
+ buffer.get(signature);
+ return new KEMSignedPreKey(keyId, kemPublicKey, signature);
+ }
+
+ /**
+ * The location of a specific key within a serialized page
+ */
+ record KeyLocation(int start, int length) {
+
+ int getStartInclusive() {
+ return start;
+ }
+
+ int getEndInclusive() {
+ return start + length - 1;
+ }
+ }
+
+ /**
+ * Get the location of the key at the provided index within a page
+ *
+ * @param format The format of the page
+ * @param index The index of the key to retrieve
+ * @return An {@link KeyLocation} indicating where within the page the key is
+ */
+ static KeyLocation keyLocation(final int format, final int index) {
+ if (format != FORMAT) {
+ throw new IllegalArgumentException("unknown format " + format);
+ }
+ final int startOffset = HEADER_SIZE + (index * SERIALIZED_PREKEY_LENGTH);
+ return new KeyLocation(startOffset, SERIALIZED_PREKEY_LENGTH);
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysManager.java
index 40ecc6fab..602ae61d0 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysManager.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysManager.java
@@ -12,26 +12,27 @@ import java.util.concurrent.CompletableFuture;
import org.whispersystems.textsecuregcm.entities.ECPreKey;
import org.whispersystems.textsecuregcm.entities.ECSignedPreKey;
import org.whispersystems.textsecuregcm.entities.KEMSignedPreKey;
-import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
public class KeysManager {
private final SingleUseECPreKeyStore ecPreKeys;
private final SingleUseKEMPreKeyStore pqPreKeys;
+ private final PagedSingleUseKEMPreKeyStore pagedPqPreKeys;
private final RepeatedUseECSignedPreKeyStore ecSignedPreKeys;
private final RepeatedUseKEMSignedPreKeyStore pqLastResortKeys;
public KeysManager(
- final DynamoDbAsyncClient dynamoDbAsyncClient,
- final String ecTableName,
- final String pqTableName,
- final String ecSignedPreKeysTableName,
- final String pqLastResortTableName) {
- this.ecPreKeys = new SingleUseECPreKeyStore(dynamoDbAsyncClient, ecTableName);
- this.pqPreKeys = new SingleUseKEMPreKeyStore(dynamoDbAsyncClient, pqTableName);
- this.ecSignedPreKeys = new RepeatedUseECSignedPreKeyStore(dynamoDbAsyncClient, ecSignedPreKeysTableName);
- this.pqLastResortKeys = new RepeatedUseKEMSignedPreKeyStore(dynamoDbAsyncClient, pqLastResortTableName);
+ final SingleUseECPreKeyStore ecPreKeys,
+ final SingleUseKEMPreKeyStore pqPreKeys,
+ final PagedSingleUseKEMPreKeyStore pagedPqPreKeys,
+ final RepeatedUseECSignedPreKeyStore ecSignedPreKeys,
+ final RepeatedUseKEMSignedPreKeyStore pqLastResortKeys) {
+ this.ecPreKeys = ecPreKeys;
+ this.pqPreKeys = pqPreKeys;
+ this.pagedPqPreKeys = pagedPqPreKeys;
+ this.ecSignedPreKeys = ecSignedPreKeys;
+ this.pqLastResortKeys = pqLastResortKeys;
}
public TransactWriteItem buildWriteItemForEcSignedPreKey(final UUID identifier,
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PagedSingleUseKEMPreKeyStore.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PagedSingleUseKEMPreKeyStore.java
new file mode 100644
index 000000000..3ebd1cecf
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PagedSingleUseKEMPreKeyStore.java
@@ -0,0 +1,367 @@
+/*
+ * Copyright 2023 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+package org.whispersystems.textsecuregcm.storage;
+
+import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
+
+import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.signal.libsignal.protocol.InvalidKeyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.textsecuregcm.entities.KEMSignedPreKey;
+import org.whispersystems.textsecuregcm.util.AttributeValues;
+import org.whispersystems.textsecuregcm.util.ExceptionUtils;
+import org.whispersystems.textsecuregcm.util.Util;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+
+/**
+ * @implNote This version of a {@link SingleUsePreKeyStore} store bundles prekeys into "pages", which are stored in on
+ * an object store and referenced via dynamodb. Each device may only have a single active page at a time. Crashes or
+ * errors may leave orphaned pages which are no longer referenced by the database. A background process must
+ * periodically check for orphaned pages and remove them.
+ * @see SingleUsePreKeyStore
+ */
+public class PagedSingleUseKEMPreKeyStore {
+
+ private static final Logger log = LoggerFactory.getLogger(PagedSingleUseKEMPreKeyStore.class);
+
+ private final DynamoDbAsyncClient dynamoDbAsyncClient;
+ private final S3AsyncClient s3AsyncClient;
+ private final String tableName;
+ private final String bucketName;
+
+ private final Timer getKeyCountTimer = Metrics.timer(name(getClass(), "getCount"));
+ private final Timer storeKeyBatchTimer = Metrics.timer(name(getClass(), "storeKeyBatch"));
+ private final Timer deleteForDeviceTimer = Metrics.timer(name(getClass(), "deleteForDevice"));
+ private final Timer deleteForAccountTimer = Metrics.timer(name(getClass(), "deleteForAccount"));
+
+ final DistributionSummary availableKeyCountDistributionSummary = DistributionSummary
+ .builder(name(getClass(), "availableKeyCount"))
+ .publishPercentileHistogram()
+ .register(Metrics.globalRegistry);
+
+ private final String takeKeyTimerName = name(getClass(), "takeKey");
+ private static final String KEY_PRESENT_TAG_NAME = "keyPresent";
+
+ static final String KEY_ACCOUNT_UUID = "U";
+ static final String KEY_DEVICE_ID = "D";
+ static final String ATTR_PAGE_ID = "ID";
+ static final String ATTR_PAGE_IDX = "I";
+ static final String ATTR_PAGE_NUM_KEYS = "N";
+ static final String ATTR_PAGE_FORMAT_VERSION = "F";
+
+ public PagedSingleUseKEMPreKeyStore(
+ final DynamoDbAsyncClient dynamoDbAsyncClient,
+ final S3AsyncClient s3AsyncClient,
+ final String tableName,
+ final String bucketName) {
+ this.s3AsyncClient = s3AsyncClient;
+ this.dynamoDbAsyncClient = dynamoDbAsyncClient;
+ this.tableName = tableName;
+ this.bucketName = bucketName;
+ }
+
+ /**
+ * Stores a batch of single-use pre-keys for a specific device. All previously-stored keys for the device are cleared
+ * before storing new keys.
+ *
+ * @param identifier the identifier for the account/identity with which the target device is associated
+ * @param deviceId the identifier for the device within the given account/identity
+ * @param preKeys a collection of single-use pre-keys to store for the target device
+ * @return a future that completes when all previously-stored keys have been removed and the given collection of
+ * pre-keys has been stored in its place
+ */
+ public CompletableFuture store(
+ final UUID identifier, final byte deviceId, final List preKeys) {
+ final Timer.Sample sample = Timer.start();
+
+ final List sorted = preKeys.stream().sorted(Comparator.comparing(KEMSignedPreKey::keyId)).toList();
+
+ final int bundleFormat = KEMPreKeyPage.FORMAT;
+ final ByteBuffer bundle = KEMPreKeyPage.serialize(KEMPreKeyPage.FORMAT, sorted);
+
+ // Write the bundle to S3, then update the database. Delete the S3 object that was in the database before. This can
+ // leave orphans in S3 if we fail to update after writing to S3, or fail to delete the old page. However, it can
+ // never leave a broken pointer in the database. To keep this invariant, we must make sure to generate a new
+ // name for the page any time we were to retry this entire operation.
+ return writeBundleToS3(identifier, deviceId, bundle)
+ .thenCompose(pageId -> dynamoDbAsyncClient.putItem(PutItemRequest.builder()
+ .tableName(tableName)
+ .item(Map.of(
+ KEY_ACCOUNT_UUID, AttributeValues.fromUUID(identifier),
+ KEY_DEVICE_ID, AttributeValues.fromInt(deviceId),
+ ATTR_PAGE_ID, AttributeValues.fromUUID(pageId),
+ ATTR_PAGE_IDX, AttributeValues.fromInt(0),
+ ATTR_PAGE_NUM_KEYS, AttributeValues.fromInt(sorted.size()),
+ ATTR_PAGE_FORMAT_VERSION, AttributeValues.fromInt(bundleFormat)
+ ))
+ .returnValues(ReturnValue.ALL_OLD)
+ .build()))
+ .thenCompose(response -> {
+ if (response.hasAttributes()) {
+ final UUID pageId = AttributeValues.getUUID(response.attributes(), ATTR_PAGE_ID, null);
+ if (pageId == null) {
+ log.error("Replaced record: {} with no pageId", response.attributes());
+ return CompletableFuture.completedFuture(null);
+ }
+ return deleteBundleFromS3(identifier, deviceId, pageId);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ })
+ .whenComplete((result, error) -> sample.stop(storeKeyBatchTimer));
+ }
+
+ /**
+ * Attempts to retrieve a single-use pre-key for a specific device. Keys may only be returned by this method at most
+ * once; once the key is returned, it is removed from the key store and subsequent calls to this method will never
+ * return the same key.
+ *
+ * @param identifier the identifier for the account/identity with which the target device is associated
+ * @param deviceId the identifier for the device within the given account/identity
+ * @return a future that yields a single-use pre-key if one is available or empty if no single-use pre-keys are
+ * available for the target device
+ */
+ public CompletableFuture> take(final UUID identifier, final byte deviceId) {
+ final Timer.Sample sample = Timer.start();
+
+ return dynamoDbAsyncClient.updateItem(UpdateItemRequest.builder()
+ .tableName(tableName)
+ .key(Map.of(
+ KEY_ACCOUNT_UUID, AttributeValues.fromUUID(identifier),
+ KEY_DEVICE_ID, AttributeValues.fromInt(deviceId)))
+ .updateExpression("SET #index = #index + :one")
+ .conditionExpression("#id = :id AND #index < #numkeys")
+ .expressionAttributeNames(Map.of(
+ "#id", KEY_ACCOUNT_UUID,
+ "#index", ATTR_PAGE_IDX,
+ "#numkeys", ATTR_PAGE_NUM_KEYS))
+ .expressionAttributeValues(Map.of(
+ ":one", AttributeValues.n(1),
+ ":id", AttributeValues.fromUUID(identifier)))
+ .returnValues(ReturnValue.ALL_OLD)
+ .build())
+ .thenCompose(updateItemResponse -> {
+ if (!updateItemResponse.hasAttributes()) {
+ throw new IllegalStateException("update succeeded but did not return an item");
+ }
+
+ final int index = AttributeValues.getInt(updateItemResponse.attributes(), ATTR_PAGE_IDX, -1);
+ final UUID pageId = AttributeValues.getUUID(updateItemResponse.attributes(), ATTR_PAGE_ID, null);
+ final int format = AttributeValues.getInt(updateItemResponse.attributes(), ATTR_PAGE_FORMAT_VERSION, -1);
+ if (index < 0 || format < 0 || pageId == null) {
+ throw new CompletionException(
+ new IOException("unexpected page descriptor " + updateItemResponse.attributes()));
+ }
+
+ return readPreKeyAtIndexFromS3(identifier, deviceId, pageId, format, index).thenApply(Optional::of);
+ })
+ // If this check fails, it means that the item did not exist, or its index was already at the last key. Either
+ // way, there are no keys left so we return empty
+ .exceptionally(ExceptionUtils.exceptionallyHandler(
+ ConditionalCheckFailedException.class,
+ e -> Optional.empty()))
+ .whenComplete((maybeKey, throwable) ->
+ sample.stop(Metrics.timer(
+ takeKeyTimerName,
+ KEY_PRESENT_TAG_NAME, String.valueOf(maybeKey != null && maybeKey.isPresent()))));
+ }
+
+ /**
+ * Returns the number of single-use pre-keys available for a given device.
+ *
+ * @param identifier the identifier for the account/identity with which the target device is associated
+ * @param deviceId the identifier for the device within the given account/identity
+ * @return a future that yields the approximate number of single-use pre-keys currently available for the target
+ * device
+ */
+ public CompletableFuture getCount(final UUID identifier, final byte deviceId) {
+ final Timer.Sample sample = Timer.start();
+
+ return dynamoDbAsyncClient.getItem(GetItemRequest.builder()
+ .tableName(tableName)
+ .key(Map.of(
+ KEY_ACCOUNT_UUID, AttributeValues.fromUUID(identifier),
+ KEY_DEVICE_ID, AttributeValues.fromInt(deviceId)))
+ .consistentRead(true)
+ .projectionExpression("#total, #index")
+ .expressionAttributeNames(Map.of(
+ "#total", ATTR_PAGE_NUM_KEYS,
+ "#index", ATTR_PAGE_IDX))
+ .build())
+ .thenApply(getResponse -> {
+ if (!getResponse.hasItem()) {
+ return 0;
+ }
+ final int numKeys = AttributeValues.getInt(getResponse.item(), ATTR_PAGE_NUM_KEYS, -1);
+ final int index = AttributeValues.getInt(getResponse.item(), ATTR_PAGE_IDX, -1);
+ if (numKeys < 0 || index < 0 || index > numKeys) {
+ log.error("unexpected index/length in page descriptor: {}", getResponse.item());
+ return 0;
+ }
+
+ return numKeys - index;
+ })
+ .whenComplete((keyCount, throwable) -> {
+ sample.stop(getKeyCountTimer);
+
+ if (throwable == null && keyCount != null) {
+ availableKeyCountDistributionSummary.record(keyCount);
+ }
+ });
+ }
+
+ /**
+ * Removes all single-use pre-keys for all devices associated with the given account/identity.
+ *
+ * @param identifier the identifier for the account/identity for which to remove single-use pre-keys
+ * @return a future that completes when all single-use pre-keys have been removed for all devices associated with the
+ * given account/identity
+ */
+ public CompletableFuture delete(final UUID identifier) {
+ final Timer.Sample sample = Timer.start();
+
+ return deleteItems(identifier, Flux.from(dynamoDbAsyncClient.queryPaginator(QueryRequest.builder()
+ .tableName(tableName)
+ .keyConditionExpression("#uuid = :uuid")
+ .projectionExpression("#uuid,#deviceid,#pageid")
+ .expressionAttributeNames(Map.of(
+ "#uuid", KEY_ACCOUNT_UUID,
+ "#deviceid", KEY_DEVICE_ID,
+ "#pageid", ATTR_PAGE_ID))
+ .expressionAttributeValues(Map.of(":uuid", AttributeValues.fromUUID(identifier)))
+ .consistentRead(true)
+ .build())
+ .items()))
+ .thenRun(() -> sample.stop(deleteForAccountTimer));
+ }
+
+ /**
+ * Removes all single-use pre-keys for a specific device.
+ *
+ * @param identifier the identifier for the account/identity with which the target device is associated
+ * @param deviceId the identifier for the device within the given account/identity
+ * @return a future that completes when all single-use pre-keys have been removed for the target device
+ */
+ public CompletableFuture delete(final UUID identifier, final byte deviceId) {
+ final Timer.Sample sample = Timer.start();
+
+ return dynamoDbAsyncClient.getItem(GetItemRequest.builder()
+ .tableName(tableName)
+ .key(Map.of(
+ KEY_ACCOUNT_UUID, AttributeValues.fromUUID(identifier),
+ KEY_DEVICE_ID, AttributeValues.fromInt(deviceId)))
+ .consistentRead(true)
+ .projectionExpression("#uuid,#deviceid,#pageid")
+ .expressionAttributeNames(Map.of(
+ "#uuid", KEY_ACCOUNT_UUID,
+ "#deviceid", KEY_DEVICE_ID,
+ "#pageid", ATTR_PAGE_ID))
+ .build())
+ .thenCompose(getItemResponse -> deleteItems(identifier, getItemResponse.hasItem()
+ ? Flux.just(getItemResponse.item())
+ : Flux.empty()))
+ .thenRun(() -> sample.stop(deleteForDeviceTimer));
+ }
+
+ private CompletableFuture deleteItems(final UUID identifier,
+ final Flux