diff --git a/service/pom.xml b/service/pom.xml
index 233429282..c8bf0a5d5 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -256,6 +256,10 @@
software.amazon.awssdk
s3
+
+ software.amazon.awssdk
+ dynamodb
+
com.amazonaws
aws-java-sdk-core
@@ -272,10 +276,6 @@
com.amazonaws
aws-java-sdk-appconfig
-
- com.amazonaws
- aws-java-sdk-dynamodb
-
redis.clients
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
index 98b051d98..85f8c4a02 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
@@ -12,11 +12,6 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
-import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.codahale.metrics.SharedMetricRegistries;
@@ -194,6 +189,7 @@ import org.whispersystems.textsecuregcm.storage.Usernames;
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
import org.whispersystems.textsecuregcm.util.AsnManager;
import org.whispersystems.textsecuregcm.util.Constants;
+import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import org.whispersystems.textsecuregcm.util.TorExitNodeManager;
import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener;
import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler;
@@ -210,6 +206,14 @@ import org.whispersystems.textsecuregcm.workers.VacuumCommand;
import org.whispersystems.textsecuregcm.workers.ZkParamsCommand;
import org.whispersystems.websocket.WebSocketResourceProviderFactory;
import org.whispersystems.websocket.setup.WebSocketEnvironment;
+import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
public class WhisperServerService extends Application {
@@ -316,80 +320,40 @@ public class WhisperServerService extends Application());
- AmazonDynamoDBAsyncClientBuilder accountsDynamoDbAsyncClientBuilder = AmazonDynamoDBAsyncClientBuilder
- .standard()
- .withRegion(accountsDynamoDbClientBuilder.getRegion())
- .withClientConfiguration(accountsDynamoDbClientBuilder.getClientConfiguration())
- .withCredentials(accountsDynamoDbClientBuilder.getCredentials())
- .withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool);
+ DynamoDbAsyncClient accountsDynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(config.getAccountsDynamoDbConfiguration(),
+ software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create(),
+ accountsDynamoDbMigrationThreadPool);
- AmazonDynamoDBClientBuilder migrationDeletedAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
- .standard()
- .withRegion(config.getMigrationDeletedAccountsDynamoDbConfiguration().getRegion())
- .withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getMigrationDeletedAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
- .withRequestTimeout((int) config.getMigrationDeletedAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
- .withCredentials(InstanceProfileCredentialsProvider.getInstance());
+ DynamoDbClient recentlyDeletedAccountsDynamoDb = DynamoDbFromConfig.client(config.getMigrationDeletedAccountsDynamoDbConfiguration(),
+ software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
- AmazonDynamoDBClientBuilder migrationRetryAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
- .standard()
- .withRegion(config.getMigrationRetryAccountsDynamoDbConfiguration().getRegion())
- .withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getMigrationRetryAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
- .withRequestTimeout((int) config.getMigrationRetryAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
- .withCredentials(InstanceProfileCredentialsProvider.getInstance());
+ DynamoDbClient pushChallengeDynamoDbClient = DynamoDbFromConfig.client(config.getPushChallengeDynamoDbConfiguration(),
+ software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
- AmazonDynamoDBClientBuilder pushChallengeDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
- .standard()
- .withRegion(config.getPushChallengeDynamoDbConfiguration().getRegion())
- .withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getPushChallengeDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
- .withRequestTimeout((int) config.getPushChallengeDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
- .withCredentials(InstanceProfileCredentialsProvider.getInstance());
+ DynamoDbClient reportMessageDynamoDbClient = DynamoDbFromConfig.client(config.getReportMessageDynamoDbConfiguration(),
+ software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
- AmazonDynamoDBClientBuilder reportMessageDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
- .standard()
- .withRegion(config.getReportMessageDynamoDbConfiguration().getRegion())
- .withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getReportMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
- .withRequestTimeout((int) config.getReportMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
- .withCredentials(InstanceProfileCredentialsProvider.getInstance());
-
- DynamoDB messageDynamoDb = new DynamoDB(messageDynamoDbClientBuilder.build());
- DynamoDB preKeyDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
-
- AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build();
- AmazonDynamoDBAsync accountsDynamodbAsyncClient = accountsDynamoDbAsyncClientBuilder.build();
-
- DynamoDB recentlyDeletedAccountsDynamoDb = new DynamoDB(migrationDeletedAccountsDynamoDbClientBuilder.build());
- DynamoDB migrationRetryAccountsDynamoDb = new DynamoDB(migrationRetryAccountsDynamoDbClientBuilder.build());
+ DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(config.getMigrationRetryAccountsDynamoDbConfiguration(),
+ software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(recentlyDeletedAccountsDynamoDb, config.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, config.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());
Accounts accounts = new Accounts(accountDatabase);
- AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamodbAsyncClient, accountsDynamoDbMigrationThreadPool, new DynamoDB(accountsDynamoDbClient), config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts);
+ AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient, accountsDynamoDbMigrationThreadPool, config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts);
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
PendingDevices pendingDevices = new PendingDevices (accountDatabase);
Usernames usernames = new Usernames(accountDatabase);
@@ -399,8 +363,8 @@ public class WhisperServerService extends Application outcome = new AtomicReference<>();
- batchWriteItemsFirstPass.record(() -> outcome.set(dynamoDb.batchWriteItem(items)));
+ protected void executeTableWriteItemsUntilComplete(final Map> items) {
+ AtomicReference outcome = new AtomicReference<>();
+ batchWriteItemsFirstPass.record(() -> outcome.set(dynamoDbClient.batchWriteItem(BatchWriteItemRequest.builder().requestItems(items).build())));
int attemptCount = 0;
- while (!outcome.get().getUnprocessedItems().isEmpty() && attemptCount < MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE) {
- batchWriteItemsRetryPass.record(() -> outcome.set(dynamoDb.batchWriteItemUnprocessed(outcome.get().getUnprocessedItems())));
+ while (!outcome.get().unprocessedItems().isEmpty() && attemptCount < MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE) {
+ batchWriteItemsRetryPass.record(() -> outcome.set(dynamoDbClient.batchWriteItem(BatchWriteItemRequest.builder()
+ .requestItems(outcome.get().unprocessedItems())
+ .build())));
++attemptCount;
}
- if (!outcome.get().getUnprocessedItems().isEmpty()) {
- logger.error("Attempt count ({}) reached max ({}}) before applying all batch writes to dynamo. {} unprocessed items remain.", attemptCount, MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE, outcome.get().getUnprocessedItems().size());
- batchWriteItemsUnprocessed.increment(outcome.get().getUnprocessedItems().size());
+ if (!outcome.get().unprocessedItems().isEmpty()) {
+ int totalItems = outcome.get().unprocessedItems().values().stream().mapToInt(List::size).sum();
+ logger.error("Attempt count ({}) reached max ({}}) before applying all batch writes to dynamo. {} unprocessed items remain.", attemptCount, MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE, totalItems);
+ batchWriteItemsUnprocessed.increment(totalItems);
}
}
- protected long countItemsMatchingQuery(final Table table, final QuerySpec querySpec) {
- // This is very confusing, but does appear to be the intended behavior. See:
- //
- // - https://github.com/aws/aws-sdk-java/issues/693
- // - https://github.com/aws/aws-sdk-java/issues/915
- // - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.Count
-
- long matchingItems = 0;
-
- for (final Page- page : table.query(querySpec).pages()) {
- matchingItems += page.getLowLevelResult().getQueryResult().getCount();
- }
-
- return matchingItems;
- }
-
static void writeInBatches(final Iterable items, final Consumer
> action) {
final List batch = new ArrayList<>(DYNAMO_DB_MAX_BATCH_SIZE);
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java
index 3db2695af..1d5faebd9 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java
@@ -2,25 +2,6 @@ package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
-import com.amazonaws.handlers.AsyncHandler;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
-import com.amazonaws.services.dynamodbv2.document.DynamoDB;
-import com.amazonaws.services.dynamodbv2.document.Item;
-import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
-import com.amazonaws.services.dynamodbv2.document.Table;
-import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
-import com.amazonaws.services.dynamodbv2.model.AttributeValue;
-import com.amazonaws.services.dynamodbv2.model.CancellationReason;
-import com.amazonaws.services.dynamodbv2.model.Delete;
-import com.amazonaws.services.dynamodbv2.model.GetItemResult;
-import com.amazonaws.services.dynamodbv2.model.Put;
-import com.amazonaws.services.dynamodbv2.model.ReturnValuesOnConditionCheckFailure;
-import com.amazonaws.services.dynamodbv2.model.TransactWriteItem;
-import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsRequest;
-import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsResult;
-import com.amazonaws.services.dynamodbv2.model.TransactionCanceledException;
-import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
@@ -33,12 +14,27 @@ import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.CancellationReason;
+import software.amazon.awssdk.services.dynamodb.model.Delete;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.Put;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
+import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
+import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountStore {
@@ -51,9 +47,8 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
static final String ATTR_MIGRATION_VERSION = "V";
- private final AmazonDynamoDB client;
- private final Table accountsTable;
- private final AmazonDynamoDBAsync asyncClient;
+ private final DynamoDbClient client;
+ private final DynamoDbAsyncClient asyncClient;
private final ThreadPoolExecutor migrationThreadPool;
@@ -61,6 +56,7 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
private final MigrationRetryAccounts migrationRetryAccounts;
private final String phoneNumbersTableName;
+ private final String accountsTableName;
private static final Timer CREATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "create"));
private static final Timer UPDATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "update"));
@@ -70,18 +66,17 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
private final Logger logger = LoggerFactory.getLogger(AccountsDynamoDb.class);
- public AccountsDynamoDb(AmazonDynamoDB client, AmazonDynamoDBAsync asyncClient,
- ThreadPoolExecutor migrationThreadPool, DynamoDB dynamoDb, String accountsTableName, String phoneNumbersTableName,
+ public AccountsDynamoDb(DynamoDbClient client, DynamoDbAsyncClient asyncClient,
+ ThreadPoolExecutor migrationThreadPool, String accountsTableName, String phoneNumbersTableName,
MigrationDeletedAccounts migrationDeletedAccounts,
MigrationRetryAccounts accountsMigrationErrors) {
- super(dynamoDb);
+ super(client);
this.client = client;
- this.accountsTable = dynamoDb.getTable(accountsTableName);
- this.phoneNumbersTableName = phoneNumbersTableName;
-
this.asyncClient = asyncClient;
+ this.phoneNumbersTableName = phoneNumbersTableName;
+ this.accountsTableName = accountsTableName;
this.migrationThreadPool = migrationThreadPool;
this.migrationDeletedAccounts = migrationDeletedAccounts;
@@ -90,32 +85,34 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
@Override
public boolean create(Account account) {
-
return CREATE_TIMER.record(() -> {
try {
TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid());
+ TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid(), Put.builder()
+ .conditionExpression("attribute_not_exists(#number) OR #number = :number")
+ .expressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164))
+ .expressionAttributeValues(Map.of(":number", AttributeValues.fromString(account.getNumber()))));
- TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid());
-
- final TransactWriteItemsRequest request = new TransactWriteItemsRequest()
- .withTransactItems(phoneNumberConstraintPut, accountPut);
+ final TransactWriteItemsRequest request = TransactWriteItemsRequest.builder()
+ .transactItems(phoneNumberConstraintPut, accountPut)
+ .build();
try {
client.transactWriteItems(request);
} catch (TransactionCanceledException e) {
- final CancellationReason accountCancellationReason = e.getCancellationReasons().get(1);
+ final CancellationReason accountCancellationReason = e.cancellationReasons().get(1);
- if ("ConditionalCheckFailed".equals(accountCancellationReason.getCode())) {
+ if ("ConditionalCheckFailed".equals(accountCancellationReason.code())) {
throw new IllegalArgumentException("uuid present with different phone number");
}
- final CancellationReason phoneNumberConstraintCancellationReason = e.getCancellationReasons().get(0);
+ final CancellationReason phoneNumberConstraintCancellationReason = e.cancellationReasons().get(0);
- if ("ConditionalCheckFailed".equals(phoneNumberConstraintCancellationReason.getCode())) {
+ if ("ConditionalCheckFailed".equals(phoneNumberConstraintCancellationReason.code())) {
- ByteBuffer actualAccountUuid = phoneNumberConstraintCancellationReason.getItem().get(KEY_ACCOUNT_UUID).getB();
+ ByteBuffer actualAccountUuid = phoneNumberConstraintCancellationReason.item().get(KEY_ACCOUNT_UUID).b().asByteBuffer();
account.setUuid(UUIDUtil.fromByteBuffer(actualAccountUuid));
update(account);
@@ -134,39 +131,37 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
});
}
- private TransactWriteItem buildPutWriteItemForAccount(Account account, UUID uuid) throws JsonProcessingException {
- return new TransactWriteItem()
- .withPut(
- new Put()
- .withTableName(accountsTable.getTableName())
- .withItem(Map.of(
- KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)),
- ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()),
- ATTR_ACCOUNT_DATA, new AttributeValue()
- .withB(ByteBuffer.wrap(SystemMapper.getMapper().writeValueAsBytes(account))),
- ATTR_MIGRATION_VERSION, new AttributeValue().withN(
- String.valueOf(account.getDynamoDbMigrationVersion()))))
- .withConditionExpression("attribute_not_exists(#number) OR #number = :number")
- .withExpressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164))
- .withExpressionAttributeValues(Map.of(":number", new AttributeValue(account.getNumber()))));
+ private TransactWriteItem buildPutWriteItemForAccount(Account account, UUID uuid, Put.Builder putBuilder) throws JsonProcessingException {
+ return TransactWriteItem.builder()
+ .put(putBuilder
+ .tableName(accountsTableName)
+ .item(Map.of(
+ KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
+ ATTR_ACCOUNT_E164, AttributeValues.fromString(account.getNumber()),
+ ATTR_ACCOUNT_DATA, AttributeValues.fromByteArray(SystemMapper.getMapper().writeValueAsBytes(account)),
+ ATTR_MIGRATION_VERSION, AttributeValues.fromInt(account.getDynamoDbMigrationVersion())))
+ .build())
+ .build();
}
private TransactWriteItem buildPutWriteItemForPhoneNumberConstraint(Account account, UUID uuid) {
- return new TransactWriteItem()
- .withPut(
- new Put()
- .withTableName(phoneNumbersTableName)
- .withItem(Map.of(
- ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()),
- KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid))))
- .withConditionExpression(
+ return TransactWriteItem.builder()
+ .put(
+ Put.builder()
+ .tableName(phoneNumbersTableName)
+ .item(Map.of(
+ ATTR_ACCOUNT_E164, AttributeValues.fromString(account.getNumber()),
+ KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid)))
+ .conditionExpression(
"attribute_not_exists(#number) OR (attribute_exists(#number) AND #uuid = :uuid)")
- .withExpressionAttributeNames(
+ .expressionAttributeNames(
Map.of("#uuid", KEY_ACCOUNT_UUID,
"#number", ATTR_ACCOUNT_E164))
- .withExpressionAttributeValues(
- Map.of(":uuid", new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid))))
- .withReturnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD));
+ .expressionAttributeValues(
+ Map.of(":uuid", AttributeValues.fromUUID(uuid)))
+ .returnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD)
+ .build())
+ .build();
}
@Override
@@ -174,16 +169,18 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
UPDATE_TIMER.record(() -> {
UpdateItemRequest updateItemRequest;
try {
- updateItemRequest = new UpdateItemRequest()
- .withTableName(accountsTable.getTableName())
- .withKey(Map.of(KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(account.getUuid()))))
- .withUpdateExpression("SET #data = :data, #version = :version")
- .withConditionExpression("attribute_exists(#number)")
- .withExpressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164,
+ updateItemRequest = UpdateItemRequest.builder()
+ .tableName(accountsTableName)
+ .key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(account.getUuid())))
+ .updateExpression("SET #data = :data, #version = :version")
+ .conditionExpression("attribute_exists(#number)")
+ .expressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164,
"#data", ATTR_ACCOUNT_DATA,
"#version", ATTR_MIGRATION_VERSION))
- .withExpressionAttributeValues(Map.of(":data", new AttributeValue().withB(ByteBuffer.wrap(SystemMapper.getMapper().writeValueAsBytes(account))),
- ":version", new AttributeValue().withN(String.valueOf(account.getDynamoDbMigrationVersion()))));
+ .expressionAttributeValues(Map.of(
+ ":data", AttributeValues.fromByteArray(SystemMapper.getMapper().writeValueAsBytes(account)),
+ ":version", AttributeValues.fromInt(account.getDynamoDbMigrationVersion())))
+ .build();
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
@@ -193,37 +190,42 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
});
}
+
@Override
public Optional get(String number) {
-
return GET_BY_NUMBER_TIMER.record(() -> {
- final GetItemResult phoneNumberAndUuid = client.getItem(phoneNumbersTableName,
- Map.of(ATTR_ACCOUNT_E164, new AttributeValue(number)), true);
+ final GetItemResponse response = client.getItem(GetItemRequest.builder()
+ .tableName(phoneNumbersTableName)
+ .key(Map.of(ATTR_ACCOUNT_E164, AttributeValues.fromString(number)))
+ .build());
- return Optional.ofNullable(phoneNumberAndUuid.getItem())
- .map(item -> item.get(KEY_ACCOUNT_UUID).getB())
- .map(uuid -> accountsTable.getItem(new GetItemSpec()
- .withPrimaryKey(KEY_ACCOUNT_UUID, uuid.array())
- .withConsistentRead(true)))
+ return Optional.ofNullable(response.item())
+ .map(item -> item.get(KEY_ACCOUNT_UUID))
+ .map(uuid -> accountByUuid(uuid))
.map(AccountsDynamoDb::fromItem);
});
}
+ private Map accountByUuid(AttributeValue uuid) {
+ GetItemResponse r = client.getItem(GetItemRequest.builder()
+ .tableName(accountsTableName)
+ .key(Map.of(KEY_ACCOUNT_UUID, uuid))
+ .consistentRead(true)
+ .build());
+ return r.item().isEmpty() ? null : r.item();
+ }
+
@Override
public Optional get(UUID uuid) {
- Optional- maybeItem = GET_BY_UUID_TIMER.record(() ->
- Optional.ofNullable(accountsTable.getItem(new GetItemSpec().
- withPrimaryKey(new PrimaryKey(KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(uuid)))
- .withConsistentRead(true))));
-
- return maybeItem.map(AccountsDynamoDb::fromItem);
+ return GET_BY_UUID_TIMER.record(() ->
+ Optional.ofNullable(accountByUuid(AttributeValues.fromUUID(uuid)))
+ .map(AccountsDynamoDb::fromItem));
}
@Override
public void delete(UUID uuid) {
DELETE_TIMER.record(() -> {
-
delete(uuid, true);
});
}
@@ -238,18 +240,22 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
maybeAccount.ifPresent(account -> {
- TransactWriteItem phoneNumberDelete = new TransactWriteItem()
- .withDelete(new Delete()
- .withTableName(phoneNumbersTableName)
- .withKey(Map.of(ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()))));
+ TransactWriteItem phoneNumberDelete = TransactWriteItem.builder()
+ .delete(Delete.builder()
+ .tableName(phoneNumbersTableName)
+ .key(Map.of(ATTR_ACCOUNT_E164, AttributeValues.fromString(account.getNumber())))
+ .build())
+ .build();
- TransactWriteItem accountDelete = new TransactWriteItem().withDelete(
- new Delete()
- .withTableName(accountsTable.getTableName())
- .withKey(Map.of(KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)))));
+ TransactWriteItem accountDelete = TransactWriteItem.builder()
+ .delete(Delete.builder()
+ .tableName(accountsTableName)
+ .key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid)))
+ .build())
+ .build();
- TransactWriteItemsRequest request = new TransactWriteItemsRequest()
- .withTransactItems(phoneNumberDelete, accountDelete);
+ TransactWriteItemsRequest request = TransactWriteItemsRequest.builder()
+ .transactItems(phoneNumberDelete, accountDelete).build();
client.transactWriteItems(request);
});
@@ -299,64 +305,62 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
try {
TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid());
- TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid());
- accountPut.getPut()
- .setConditionExpression("attribute_not_exists(#uuid) OR (attribute_exists(#uuid) AND #version < :version)");
- accountPut.getPut()
- .setExpressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID,
- "#version", ATTR_MIGRATION_VERSION));
- accountPut.getPut()
- .setExpressionAttributeValues(
- Map.of(":version", new AttributeValue().withN(String.valueOf(account.getDynamoDbMigrationVersion()))));
+ TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid(), Put.builder()
+ .conditionExpression("attribute_not_exists(#uuid) OR (attribute_exists(#uuid) AND #version < :version)")
+ .expressionAttributeNames(Map.of(
+ "#uuid", KEY_ACCOUNT_UUID,
+ "#version", ATTR_MIGRATION_VERSION))
+ .expressionAttributeValues(Map.of(
+ ":version", AttributeValues.fromInt(account.getDynamoDbMigrationVersion()))));
- final TransactWriteItemsRequest request = new TransactWriteItemsRequest()
- .withTransactItems(phoneNumberConstraintPut, accountPut);
+ final TransactWriteItemsRequest request = TransactWriteItemsRequest.builder()
+ .transactItems(phoneNumberConstraintPut, accountPut).build();
final CompletableFuture resultFuture = new CompletableFuture<>();
-
- asyncClient.transactWriteItemsAsync(request,
- new AsyncHandler<>() {
- @Override
- public void onError(Exception exception) {
- if (exception instanceof TransactionCanceledException) {
- // account is already migrated
- resultFuture.complete(false);
- } else {
- try {
- migrationRetryAccounts.put(account.getUuid());
- } catch (final Exception e) {
- logger.error("Could not store account {}", account.getUuid());
- }
- resultFuture.completeExceptionally(exception);
- }
- }
-
- @Override
- public void onSuccess(TransactWriteItemsRequest request, TransactWriteItemsResult transactWriteItemsResult) {
- resultFuture.complete(true);
- }
- });
-
+ asyncClient.transactWriteItems(request).whenCompleteAsync((result, exception) -> {
+ if (result != null) {
+ resultFuture.complete(true);
+ return;
+ }
+ if (exception instanceof CompletionException) {
+ // whenCompleteAsync can wrap exceptions in a CompletionException; unwrap it to get to the root cause.
+ exception = exception.getCause();
+ }
+ if (exception instanceof TransactionCanceledException) {
+ // account is already migrated
+ resultFuture.complete(false);
+ return;
+ }
+ try {
+ migrationRetryAccounts.put(account.getUuid());
+ } catch (final Exception e) {
+ logger.error("Could not store account {}", account.getUuid());
+ }
+ resultFuture.completeExceptionally(exception);
+ });
return resultFuture;
-
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
private static String extractCancellationReasonCodes(final TransactionCanceledException exception) {
- return exception.getCancellationReasons().stream()
- .map(CancellationReason::getCode)
+ return exception.cancellationReasons().stream()
+ .map(CancellationReason::code)
.collect(Collectors.joining(", "));
}
@VisibleForTesting
- static Account fromItem(Item item) {
+ static Account fromItem(Map item) {
+ if (!item.containsKey(ATTR_ACCOUNT_DATA) ||
+ !item.containsKey(ATTR_ACCOUNT_E164) ||
+ !item.containsKey(KEY_ACCOUNT_UUID)) {
+ throw new RuntimeException("item missing values");
+ }
try {
- Account account = SystemMapper.getMapper().readValue(item.getBinary(ATTR_ACCOUNT_DATA), Account.class);
-
- account.setNumber(item.getString(ATTR_ACCOUNT_E164));
- account.setUuid(UUIDUtil.fromByteBuffer(item.getByteBuffer(KEY_ACCOUNT_UUID)));
+ Account account = SystemMapper.getMapper().readValue(item.get(ATTR_ACCOUNT_DATA).b().asByteArray(), Account.class);
+ account.setNumber(item.get(ATTR_ACCOUNT_E164).s());
+ account.setUuid(UUIDUtil.fromByteBuffer(item.get(KEY_ACCOUNT_UUID).b().asByteBuffer()));
return account;
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java
index 2806b667b..5a2afffcc 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java
@@ -7,7 +7,6 @@ package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
-import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
@@ -42,6 +41,7 @@ import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
public class AccountsManager {
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDb.java
index 02378ed62..969a0e797 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDb.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/KeysDynamoDb.java
@@ -7,195 +7,230 @@ package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
-import com.amazonaws.services.dynamodbv2.document.DeleteItemOutcome;
-import com.amazonaws.services.dynamodbv2.document.DynamoDB;
-import com.amazonaws.services.dynamodbv2.document.Item;
-import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
-import com.amazonaws.services.dynamodbv2.document.Table;
-import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
-import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec;
-import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
-import com.amazonaws.services.dynamodbv2.model.ReturnValue;
-import com.amazonaws.services.dynamodbv2.model.Select;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.whispersystems.textsecuregcm.entities.PreKey;
-import org.whispersystems.textsecuregcm.util.UUIDUtil;
+import org.whispersystems.textsecuregcm.util.AttributeValues;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
+import software.amazon.awssdk.services.dynamodb.model.Select;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
public class KeysDynamoDb extends AbstractDynamoDbStore {
- private final Table table;
+ private final String tableName;
- static final String KEY_ACCOUNT_UUID = "U";
- static final String KEY_DEVICE_ID_KEY_ID = "DK";
- static final String KEY_PUBLIC_KEY = "P";
+ static final String KEY_ACCOUNT_UUID = "U";
+ static final String KEY_DEVICE_ID_KEY_ID = "DK";
+ static final String KEY_PUBLIC_KEY = "P";
- private static final Timer STORE_KEYS_TIMER = Metrics.timer(name(KeysDynamoDb.class, "storeKeys"));
- private static final Timer TAKE_KEY_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForDevice"));
- private static final Timer TAKE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForAccount"));
- private static final Timer GET_KEY_COUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "getKeyCount"));
- private static final Timer DELETE_KEYS_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForDevice"));
- private static final Timer DELETE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForAccount"));
- private static final DistributionSummary CONTESTED_KEY_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "contestedKeys"));
- private static final DistributionSummary KEY_COUNT_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "keyCount"));
+ private static final Timer STORE_KEYS_TIMER = Metrics.timer(name(KeysDynamoDb.class, "storeKeys"));
+ private static final Timer TAKE_KEY_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForDevice"));
+ private static final Timer TAKE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForAccount"));
+ private static final Timer GET_KEY_COUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "getKeyCount"));
+ private static final Timer DELETE_KEYS_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForDevice"));
+ private static final Timer DELETE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForAccount"));
+ private static final DistributionSummary CONTESTED_KEY_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "contestedKeys"));
+ private static final DistributionSummary KEY_COUNT_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "keyCount"));
- public KeysDynamoDb(final DynamoDB dynamoDB, final String tableName) {
- super(dynamoDB);
+ public KeysDynamoDb(final DynamoDbClient dynamoDB, final String tableName) {
+ super(dynamoDB);
+ this.tableName = tableName;
+ }
- this.table = dynamoDB.getTable(tableName);
- }
+ public void store(final Account account, final long deviceId, final List keys) {
+ STORE_KEYS_TIMER.record(() -> {
+ delete(account, deviceId);
- public void store(final Account account, final long deviceId, final List keys) {
- STORE_KEYS_TIMER.record(() -> {
- delete(account, deviceId);
+ writeInBatches(keys, batch -> {
+ List items = new ArrayList<>();
+ for (final PreKey preKey : batch) {
+ items.add(WriteRequest.builder()
+ .putRequest(PutRequest.builder()
+ .item(getItemFromPreKey(account.getUuid(), deviceId, preKey))
+ .build())
+ .build());
+ }
+ executeTableWriteItemsUntilComplete(Map.of(tableName, items));
+ });
+ });
+ }
- writeInBatches(keys, batch -> {
- final TableWriteItems items = new TableWriteItems(table.getTableName());
+ public Optional take(final Account account, final long deviceId) {
+ return TAKE_KEY_FOR_DEVICE_TIMER.record(() -> {
+ final AttributeValue partitionKey = getPartitionKey(account.getUuid());
+ QueryRequest queryRequest = QueryRequest.builder()
+ .tableName(tableName)
+ .keyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
+ .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
+ .expressionAttributeValues(Map.of(
+ ":uuid", partitionKey,
+ ":sortprefix", getSortKeyPrefix(deviceId)))
+ .projectionExpression(KEY_DEVICE_ID_KEY_ID)
+ .consistentRead(false)
+ .build();
- for (final PreKey preKey : batch) {
- items.addItemToPut(getItemFromPreKey(account.getUuid(), deviceId, preKey));
- }
+ int contestedKeys = 0;
- executeTableWriteItemsUntilComplete(items);
- });
- });
- }
+ try {
+ QueryResponse response = db().query(queryRequest);
+ for (Map candidate : response.items()) {
+ DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
+ .tableName(tableName)
+ .key(Map.of(
+ KEY_ACCOUNT_UUID, partitionKey,
+ KEY_DEVICE_ID_KEY_ID, candidate.get(KEY_DEVICE_ID_KEY_ID)))
+ .returnValues(ReturnValue.ALL_OLD)
+ .build();
+ DeleteItemResponse deleteItemResponse = db().deleteItem(deleteItemRequest);
+ if (deleteItemResponse.attributes() != null) {
+ return Optional.of(getPreKeyFromItem(deleteItemResponse.attributes()));
+ }
- public Optional take(final Account account, final long deviceId) {
- return TAKE_KEY_FOR_DEVICE_TIMER.record(() -> {
- final byte[] partitionKey = getPartitionKey(account.getUuid());
+ contestedKeys++;
+ }
- final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
- .withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
- .withValueMap(Map.of(":uuid", partitionKey,
- ":sortprefix", getSortKeyPrefix(deviceId)))
- .withProjectionExpression(KEY_DEVICE_ID_KEY_ID)
- .withConsistentRead(false);
+ return Optional.empty();
+ } finally {
+ CONTESTED_KEY_DISTRIBUTION.record(contestedKeys);
+ }
+ });
+ }
- int contestedKeys = 0;
+ public Map take(final Account account) {
+ return TAKE_KEYS_FOR_ACCOUNT_TIMER.record(() -> {
+ final Map preKeysByDeviceId = new HashMap<>();
- try {
- for (final Item candidate : table.query(querySpec)) {
- final DeleteItemSpec deleteItemSpec = new DeleteItemSpec().withPrimaryKey(KEY_ACCOUNT_UUID, partitionKey, KEY_DEVICE_ID_KEY_ID, candidate.getBinary(KEY_DEVICE_ID_KEY_ID))
- .withReturnValues(ReturnValue.ALL_OLD);
+ for (final Device device : account.getDevices()) {
+ take(account, device.getId()).ifPresent(preKey -> preKeysByDeviceId.put(device.getId(), preKey));
+ }
- final DeleteItemOutcome outcome = table.deleteItem(deleteItemSpec);
+ return preKeysByDeviceId;
+ });
+ }
- if (outcome.getItem() != null) {
- return Optional.of(getPreKeyFromItem(outcome.getItem()));
- }
+ public int getCount(final Account account, final long deviceId) {
+ return GET_KEY_COUNT_TIMER.record(() -> {
+ QueryRequest queryRequest = QueryRequest.builder()
+ .tableName(tableName)
+ .keyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
+ .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
+ .expressionAttributeValues(Map.of(
+ ":uuid", getPartitionKey(account.getUuid()),
+ ":sortprefix", getSortKeyPrefix(deviceId)))
+ .select(Select.COUNT)
+ .consistentRead(false)
+ .build();
- contestedKeys++;
- }
+ int keyCount = 0;
+ // This is very confusing, but does appear to be the intended behavior. See:
+ //
+ // - https://github.com/aws/aws-sdk-java/issues/693
+ // - https://github.com/aws/aws-sdk-java/issues/915
+ // - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.Count
+ for (final QueryResponse page : db().queryPaginator(queryRequest)) {
+ keyCount += page.count();
+ }
+ KEY_COUNT_DISTRIBUTION.record(keyCount);
+ return keyCount;
+ });
+ }
- return Optional.empty();
- } finally {
- CONTESTED_KEY_DISTRIBUTION.record(contestedKeys);
- }
- });
- }
+ public void delete(final Account account) {
+ DELETE_KEYS_FOR_ACCOUNT_TIMER.record(() -> {
+ final QueryRequest queryRequest = QueryRequest.builder()
+ .tableName(tableName)
+ .keyConditionExpression("#uuid = :uuid")
+ .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID))
+ .expressionAttributeValues(Map.of(
+ ":uuid", getPartitionKey(account.getUuid())))
+ .projectionExpression(KEY_DEVICE_ID_KEY_ID)
+ .consistentRead(true)
+ .build();
- public Map take(final Account account) {
- return TAKE_KEYS_FOR_ACCOUNT_TIMER.record(() -> {
- final Map preKeysByDeviceId = new HashMap<>();
+ deleteItemsForAccountMatchingQuery(account, queryRequest);
+ });
+ }
- for (final Device device : account.getDevices()) {
- take(account, device.getId()).ifPresent(preKey -> preKeysByDeviceId.put(device.getId(), preKey));
- }
+ @VisibleForTesting
+ void delete(final Account account, final long deviceId) {
+ DELETE_KEYS_FOR_DEVICE_TIMER.record(() -> {
+ final QueryRequest queryRequest = QueryRequest.builder()
+ .tableName(tableName)
+ .keyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
+ .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
+ .expressionAttributeValues(Map.of(
+ ":uuid", getPartitionKey(account.getUuid()),
+ ":sortprefix", getSortKeyPrefix(deviceId)))
+ .projectionExpression(KEY_DEVICE_ID_KEY_ID)
+ .consistentRead(true)
+ .build();
- return preKeysByDeviceId;
- });
- }
+ deleteItemsForAccountMatchingQuery(account, queryRequest);
+ });
+ }
- public int getCount(final Account account, final long deviceId) {
- return GET_KEY_COUNT_TIMER.record(() -> {
- final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
- .withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
- .withValueMap(Map.of(":uuid", getPartitionKey(account.getUuid()),
- ":sortprefix", getSortKeyPrefix(deviceId)))
- .withSelect(Select.COUNT)
- .withConsistentRead(false);
+ private void deleteItemsForAccountMatchingQuery(final Account account, final QueryRequest querySpec) {
+ final AttributeValue partitionKey = getPartitionKey(account.getUuid());
- final int keyCount = (int)countItemsMatchingQuery(table, querySpec);
+ writeInBatches(db().query(querySpec).items(), batch -> {
+ List deletes = new ArrayList<>();
+ for (final Map item : batch) {
+ deletes.add(WriteRequest.builder()
+ .deleteRequest(DeleteRequest.builder()
+ .key(Map.of(
+ KEY_ACCOUNT_UUID, partitionKey,
+ KEY_DEVICE_ID_KEY_ID, item.get(KEY_DEVICE_ID_KEY_ID)))
+ .build())
+ .build());
+ }
+ executeTableWriteItemsUntilComplete(Map.of(tableName, deletes));
+ });
+ }
- KEY_COUNT_DISTRIBUTION.record(keyCount);
- return keyCount;
- });
- }
+ private static AttributeValue getPartitionKey(final UUID accountUuid) {
+ return AttributeValues.fromUUID(accountUuid);
+ }
- public void delete(final Account account) {
- DELETE_KEYS_FOR_ACCOUNT_TIMER.record(() -> {
- final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid")
- .withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID))
- .withValueMap(Map.of(":uuid", getPartitionKey(account.getUuid())))
- .withProjectionExpression(KEY_DEVICE_ID_KEY_ID)
- .withConsistentRead(true);
+ private static AttributeValue getSortKey(final long deviceId, final long keyId) {
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
+ byteBuffer.putLong(deviceId);
+ byteBuffer.putLong(keyId);
+ return AttributeValues.fromByteBuffer(byteBuffer.flip());
+ }
- deleteItemsForAccountMatchingQuery(account, querySpec);
- });
- }
+ @VisibleForTesting
+ static AttributeValue getSortKeyPrefix(final long deviceId) {
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]);
+ byteBuffer.putLong(deviceId);
+ return AttributeValues.fromByteBuffer(byteBuffer.flip());
+ }
- @VisibleForTesting
- void delete(final Account account, final long deviceId) {
- DELETE_KEYS_FOR_DEVICE_TIMER.record(() -> {
- final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
- .withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
- .withValueMap(Map.of(":uuid", getPartitionKey(account.getUuid()),
- ":sortprefix", getSortKeyPrefix(deviceId)))
- .withProjectionExpression(KEY_DEVICE_ID_KEY_ID)
- .withConsistentRead(true);
+ private Map getItemFromPreKey(final UUID accountUuid, final long deviceId, final PreKey preKey) {
+ return Map.of(
+ KEY_ACCOUNT_UUID, getPartitionKey(accountUuid),
+ KEY_DEVICE_ID_KEY_ID, getSortKey(deviceId, preKey.getKeyId()),
+ KEY_PUBLIC_KEY, AttributeValues.fromString(preKey.getPublicKey()));
+ }
- deleteItemsForAccountMatchingQuery(account, querySpec);
- });
- }
-
- private void deleteItemsForAccountMatchingQuery(final Account account, final QuerySpec querySpec) {
- final byte[] partitionKey = getPartitionKey(account.getUuid());
-
- writeInBatches(table.query(querySpec), batch -> {
- final TableWriteItems writeItems = new TableWriteItems(table.getTableName());
-
- for (final Item item : batch) {
- writeItems.addPrimaryKeyToDelete(new PrimaryKey(KEY_ACCOUNT_UUID, partitionKey, KEY_DEVICE_ID_KEY_ID, item.getBinary(KEY_DEVICE_ID_KEY_ID)));
- }
-
- executeTableWriteItemsUntilComplete(writeItems);
- });
- }
-
- private static byte[] getPartitionKey(final UUID accountUuid) {
- return UUIDUtil.toBytes(accountUuid);
- }
-
- private static byte[] getSortKey(final long deviceId, final long keyId) {
- final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
- byteBuffer.putLong(deviceId);
- byteBuffer.putLong(keyId);
- return byteBuffer.array();
- }
-
- private static byte[] getSortKeyPrefix(final long deviceId) {
- final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]);
- byteBuffer.putLong(deviceId);
- return byteBuffer.array();
- }
-
- private Item getItemFromPreKey(final UUID accountUuid, final long deviceId, final PreKey preKey) {
- return new Item().withBinary(KEY_ACCOUNT_UUID, getPartitionKey(accountUuid))
- .withBinary(KEY_DEVICE_ID_KEY_ID, getSortKey(deviceId, preKey.getKeyId()))
- .withString(KEY_PUBLIC_KEY, preKey.getPublicKey());
- }
-
- private PreKey getPreKeyFromItem(final Item item) {
- final long keyId = ByteBuffer.wrap(item.getBinary(KEY_DEVICE_ID_KEY_ID)).getLong(8);
- return new PreKey(keyId, item.getString(KEY_PUBLIC_KEY));
- }
+ private PreKey getPreKeyFromItem(Map item) {
+ final long keyId = item.get(KEY_DEVICE_ID_KEY_ID).b().asByteBuffer().getLong(8);
+ return new PreKey(keyId, item.get(KEY_PUBLIC_KEY).s());
+ }
}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java
index 413c93443..536a758a3 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java
@@ -8,17 +8,7 @@ package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import static io.micrometer.core.instrument.Metrics.timer;
-import com.amazonaws.services.dynamodbv2.document.DeleteItemOutcome;
-import com.amazonaws.services.dynamodbv2.document.DynamoDB;
-import com.amazonaws.services.dynamodbv2.document.Index;
-import com.amazonaws.services.dynamodbv2.document.Item;
-import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
-import com.amazonaws.services.dynamodbv2.document.Table;
-import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
-import com.amazonaws.services.dynamodbv2.document.api.QueryApi;
-import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec;
-import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
-import com.amazonaws.services.dynamodbv2.model.ReturnValue;
+import com.google.common.collect.ImmutableMap;
import io.micrometer.core.instrument.Timer;
import java.nio.ByteBuffer;
import java.time.Duration;
@@ -27,11 +17,22 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
+import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
public class MessagesDynamoDb extends AbstractDynamoDbStore {
@@ -60,7 +61,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
private final String tableName;
private final Duration timeToLive;
- public MessagesDynamoDb(DynamoDB dynamoDb, String tableName, Duration timeToLive) {
+ public MessagesDynamoDb(DynamoDbClient dynamoDb, String tableName, Duration timeToLive) {
super(dynamoDb);
this.tableName = tableName;
@@ -76,54 +77,61 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
throw new IllegalArgumentException("Maximum batch size of " + DYNAMO_DB_MAX_BATCH_SIZE + " execeeded with " + messages.size() + " messages");
}
- final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
- TableWriteItems items = new TableWriteItems(tableName);
+ final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
+ List writeItems = new ArrayList<>();
for (MessageProtos.Envelope message : messages) {
final UUID messageUuid = UUID.fromString(message.getServerGuid());
- final Item item = new Item().withBinary(KEY_PARTITION, partitionKey)
- .withBinary(KEY_SORT, convertSortKey(destinationDeviceId, message.getServerTimestamp(), messageUuid))
- .withBinary(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid))
- .withInt(KEY_TYPE, message.getType().getNumber())
- .withLong(KEY_TIMESTAMP, message.getTimestamp())
- .withLong(KEY_TTL, getTtlForMessage(message));
+ final ImmutableMap.Builder item = ImmutableMap.builder()
+ .put(KEY_PARTITION, partitionKey)
+ .put(KEY_SORT, convertSortKey(destinationDeviceId, message.getServerTimestamp(), messageUuid))
+ .put(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid))
+ .put(KEY_TYPE, AttributeValues.fromInt(message.getType().getNumber()))
+ .put(KEY_TIMESTAMP, AttributeValues.fromLong(message.getTimestamp()))
+ .put(KEY_TTL, AttributeValues.fromLong(getTtlForMessage(message)));
if (message.hasRelay() && message.getRelay().length() > 0) {
- item.withString(KEY_RELAY, message.getRelay());
+ item.put(KEY_RELAY, AttributeValues.fromString(message.getRelay()));
}
if (message.hasSource()) {
- item.withString(KEY_SOURCE, message.getSource());
+ item.put(KEY_SOURCE, AttributeValues.fromString(message.getSource()));
}
if (message.hasSourceUuid()) {
- item.withBinary(KEY_SOURCE_UUID, UUIDUtil.toBytes(UUID.fromString(message.getSourceUuid())));
+ item.put(KEY_SOURCE_UUID, AttributeValues.fromUUID(UUID.fromString(message.getSourceUuid())));
}
if (message.hasSourceDevice()) {
- item.withInt(KEY_SOURCE_DEVICE, message.getSourceDevice());
+ item.put(KEY_SOURCE_DEVICE, AttributeValues.fromInt(message.getSourceDevice()));
}
if (message.hasLegacyMessage()) {
- item.withBinary(KEY_MESSAGE, message.getLegacyMessage().toByteArray());
+ item.put(KEY_MESSAGE, AttributeValues.fromByteArray(message.getLegacyMessage().toByteArray()));
}
if (message.hasContent()) {
- item.withBinary(KEY_CONTENT, message.getContent().toByteArray());
+ item.put(KEY_CONTENT, AttributeValues.fromByteArray(message.getContent().toByteArray()));
}
- items.addItemToPut(item);
+ writeItems.add(WriteRequest.builder().putRequest(PutRequest.builder()
+ .item(item.build())
+ .build()).build());
}
- executeTableWriteItemsUntilComplete(items);
+ executeTableWriteItemsUntilComplete(Map.of(tableName, writeItems));
}
public List load(final UUID destinationAccountUuid, final long destinationDeviceId, final int requestedNumberOfMessagesToFetch) {
return loadTimer.record(() -> {
final int numberOfMessagesToFetch = Math.min(requestedNumberOfMessagesToFetch, RESULT_SET_CHUNK_SIZE);
- final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
- final QuerySpec querySpec = new QuerySpec().withConsistentRead(true)
- .withKeyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
- .withNameMap(Map.of("#part", KEY_PARTITION,
- "#sort", KEY_SORT))
- .withValueMap(Map.of(":part", partitionKey,
- ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
- .withMaxResultSize(numberOfMessagesToFetch);
- final Table table = getDynamoDb().getTable(tableName);
+ final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
+ final QueryRequest queryRequest = QueryRequest.builder()
+ .tableName(tableName)
+ .consistentRead(true)
+ .keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
+ .expressionAttributeNames(Map.of(
+ "#part", KEY_PARTITION,
+ "#sort", KEY_SORT))
+ .expressionAttributeValues(Map.of(
+ ":part", partitionKey,
+ ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
+ .limit(numberOfMessagesToFetch)
+ .build();
List messageEntities = new ArrayList<>(numberOfMessagesToFetch);
- for (Item message : table.query(querySpec)) {
+ for (Map message : db().query(queryRequest).items()) {
messageEntities.add(convertItemToOutgoingMessageEntity(message));
}
return messageEntities;
@@ -136,53 +144,63 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
throw new IllegalArgumentException("must specify a source");
}
- final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
- final QuerySpec querySpec = new QuerySpec().withProjectionExpression(KEY_SORT)
- .withConsistentRead(true)
- .withKeyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
- .withFilterExpression("#source = :source AND #timestamp = :timestamp")
- .withNameMap(Map.of("#part", KEY_PARTITION,
- "#sort", KEY_SORT,
- "#source", KEY_SOURCE,
- "#timestamp", KEY_TIMESTAMP))
- .withValueMap(Map.of(":part", partitionKey,
- ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId),
- ":source", source,
- ":timestamp", timestamp));
+ final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
+ final QueryRequest queryRequest = QueryRequest.builder()
+ .tableName(tableName)
+ .projectionExpression(KEY_SORT)
+ .consistentRead(true)
+ .keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
+ .filterExpression("#source = :source AND #timestamp = :timestamp")
+ .expressionAttributeNames(Map.of(
+ "#part", KEY_PARTITION,
+ "#sort", KEY_SORT,
+ "#source", KEY_SOURCE,
+ "#timestamp", KEY_TIMESTAMP))
+ .expressionAttributeValues(Map.of(
+ ":part", partitionKey,
+ ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId),
+ ":source", AttributeValues.fromString(source),
+ ":timestamp", AttributeValues.fromLong(timestamp)))
+ .build();
- final Table table = getDynamoDb().getTable(tableName);
- return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(table, partitionKey, querySpec, table);
+ return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(partitionKey, queryRequest);
});
}
public Optional deleteMessageByDestinationAndGuid(final UUID destinationAccountUuid, final long destinationDeviceId, final UUID messageUuid) {
return deleteByGuid.record(() -> {
- final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
- final QuerySpec querySpec = new QuerySpec().withProjectionExpression(KEY_SORT)
- .withConsistentRead(true)
- .withKeyConditionExpression("#part = :part AND #uuid = :uuid")
- .withNameMap(Map.of("#part", KEY_PARTITION,
- "#uuid", LOCAL_INDEX_MESSAGE_UUID_KEY_SORT))
- .withValueMap(Map.of(":part", partitionKey,
- ":uuid", convertLocalIndexMessageUuidSortKey(messageUuid)));
- final Table table = getDynamoDb().getTable(tableName);
- final Index index = table.getIndex(LOCAL_INDEX_MESSAGE_UUID_NAME);
- return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(table, partitionKey, querySpec, index);
+ final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
+ final QueryRequest queryRequest = QueryRequest.builder()
+ .tableName(tableName)
+ .indexName(LOCAL_INDEX_MESSAGE_UUID_NAME)
+ .projectionExpression(KEY_SORT)
+ .consistentRead(true)
+ .keyConditionExpression("#part = :part AND #uuid = :uuid")
+ .expressionAttributeNames(Map.of(
+ "#part", KEY_PARTITION,
+ "#uuid", LOCAL_INDEX_MESSAGE_UUID_KEY_SORT))
+ .expressionAttributeValues(Map.of(
+ ":part", partitionKey,
+ ":uuid", convertLocalIndexMessageUuidSortKey(messageUuid)))
+ .build();
+ return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(partitionKey, queryRequest);
});
}
@Nonnull
- private Optional deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(Table table, byte[] partitionKey, QuerySpec querySpec, QueryApi queryApi) {
+ private Optional deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(AttributeValue partitionKey, QueryRequest queryRequest) {
Optional result = Optional.empty();
- for (Item item : queryApi.query(querySpec)) {
- final byte[] rangeKeyValue = item.getBinary(KEY_SORT);
- DeleteItemSpec deleteItemSpec = new DeleteItemSpec().withPrimaryKey(KEY_PARTITION, partitionKey, KEY_SORT, rangeKeyValue);
+ for (Map item : db().query(queryRequest).items()) {
+ final byte[] rangeKeyValue = item.get(KEY_SORT).b().asByteArray();
+ DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder()
+ .tableName(tableName)
+ .key(Map.of(KEY_PARTITION, partitionKey, KEY_SORT, AttributeValues.fromByteArray(rangeKeyValue)));
if (result.isEmpty()) {
- deleteItemSpec.withReturnValues(ReturnValue.ALL_OLD);
+ deleteItemRequest.returnValues(ReturnValue.ALL_OLD);
}
- final DeleteItemOutcome deleteItemOutcome = table.deleteItem(deleteItemSpec);
- if (deleteItemOutcome.getItem() != null && deleteItemOutcome.getItem().hasAttribute(KEY_PARTITION)) {
- result = Optional.of(convertItemToOutgoingMessageEntity(deleteItemOutcome.getItem()));
+ final DeleteItemResponse deleteItemResponse = db().deleteItem(deleteItemRequest.build());
+ if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) {
+ result = Optional.of(convertItemToOutgoingMessageEntity(deleteItemResponse.attributes()));
}
}
return result;
@@ -190,74 +208,88 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
public void deleteAllMessagesForAccount(final UUID destinationAccountUuid) {
deleteByAccount.record(() -> {
- final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
- final QuerySpec querySpec = new QuerySpec().withHashKey(KEY_PARTITION, partitionKey)
- .withProjectionExpression(KEY_SORT)
- .withConsistentRead(true);
- deleteRowsMatchingQuery(partitionKey, querySpec);
+ final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
+ final QueryRequest queryRequest = QueryRequest.builder()
+ .tableName(tableName)
+ .projectionExpression(KEY_SORT)
+ .consistentRead(true)
+ .keyConditionExpression("#part = :part")
+ .expressionAttributeNames(Map.of("#part", KEY_PARTITION))
+ .expressionAttributeValues(Map.of(":part", partitionKey))
+ .build();
+ deleteRowsMatchingQuery(partitionKey, queryRequest);
});
}
public void deleteAllMessagesForDevice(final UUID destinationAccountUuid, final long destinationDeviceId) {
deleteByDevice.record(() -> {
- final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
- final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
- .withNameMap(Map.of("#part", KEY_PARTITION,
- "#sort", KEY_SORT))
- .withValueMap(Map.of(":part", partitionKey,
- ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
- .withProjectionExpression(KEY_SORT)
- .withConsistentRead(true);
- deleteRowsMatchingQuery(partitionKey, querySpec);
+ final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
+ final QueryRequest queryRequest = QueryRequest.builder()
+ .tableName(tableName)
+ .keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
+ .expressionAttributeNames(Map.of(
+ "#part", KEY_PARTITION,
+ "#sort", KEY_SORT))
+ .expressionAttributeValues(Map.of(
+ ":part", partitionKey,
+ ":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
+ .projectionExpression(KEY_SORT)
+ .consistentRead(true)
+ .build();
+ deleteRowsMatchingQuery(partitionKey, queryRequest);
});
}
- private OutgoingMessageEntity convertItemToOutgoingMessageEntity(Item message) {
- final SortKey sortKey = convertSortKey(message.getBinary(KEY_SORT));
- final UUID messageUuid = convertLocalIndexMessageUuidSortKey(message.getBinary(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT));
- final int type = message.getInt(KEY_TYPE);
- final String relay = message.getString(KEY_RELAY);
- final long timestamp = message.getLong(KEY_TIMESTAMP);
- final String source = message.getString(KEY_SOURCE);
- final UUID sourceUuid = message.hasAttribute(KEY_SOURCE_UUID) ? convertUuidFromBytes(message.getBinary(KEY_SOURCE_UUID), "message source uuid") : null;
- final int sourceDevice = message.hasAttribute(KEY_SOURCE_DEVICE) ? message.getInt(KEY_SOURCE_DEVICE) : 0;
- final byte[] messageBytes = message.getBinary(KEY_MESSAGE);
- final byte[] content = message.getBinary(KEY_CONTENT);
+ private OutgoingMessageEntity convertItemToOutgoingMessageEntity(Map message) {
+ final SortKey sortKey = convertSortKey(message.get(KEY_SORT).b().asByteArray());
+ final UUID messageUuid = convertLocalIndexMessageUuidSortKey(message.get(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT).b().asByteArray());
+ final int type = AttributeValues.getInt(message, KEY_TYPE, 0);
+ final String relay = AttributeValues.getString(message, KEY_RELAY, null);
+ final long timestamp = AttributeValues.getLong(message, KEY_TIMESTAMP, 0L);
+ final String source = AttributeValues.getString(message, KEY_SOURCE, null);
+ final UUID sourceUuid = AttributeValues.getUUID(message, KEY_SOURCE_UUID, null);
+ final int sourceDevice = AttributeValues.getInt(message, KEY_SOURCE_DEVICE, 0);
+ final byte[] messageBytes = AttributeValues.getByteArray(message, KEY_MESSAGE, null);
+ final byte[] content = AttributeValues.getByteArray(message, KEY_CONTENT, null);
return new OutgoingMessageEntity(-1L, false, messageUuid, type, relay, timestamp, source, sourceUuid, sourceDevice, messageBytes, content, sortKey.getServerTimestamp());
}
- private void deleteRowsMatchingQuery(byte[] partitionKey, QuerySpec querySpec) {
- final Table table = getDynamoDb().getTable(tableName);
- writeInBatches(table.query(querySpec), (itemBatch) -> deleteItems(partitionKey, itemBatch));
+ private void deleteRowsMatchingQuery(AttributeValue partitionKey, QueryRequest querySpec) {
+ writeInBatches(db().query(querySpec).items(), (itemBatch) -> deleteItems(partitionKey, itemBatch));
}
- private void deleteItems(byte[] partitionKey, List
- items) {
- final TableWriteItems tableWriteItems = new TableWriteItems(tableName);
- items.stream().map(item -> new PrimaryKey(KEY_PARTITION, partitionKey, KEY_SORT, item.getBinary(KEY_SORT))).forEach(tableWriteItems::addPrimaryKeyToDelete);
- executeTableWriteItemsUntilComplete(tableWriteItems);
+ private void deleteItems(AttributeValue partitionKey, List