From c545cff1b3b02b5c7aa8d4ee0b7b6137dc25a7c0 Mon Sep 17 00:00:00 2001 From: Graeme Connell Date: Mon, 24 May 2021 16:43:56 -0600 Subject: [PATCH] Switch DynamoDB to AWSv2. Switch from using com.amazonaws.services.dynamodbv2 to using software.amazon.awssdk.services.dynamodb for all current DynamoDB uses. --- service/pom.xml | 8 +- .../textsecuregcm/WhisperServerService.java | 94 ++--- .../storage/AbstractDynamoDbStore.java | 58 ++- .../storage/AccountsDynamoDb.java | 286 +++++++-------- .../storage/AccountsManager.java | 2 +- .../textsecuregcm/storage/KeysDynamoDb.java | 337 ++++++++++-------- .../storage/MessagesDynamoDb.java | 256 +++++++------ .../storage/MigrationDeletedAccounts.java | 59 +-- .../storage/MigrationRetryAccounts.java | 37 +- .../storage/PushChallengeDynamoDb.java | 48 +-- .../storage/ReportMessageDynamoDb.java | 52 ++- .../textsecuregcm/util/AttributeValues.java | 89 +++++ .../util/DynamoDbFromConfig.java | 41 +++ .../textsecuregcm/util/UUIDUtil.java | 16 +- .../workers/DeleteUserCommand.java | 82 +---- .../storage/AccountsDynamoDbTest.java | 158 ++++---- .../storage/DynamoDbExtension.java | 102 +++--- .../storage/KeysDynamoDbRule.java | 40 ++- .../storage/KeysDynamoDbTest.java | 10 +- .../MessagePersisterIntegrationTest.java | 34 +- .../storage/MigrationDeletedAccountsTest.java | 11 +- .../storage/MigrationRetryAccountsTest.java | 11 +- .../storage/PushChallengeDynamoDbTest.java | 11 +- .../storage/ReportMessageDynamoDbTest.java | 11 +- .../tests/storage/AccountsManagerTest.java | 2 +- .../tests/storage/MessagesDynamoDbTest.java | 2 +- .../tests/util/LocalDynamoDbRule.java | 23 +- .../tests/util/MessagesDynamoDbRule.java | 47 +-- .../textsecuregcm/util/AsnTableTest.java | 1 - .../util/AttributeValuesTest.java | 60 ++++ .../WebSocketConnectionIntegrationTest.java | 2 +- 31 files changed, 1114 insertions(+), 876 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/util/AttributeValues.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/util/DynamoDbFromConfig.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/util/AttributeValuesTest.java diff --git a/service/pom.xml b/service/pom.xml index 233429282..c8bf0a5d5 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -256,6 +256,10 @@ software.amazon.awssdk s3 + + software.amazon.awssdk + dynamodb + com.amazonaws aws-java-sdk-core @@ -272,10 +276,6 @@ com.amazonaws aws-java-sdk-appconfig - - com.amazonaws - aws-java-sdk-dynamodb - redis.clients diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 98b051d98..85f8c4a02 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -12,11 +12,6 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.InstanceProfileCredentialsProvider; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; import com.codahale.metrics.SharedMetricRegistries; @@ -194,6 +189,7 @@ import org.whispersystems.textsecuregcm.storage.Usernames; import org.whispersystems.textsecuregcm.storage.UsernamesManager; import org.whispersystems.textsecuregcm.util.AsnManager; import org.whispersystems.textsecuregcm.util.Constants; +import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig; import org.whispersystems.textsecuregcm.util.TorExitNodeManager; import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener; import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler; @@ -210,6 +206,14 @@ import org.whispersystems.textsecuregcm.workers.VacuumCommand; import org.whispersystems.textsecuregcm.workers.ZkParamsCommand; import org.whispersystems.websocket.WebSocketResourceProviderFactory; import org.whispersystems.websocket.setup.WebSocketEnvironment; +import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder; public class WhisperServerService extends Application { @@ -316,80 +320,40 @@ public class WhisperServerService extends Application()); - AmazonDynamoDBAsyncClientBuilder accountsDynamoDbAsyncClientBuilder = AmazonDynamoDBAsyncClientBuilder - .standard() - .withRegion(accountsDynamoDbClientBuilder.getRegion()) - .withClientConfiguration(accountsDynamoDbClientBuilder.getClientConfiguration()) - .withCredentials(accountsDynamoDbClientBuilder.getCredentials()) - .withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool); + DynamoDbAsyncClient accountsDynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(config.getAccountsDynamoDbConfiguration(), + software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create(), + accountsDynamoDbMigrationThreadPool); - AmazonDynamoDBClientBuilder migrationDeletedAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder - .standard() - .withRegion(config.getMigrationDeletedAccountsDynamoDbConfiguration().getRegion()) - .withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getMigrationDeletedAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis())) - .withRequestTimeout((int) config.getMigrationDeletedAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis())) - .withCredentials(InstanceProfileCredentialsProvider.getInstance()); + DynamoDbClient recentlyDeletedAccountsDynamoDb = DynamoDbFromConfig.client(config.getMigrationDeletedAccountsDynamoDbConfiguration(), + software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); - AmazonDynamoDBClientBuilder migrationRetryAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder - .standard() - .withRegion(config.getMigrationRetryAccountsDynamoDbConfiguration().getRegion()) - .withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getMigrationRetryAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis())) - .withRequestTimeout((int) config.getMigrationRetryAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis())) - .withCredentials(InstanceProfileCredentialsProvider.getInstance()); + DynamoDbClient pushChallengeDynamoDbClient = DynamoDbFromConfig.client(config.getPushChallengeDynamoDbConfiguration(), + software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); - AmazonDynamoDBClientBuilder pushChallengeDynamoDbClientBuilder = AmazonDynamoDBClientBuilder - .standard() - .withRegion(config.getPushChallengeDynamoDbConfiguration().getRegion()) - .withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getPushChallengeDynamoDbConfiguration().getClientExecutionTimeout().toMillis())) - .withRequestTimeout((int) config.getPushChallengeDynamoDbConfiguration().getClientRequestTimeout().toMillis())) - .withCredentials(InstanceProfileCredentialsProvider.getInstance()); + DynamoDbClient reportMessageDynamoDbClient = DynamoDbFromConfig.client(config.getReportMessageDynamoDbConfiguration(), + software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); - AmazonDynamoDBClientBuilder reportMessageDynamoDbClientBuilder = AmazonDynamoDBClientBuilder - .standard() - .withRegion(config.getReportMessageDynamoDbConfiguration().getRegion()) - .withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getReportMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis())) - .withRequestTimeout((int) config.getReportMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis())) - .withCredentials(InstanceProfileCredentialsProvider.getInstance()); - - DynamoDB messageDynamoDb = new DynamoDB(messageDynamoDbClientBuilder.build()); - DynamoDB preKeyDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build()); - - AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build(); - AmazonDynamoDBAsync accountsDynamodbAsyncClient = accountsDynamoDbAsyncClientBuilder.build(); - - DynamoDB recentlyDeletedAccountsDynamoDb = new DynamoDB(migrationDeletedAccountsDynamoDbClientBuilder.build()); - DynamoDB migrationRetryAccountsDynamoDb = new DynamoDB(migrationRetryAccountsDynamoDbClientBuilder.build()); + DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(config.getMigrationRetryAccountsDynamoDbConfiguration(), + software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(recentlyDeletedAccountsDynamoDb, config.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName()); MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, config.getMigrationRetryAccountsDynamoDbConfiguration().getTableName()); Accounts accounts = new Accounts(accountDatabase); - AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamodbAsyncClient, accountsDynamoDbMigrationThreadPool, new DynamoDB(accountsDynamoDbClient), config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts); + AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient, accountsDynamoDbMigrationThreadPool, config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts); PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase); PendingDevices pendingDevices = new PendingDevices (accountDatabase); Usernames usernames = new Usernames(accountDatabase); @@ -399,8 +363,8 @@ public class WhisperServerService extends Application outcome = new AtomicReference<>(); - batchWriteItemsFirstPass.record(() -> outcome.set(dynamoDb.batchWriteItem(items))); + protected void executeTableWriteItemsUntilComplete(final Map> items) { + AtomicReference outcome = new AtomicReference<>(); + batchWriteItemsFirstPass.record(() -> outcome.set(dynamoDbClient.batchWriteItem(BatchWriteItemRequest.builder().requestItems(items).build()))); int attemptCount = 0; - while (!outcome.get().getUnprocessedItems().isEmpty() && attemptCount < MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE) { - batchWriteItemsRetryPass.record(() -> outcome.set(dynamoDb.batchWriteItemUnprocessed(outcome.get().getUnprocessedItems()))); + while (!outcome.get().unprocessedItems().isEmpty() && attemptCount < MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE) { + batchWriteItemsRetryPass.record(() -> outcome.set(dynamoDbClient.batchWriteItem(BatchWriteItemRequest.builder() + .requestItems(outcome.get().unprocessedItems()) + .build()))); ++attemptCount; } - if (!outcome.get().getUnprocessedItems().isEmpty()) { - logger.error("Attempt count ({}) reached max ({}}) before applying all batch writes to dynamo. {} unprocessed items remain.", attemptCount, MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE, outcome.get().getUnprocessedItems().size()); - batchWriteItemsUnprocessed.increment(outcome.get().getUnprocessedItems().size()); + if (!outcome.get().unprocessedItems().isEmpty()) { + int totalItems = outcome.get().unprocessedItems().values().stream().mapToInt(List::size).sum(); + logger.error("Attempt count ({}) reached max ({}}) before applying all batch writes to dynamo. {} unprocessed items remain.", attemptCount, MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE, totalItems); + batchWriteItemsUnprocessed.increment(totalItems); } } - protected long countItemsMatchingQuery(final Table table, final QuerySpec querySpec) { - // This is very confusing, but does appear to be the intended behavior. See: - // - // - https://github.com/aws/aws-sdk-java/issues/693 - // - https://github.com/aws/aws-sdk-java/issues/915 - // - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.Count - - long matchingItems = 0; - - for (final Page page : table.query(querySpec).pages()) { - matchingItems += page.getLowLevelResult().getQueryResult().getCount(); - } - - return matchingItems; - } - static void writeInBatches(final Iterable items, final Consumer> action) { final List batch = new ArrayList<>(DYNAMO_DB_MAX_BATCH_SIZE); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java index 3db2695af..1d5faebd9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java @@ -2,25 +2,6 @@ package org.whispersystems.textsecuregcm.storage; import static com.codahale.metrics.MetricRegistry.name; -import com.amazonaws.handlers.AsyncHandler; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.PrimaryKey; -import com.amazonaws.services.dynamodbv2.document.Table; -import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec; -import com.amazonaws.services.dynamodbv2.model.AttributeValue; -import com.amazonaws.services.dynamodbv2.model.CancellationReason; -import com.amazonaws.services.dynamodbv2.model.Delete; -import com.amazonaws.services.dynamodbv2.model.GetItemResult; -import com.amazonaws.services.dynamodbv2.model.Put; -import com.amazonaws.services.dynamodbv2.model.ReturnValuesOnConditionCheckFailure; -import com.amazonaws.services.dynamodbv2.model.TransactWriteItem; -import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsRequest; -import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsResult; -import com.amazonaws.services.dynamodbv2.model.TransactionCanceledException; -import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import io.micrometer.core.instrument.Counter; @@ -33,12 +14,27 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.UUIDUtil; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.CancellationReason; +import software.amazon.awssdk.services.dynamodb.model.Delete; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.Put; +import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure; +import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem; +import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest; +import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountStore { @@ -51,9 +47,8 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt static final String ATTR_MIGRATION_VERSION = "V"; - private final AmazonDynamoDB client; - private final Table accountsTable; - private final AmazonDynamoDBAsync asyncClient; + private final DynamoDbClient client; + private final DynamoDbAsyncClient asyncClient; private final ThreadPoolExecutor migrationThreadPool; @@ -61,6 +56,7 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt private final MigrationRetryAccounts migrationRetryAccounts; private final String phoneNumbersTableName; + private final String accountsTableName; private static final Timer CREATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "create")); private static final Timer UPDATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "update")); @@ -70,18 +66,17 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt private final Logger logger = LoggerFactory.getLogger(AccountsDynamoDb.class); - public AccountsDynamoDb(AmazonDynamoDB client, AmazonDynamoDBAsync asyncClient, - ThreadPoolExecutor migrationThreadPool, DynamoDB dynamoDb, String accountsTableName, String phoneNumbersTableName, + public AccountsDynamoDb(DynamoDbClient client, DynamoDbAsyncClient asyncClient, + ThreadPoolExecutor migrationThreadPool, String accountsTableName, String phoneNumbersTableName, MigrationDeletedAccounts migrationDeletedAccounts, MigrationRetryAccounts accountsMigrationErrors) { - super(dynamoDb); + super(client); this.client = client; - this.accountsTable = dynamoDb.getTable(accountsTableName); - this.phoneNumbersTableName = phoneNumbersTableName; - this.asyncClient = asyncClient; + this.phoneNumbersTableName = phoneNumbersTableName; + this.accountsTableName = accountsTableName; this.migrationThreadPool = migrationThreadPool; this.migrationDeletedAccounts = migrationDeletedAccounts; @@ -90,32 +85,34 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt @Override public boolean create(Account account) { - return CREATE_TIMER.record(() -> { try { TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid()); + TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid(), Put.builder() + .conditionExpression("attribute_not_exists(#number) OR #number = :number") + .expressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164)) + .expressionAttributeValues(Map.of(":number", AttributeValues.fromString(account.getNumber())))); - TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid()); - - final TransactWriteItemsRequest request = new TransactWriteItemsRequest() - .withTransactItems(phoneNumberConstraintPut, accountPut); + final TransactWriteItemsRequest request = TransactWriteItemsRequest.builder() + .transactItems(phoneNumberConstraintPut, accountPut) + .build(); try { client.transactWriteItems(request); } catch (TransactionCanceledException e) { - final CancellationReason accountCancellationReason = e.getCancellationReasons().get(1); + final CancellationReason accountCancellationReason = e.cancellationReasons().get(1); - if ("ConditionalCheckFailed".equals(accountCancellationReason.getCode())) { + if ("ConditionalCheckFailed".equals(accountCancellationReason.code())) { throw new IllegalArgumentException("uuid present with different phone number"); } - final CancellationReason phoneNumberConstraintCancellationReason = e.getCancellationReasons().get(0); + final CancellationReason phoneNumberConstraintCancellationReason = e.cancellationReasons().get(0); - if ("ConditionalCheckFailed".equals(phoneNumberConstraintCancellationReason.getCode())) { + if ("ConditionalCheckFailed".equals(phoneNumberConstraintCancellationReason.code())) { - ByteBuffer actualAccountUuid = phoneNumberConstraintCancellationReason.getItem().get(KEY_ACCOUNT_UUID).getB(); + ByteBuffer actualAccountUuid = phoneNumberConstraintCancellationReason.item().get(KEY_ACCOUNT_UUID).b().asByteBuffer(); account.setUuid(UUIDUtil.fromByteBuffer(actualAccountUuid)); update(account); @@ -134,39 +131,37 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt }); } - private TransactWriteItem buildPutWriteItemForAccount(Account account, UUID uuid) throws JsonProcessingException { - return new TransactWriteItem() - .withPut( - new Put() - .withTableName(accountsTable.getTableName()) - .withItem(Map.of( - KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)), - ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()), - ATTR_ACCOUNT_DATA, new AttributeValue() - .withB(ByteBuffer.wrap(SystemMapper.getMapper().writeValueAsBytes(account))), - ATTR_MIGRATION_VERSION, new AttributeValue().withN( - String.valueOf(account.getDynamoDbMigrationVersion())))) - .withConditionExpression("attribute_not_exists(#number) OR #number = :number") - .withExpressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164)) - .withExpressionAttributeValues(Map.of(":number", new AttributeValue(account.getNumber())))); + private TransactWriteItem buildPutWriteItemForAccount(Account account, UUID uuid, Put.Builder putBuilder) throws JsonProcessingException { + return TransactWriteItem.builder() + .put(putBuilder + .tableName(accountsTableName) + .item(Map.of( + KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid), + ATTR_ACCOUNT_E164, AttributeValues.fromString(account.getNumber()), + ATTR_ACCOUNT_DATA, AttributeValues.fromByteArray(SystemMapper.getMapper().writeValueAsBytes(account)), + ATTR_MIGRATION_VERSION, AttributeValues.fromInt(account.getDynamoDbMigrationVersion()))) + .build()) + .build(); } private TransactWriteItem buildPutWriteItemForPhoneNumberConstraint(Account account, UUID uuid) { - return new TransactWriteItem() - .withPut( - new Put() - .withTableName(phoneNumbersTableName) - .withItem(Map.of( - ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()), - KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)))) - .withConditionExpression( + return TransactWriteItem.builder() + .put( + Put.builder() + .tableName(phoneNumbersTableName) + .item(Map.of( + ATTR_ACCOUNT_E164, AttributeValues.fromString(account.getNumber()), + KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid))) + .conditionExpression( "attribute_not_exists(#number) OR (attribute_exists(#number) AND #uuid = :uuid)") - .withExpressionAttributeNames( + .expressionAttributeNames( Map.of("#uuid", KEY_ACCOUNT_UUID, "#number", ATTR_ACCOUNT_E164)) - .withExpressionAttributeValues( - Map.of(":uuid", new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)))) - .withReturnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD)); + .expressionAttributeValues( + Map.of(":uuid", AttributeValues.fromUUID(uuid))) + .returnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD) + .build()) + .build(); } @Override @@ -174,16 +169,18 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt UPDATE_TIMER.record(() -> { UpdateItemRequest updateItemRequest; try { - updateItemRequest = new UpdateItemRequest() - .withTableName(accountsTable.getTableName()) - .withKey(Map.of(KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(account.getUuid())))) - .withUpdateExpression("SET #data = :data, #version = :version") - .withConditionExpression("attribute_exists(#number)") - .withExpressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164, + updateItemRequest = UpdateItemRequest.builder() + .tableName(accountsTableName) + .key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(account.getUuid()))) + .updateExpression("SET #data = :data, #version = :version") + .conditionExpression("attribute_exists(#number)") + .expressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164, "#data", ATTR_ACCOUNT_DATA, "#version", ATTR_MIGRATION_VERSION)) - .withExpressionAttributeValues(Map.of(":data", new AttributeValue().withB(ByteBuffer.wrap(SystemMapper.getMapper().writeValueAsBytes(account))), - ":version", new AttributeValue().withN(String.valueOf(account.getDynamoDbMigrationVersion())))); + .expressionAttributeValues(Map.of( + ":data", AttributeValues.fromByteArray(SystemMapper.getMapper().writeValueAsBytes(account)), + ":version", AttributeValues.fromInt(account.getDynamoDbMigrationVersion()))) + .build(); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); @@ -193,37 +190,42 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt }); } + @Override public Optional get(String number) { - return GET_BY_NUMBER_TIMER.record(() -> { - final GetItemResult phoneNumberAndUuid = client.getItem(phoneNumbersTableName, - Map.of(ATTR_ACCOUNT_E164, new AttributeValue(number)), true); + final GetItemResponse response = client.getItem(GetItemRequest.builder() + .tableName(phoneNumbersTableName) + .key(Map.of(ATTR_ACCOUNT_E164, AttributeValues.fromString(number))) + .build()); - return Optional.ofNullable(phoneNumberAndUuid.getItem()) - .map(item -> item.get(KEY_ACCOUNT_UUID).getB()) - .map(uuid -> accountsTable.getItem(new GetItemSpec() - .withPrimaryKey(KEY_ACCOUNT_UUID, uuid.array()) - .withConsistentRead(true))) + return Optional.ofNullable(response.item()) + .map(item -> item.get(KEY_ACCOUNT_UUID)) + .map(uuid -> accountByUuid(uuid)) .map(AccountsDynamoDb::fromItem); }); } + private Map accountByUuid(AttributeValue uuid) { + GetItemResponse r = client.getItem(GetItemRequest.builder() + .tableName(accountsTableName) + .key(Map.of(KEY_ACCOUNT_UUID, uuid)) + .consistentRead(true) + .build()); + return r.item().isEmpty() ? null : r.item(); + } + @Override public Optional get(UUID uuid) { - Optional maybeItem = GET_BY_UUID_TIMER.record(() -> - Optional.ofNullable(accountsTable.getItem(new GetItemSpec(). - withPrimaryKey(new PrimaryKey(KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(uuid))) - .withConsistentRead(true)))); - - return maybeItem.map(AccountsDynamoDb::fromItem); + return GET_BY_UUID_TIMER.record(() -> + Optional.ofNullable(accountByUuid(AttributeValues.fromUUID(uuid))) + .map(AccountsDynamoDb::fromItem)); } @Override public void delete(UUID uuid) { DELETE_TIMER.record(() -> { - delete(uuid, true); }); } @@ -238,18 +240,22 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt maybeAccount.ifPresent(account -> { - TransactWriteItem phoneNumberDelete = new TransactWriteItem() - .withDelete(new Delete() - .withTableName(phoneNumbersTableName) - .withKey(Map.of(ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber())))); + TransactWriteItem phoneNumberDelete = TransactWriteItem.builder() + .delete(Delete.builder() + .tableName(phoneNumbersTableName) + .key(Map.of(ATTR_ACCOUNT_E164, AttributeValues.fromString(account.getNumber()))) + .build()) + .build(); - TransactWriteItem accountDelete = new TransactWriteItem().withDelete( - new Delete() - .withTableName(accountsTable.getTableName()) - .withKey(Map.of(KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid))))); + TransactWriteItem accountDelete = TransactWriteItem.builder() + .delete(Delete.builder() + .tableName(accountsTableName) + .key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid))) + .build()) + .build(); - TransactWriteItemsRequest request = new TransactWriteItemsRequest() - .withTransactItems(phoneNumberDelete, accountDelete); + TransactWriteItemsRequest request = TransactWriteItemsRequest.builder() + .transactItems(phoneNumberDelete, accountDelete).build(); client.transactWriteItems(request); }); @@ -299,64 +305,62 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt try { TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid()); - TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid()); - accountPut.getPut() - .setConditionExpression("attribute_not_exists(#uuid) OR (attribute_exists(#uuid) AND #version < :version)"); - accountPut.getPut() - .setExpressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID, - "#version", ATTR_MIGRATION_VERSION)); - accountPut.getPut() - .setExpressionAttributeValues( - Map.of(":version", new AttributeValue().withN(String.valueOf(account.getDynamoDbMigrationVersion())))); + TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid(), Put.builder() + .conditionExpression("attribute_not_exists(#uuid) OR (attribute_exists(#uuid) AND #version < :version)") + .expressionAttributeNames(Map.of( + "#uuid", KEY_ACCOUNT_UUID, + "#version", ATTR_MIGRATION_VERSION)) + .expressionAttributeValues(Map.of( + ":version", AttributeValues.fromInt(account.getDynamoDbMigrationVersion())))); - final TransactWriteItemsRequest request = new TransactWriteItemsRequest() - .withTransactItems(phoneNumberConstraintPut, accountPut); + final TransactWriteItemsRequest request = TransactWriteItemsRequest.builder() + .transactItems(phoneNumberConstraintPut, accountPut).build(); final CompletableFuture resultFuture = new CompletableFuture<>(); - - asyncClient.transactWriteItemsAsync(request, - new AsyncHandler<>() { - @Override - public void onError(Exception exception) { - if (exception instanceof TransactionCanceledException) { - // account is already migrated - resultFuture.complete(false); - } else { - try { - migrationRetryAccounts.put(account.getUuid()); - } catch (final Exception e) { - logger.error("Could not store account {}", account.getUuid()); - } - resultFuture.completeExceptionally(exception); - } - } - - @Override - public void onSuccess(TransactWriteItemsRequest request, TransactWriteItemsResult transactWriteItemsResult) { - resultFuture.complete(true); - } - }); - + asyncClient.transactWriteItems(request).whenCompleteAsync((result, exception) -> { + if (result != null) { + resultFuture.complete(true); + return; + } + if (exception instanceof CompletionException) { + // whenCompleteAsync can wrap exceptions in a CompletionException; unwrap it to get to the root cause. + exception = exception.getCause(); + } + if (exception instanceof TransactionCanceledException) { + // account is already migrated + resultFuture.complete(false); + return; + } + try { + migrationRetryAccounts.put(account.getUuid()); + } catch (final Exception e) { + logger.error("Could not store account {}", account.getUuid()); + } + resultFuture.completeExceptionally(exception); + }); return resultFuture; - } catch (Exception e) { return CompletableFuture.failedFuture(e); } } private static String extractCancellationReasonCodes(final TransactionCanceledException exception) { - return exception.getCancellationReasons().stream() - .map(CancellationReason::getCode) + return exception.cancellationReasons().stream() + .map(CancellationReason::code) .collect(Collectors.joining(", ")); } @VisibleForTesting - static Account fromItem(Item item) { + static Account fromItem(Map item) { + if (!item.containsKey(ATTR_ACCOUNT_DATA) || + !item.containsKey(ATTR_ACCOUNT_E164) || + !item.containsKey(KEY_ACCOUNT_UUID)) { + throw new RuntimeException("item missing values"); + } try { - Account account = SystemMapper.getMapper().readValue(item.getBinary(ATTR_ACCOUNT_DATA), Account.class); - - account.setNumber(item.getString(ATTR_ACCOUNT_E164)); - account.setUuid(UUIDUtil.fromByteBuffer(item.getByteBuffer(KEY_ACCOUNT_UUID))); + Account account = SystemMapper.getMapper().readValue(item.get(ATTR_ACCOUNT_DATA).b().asByteArray(), Account.class); + account.setNumber(item.get(ATTR_ACCOUNT_E164).s()); + account.setUuid(UUIDUtil.fromByteBuffer(item.get(KEY_ACCOUNT_UUID).b().asByteBuffer())); return account; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 2806b667b..5a2afffcc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -7,7 +7,6 @@ package org.whispersystems.textsecuregcm.storage; import static com.codahale.metrics.MetricRegistry.name; -import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; @@ -42,6 +41,7 @@ import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.Util; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; public class AccountsManager { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDb.java index 02378ed62..969a0e797 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDb.java @@ -7,195 +7,230 @@ package org.whispersystems.textsecuregcm.storage; import static com.codahale.metrics.MetricRegistry.name; -import com.amazonaws.services.dynamodbv2.document.DeleteItemOutcome; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.PrimaryKey; -import com.amazonaws.services.dynamodbv2.document.Table; -import com.amazonaws.services.dynamodbv2.document.TableWriteItems; -import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec; -import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec; -import com.amazonaws.services.dynamodbv2.model.ReturnValue; -import com.amazonaws.services.dynamodbv2.model.Select; import com.google.common.annotations.VisibleForTesting; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import org.whispersystems.textsecuregcm.entities.PreKey; -import org.whispersystems.textsecuregcm.util.UUIDUtil; +import org.whispersystems.textsecuregcm.util.AttributeValues; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse; +import software.amazon.awssdk.services.dynamodb.model.DeleteRequest; +import software.amazon.awssdk.services.dynamodb.model.PutRequest; +import software.amazon.awssdk.services.dynamodb.model.QueryRequest; +import software.amazon.awssdk.services.dynamodb.model.QueryResponse; +import software.amazon.awssdk.services.dynamodb.model.ReturnValue; +import software.amazon.awssdk.services.dynamodb.model.Select; +import software.amazon.awssdk.services.dynamodb.model.WriteRequest; public class KeysDynamoDb extends AbstractDynamoDbStore { - private final Table table; + private final String tableName; - static final String KEY_ACCOUNT_UUID = "U"; - static final String KEY_DEVICE_ID_KEY_ID = "DK"; - static final String KEY_PUBLIC_KEY = "P"; + static final String KEY_ACCOUNT_UUID = "U"; + static final String KEY_DEVICE_ID_KEY_ID = "DK"; + static final String KEY_PUBLIC_KEY = "P"; - private static final Timer STORE_KEYS_TIMER = Metrics.timer(name(KeysDynamoDb.class, "storeKeys")); - private static final Timer TAKE_KEY_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForDevice")); - private static final Timer TAKE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForAccount")); - private static final Timer GET_KEY_COUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "getKeyCount")); - private static final Timer DELETE_KEYS_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForDevice")); - private static final Timer DELETE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForAccount")); - private static final DistributionSummary CONTESTED_KEY_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "contestedKeys")); - private static final DistributionSummary KEY_COUNT_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "keyCount")); + private static final Timer STORE_KEYS_TIMER = Metrics.timer(name(KeysDynamoDb.class, "storeKeys")); + private static final Timer TAKE_KEY_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForDevice")); + private static final Timer TAKE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForAccount")); + private static final Timer GET_KEY_COUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "getKeyCount")); + private static final Timer DELETE_KEYS_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForDevice")); + private static final Timer DELETE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForAccount")); + private static final DistributionSummary CONTESTED_KEY_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "contestedKeys")); + private static final DistributionSummary KEY_COUNT_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "keyCount")); - public KeysDynamoDb(final DynamoDB dynamoDB, final String tableName) { - super(dynamoDB); + public KeysDynamoDb(final DynamoDbClient dynamoDB, final String tableName) { + super(dynamoDB); + this.tableName = tableName; + } - this.table = dynamoDB.getTable(tableName); - } + public void store(final Account account, final long deviceId, final List keys) { + STORE_KEYS_TIMER.record(() -> { + delete(account, deviceId); - public void store(final Account account, final long deviceId, final List keys) { - STORE_KEYS_TIMER.record(() -> { - delete(account, deviceId); + writeInBatches(keys, batch -> { + List items = new ArrayList<>(); + for (final PreKey preKey : batch) { + items.add(WriteRequest.builder() + .putRequest(PutRequest.builder() + .item(getItemFromPreKey(account.getUuid(), deviceId, preKey)) + .build()) + .build()); + } + executeTableWriteItemsUntilComplete(Map.of(tableName, items)); + }); + }); + } - writeInBatches(keys, batch -> { - final TableWriteItems items = new TableWriteItems(table.getTableName()); + public Optional take(final Account account, final long deviceId) { + return TAKE_KEY_FOR_DEVICE_TIMER.record(() -> { + final AttributeValue partitionKey = getPartitionKey(account.getUuid()); + QueryRequest queryRequest = QueryRequest.builder() + .tableName(tableName) + .keyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)") + .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID)) + .expressionAttributeValues(Map.of( + ":uuid", partitionKey, + ":sortprefix", getSortKeyPrefix(deviceId))) + .projectionExpression(KEY_DEVICE_ID_KEY_ID) + .consistentRead(false) + .build(); - for (final PreKey preKey : batch) { - items.addItemToPut(getItemFromPreKey(account.getUuid(), deviceId, preKey)); - } + int contestedKeys = 0; - executeTableWriteItemsUntilComplete(items); - }); - }); - } + try { + QueryResponse response = db().query(queryRequest); + for (Map candidate : response.items()) { + DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder() + .tableName(tableName) + .key(Map.of( + KEY_ACCOUNT_UUID, partitionKey, + KEY_DEVICE_ID_KEY_ID, candidate.get(KEY_DEVICE_ID_KEY_ID))) + .returnValues(ReturnValue.ALL_OLD) + .build(); + DeleteItemResponse deleteItemResponse = db().deleteItem(deleteItemRequest); + if (deleteItemResponse.attributes() != null) { + return Optional.of(getPreKeyFromItem(deleteItemResponse.attributes())); + } - public Optional take(final Account account, final long deviceId) { - return TAKE_KEY_FOR_DEVICE_TIMER.record(() -> { - final byte[] partitionKey = getPartitionKey(account.getUuid()); + contestedKeys++; + } - final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)") - .withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID)) - .withValueMap(Map.of(":uuid", partitionKey, - ":sortprefix", getSortKeyPrefix(deviceId))) - .withProjectionExpression(KEY_DEVICE_ID_KEY_ID) - .withConsistentRead(false); + return Optional.empty(); + } finally { + CONTESTED_KEY_DISTRIBUTION.record(contestedKeys); + } + }); + } - int contestedKeys = 0; + public Map take(final Account account) { + return TAKE_KEYS_FOR_ACCOUNT_TIMER.record(() -> { + final Map preKeysByDeviceId = new HashMap<>(); - try { - for (final Item candidate : table.query(querySpec)) { - final DeleteItemSpec deleteItemSpec = new DeleteItemSpec().withPrimaryKey(KEY_ACCOUNT_UUID, partitionKey, KEY_DEVICE_ID_KEY_ID, candidate.getBinary(KEY_DEVICE_ID_KEY_ID)) - .withReturnValues(ReturnValue.ALL_OLD); + for (final Device device : account.getDevices()) { + take(account, device.getId()).ifPresent(preKey -> preKeysByDeviceId.put(device.getId(), preKey)); + } - final DeleteItemOutcome outcome = table.deleteItem(deleteItemSpec); + return preKeysByDeviceId; + }); + } - if (outcome.getItem() != null) { - return Optional.of(getPreKeyFromItem(outcome.getItem())); - } + public int getCount(final Account account, final long deviceId) { + return GET_KEY_COUNT_TIMER.record(() -> { + QueryRequest queryRequest = QueryRequest.builder() + .tableName(tableName) + .keyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)") + .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID)) + .expressionAttributeValues(Map.of( + ":uuid", getPartitionKey(account.getUuid()), + ":sortprefix", getSortKeyPrefix(deviceId))) + .select(Select.COUNT) + .consistentRead(false) + .build(); - contestedKeys++; - } + int keyCount = 0; + // This is very confusing, but does appear to be the intended behavior. See: + // + // - https://github.com/aws/aws-sdk-java/issues/693 + // - https://github.com/aws/aws-sdk-java/issues/915 + // - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.Count + for (final QueryResponse page : db().queryPaginator(queryRequest)) { + keyCount += page.count(); + } + KEY_COUNT_DISTRIBUTION.record(keyCount); + return keyCount; + }); + } - return Optional.empty(); - } finally { - CONTESTED_KEY_DISTRIBUTION.record(contestedKeys); - } - }); - } + public void delete(final Account account) { + DELETE_KEYS_FOR_ACCOUNT_TIMER.record(() -> { + final QueryRequest queryRequest = QueryRequest.builder() + .tableName(tableName) + .keyConditionExpression("#uuid = :uuid") + .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID)) + .expressionAttributeValues(Map.of( + ":uuid", getPartitionKey(account.getUuid()))) + .projectionExpression(KEY_DEVICE_ID_KEY_ID) + .consistentRead(true) + .build(); - public Map take(final Account account) { - return TAKE_KEYS_FOR_ACCOUNT_TIMER.record(() -> { - final Map preKeysByDeviceId = new HashMap<>(); + deleteItemsForAccountMatchingQuery(account, queryRequest); + }); + } - for (final Device device : account.getDevices()) { - take(account, device.getId()).ifPresent(preKey -> preKeysByDeviceId.put(device.getId(), preKey)); - } + @VisibleForTesting + void delete(final Account account, final long deviceId) { + DELETE_KEYS_FOR_DEVICE_TIMER.record(() -> { + final QueryRequest queryRequest = QueryRequest.builder() + .tableName(tableName) + .keyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)") + .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID)) + .expressionAttributeValues(Map.of( + ":uuid", getPartitionKey(account.getUuid()), + ":sortprefix", getSortKeyPrefix(deviceId))) + .projectionExpression(KEY_DEVICE_ID_KEY_ID) + .consistentRead(true) + .build(); - return preKeysByDeviceId; - }); - } + deleteItemsForAccountMatchingQuery(account, queryRequest); + }); + } - public int getCount(final Account account, final long deviceId) { - return GET_KEY_COUNT_TIMER.record(() -> { - final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)") - .withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID)) - .withValueMap(Map.of(":uuid", getPartitionKey(account.getUuid()), - ":sortprefix", getSortKeyPrefix(deviceId))) - .withSelect(Select.COUNT) - .withConsistentRead(false); + private void deleteItemsForAccountMatchingQuery(final Account account, final QueryRequest querySpec) { + final AttributeValue partitionKey = getPartitionKey(account.getUuid()); - final int keyCount = (int)countItemsMatchingQuery(table, querySpec); + writeInBatches(db().query(querySpec).items(), batch -> { + List deletes = new ArrayList<>(); + for (final Map item : batch) { + deletes.add(WriteRequest.builder() + .deleteRequest(DeleteRequest.builder() + .key(Map.of( + KEY_ACCOUNT_UUID, partitionKey, + KEY_DEVICE_ID_KEY_ID, item.get(KEY_DEVICE_ID_KEY_ID))) + .build()) + .build()); + } + executeTableWriteItemsUntilComplete(Map.of(tableName, deletes)); + }); + } - KEY_COUNT_DISTRIBUTION.record(keyCount); - return keyCount; - }); - } + private static AttributeValue getPartitionKey(final UUID accountUuid) { + return AttributeValues.fromUUID(accountUuid); + } - public void delete(final Account account) { - DELETE_KEYS_FOR_ACCOUNT_TIMER.record(() -> { - final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid") - .withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID)) - .withValueMap(Map.of(":uuid", getPartitionKey(account.getUuid()))) - .withProjectionExpression(KEY_DEVICE_ID_KEY_ID) - .withConsistentRead(true); + private static AttributeValue getSortKey(final long deviceId, final long keyId) { + final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]); + byteBuffer.putLong(deviceId); + byteBuffer.putLong(keyId); + return AttributeValues.fromByteBuffer(byteBuffer.flip()); + } - deleteItemsForAccountMatchingQuery(account, querySpec); - }); - } + @VisibleForTesting + static AttributeValue getSortKeyPrefix(final long deviceId) { + final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]); + byteBuffer.putLong(deviceId); + return AttributeValues.fromByteBuffer(byteBuffer.flip()); + } - @VisibleForTesting - void delete(final Account account, final long deviceId) { - DELETE_KEYS_FOR_DEVICE_TIMER.record(() -> { - final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)") - .withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID)) - .withValueMap(Map.of(":uuid", getPartitionKey(account.getUuid()), - ":sortprefix", getSortKeyPrefix(deviceId))) - .withProjectionExpression(KEY_DEVICE_ID_KEY_ID) - .withConsistentRead(true); + private Map getItemFromPreKey(final UUID accountUuid, final long deviceId, final PreKey preKey) { + return Map.of( + KEY_ACCOUNT_UUID, getPartitionKey(accountUuid), + KEY_DEVICE_ID_KEY_ID, getSortKey(deviceId, preKey.getKeyId()), + KEY_PUBLIC_KEY, AttributeValues.fromString(preKey.getPublicKey())); + } - deleteItemsForAccountMatchingQuery(account, querySpec); - }); - } - - private void deleteItemsForAccountMatchingQuery(final Account account, final QuerySpec querySpec) { - final byte[] partitionKey = getPartitionKey(account.getUuid()); - - writeInBatches(table.query(querySpec), batch -> { - final TableWriteItems writeItems = new TableWriteItems(table.getTableName()); - - for (final Item item : batch) { - writeItems.addPrimaryKeyToDelete(new PrimaryKey(KEY_ACCOUNT_UUID, partitionKey, KEY_DEVICE_ID_KEY_ID, item.getBinary(KEY_DEVICE_ID_KEY_ID))); - } - - executeTableWriteItemsUntilComplete(writeItems); - }); - } - - private static byte[] getPartitionKey(final UUID accountUuid) { - return UUIDUtil.toBytes(accountUuid); - } - - private static byte[] getSortKey(final long deviceId, final long keyId) { - final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]); - byteBuffer.putLong(deviceId); - byteBuffer.putLong(keyId); - return byteBuffer.array(); - } - - private static byte[] getSortKeyPrefix(final long deviceId) { - final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]); - byteBuffer.putLong(deviceId); - return byteBuffer.array(); - } - - private Item getItemFromPreKey(final UUID accountUuid, final long deviceId, final PreKey preKey) { - return new Item().withBinary(KEY_ACCOUNT_UUID, getPartitionKey(accountUuid)) - .withBinary(KEY_DEVICE_ID_KEY_ID, getSortKey(deviceId, preKey.getKeyId())) - .withString(KEY_PUBLIC_KEY, preKey.getPublicKey()); - } - - private PreKey getPreKeyFromItem(final Item item) { - final long keyId = ByteBuffer.wrap(item.getBinary(KEY_DEVICE_ID_KEY_ID)).getLong(8); - return new PreKey(keyId, item.getString(KEY_PUBLIC_KEY)); - } + private PreKey getPreKeyFromItem(Map item) { + final long keyId = item.get(KEY_DEVICE_ID_KEY_ID).b().asByteBuffer().getLong(8); + return new PreKey(keyId, item.get(KEY_PUBLIC_KEY).s()); + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index 413c93443..536a758a3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -8,17 +8,7 @@ package org.whispersystems.textsecuregcm.storage; import static com.codahale.metrics.MetricRegistry.name; import static io.micrometer.core.instrument.Metrics.timer; -import com.amazonaws.services.dynamodbv2.document.DeleteItemOutcome; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Index; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.PrimaryKey; -import com.amazonaws.services.dynamodbv2.document.Table; -import com.amazonaws.services.dynamodbv2.document.TableWriteItems; -import com.amazonaws.services.dynamodbv2.document.api.QueryApi; -import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec; -import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec; -import com.amazonaws.services.dynamodbv2.model.ReturnValue; +import com.google.common.collect.ImmutableMap; import io.micrometer.core.instrument.Timer; import java.nio.ByteBuffer; import java.time.Duration; @@ -27,11 +17,22 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import org.apache.commons.lang3.StringUtils; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; +import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.UUIDUtil; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse; +import software.amazon.awssdk.services.dynamodb.model.DeleteRequest; +import software.amazon.awssdk.services.dynamodb.model.PutRequest; +import software.amazon.awssdk.services.dynamodb.model.QueryRequest; +import software.amazon.awssdk.services.dynamodb.model.ReturnValue; +import software.amazon.awssdk.services.dynamodb.model.WriteRequest; public class MessagesDynamoDb extends AbstractDynamoDbStore { @@ -60,7 +61,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { private final String tableName; private final Duration timeToLive; - public MessagesDynamoDb(DynamoDB dynamoDb, String tableName, Duration timeToLive) { + public MessagesDynamoDb(DynamoDbClient dynamoDb, String tableName, Duration timeToLive) { super(dynamoDb); this.tableName = tableName; @@ -76,54 +77,61 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { throw new IllegalArgumentException("Maximum batch size of " + DYNAMO_DB_MAX_BATCH_SIZE + " execeeded with " + messages.size() + " messages"); } - final byte[] partitionKey = convertPartitionKey(destinationAccountUuid); - TableWriteItems items = new TableWriteItems(tableName); + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid); + List writeItems = new ArrayList<>(); for (MessageProtos.Envelope message : messages) { final UUID messageUuid = UUID.fromString(message.getServerGuid()); - final Item item = new Item().withBinary(KEY_PARTITION, partitionKey) - .withBinary(KEY_SORT, convertSortKey(destinationDeviceId, message.getServerTimestamp(), messageUuid)) - .withBinary(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid)) - .withInt(KEY_TYPE, message.getType().getNumber()) - .withLong(KEY_TIMESTAMP, message.getTimestamp()) - .withLong(KEY_TTL, getTtlForMessage(message)); + final ImmutableMap.Builder item = ImmutableMap.builder() + .put(KEY_PARTITION, partitionKey) + .put(KEY_SORT, convertSortKey(destinationDeviceId, message.getServerTimestamp(), messageUuid)) + .put(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid)) + .put(KEY_TYPE, AttributeValues.fromInt(message.getType().getNumber())) + .put(KEY_TIMESTAMP, AttributeValues.fromLong(message.getTimestamp())) + .put(KEY_TTL, AttributeValues.fromLong(getTtlForMessage(message))); if (message.hasRelay() && message.getRelay().length() > 0) { - item.withString(KEY_RELAY, message.getRelay()); + item.put(KEY_RELAY, AttributeValues.fromString(message.getRelay())); } if (message.hasSource()) { - item.withString(KEY_SOURCE, message.getSource()); + item.put(KEY_SOURCE, AttributeValues.fromString(message.getSource())); } if (message.hasSourceUuid()) { - item.withBinary(KEY_SOURCE_UUID, UUIDUtil.toBytes(UUID.fromString(message.getSourceUuid()))); + item.put(KEY_SOURCE_UUID, AttributeValues.fromUUID(UUID.fromString(message.getSourceUuid()))); } if (message.hasSourceDevice()) { - item.withInt(KEY_SOURCE_DEVICE, message.getSourceDevice()); + item.put(KEY_SOURCE_DEVICE, AttributeValues.fromInt(message.getSourceDevice())); } if (message.hasLegacyMessage()) { - item.withBinary(KEY_MESSAGE, message.getLegacyMessage().toByteArray()); + item.put(KEY_MESSAGE, AttributeValues.fromByteArray(message.getLegacyMessage().toByteArray())); } if (message.hasContent()) { - item.withBinary(KEY_CONTENT, message.getContent().toByteArray()); + item.put(KEY_CONTENT, AttributeValues.fromByteArray(message.getContent().toByteArray())); } - items.addItemToPut(item); + writeItems.add(WriteRequest.builder().putRequest(PutRequest.builder() + .item(item.build()) + .build()).build()); } - executeTableWriteItemsUntilComplete(items); + executeTableWriteItemsUntilComplete(Map.of(tableName, writeItems)); } public List load(final UUID destinationAccountUuid, final long destinationDeviceId, final int requestedNumberOfMessagesToFetch) { return loadTimer.record(() -> { final int numberOfMessagesToFetch = Math.min(requestedNumberOfMessagesToFetch, RESULT_SET_CHUNK_SIZE); - final byte[] partitionKey = convertPartitionKey(destinationAccountUuid); - final QuerySpec querySpec = new QuerySpec().withConsistentRead(true) - .withKeyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )") - .withNameMap(Map.of("#part", KEY_PARTITION, - "#sort", KEY_SORT)) - .withValueMap(Map.of(":part", partitionKey, - ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId))) - .withMaxResultSize(numberOfMessagesToFetch); - final Table table = getDynamoDb().getTable(tableName); + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid); + final QueryRequest queryRequest = QueryRequest.builder() + .tableName(tableName) + .consistentRead(true) + .keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )") + .expressionAttributeNames(Map.of( + "#part", KEY_PARTITION, + "#sort", KEY_SORT)) + .expressionAttributeValues(Map.of( + ":part", partitionKey, + ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId))) + .limit(numberOfMessagesToFetch) + .build(); List messageEntities = new ArrayList<>(numberOfMessagesToFetch); - for (Item message : table.query(querySpec)) { + for (Map message : db().query(queryRequest).items()) { messageEntities.add(convertItemToOutgoingMessageEntity(message)); } return messageEntities; @@ -136,53 +144,63 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { throw new IllegalArgumentException("must specify a source"); } - final byte[] partitionKey = convertPartitionKey(destinationAccountUuid); - final QuerySpec querySpec = new QuerySpec().withProjectionExpression(KEY_SORT) - .withConsistentRead(true) - .withKeyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )") - .withFilterExpression("#source = :source AND #timestamp = :timestamp") - .withNameMap(Map.of("#part", KEY_PARTITION, - "#sort", KEY_SORT, - "#source", KEY_SOURCE, - "#timestamp", KEY_TIMESTAMP)) - .withValueMap(Map.of(":part", partitionKey, - ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId), - ":source", source, - ":timestamp", timestamp)); + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid); + final QueryRequest queryRequest = QueryRequest.builder() + .tableName(tableName) + .projectionExpression(KEY_SORT) + .consistentRead(true) + .keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )") + .filterExpression("#source = :source AND #timestamp = :timestamp") + .expressionAttributeNames(Map.of( + "#part", KEY_PARTITION, + "#sort", KEY_SORT, + "#source", KEY_SOURCE, + "#timestamp", KEY_TIMESTAMP)) + .expressionAttributeValues(Map.of( + ":part", partitionKey, + ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId), + ":source", AttributeValues.fromString(source), + ":timestamp", AttributeValues.fromLong(timestamp))) + .build(); - final Table table = getDynamoDb().getTable(tableName); - return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(table, partitionKey, querySpec, table); + return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(partitionKey, queryRequest); }); } public Optional deleteMessageByDestinationAndGuid(final UUID destinationAccountUuid, final long destinationDeviceId, final UUID messageUuid) { return deleteByGuid.record(() -> { - final byte[] partitionKey = convertPartitionKey(destinationAccountUuid); - final QuerySpec querySpec = new QuerySpec().withProjectionExpression(KEY_SORT) - .withConsistentRead(true) - .withKeyConditionExpression("#part = :part AND #uuid = :uuid") - .withNameMap(Map.of("#part", KEY_PARTITION, - "#uuid", LOCAL_INDEX_MESSAGE_UUID_KEY_SORT)) - .withValueMap(Map.of(":part", partitionKey, - ":uuid", convertLocalIndexMessageUuidSortKey(messageUuid))); - final Table table = getDynamoDb().getTable(tableName); - final Index index = table.getIndex(LOCAL_INDEX_MESSAGE_UUID_NAME); - return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(table, partitionKey, querySpec, index); + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid); + final QueryRequest queryRequest = QueryRequest.builder() + .tableName(tableName) + .indexName(LOCAL_INDEX_MESSAGE_UUID_NAME) + .projectionExpression(KEY_SORT) + .consistentRead(true) + .keyConditionExpression("#part = :part AND #uuid = :uuid") + .expressionAttributeNames(Map.of( + "#part", KEY_PARTITION, + "#uuid", LOCAL_INDEX_MESSAGE_UUID_KEY_SORT)) + .expressionAttributeValues(Map.of( + ":part", partitionKey, + ":uuid", convertLocalIndexMessageUuidSortKey(messageUuid))) + .build(); + return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(partitionKey, queryRequest); }); } @Nonnull - private Optional deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(Table table, byte[] partitionKey, QuerySpec querySpec, QueryApi queryApi) { + private Optional deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(AttributeValue partitionKey, QueryRequest queryRequest) { Optional result = Optional.empty(); - for (Item item : queryApi.query(querySpec)) { - final byte[] rangeKeyValue = item.getBinary(KEY_SORT); - DeleteItemSpec deleteItemSpec = new DeleteItemSpec().withPrimaryKey(KEY_PARTITION, partitionKey, KEY_SORT, rangeKeyValue); + for (Map item : db().query(queryRequest).items()) { + final byte[] rangeKeyValue = item.get(KEY_SORT).b().asByteArray(); + DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder() + .tableName(tableName) + .key(Map.of(KEY_PARTITION, partitionKey, KEY_SORT, AttributeValues.fromByteArray(rangeKeyValue))); if (result.isEmpty()) { - deleteItemSpec.withReturnValues(ReturnValue.ALL_OLD); + deleteItemRequest.returnValues(ReturnValue.ALL_OLD); } - final DeleteItemOutcome deleteItemOutcome = table.deleteItem(deleteItemSpec); - if (deleteItemOutcome.getItem() != null && deleteItemOutcome.getItem().hasAttribute(KEY_PARTITION)) { - result = Optional.of(convertItemToOutgoingMessageEntity(deleteItemOutcome.getItem())); + final DeleteItemResponse deleteItemResponse = db().deleteItem(deleteItemRequest.build()); + if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) { + result = Optional.of(convertItemToOutgoingMessageEntity(deleteItemResponse.attributes())); } } return result; @@ -190,74 +208,88 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { public void deleteAllMessagesForAccount(final UUID destinationAccountUuid) { deleteByAccount.record(() -> { - final byte[] partitionKey = convertPartitionKey(destinationAccountUuid); - final QuerySpec querySpec = new QuerySpec().withHashKey(KEY_PARTITION, partitionKey) - .withProjectionExpression(KEY_SORT) - .withConsistentRead(true); - deleteRowsMatchingQuery(partitionKey, querySpec); + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid); + final QueryRequest queryRequest = QueryRequest.builder() + .tableName(tableName) + .projectionExpression(KEY_SORT) + .consistentRead(true) + .keyConditionExpression("#part = :part") + .expressionAttributeNames(Map.of("#part", KEY_PARTITION)) + .expressionAttributeValues(Map.of(":part", partitionKey)) + .build(); + deleteRowsMatchingQuery(partitionKey, queryRequest); }); } public void deleteAllMessagesForDevice(final UUID destinationAccountUuid, final long destinationDeviceId) { deleteByDevice.record(() -> { - final byte[] partitionKey = convertPartitionKey(destinationAccountUuid); - final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )") - .withNameMap(Map.of("#part", KEY_PARTITION, - "#sort", KEY_SORT)) - .withValueMap(Map.of(":part", partitionKey, - ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId))) - .withProjectionExpression(KEY_SORT) - .withConsistentRead(true); - deleteRowsMatchingQuery(partitionKey, querySpec); + final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid); + final QueryRequest queryRequest = QueryRequest.builder() + .tableName(tableName) + .keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )") + .expressionAttributeNames(Map.of( + "#part", KEY_PARTITION, + "#sort", KEY_SORT)) + .expressionAttributeValues(Map.of( + ":part", partitionKey, + ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId))) + .projectionExpression(KEY_SORT) + .consistentRead(true) + .build(); + deleteRowsMatchingQuery(partitionKey, queryRequest); }); } - private OutgoingMessageEntity convertItemToOutgoingMessageEntity(Item message) { - final SortKey sortKey = convertSortKey(message.getBinary(KEY_SORT)); - final UUID messageUuid = convertLocalIndexMessageUuidSortKey(message.getBinary(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT)); - final int type = message.getInt(KEY_TYPE); - final String relay = message.getString(KEY_RELAY); - final long timestamp = message.getLong(KEY_TIMESTAMP); - final String source = message.getString(KEY_SOURCE); - final UUID sourceUuid = message.hasAttribute(KEY_SOURCE_UUID) ? convertUuidFromBytes(message.getBinary(KEY_SOURCE_UUID), "message source uuid") : null; - final int sourceDevice = message.hasAttribute(KEY_SOURCE_DEVICE) ? message.getInt(KEY_SOURCE_DEVICE) : 0; - final byte[] messageBytes = message.getBinary(KEY_MESSAGE); - final byte[] content = message.getBinary(KEY_CONTENT); + private OutgoingMessageEntity convertItemToOutgoingMessageEntity(Map message) { + final SortKey sortKey = convertSortKey(message.get(KEY_SORT).b().asByteArray()); + final UUID messageUuid = convertLocalIndexMessageUuidSortKey(message.get(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT).b().asByteArray()); + final int type = AttributeValues.getInt(message, KEY_TYPE, 0); + final String relay = AttributeValues.getString(message, KEY_RELAY, null); + final long timestamp = AttributeValues.getLong(message, KEY_TIMESTAMP, 0L); + final String source = AttributeValues.getString(message, KEY_SOURCE, null); + final UUID sourceUuid = AttributeValues.getUUID(message, KEY_SOURCE_UUID, null); + final int sourceDevice = AttributeValues.getInt(message, KEY_SOURCE_DEVICE, 0); + final byte[] messageBytes = AttributeValues.getByteArray(message, KEY_MESSAGE, null); + final byte[] content = AttributeValues.getByteArray(message, KEY_CONTENT, null); return new OutgoingMessageEntity(-1L, false, messageUuid, type, relay, timestamp, source, sourceUuid, sourceDevice, messageBytes, content, sortKey.getServerTimestamp()); } - private void deleteRowsMatchingQuery(byte[] partitionKey, QuerySpec querySpec) { - final Table table = getDynamoDb().getTable(tableName); - writeInBatches(table.query(querySpec), (itemBatch) -> deleteItems(partitionKey, itemBatch)); + private void deleteRowsMatchingQuery(AttributeValue partitionKey, QueryRequest querySpec) { + writeInBatches(db().query(querySpec).items(), (itemBatch) -> deleteItems(partitionKey, itemBatch)); } - private void deleteItems(byte[] partitionKey, List items) { - final TableWriteItems tableWriteItems = new TableWriteItems(tableName); - items.stream().map(item -> new PrimaryKey(KEY_PARTITION, partitionKey, KEY_SORT, item.getBinary(KEY_SORT))).forEach(tableWriteItems::addPrimaryKeyToDelete); - executeTableWriteItemsUntilComplete(tableWriteItems); + private void deleteItems(AttributeValue partitionKey, List> items) { + List deletes = items.stream() + .map(item -> WriteRequest.builder() + .deleteRequest(DeleteRequest.builder().key(Map.of( + KEY_PARTITION, partitionKey, + KEY_SORT, item.get(KEY_SORT))).build()) + .build()) + .collect(Collectors.toList()); + executeTableWriteItemsUntilComplete(Map.of(tableName, deletes)); } private long getTtlForMessage(MessageProtos.Envelope message) { return message.getServerTimestamp() / 1000 + timeToLive.getSeconds(); } - private static byte[] convertPartitionKey(final UUID destinationAccountUuid) { - return UUIDUtil.toBytes(destinationAccountUuid); + private static AttributeValue convertPartitionKey(final UUID destinationAccountUuid) { + return AttributeValues.fromUUID(destinationAccountUuid); } - private static byte[] convertSortKey(final long destinationDeviceId, final long serverTimestamp, final UUID messageUuid) { + private static AttributeValue convertSortKey(final long destinationDeviceId, final long serverTimestamp, final UUID messageUuid) { ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[32]); byteBuffer.putLong(destinationDeviceId); byteBuffer.putLong(serverTimestamp); byteBuffer.putLong(messageUuid.getMostSignificantBits()); byteBuffer.putLong(messageUuid.getLeastSignificantBits()); - return byteBuffer.array(); + return AttributeValues.fromByteBuffer(byteBuffer.flip()); } - private static byte[] convertDestinationDeviceIdToSortKeyPrefix(final long destinationDeviceId) { + private static AttributeValue convertDestinationDeviceIdToSortKeyPrefix(final long destinationDeviceId) { ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]); byteBuffer.putLong(destinationDeviceId); - return byteBuffer.array(); + return AttributeValues.fromByteBuffer(byteBuffer.flip()); } private static SortKey convertSortKey(final byte[] bytes) { @@ -273,8 +305,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { return new SortKey(destinationDeviceId, serverTimestamp, new UUID(mostSigBits, leastSigBits)); } - private static byte[] convertLocalIndexMessageUuidSortKey(final UUID messageUuid) { - return UUIDUtil.toBytes(messageUuid); + private static AttributeValue convertLocalIndexMessageUuidSortKey(final UUID messageUuid) { + return AttributeValues.fromUUID(messageUuid); } private static UUID convertLocalIndexMessageUuidSortKey(final byte[] bytes) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationDeletedAccounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationDeletedAccounts.java index f0b8b75b3..1957fa415 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationDeletedAccounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationDeletedAccounts.java @@ -1,42 +1,52 @@ package org.whispersystems.textsecuregcm.storage; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.PrimaryKey; -import com.amazonaws.services.dynamodbv2.document.Table; -import com.amazonaws.services.dynamodbv2.document.TableWriteItems; -import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.UUID; -import org.whispersystems.textsecuregcm.util.UUIDUtil; +import java.util.stream.Collectors; +import org.whispersystems.textsecuregcm.util.AttributeValues; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.DeleteRequest; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanResponse; +import software.amazon.awssdk.services.dynamodb.model.WriteRequest; public class MigrationDeletedAccounts extends AbstractDynamoDbStore { - private final Table table; + private final String tableName; static final String KEY_UUID = "U"; - public MigrationDeletedAccounts(DynamoDB dynamoDb, String tableName) { + public MigrationDeletedAccounts(DynamoDbClient dynamoDb, String tableName) { super(dynamoDb); - - table = dynamoDb.getTable(tableName); + this.tableName = tableName; } public void put(UUID uuid) { - table.putItem(new Item() - .withPrimaryKey(primaryKey(uuid))); + db().putItem(PutItemRequest.builder() + .tableName(tableName) + .item(primaryKey(uuid)) + .build()); } public List getRecentlyDeletedUuids() { final List uuids = new ArrayList<>(); + Optional firstPage = db().scanPaginator(ScanRequest.builder() + .tableName(tableName) + .build()).stream().findAny(); // get the first available response - for (Item item : table.scan(new ScanSpec()).firstPage()) { - // only process one page each time. If we have a significant backlog at the end of the migration - // we can handle it separately - uuids.add(UUIDUtil.fromByteBuffer(item.getByteBuffer(KEY_UUID))); + if (firstPage.isPresent()) { + for (Map item : firstPage.get().items()) { + // only process one page each time. If we have a significant backlog at the end of the migration + // we can handle it separately + uuids.add(AttributeValues.getUUID(item, KEY_UUID, null)); + } } return uuids; @@ -45,20 +55,17 @@ public class MigrationDeletedAccounts extends AbstractDynamoDbStore { public void delete(List uuids) { writeInBatches(uuids, (batch) -> { + List deletes = batch.stream().map((uuid) -> WriteRequest.builder().deleteRequest(DeleteRequest.builder() + .key(primaryKey(uuid)) + .build()).build()).collect(Collectors.toList()); - final TableWriteItems deleteItems = new TableWriteItems(table.getTableName()); - - for (UUID uuid : batch) { - deleteItems.addPrimaryKeyToDelete(primaryKey(uuid)); - } - - executeTableWriteItemsUntilComplete(deleteItems); + executeTableWriteItemsUntilComplete(Map.of(tableName, deletes)); }); } @VisibleForTesting - public static PrimaryKey primaryKey(UUID uuid) { - return new PrimaryKey(KEY_UUID, UUIDUtil.toBytes(uuid)); + public static Map primaryKey(UUID uuid) { + return Map.of(KEY_UUID, AttributeValues.fromUUID(uuid)); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationRetryAccounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationRetryAccounts.java index 77180ba3b..d24e6cab8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationRetryAccounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationRetryAccounts.java @@ -1,43 +1,44 @@ package org.whispersystems.textsecuregcm.storage; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.Page; -import com.amazonaws.services.dynamodbv2.document.PrimaryKey; -import com.amazonaws.services.dynamodbv2.document.ScanOutcome; -import com.amazonaws.services.dynamodbv2.document.Table; -import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; -import org.whispersystems.textsecuregcm.util.UUIDUtil; +import org.whispersystems.textsecuregcm.util.AttributeValues; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanResponse; public class MigrationRetryAccounts extends AbstractDynamoDbStore { - private final Table table; + private final String tableName; static final String KEY_UUID = "U"; - public MigrationRetryAccounts(DynamoDB dynamoDb, String tableName) { + public MigrationRetryAccounts(DynamoDbClient dynamoDb, String tableName) { super(dynamoDb); - table = dynamoDb.getTable(tableName); + this.tableName = tableName; } public void put(UUID uuid) { - table.putItem(new Item() - .withPrimaryKey(primaryKey(uuid))); + db().putItem(PutItemRequest.builder() + .tableName(tableName) + .item(primaryKey(uuid)) + .build()); } public List getUuids(int max) { final List uuids = new ArrayList<>(); - for (Page page : table.scan(new ScanSpec()).pages()) { + for (ScanResponse response : db().scanPaginator(ScanRequest.builder().tableName(tableName).build())) { - for (Item item : page) { - uuids.add(UUIDUtil.fromByteBuffer(item.getByteBuffer(KEY_UUID))); + for (Map item : response.items()) { + uuids.add(AttributeValues.getUUID(item, KEY_UUID, null)); if (uuids.size() >= max) { break; @@ -53,8 +54,8 @@ public class MigrationRetryAccounts extends AbstractDynamoDbStore { } @VisibleForTesting - public static PrimaryKey primaryKey(UUID uuid) { - return new PrimaryKey(KEY_UUID, UUIDUtil.toBytes(uuid)); + public static Map primaryKey(UUID uuid) { + return Map.of(KEY_UUID, AttributeValues.fromUUID(uuid)); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushChallengeDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushChallengeDynamoDb.java index 19c0ceee0..b221258f9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushChallengeDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushChallengeDynamoDb.java @@ -5,25 +5,23 @@ package org.whispersystems.textsecuregcm.storage; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.Table; -import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec; -import com.amazonaws.services.dynamodbv2.document.spec.PutItemSpec; -import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException; import java.time.Clock; import java.time.Duration; import java.util.Map; import java.util.UUID; import com.google.common.annotations.VisibleForTesting; -import org.whispersystems.textsecuregcm.util.UUIDUtil; +import org.whispersystems.textsecuregcm.util.AttributeValues; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; /** * Stores push challenge tokens. Users may have at most one outstanding push challenge token at a time. */ public class PushChallengeDynamoDb extends AbstractDynamoDbStore { - private final Table table; + private final String tableName; private final Clock clock; static final String KEY_ACCOUNT_UUID = "U"; @@ -33,15 +31,15 @@ public class PushChallengeDynamoDb extends AbstractDynamoDbStore { private static final Map UUID_NAME_MAP = Map.of("#uuid", KEY_ACCOUNT_UUID); private static final Map CHALLENGE_TOKEN_NAME_MAP = Map.of("#challenge", ATTR_CHALLENGE_TOKEN); - public PushChallengeDynamoDb(final DynamoDB dynamoDB, final String tableName) { + public PushChallengeDynamoDb(final DynamoDbClient dynamoDB, final String tableName) { this(dynamoDB, tableName, Clock.systemUTC()); } @VisibleForTesting - PushChallengeDynamoDb(final DynamoDB dynamoDB, final String tableName, final Clock clock) { + PushChallengeDynamoDb(final DynamoDbClient dynamoDB, final String tableName, final Clock clock) { super(dynamoDB); - this.table = dynamoDB.getTable(tableName); + this.tableName = tableName; this.clock = clock; } @@ -57,13 +55,15 @@ public class PushChallengeDynamoDb extends AbstractDynamoDbStore { */ public boolean add(final UUID accountUuid, final byte[] challengeToken, final Duration ttl) { try { - table.putItem( new PutItemSpec() - .withItem(new Item() - .withBinary(KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(accountUuid)) - .withBinary(ATTR_CHALLENGE_TOKEN, challengeToken) - .withNumber(ATTR_TTL, getExpirationTimestamp(ttl))) - .withConditionExpression("attribute_not_exists(#uuid)") - .withNameMap(UUID_NAME_MAP)); + db().putItem(PutItemRequest.builder() + .tableName(tableName) + .item(Map.of( + KEY_ACCOUNT_UUID, AttributeValues.fromUUID(accountUuid), + ATTR_CHALLENGE_TOKEN, AttributeValues.fromByteArray(challengeToken), + ATTR_TTL, AttributeValues.fromLong(getExpirationTimestamp(ttl)))) + .conditionExpression("attribute_not_exists(#uuid)") + .expressionAttributeNames(UUID_NAME_MAP) + .build()); return true; } catch (final ConditionalCheckFailedException e) { return false; @@ -84,11 +84,13 @@ public class PushChallengeDynamoDb extends AbstractDynamoDbStore { */ public boolean remove(final UUID accountUuid, final byte[] challengeToken) { try { - table.deleteItem(new DeleteItemSpec() - .withPrimaryKey(KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(accountUuid)) - .withConditionExpression("#challenge = :challenge") - .withNameMap(CHALLENGE_TOKEN_NAME_MAP) - .withValueMap(Map.of(":challenge", challengeToken))); + db().deleteItem(DeleteItemRequest.builder() + .tableName(tableName) + .key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(accountUuid))) + .conditionExpression("#challenge = :challenge") + .expressionAttributeNames(CHALLENGE_TOKEN_NAME_MAP) + .expressionAttributeValues(Map.of(":challenge", AttributeValues.fromByteArray(challengeToken))) + .build()); return true; } catch (final ConditionalCheckFailedException e) { return false; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ReportMessageDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ReportMessageDynamoDb.java index 97b05947f..a2449c779 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ReportMessageDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ReportMessageDynamoDb.java @@ -1,13 +1,14 @@ package org.whispersystems.textsecuregcm.storage; -import com.amazonaws.services.dynamodbv2.document.DeleteItemOutcome; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.Table; -import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec; -import com.amazonaws.services.dynamodbv2.model.ReturnValue; +import org.whispersystems.textsecuregcm.util.AttributeValues; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.ReturnValue; import java.time.Duration; import java.time.Instant; +import java.util.Map; public class ReportMessageDynamoDb { @@ -16,33 +17,30 @@ public class ReportMessageDynamoDb { static final Duration TIME_TO_LIVE = Duration.ofDays(7); - private final Table table; + private final DynamoDbClient db; + private final String tableName; - public ReportMessageDynamoDb(final DynamoDB dynamoDB, final String tableName) { - - this.table = dynamoDB.getTable(tableName); + public ReportMessageDynamoDb(final DynamoDbClient dynamoDB, final String tableName) { + this.db = dynamoDB; + this.tableName = tableName; } public void store(byte[] hash) { - - table.putItem(buildItemForHash(hash)); - } - - private Item buildItemForHash(byte[] hash) { - return new Item() - .withBinary(KEY_HASH, hash) - .withLong(ATTR_TTL, Instant.now().plus(TIME_TO_LIVE).getEpochSecond()); + db.putItem(PutItemRequest.builder() + .tableName(tableName) + .item(Map.of( + KEY_HASH, AttributeValues.fromByteArray(hash), + ATTR_TTL, AttributeValues.fromLong(Instant.now().plus(TIME_TO_LIVE).getEpochSecond()) + )) + .build()); } public boolean remove(byte[] hash) { - - final DeleteItemSpec deleteItemSpec = new DeleteItemSpec() - .withPrimaryKey(KEY_HASH, hash) - .withReturnValues(ReturnValue.ALL_OLD); - - final DeleteItemOutcome outcome = table.deleteItem(deleteItemSpec); - - return outcome.getItem() != null; + final DeleteItemResponse deleteItemResponse = db.deleteItem(DeleteItemRequest.builder() + .tableName(tableName) + .key(Map.of(KEY_HASH, AttributeValues.fromByteArray(hash))) + .returnValues(ReturnValue.ALL_OLD) + .build()); + return !deleteItemResponse.attributes().isEmpty(); } - } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/AttributeValues.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/AttributeValues.java new file mode 100644 index 000000000..5f6253c46 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/AttributeValues.java @@ -0,0 +1,89 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.util; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +/** AwsAV provides static helper methods for working with AWS AttributeValues. */ +public class AttributeValues { + + public static AttributeValue fromString(String value) { + return AttributeValue.builder().s(value).build(); + } + + public static AttributeValue fromLong(long value) { + return AttributeValue.builder().n(Long.toString(value)).build(); + } + + public static AttributeValue fromInt(int value) { + return AttributeValue.builder().n(Integer.toString(value)).build(); + } + + public static AttributeValue fromByteArray(byte[] value) { + return AttributeValues.fromSdkBytes(SdkBytes.fromByteArray(value)); + } + + public static AttributeValue fromByteBuffer(ByteBuffer value) { + return AttributeValues.fromSdkBytes(SdkBytes.fromByteBuffer(value)); + } + + public static AttributeValue fromUUID(UUID uuid) { + return AttributeValues.fromSdkBytes(SdkBytes.fromByteArrayUnsafe(UUIDUtil.toBytes(uuid))); + } + + public static AttributeValue fromSdkBytes(SdkBytes value) { + return AttributeValue.builder().b(value).build(); + } + + private static int toInt(AttributeValue av) { + return Integer.parseInt(av.n()); + } + + private static long toLong(AttributeValue av) { + return Long.parseLong(av.n()); + } + + private static UUID toUUID(AttributeValue av) { + return UUIDUtil.fromBytes(av.b().asByteArrayUnsafe()); // We're guaranteed not to modify the byte array + } + + private static byte[] toByteArray(AttributeValue av) { + return av.b().asByteArray(); + } + + private static String toString(AttributeValue av) { + return av.s(); + } + + public static Optional get(Map item, String key) { + return Optional.ofNullable(item.get(key)); + } + + public static int getInt(Map item, String key, int defaultValue) { + return AttributeValues.get(item, key).map(AttributeValues::toInt).orElse(defaultValue); + } + + public static String getString(Map item, String key, String defaultValue) { + return AttributeValues.get(item, key).map(AttributeValues::toString).orElse(defaultValue); + } + + public static long getLong(Map item, String key, long defaultValue) { + return AttributeValues.get(item, key).map(AttributeValues::toLong).orElse(defaultValue); + } + + public static byte[] getByteArray(Map item, String key, byte[] defaultValue) { + return AttributeValues.get(item, key).map(AttributeValues::toByteArray).orElse(defaultValue); + } + + public static UUID getUUID(Map item, String key, UUID defaultValue) { + return AttributeValues.get(item, key).map(AttributeValues::toUUID).orElse(defaultValue); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/DynamoDbFromConfig.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/DynamoDbFromConfig.java new file mode 100644 index 000000000..738252b5c --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/DynamoDbFromConfig.java @@ -0,0 +1,41 @@ +package org.whispersystems.textsecuregcm.util; + +import org.whispersystems.textsecuregcm.configuration.DynamoDbConfiguration; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import java.util.concurrent.Executor; + +public class DynamoDbFromConfig { + private static ClientOverrideConfiguration clientOverrideConfiguration(DynamoDbConfiguration config) { + return ClientOverrideConfiguration.builder() + .apiCallTimeout(config.getClientExecutionTimeout()) + .apiCallAttemptTimeout(config.getClientRequestTimeout()) + .build(); + } + public static DynamoDbClient client(DynamoDbConfiguration config, AwsCredentialsProvider credentialsProvider) { + return DynamoDbClient.builder() + .region(Region.of(config.getRegion())) + .credentialsProvider(credentialsProvider) + .overrideConfiguration(clientOverrideConfiguration(config)) + .build(); + } + public static DynamoDbAsyncClient asyncClient(DynamoDbConfiguration config, AwsCredentialsProvider credentialsProvider, Executor executor) { + DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder() + .region(Region.of(config.getRegion())) + .credentialsProvider(credentialsProvider) + .overrideConfiguration(clientOverrideConfiguration(config)); + if (executor != null) { + builder.asyncConfiguration(ClientAsyncConfiguration.builder() + .advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, + executor) + .build()); + } + return builder.build(); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/UUIDUtil.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/UUIDUtil.java index ce368df77..e05340970 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/util/UUIDUtil.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/UUIDUtil.java @@ -5,6 +5,7 @@ package org.whispersystems.textsecuregcm.util; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.UUID; @@ -26,12 +27,15 @@ public class UUIDUtil { } public static UUID fromByteBuffer(final ByteBuffer byteBuffer) { - if (byteBuffer.array().length != 16) { - throw new IllegalArgumentException("unexpected byte array length; was " + byteBuffer.array().length + " but expected 16"); + try { + final long mostSigBits = byteBuffer.getLong(); + final long leastSigBits = byteBuffer.getLong(); + if (byteBuffer.hasRemaining()) { + throw new IllegalArgumentException("unexpected byte array length; was greater than 16"); + } + return new UUID(mostSigBits, leastSigBits); + } catch (BufferUnderflowException e) { + throw new IllegalArgumentException("unexpected byte array length; was less than 16"); } - - final long mostSigBits = byteBuffer.getLong(); - final long leastSigBits = byteBuffer.getLong(); - return new UUID(mostSigBits, leastSigBits); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index de73ed21b..60a700b98 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -9,11 +9,6 @@ import static com.codahale.metrics.MetricRegistry.name; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.InstanceProfileCredentialsProvider; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.fasterxml.jackson.databind.DeserializationFeature; import io.dropwizard.Application; import io.dropwizard.cli.EnvironmentCommand; @@ -59,6 +54,9 @@ import org.whispersystems.textsecuregcm.storage.ReportMessageManager; import org.whispersystems.textsecuregcm.storage.ReservedUsernames; import org.whispersystems.textsecuregcm.storage.Usernames; import org.whispersystems.textsecuregcm.storage.UsernamesManager; +import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; public class DeleteUserCommand extends EnvironmentCommand { @@ -100,64 +98,20 @@ public class DeleteUserCommand extends EnvironmentCommand()); - AmazonDynamoDBAsyncClientBuilder accountsDynamoDbAsyncClientBuilder = AmazonDynamoDBAsyncClientBuilder - .standard() - .withRegion(accountsDynamoDbClientBuilder.getRegion()) - .withClientConfiguration(accountsDynamoDbClientBuilder.getClientConfiguration()) - .withCredentials(accountsDynamoDbClientBuilder.getCredentials()) - .withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool); - - AmazonDynamoDBClientBuilder migrationDeletedAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder - .standard() - .withRegion(configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getRegion()) - .withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis())) - .withRequestTimeout((int) configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis())) - .withCredentials(InstanceProfileCredentialsProvider.getInstance()); - - AmazonDynamoDBClientBuilder migrationRetryAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder - .standard() - .withRegion(configuration.getMigrationRetryAccountsDynamoDbConfiguration().getRegion()) - .withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getMigrationRetryAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis())) - .withRequestTimeout((int) configuration.getMigrationRetryAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis())) - .withCredentials(InstanceProfileCredentialsProvider.getInstance()); - - AmazonDynamoDBClientBuilder reportMessageDynamoDbClientBuilder = AmazonDynamoDBClientBuilder - .standard() - .withRegion(configuration.getReportMessageDynamoDbConfiguration().getRegion()) - .withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getReportMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis())) - .withRequestTimeout((int) configuration.getReportMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis())) - .withCredentials(InstanceProfileCredentialsProvider.getInstance()); - - DynamoDB messageDynamoDb = new DynamoDB(clientBuilder.build()); - DynamoDB preKeysDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build()); - DynamoDB reportMessagesDynamoDb = new DynamoDB(reportMessageDynamoDbClientBuilder.build()); - - AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build(); - AmazonDynamoDBAsync accountsDynamoDbAsyncClient = accountsDynamoDbAsyncClientBuilder.build(); + DynamoDbClient reportMessagesDynamoDb = DynamoDbFromConfig.client(configuration.getReportMessageDynamoDbConfiguration(), + software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); + DynamoDbClient messageDynamoDb = DynamoDbFromConfig.client(configuration.getMessageDynamoDbConfiguration(), + software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); + DynamoDbClient preKeysDynamoDb = DynamoDbFromConfig.client(configuration.getKeysDynamoDbConfiguration(), + software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); + DynamoDbClient accountsDynamoDbClient = DynamoDbFromConfig.client(configuration.getAccountsDynamoDbConfiguration(), + software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); + DynamoDbAsyncClient accountsDynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(configuration.getAccountsDynamoDbConfiguration(), + software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create(), + accountsDynamoDbMigrationThreadPool); FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration(), redisClusterClientResources); @@ -173,14 +127,16 @@ public class DeleteUserCommand extends EnvironmentCommand()), dynamoDbExtension.getDynamoDB(), dynamoDbExtension.getTableName(), numbersTable.getTableName(), - migrationDeletedAccounts, migrationRetryAccounts); + this.accountsDynamoDb = new AccountsDynamoDb( + dynamoDbExtension.getDynamoDbClient(), + dynamoDbExtension.getDynamoDbAsyncClient(), + new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()), + dynamoDbExtension.getTableName(), + NUMBERS_TABLE_NAME, + migrationDeletedAccounts, + migrationRetryAccounts); } @Test @@ -262,8 +289,12 @@ class AccountsDynamoDbTest { verifyRecentlyDeletedAccountsTableItemCount(1); - assertThat(migrationDeletedAccountsTable - .getItem(MigrationDeletedAccounts.primaryKey(deletedAccount.getUuid()))).isNotNull(); + Map primaryKey = MigrationDeletedAccounts.primaryKey(deletedAccount.getUuid()); + assertThat(dynamoDbExtension.getDynamoDbClient().getItem(GetItemRequest.builder() + .tableName(MIGRATION_DELETED_ACCOUNTS_TABLE_NAME) + .key(Map.of(MigrationDeletedAccounts.KEY_UUID, primaryKey.get(MigrationDeletedAccounts.KEY_UUID))) + .build())) + .isNotNull(); accountsDynamoDb.deleteRecentlyDeletedUuids(); @@ -273,8 +304,10 @@ class AccountsDynamoDbTest { private void verifyRecentlyDeletedAccountsTableItemCount(int expectedItemCount) { int totalItems = 0; - for (Page page : migrationDeletedAccountsTable.scan(new ScanSpec()).pages()) { - for (Item ignored : page) { + for (ScanResponse page : dynamoDbExtension.getDynamoDbClient().scanPaginator(ScanRequest.builder() + .tableName(MIGRATION_DELETED_ACCOUNTS_TABLE_NAME) + .build())) { + for (Map item : page.items()) { totalItems++; } } @@ -306,16 +339,15 @@ class AccountsDynamoDbTest { configuration.setRingBufferSizeInClosedState(2); configuration.setFailureRateThreshold(50); - final AmazonDynamoDB client = mock(AmazonDynamoDB.class); - final DynamoDB dynamoDB = new DynamoDB(client); + final DynamoDbClient client = mock(DynamoDbClient.class); - when(client.transactWriteItems(any())) + when(client.transactWriteItems(any(TransactWriteItemsRequest.class))) .thenThrow(RuntimeException.class); - when(client.updateItem(any())) + when(client.updateItem(any(UpdateItemRequest.class))) .thenThrow(RuntimeException.class); - AccountsDynamoDb accounts = new AccountsDynamoDb(client, mock(AmazonDynamoDBAsync.class), mock(ThreadPoolExecutor.class), dynamoDB, ACCOUNTS_TABLE_NAME, NUMBERS_TABLE_NAME, mock( + AccountsDynamoDb accounts = new AccountsDynamoDb(client, mock(DynamoDbAsyncClient.class), mock(ThreadPoolExecutor.class), ACCOUNTS_TABLE_NAME, NUMBERS_TABLE_NAME, mock( MigrationDeletedAccounts.class), mock(MigrationRetryAccounts.class)); Account account = generateAccount("+14151112222", UUID.randomUUID()); @@ -408,20 +440,22 @@ class AccountsDynamoDbTest { } private void verifyStoredState(String number, UUID uuid, Account expecting) { - final Table accounts = dynamoDbExtension.getDynamoDB().getTable(dynamoDbExtension.getTableName()); + final DynamoDbClient db = dynamoDbExtension.getDynamoDbClient(); - Item item = accounts.getItem(new GetItemSpec() - .withPrimaryKey(AccountsDynamoDb.KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(uuid)) - .withConsistentRead(true)); + final GetItemResponse get = db.getItem(GetItemRequest.builder() + .tableName(dynamoDbExtension.getTableName()) + .key(Map.of(AccountsDynamoDb.KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid))) + .consistentRead(true) + .build()); - if (item != null) { - String data = new String(item.getBinary(AccountsDynamoDb.ATTR_ACCOUNT_DATA), StandardCharsets.UTF_8); + if (get.hasItem()) { + String data = new String(get.item().get(AccountsDynamoDb.ATTR_ACCOUNT_DATA).b().asByteArray(), StandardCharsets.UTF_8); assertThat(data).isNotEmpty(); - assertThat(item.getNumber(AccountsDynamoDb.ATTR_MIGRATION_VERSION).intValue()) + assertThat(AttributeValues.getInt(get.item(), AccountsDynamoDb.ATTR_MIGRATION_VERSION, -1)) .isEqualTo(expecting.getDynamoDbMigrationVersion()); - Account result = AccountsDynamoDb.fromItem(item); + Account result = AccountsDynamoDb.fromItem(get.item()); verifyStoredState(number, uuid, result, expecting); } else { throw new AssertionError("No data"); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java index 141d72ffa..9dd005cf1 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java @@ -1,33 +1,35 @@ package org.whispersystems.textsecuregcm.storage; import com.almworks.sqlite4java.SQLite; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.local.main.ServerRunner; import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer; -import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; -import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; -import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex; -import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; -import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import java.net.ServerSocket; +import java.net.URI; import java.util.ArrayList; import java.util.List; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback { static final String DEFAULT_TABLE_NAME = "test_table"; - static final ProvisionedThroughput DEFAULT_PROVISIONED_THROUGHPUT = new ProvisionedThroughput(20L, 20L); + static final ProvisionedThroughput DEFAULT_PROVISIONED_THROUGHPUT = ProvisionedThroughput.builder() + .readCapacityUnits(20L) + .writeCapacityUnits(20L) + .build(); private DynamoDBProxyServer server; private int port; @@ -42,9 +44,8 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback private final long readCapacityUnits; private final long writeCapacityUnits; - private AmazonDynamoDB client; - private AmazonDynamoDBAsync asyncClient; - private DynamoDB dynamoDB; + private DynamoDbClient dynamoDB2; + private DynamoDbAsyncClient dynamoAsyncDB2; private DynamoDbExtension(String tableName, String hashKey, String rangeKey, List attributeDefinitions, List globalSecondaryIndexes, long readCapacityUnits, long writeCapacityUnits) { @@ -87,26 +88,33 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback KeySchemaElement[] keySchemaElements; if (rangeKeyName == null) { keySchemaElements = new KeySchemaElement[] { - new KeySchemaElement(hashKeyName, "HASH"), + KeySchemaElement.builder().attributeName(hashKeyName).keyType(KeyType.HASH).build(), }; } else { keySchemaElements = new KeySchemaElement[] { - new KeySchemaElement(hashKeyName, "HASH"), - new KeySchemaElement(rangeKeyName, "RANGE") + KeySchemaElement.builder().attributeName(hashKeyName).keyType(KeyType.HASH).build(), + KeySchemaElement.builder().attributeName(rangeKeyName).keyType(KeyType.RANGE).build(), }; } - final CreateTableRequest createTableRequest = new CreateTableRequest() - .withTableName(tableName) - .withKeySchema(keySchemaElements) - .withAttributeDefinitions(attributeDefinitions.isEmpty() ? null : attributeDefinitions) - .withGlobalSecondaryIndexes(globalSecondaryIndexes.isEmpty() ? null : globalSecondaryIndexes) - .withProvisionedThroughput(new ProvisionedThroughput(readCapacityUnits, writeCapacityUnits)); + final CreateTableRequest createTableRequest = CreateTableRequest.builder() + .tableName(tableName) + .keySchema(keySchemaElements) + .attributeDefinitions(attributeDefinitions.isEmpty() ? null : attributeDefinitions) + .globalSecondaryIndexes(globalSecondaryIndexes.isEmpty() ? null : globalSecondaryIndexes) + .provisionedThroughput(ProvisionedThroughput.builder() + .readCapacityUnits(readCapacityUnits) + .writeCapacityUnits(writeCapacityUnits) + .build()) + .build(); - getDynamoDB().createTable(createTableRequest); + getDynamoDbClient().createTable(createTableRequest); } private void startServer() throws Exception { + // Even though we're using AWS SDK v2, Dynamo's local implementation's canonical location + // is within v1 (https://github.com/aws/aws-sdk-java-v2/issues/982). This does support + // v2 clients, though. SQLite.setLibraryPath("target/lib"); // if you see a library failed to load error, you need to run mvn test-compile at least once first ServerSocket serverSocket = new ServerSocket(0); serverSocket.setReuseAddress(false); @@ -117,18 +125,18 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback } private void initializeClient() { - AmazonDynamoDBClientBuilder clientBuilder = AmazonDynamoDBClientBuilder.standard() - .withEndpointConfiguration( - new AwsClientBuilder.EndpointConfiguration("http://localhost:" + port, "local-test-region")) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))); - client = clientBuilder.build(); - - asyncClient = AmazonDynamoDBAsyncClientBuilder.standard() - .withEndpointConfiguration(clientBuilder.getEndpoint()) - .withCredentials(clientBuilder.getCredentials()) - .build(); - - dynamoDB = new DynamoDB(client); + dynamoDB2 = DynamoDbClient.builder() + .endpointOverride(URI.create("http://localhost:" + port)) + .region(Region.of("local-test-region")) + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create("accessKey", "secretKey"))) + .build(); + dynamoAsyncDB2 = DynamoDbAsyncClient.builder() + .endpointOverride(URI.create("http://localhost:" + port)) + .region(Region.of("local-test-region")) + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create("accessKey", "secretKey"))) + .build(); } static class DynamoDbExtensionBuilder { @@ -140,8 +148,8 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback private List attributeDefinitions = new ArrayList<>(); private List globalSecondaryIndexes = new ArrayList<>(); - private long readCapacityUnits = DEFAULT_PROVISIONED_THROUGHPUT.getReadCapacityUnits(); - private long writeCapacityUnits = DEFAULT_PROVISIONED_THROUGHPUT.getWriteCapacityUnits(); + private long readCapacityUnits = DEFAULT_PROVISIONED_THROUGHPUT.readCapacityUnits(); + private long writeCapacityUnits = DEFAULT_PROVISIONED_THROUGHPUT.writeCapacityUnits(); private DynamoDbExtensionBuilder() { @@ -178,16 +186,12 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback } } - public AmazonDynamoDB getClient() { - return client; + public DynamoDbClient getDynamoDbClient() { + return dynamoDB2; } - public AmazonDynamoDBAsync getAsyncClient() { - return asyncClient; - } - - public DynamoDB getDynamoDB() { - return dynamoDB; + public DynamoDbAsyncClient getDynamoDbAsyncClient() { + return dynamoAsyncDB2; } public String getTableName() { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDbRule.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDbRule.java index cee53509c..8e5d5e464 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDbRule.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDbRule.java @@ -5,13 +5,13 @@ package org.whispersystems.textsecuregcm.storage; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; -import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; -import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; -import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; -import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; import org.whispersystems.textsecuregcm.tests.util.LocalDynamoDbRule; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; public class KeysDynamoDbRule extends LocalDynamoDbRule { public static final String TABLE_NAME = "Signal_Keys_Test"; @@ -19,18 +19,22 @@ public class KeysDynamoDbRule extends LocalDynamoDbRule { @Override protected void before() throws Throwable { super.before(); - - final DynamoDB dynamoDB = getDynamoDB(); - - final CreateTableRequest createTableRequest = new CreateTableRequest() - .withTableName(TABLE_NAME) - .withKeySchema(new KeySchemaElement(KeysDynamoDb.KEY_ACCOUNT_UUID, "HASH"), - new KeySchemaElement(KeysDynamoDb.KEY_DEVICE_ID_KEY_ID, "RANGE")) - .withAttributeDefinitions(new AttributeDefinition(KeysDynamoDb.KEY_ACCOUNT_UUID, ScalarAttributeType.B), - new AttributeDefinition(KeysDynamoDb.KEY_DEVICE_ID_KEY_ID, ScalarAttributeType.B)) - .withProvisionedThroughput(new ProvisionedThroughput(20L, 20L)); - - dynamoDB.createTable(createTableRequest); + getDynamoDbClient().createTable(CreateTableRequest.builder() + .tableName(TABLE_NAME) + .keySchema( + KeySchemaElement.builder().attributeName(KeysDynamoDb.KEY_ACCOUNT_UUID).keyType(KeyType.HASH).build(), + KeySchemaElement.builder().attributeName(KeysDynamoDb.KEY_DEVICE_ID_KEY_ID).keyType(KeyType.RANGE) + .build()) + .attributeDefinitions(AttributeDefinition.builder() + .attributeName(KeysDynamoDb.KEY_ACCOUNT_UUID) + .attributeType(ScalarAttributeType.B) + .build(), + AttributeDefinition.builder() + .attributeName(KeysDynamoDb.KEY_DEVICE_ID_KEY_ID) + .attributeType(ScalarAttributeType.B) + .build()) + .provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(20L).writeCapacityUnits(20L).build()) + .build()); } @Override diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDbTest.java index b3170b2b8..3dffbd7e0 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDbTest.java @@ -9,6 +9,7 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.whispersystems.textsecuregcm.entities.PreKey; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import java.util.Collections; import java.util.List; @@ -17,6 +18,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -34,7 +36,7 @@ public class KeysDynamoDbTest { @Before public void setup() { - keysDynamoDb = new KeysDynamoDb(dynamoDbRule.getDynamoDB(), KeysDynamoDbRule.TABLE_NAME); + keysDynamoDb = new KeysDynamoDb(dynamoDbRule.getDynamoDbClient(), KeysDynamoDbRule.TABLE_NAME); account = mock(Account.class); when(account.getNumber()).thenReturn(ACCOUNT_NUMBER); @@ -133,4 +135,10 @@ public class KeysDynamoDbTest { assertEquals(0, keysDynamoDb.getCount(account, DEVICE_ID)); assertEquals(1, keysDynamoDb.getCount(account, DEVICE_ID + 1)); } + + @Test + public void testSortKeyPrefix() { + AttributeValue got = KeysDynamoDb.getSortKeyPrefix(123); + assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 123}, got.b().asByteArray()); + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index 2e063fcb7..9a20dae0e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -9,12 +9,6 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.ItemCollection; -import com.amazonaws.services.dynamodbv2.document.ScanOutcome; -import com.amazonaws.services.dynamodbv2.document.Table; -import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec; import com.google.protobuf.ByteString; import io.lettuce.core.cluster.SlotHash; import java.nio.ByteBuffer; @@ -22,6 +16,7 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -38,6 +33,10 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbRule; +import org.whispersystems.textsecuregcm.util.AttributeValues; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest { @@ -62,7 +61,7 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest { connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz"); }); - final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDB(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7)); + final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDbClient(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7)); final AccountsManager accountsManager = mock(AccountsManager.class); final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class); @@ -142,17 +141,16 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest { final List persistedMessages = new ArrayList<>(messageCount); - DynamoDB dynamoDB = messagesDynamoDbRule.getDynamoDB(); - Table table = dynamoDB.getTable(MessagesDynamoDbRule.TABLE_NAME); - final ItemCollection scan = table.scan(new ScanSpec()); - for (Item item : scan) { - persistedMessages.add(MessageProtos.Envelope.newBuilder() - .setServerGuid(convertBinaryToUuid(item.getBinary("U")).toString()) - .setType(MessageProtos.Envelope.Type.valueOf(item.getInt("T"))) - .setTimestamp(item.getLong("TS")) - .setServerTimestamp(extractServerTimestamp(item.getBinary("S"))) - .setContent(ByteString.copyFrom(item.getBinary("C"))) - .build()); + DynamoDbClient dynamoDB = messagesDynamoDbRule.getDynamoDbClient(); + for (Map item : dynamoDB + .scan(ScanRequest.builder().tableName(MessagesDynamoDbRule.TABLE_NAME).build()).items()) { + persistedMessages.add(MessageProtos.Envelope.newBuilder() + .setServerGuid(AttributeValues.getUUID(item, "U", null).toString()) + .setType(MessageProtos.Envelope.Type.valueOf(AttributeValues.getInt(item, "T", -1))) + .setTimestamp(AttributeValues.getLong(item, "TS", -1)) + .setServerTimestamp(extractServerTimestamp(AttributeValues.getByteArray(item, "S", null))) + .setContent(ByteString.copyFrom(AttributeValues.getByteArray(item, "C", null))) + .build()); } assertEquals(expectedMessages, persistedMessages); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MigrationDeletedAccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MigrationDeletedAccountsTest.java index 84a1081d2..272131acd 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MigrationDeletedAccountsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MigrationDeletedAccountsTest.java @@ -4,10 +4,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; import java.util.UUID; -import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; -import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; class MigrationDeletedAccountsTest { @@ -15,13 +15,16 @@ class MigrationDeletedAccountsTest { static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder() .tableName("deleted_accounts_test") .hashKey(MigrationDeletedAccounts.KEY_UUID) - .attributeDefinition(new AttributeDefinition(MigrationDeletedAccounts.KEY_UUID, ScalarAttributeType.B)) + .attributeDefinition(AttributeDefinition.builder() + .attributeName(MigrationDeletedAccounts.KEY_UUID) + .attributeType(ScalarAttributeType.B) + .build()) .build(); @Test void test() { - final MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(dynamoDbExtension.getDynamoDB(), + final MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(dynamoDbExtension.getDynamoDbClient(), dynamoDbExtension.getTableName()); UUID firstUuid = UUID.randomUUID(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MigrationRetryAccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MigrationRetryAccountsTest.java index a646dac62..7af23fa90 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MigrationRetryAccountsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MigrationRetryAccountsTest.java @@ -2,12 +2,12 @@ package org.whispersystems.textsecuregcm.storage; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; -import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; import java.util.List; import java.util.UUID; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; class MigrationRetryAccountsTest { @@ -15,13 +15,16 @@ class MigrationRetryAccountsTest { static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder() .tableName("account_migration_errors_test") .hashKey(MigrationRetryAccounts.KEY_UUID) - .attributeDefinition(new AttributeDefinition(MigrationRetryAccounts.KEY_UUID, ScalarAttributeType.B)) + .attributeDefinition(AttributeDefinition.builder() + .attributeName(MigrationRetryAccounts.KEY_UUID) + .attributeType(ScalarAttributeType.B) + .build()) .build(); @Test void test() { - final MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(dynamoDbExtension.getDynamoDB(), + final MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(dynamoDbExtension.getDynamoDbClient(), dynamoDbExtension.getTableName()); UUID firstUuid = UUID.randomUUID(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/PushChallengeDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/PushChallengeDynamoDbTest.java index 7ff79cc22..183f8edfe 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/PushChallengeDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/PushChallengeDynamoDbTest.java @@ -9,8 +9,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; -import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; import java.time.Clock; import java.time.Duration; import java.time.Instant; @@ -20,6 +18,8 @@ import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; class PushChallengeDynamoDbTest { @@ -34,12 +34,15 @@ class PushChallengeDynamoDbTest { static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder() .tableName(TABLE_NAME) .hashKey(PushChallengeDynamoDb.KEY_ACCOUNT_UUID) - .attributeDefinition(new AttributeDefinition(PushChallengeDynamoDb.KEY_ACCOUNT_UUID, ScalarAttributeType.B)) + .attributeDefinition(AttributeDefinition.builder() + .attributeName(PushChallengeDynamoDb.KEY_ACCOUNT_UUID) + .attributeType(ScalarAttributeType.B) + .build()) .build(); @BeforeEach void setUp() { - this.pushChallengeDynamoDb = new PushChallengeDynamoDb(dynamoDbExtension.getDynamoDB(), TABLE_NAME, Clock.fixed( + this.pushChallengeDynamoDb = new PushChallengeDynamoDb(dynamoDbExtension.getDynamoDbClient(), TABLE_NAME, Clock.fixed( Instant.ofEpochMilli(CURRENT_TIME_MILLIS), ZoneId.systemDefault())); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ReportMessageDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ReportMessageDynamoDbTest.java index ca12cb454..593ccdd27 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ReportMessageDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ReportMessageDynamoDbTest.java @@ -4,13 +4,13 @@ import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; -import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.util.UUIDUtil; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; class ReportMessageDynamoDbTest { @@ -22,13 +22,16 @@ class ReportMessageDynamoDbTest { static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder() .tableName(TABLE_NAME) .hashKey(ReportMessageDynamoDb.KEY_HASH) - .attributeDefinition(new AttributeDefinition(ReportMessageDynamoDb.KEY_HASH, ScalarAttributeType.B)) + .attributeDefinition(AttributeDefinition.builder() + .attributeName(ReportMessageDynamoDb.KEY_HASH) + .attributeType(ScalarAttributeType.B) + .build()) .build(); @BeforeEach void setUp() { - this.reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbExtension.getDynamoDB(), TABLE_NAME); + this.reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbExtension.getDynamoDbClient(), TABLE_NAME); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java index e2aa4c3a2..b6dcae6db 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java @@ -20,7 +20,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException; import io.lettuce.core.RedisException; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import java.util.HashSet; @@ -49,6 +48,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.ProfilesManager; import org.whispersystems.textsecuregcm.storage.UsernamesManager; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; class AccountsManagerTest { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/MessagesDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/MessagesDynamoDbTest.java index 7c44ca64b..bb52e10fc 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/MessagesDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/MessagesDynamoDbTest.java @@ -67,7 +67,7 @@ public class MessagesDynamoDbTest { @Before public void setup() { - messagesDynamoDb = new MessagesDynamoDb(dynamoDbRule.getDynamoDB(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7)); + messagesDynamoDb = new MessagesDynamoDb(dynamoDbRule.getDynamoDbClient(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7)); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/LocalDynamoDbRule.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/LocalDynamoDbRule.java index 33b0414d2..1b494ee6f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/LocalDynamoDbRule.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/LocalDynamoDbRule.java @@ -6,16 +6,16 @@ package org.whispersystems.textsecuregcm.tests.util; import com.almworks.sqlite4java.SQLite; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.local.main.ServerRunner; import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer; import org.junit.rules.ExternalResource; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import java.net.ServerSocket; +import java.net.URI; public class LocalDynamoDbRule extends ExternalResource { private DynamoDBProxyServer server; @@ -43,11 +43,12 @@ public class LocalDynamoDbRule extends ExternalResource { super.after(); } - public DynamoDB getDynamoDB() { - AmazonDynamoDBClientBuilder clientBuilder = - AmazonDynamoDBClientBuilder.standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:" + port, "local-test-region")) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))); - return new DynamoDB(clientBuilder.build()); + public DynamoDbClient getDynamoDbClient() { + return DynamoDbClient.builder() + .endpointOverride(URI.create("http://localhost:" + port)) + .region(Region.of("local-test-region")) + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create("accessKey", "secretKey"))) + .build(); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/MessagesDynamoDbRule.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/MessagesDynamoDbRule.java index 1841222b8..62c4ed2ce 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/MessagesDynamoDbRule.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/MessagesDynamoDbRule.java @@ -5,15 +5,15 @@ package org.whispersystems.textsecuregcm.tests.util; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; -import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; -import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; -import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndex; -import com.amazonaws.services.dynamodbv2.model.Projection; -import com.amazonaws.services.dynamodbv2.model.ProjectionType; -import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; -import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.LocalSecondaryIndex; +import software.amazon.awssdk.services.dynamodb.model.Projection; +import software.amazon.awssdk.services.dynamodb.model.ProjectionType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; public class MessagesDynamoDbRule extends LocalDynamoDbRule { @@ -22,20 +22,21 @@ public class MessagesDynamoDbRule extends LocalDynamoDbRule { @Override protected void before() throws Throwable { super.before(); - DynamoDB dynamoDB = getDynamoDB(); - CreateTableRequest createTableRequest = new CreateTableRequest() - .withTableName(TABLE_NAME) - .withKeySchema(new KeySchemaElement("H", "HASH"), - new KeySchemaElement("S", "RANGE")) - .withAttributeDefinitions(new AttributeDefinition("H", ScalarAttributeType.B), - new AttributeDefinition("S", ScalarAttributeType.B), - new AttributeDefinition("U", ScalarAttributeType.B)) - .withProvisionedThroughput(new ProvisionedThroughput(20L, 20L)) - .withLocalSecondaryIndexes(new LocalSecondaryIndex().withIndexName("Message_UUID_Index") - .withKeySchema(new KeySchemaElement("H", "HASH"), - new KeySchemaElement("U", "RANGE")) - .withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY))); - dynamoDB.createTable(createTableRequest); + getDynamoDbClient().createTable(CreateTableRequest.builder() + .tableName(TABLE_NAME) + .keySchema(KeySchemaElement.builder().attributeName("H").keyType(KeyType.HASH).build(), + KeySchemaElement.builder().attributeName("S").keyType(KeyType.RANGE).build()) + .attributeDefinitions( + AttributeDefinition.builder().attributeName("H").attributeType(ScalarAttributeType.B).build(), + AttributeDefinition.builder().attributeName("S").attributeType(ScalarAttributeType.B).build(), + AttributeDefinition.builder().attributeName("U").attributeType(ScalarAttributeType.B).build()) + .provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(20L).writeCapacityUnits(20L).build()) + .localSecondaryIndexes(LocalSecondaryIndex.builder().indexName("Message_UUID_Index") + .keySchema(KeySchemaElement.builder().attributeName("H").keyType(KeyType.HASH).build(), + KeySchemaElement.builder().attributeName("U").keyType(KeyType.RANGE).build()) + .projection(Projection.builder().projectionType(ProjectionType.KEYS_ONLY).build()) + .build()) + .build()); } @Override diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/util/AsnTableTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/util/AsnTableTest.java index 4b24a3aaf..d51028f7f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/util/AsnTableTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/util/AsnTableTest.java @@ -12,7 +12,6 @@ import java.io.InputStreamReader; import java.net.Inet4Address; import java.net.UnknownHostException; import java.util.Optional; -import java.util.zip.GZIPInputStream; import static org.junit.jupiter.api.Assertions.*; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/util/AttributeValuesTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/util/AttributeValuesTest.java new file mode 100644 index 000000000..73bb91069 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/util/AttributeValuesTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.util; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.*; + +public class AttributeValuesTest { + @Test + void testUUIDRoundTrip() { + UUID orig = UUID.randomUUID(); + AttributeValue av = AttributeValues.fromUUID(orig); + UUID returned = AttributeValues.getUUID(Map.of("foo", av), "foo", null); + assertEquals(orig, returned); + } + + @Test + void testLongRoundTrip() { + long orig = 12345; + AttributeValue av = AttributeValues.fromLong(orig); + long returned = AttributeValues.getLong(Map.of("foo", av), "foo", -1); + assertEquals(orig, returned); + } + + @Test + void testIntRoundTrip() { + int orig = 12345; + AttributeValue av = AttributeValues.fromInt(orig); + int returned = AttributeValues.getInt(Map.of("foo", av), "foo", -1); + assertEquals(orig, returned); + } + + @Test + void testByteBuffer() { + byte[] bytes = {1, 2, 3}; + ByteBuffer bb = ByteBuffer.wrap(bytes); + AttributeValue av = AttributeValues.fromByteBuffer(bb); + byte[] returned = av.b().asByteArray(); + assertArrayEquals(bytes, returned); + returned = AttributeValues.getByteArray(Map.of("foo", av), "foo", null); + assertArrayEquals(bytes, returned); + } + + @Test + void testByteBuffer2() { + final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]); + byteBuffer.putLong(123); + assertEquals(byteBuffer.remaining(), 0); + AttributeValue av = AttributeValues.fromByteBuffer(byteBuffer.flip()); + assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 123}, AttributeValues.getByteArray(Map.of("foo", av), "foo", null)); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 0a11cf507..b971079a1 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -79,7 +79,7 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest executorService = Executors.newSingleThreadExecutor(); messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), executorService); - messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDB(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7)); + messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDbClient(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7)); reportMessageManager = mock(ReportMessageManager.class); account = mock(Account.class); device = mock(Device.class);