Switch DynamoDB to AWSv2.

Switch from using com.amazonaws.services.dynamodbv2 to using
software.amazon.awssdk.services.dynamodb for all current DynamoDB uses.
This commit is contained in:
Graeme Connell 2021-05-24 16:43:56 -06:00 committed by gram-signal
parent cbd9681e3e
commit c545cff1b3
31 changed files with 1114 additions and 876 deletions

View File

@ -256,6 +256,10 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
@ -272,10 +276,6 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-appconfig</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>

View File

@ -12,11 +12,6 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.codahale.metrics.SharedMetricRegistries;
@ -194,6 +189,7 @@ import org.whispersystems.textsecuregcm.storage.Usernames;
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
import org.whispersystems.textsecuregcm.util.AsnManager;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import org.whispersystems.textsecuregcm.util.TorExitNodeManager;
import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener;
import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler;
@ -210,6 +206,14 @@ import org.whispersystems.textsecuregcm.workers.VacuumCommand;
import org.whispersystems.textsecuregcm.workers.ZkParamsCommand;
import org.whispersystems.websocket.WebSocketResourceProviderFactory;
import org.whispersystems.websocket.setup.WebSocketEnvironment;
import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
public class WhisperServerService extends Application<WhisperServerConfiguration> {
@ -316,80 +320,40 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("accounts_database", accountJdbi, config.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
FaultTolerantDatabase abuseDatabase = new FaultTolerantDatabase("abuse_database", abuseJdbi, config.getAbuseDatabaseConfiguration().getCircuitBreakerConfiguration());
AmazonDynamoDBClientBuilder messageDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(config.getMessageDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) config.getMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDbClient messageDynamoDb = DynamoDbFromConfig.client(config.getMessageDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
AmazonDynamoDBClientBuilder keysDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(config.getKeysDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getKeysDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) config.getKeysDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDbClient preKeyDynamoDb = DynamoDbFromConfig.client(config.getKeysDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
AmazonDynamoDBClientBuilder accountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(config.getAccountsDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) config.getAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDbClient accountsDynamoDbClient = DynamoDbFromConfig.client(config.getAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
// The thread pool core & max sizes are set via dynamic configuration within AccountsDynamoDb
ThreadPoolExecutor accountsDynamoDbMigrationThreadPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>());
AmazonDynamoDBAsyncClientBuilder accountsDynamoDbAsyncClientBuilder = AmazonDynamoDBAsyncClientBuilder
.standard()
.withRegion(accountsDynamoDbClientBuilder.getRegion())
.withClientConfiguration(accountsDynamoDbClientBuilder.getClientConfiguration())
.withCredentials(accountsDynamoDbClientBuilder.getCredentials())
.withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool);
DynamoDbAsyncClient accountsDynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(config.getAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create(),
accountsDynamoDbMigrationThreadPool);
AmazonDynamoDBClientBuilder migrationDeletedAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(config.getMigrationDeletedAccountsDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getMigrationDeletedAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) config.getMigrationDeletedAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDbClient recentlyDeletedAccountsDynamoDb = DynamoDbFromConfig.client(config.getMigrationDeletedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
AmazonDynamoDBClientBuilder migrationRetryAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(config.getMigrationRetryAccountsDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getMigrationRetryAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) config.getMigrationRetryAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDbClient pushChallengeDynamoDbClient = DynamoDbFromConfig.client(config.getPushChallengeDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
AmazonDynamoDBClientBuilder pushChallengeDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(config.getPushChallengeDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getPushChallengeDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) config.getPushChallengeDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDbClient reportMessageDynamoDbClient = DynamoDbFromConfig.client(config.getReportMessageDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
AmazonDynamoDBClientBuilder reportMessageDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(config.getReportMessageDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getReportMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) config.getReportMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDB messageDynamoDb = new DynamoDB(messageDynamoDbClientBuilder.build());
DynamoDB preKeyDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build();
AmazonDynamoDBAsync accountsDynamodbAsyncClient = accountsDynamoDbAsyncClientBuilder.build();
DynamoDB recentlyDeletedAccountsDynamoDb = new DynamoDB(migrationDeletedAccountsDynamoDbClientBuilder.build());
DynamoDB migrationRetryAccountsDynamoDb = new DynamoDB(migrationRetryAccountsDynamoDbClientBuilder.build());
DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(config.getMigrationRetryAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(recentlyDeletedAccountsDynamoDb, config.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, config.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());
Accounts accounts = new Accounts(accountDatabase);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamodbAsyncClient, accountsDynamoDbMigrationThreadPool, new DynamoDB(accountsDynamoDbClient), config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient, accountsDynamoDbMigrationThreadPool, config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts);
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
PendingDevices pendingDevices = new PendingDevices (accountDatabase);
Usernames usernames = new Usernames(accountDatabase);
@ -399,8 +363,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb, config.getMessageDynamoDbConfiguration().getTableName(), config.getMessageDynamoDbConfiguration().getTimeToLive());
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase);
PushChallengeDynamoDb pushChallengeDynamoDb = new PushChallengeDynamoDb(new DynamoDB(pushChallengeDynamoDbClientBuilder.build()), config.getPushChallengeDynamoDbConfiguration().getTableName());
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(new DynamoDB(reportMessageDynamoDbClientBuilder.build()), config.getReportMessageDynamoDbConfiguration().getTableName());
PushChallengeDynamoDb pushChallengeDynamoDb = new PushChallengeDynamoDb(pushChallengeDynamoDbClient, config.getPushChallengeDynamoDbConfiguration().getTableName());
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessageDynamoDbClient, config.getReportMessageDynamoDbConfiguration().getTableName());
RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool();

View File

@ -5,21 +5,18 @@
package org.whispersystems.textsecuregcm.storage;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.Page;
import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@ -29,7 +26,7 @@ import static io.micrometer.core.instrument.Metrics.timer;
public class AbstractDynamoDbStore {
private final DynamoDB dynamoDb;
private final DynamoDbClient dynamoDbClient;
private final Timer batchWriteItemsFirstPass = timer(name(getClass(), "batchWriteItems"), "firstAttempt", "true");
private final Timer batchWriteItemsRetryPass = timer(name(getClass(), "batchWriteItems"), "firstAttempt", "false");
@ -41,44 +38,31 @@ public class AbstractDynamoDbStore {
public static final int DYNAMO_DB_MAX_BATCH_SIZE = 25; // This limit comes from Amazon Dynamo DB itself. It will reject batch writes larger than this.
public static final int RESULT_SET_CHUNK_SIZE = 100;
public AbstractDynamoDbStore(final DynamoDB dynamoDb) {
this.dynamoDb = dynamoDb;
public AbstractDynamoDbStore(final DynamoDbClient dynamoDbClient) {
this.dynamoDbClient = dynamoDbClient;
}
protected DynamoDB getDynamoDb() {
return dynamoDb;
protected DynamoDbClient db() {
return dynamoDbClient;
}
protected void executeTableWriteItemsUntilComplete(final TableWriteItems items) {
AtomicReference<BatchWriteItemOutcome> outcome = new AtomicReference<>();
batchWriteItemsFirstPass.record(() -> outcome.set(dynamoDb.batchWriteItem(items)));
protected void executeTableWriteItemsUntilComplete(final Map<String,List<WriteRequest>> items) {
AtomicReference<BatchWriteItemResponse> outcome = new AtomicReference<>();
batchWriteItemsFirstPass.record(() -> outcome.set(dynamoDbClient.batchWriteItem(BatchWriteItemRequest.builder().requestItems(items).build())));
int attemptCount = 0;
while (!outcome.get().getUnprocessedItems().isEmpty() && attemptCount < MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE) {
batchWriteItemsRetryPass.record(() -> outcome.set(dynamoDb.batchWriteItemUnprocessed(outcome.get().getUnprocessedItems())));
while (!outcome.get().unprocessedItems().isEmpty() && attemptCount < MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE) {
batchWriteItemsRetryPass.record(() -> outcome.set(dynamoDbClient.batchWriteItem(BatchWriteItemRequest.builder()
.requestItems(outcome.get().unprocessedItems())
.build())));
++attemptCount;
}
if (!outcome.get().getUnprocessedItems().isEmpty()) {
logger.error("Attempt count ({}) reached max ({}}) before applying all batch writes to dynamo. {} unprocessed items remain.", attemptCount, MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE, outcome.get().getUnprocessedItems().size());
batchWriteItemsUnprocessed.increment(outcome.get().getUnprocessedItems().size());
if (!outcome.get().unprocessedItems().isEmpty()) {
int totalItems = outcome.get().unprocessedItems().values().stream().mapToInt(List::size).sum();
logger.error("Attempt count ({}) reached max ({}}) before applying all batch writes to dynamo. {} unprocessed items remain.", attemptCount, MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE, totalItems);
batchWriteItemsUnprocessed.increment(totalItems);
}
}
protected long countItemsMatchingQuery(final Table table, final QuerySpec querySpec) {
// This is very confusing, but does appear to be the intended behavior. See:
//
// - https://github.com/aws/aws-sdk-java/issues/693
// - https://github.com/aws/aws-sdk-java/issues/915
// - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.Count
long matchingItems = 0;
for (final Page<Item, QueryOutcome> page : table.query(querySpec).pages()) {
matchingItems += page.getLowLevelResult().getQueryResult().getCount();
}
return matchingItems;
}
static <T> void writeInBatches(final Iterable<T> items, final Consumer<List<T>> action) {
final List<T> batch = new ArrayList<>(DYNAMO_DB_MAX_BATCH_SIZE);

View File

@ -2,25 +2,6 @@ package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.CancellationReason;
import com.amazonaws.services.dynamodbv2.model.Delete;
import com.amazonaws.services.dynamodbv2.model.GetItemResult;
import com.amazonaws.services.dynamodbv2.model.Put;
import com.amazonaws.services.dynamodbv2.model.ReturnValuesOnConditionCheckFailure;
import com.amazonaws.services.dynamodbv2.model.TransactWriteItem;
import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsRequest;
import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsResult;
import com.amazonaws.services.dynamodbv2.model.TransactionCanceledException;
import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
@ -33,12 +14,27 @@ import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.CancellationReason;
import software.amazon.awssdk.services.dynamodb.model.Delete;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.Put;
import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountStore {
@ -51,9 +47,8 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
static final String ATTR_MIGRATION_VERSION = "V";
private final AmazonDynamoDB client;
private final Table accountsTable;
private final AmazonDynamoDBAsync asyncClient;
private final DynamoDbClient client;
private final DynamoDbAsyncClient asyncClient;
private final ThreadPoolExecutor migrationThreadPool;
@ -61,6 +56,7 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
private final MigrationRetryAccounts migrationRetryAccounts;
private final String phoneNumbersTableName;
private final String accountsTableName;
private static final Timer CREATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "create"));
private static final Timer UPDATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "update"));
@ -70,18 +66,17 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
private final Logger logger = LoggerFactory.getLogger(AccountsDynamoDb.class);
public AccountsDynamoDb(AmazonDynamoDB client, AmazonDynamoDBAsync asyncClient,
ThreadPoolExecutor migrationThreadPool, DynamoDB dynamoDb, String accountsTableName, String phoneNumbersTableName,
public AccountsDynamoDb(DynamoDbClient client, DynamoDbAsyncClient asyncClient,
ThreadPoolExecutor migrationThreadPool, String accountsTableName, String phoneNumbersTableName,
MigrationDeletedAccounts migrationDeletedAccounts,
MigrationRetryAccounts accountsMigrationErrors) {
super(dynamoDb);
super(client);
this.client = client;
this.accountsTable = dynamoDb.getTable(accountsTableName);
this.phoneNumbersTableName = phoneNumbersTableName;
this.asyncClient = asyncClient;
this.phoneNumbersTableName = phoneNumbersTableName;
this.accountsTableName = accountsTableName;
this.migrationThreadPool = migrationThreadPool;
this.migrationDeletedAccounts = migrationDeletedAccounts;
@ -90,32 +85,34 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
@Override
public boolean create(Account account) {
return CREATE_TIMER.record(() -> {
try {
TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid());
TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid(), Put.builder()
.conditionExpression("attribute_not_exists(#number) OR #number = :number")
.expressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164))
.expressionAttributeValues(Map.of(":number", AttributeValues.fromString(account.getNumber()))));
TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid());
final TransactWriteItemsRequest request = new TransactWriteItemsRequest()
.withTransactItems(phoneNumberConstraintPut, accountPut);
final TransactWriteItemsRequest request = TransactWriteItemsRequest.builder()
.transactItems(phoneNumberConstraintPut, accountPut)
.build();
try {
client.transactWriteItems(request);
} catch (TransactionCanceledException e) {
final CancellationReason accountCancellationReason = e.getCancellationReasons().get(1);
final CancellationReason accountCancellationReason = e.cancellationReasons().get(1);
if ("ConditionalCheckFailed".equals(accountCancellationReason.getCode())) {
if ("ConditionalCheckFailed".equals(accountCancellationReason.code())) {
throw new IllegalArgumentException("uuid present with different phone number");
}
final CancellationReason phoneNumberConstraintCancellationReason = e.getCancellationReasons().get(0);
final CancellationReason phoneNumberConstraintCancellationReason = e.cancellationReasons().get(0);
if ("ConditionalCheckFailed".equals(phoneNumberConstraintCancellationReason.getCode())) {
if ("ConditionalCheckFailed".equals(phoneNumberConstraintCancellationReason.code())) {
ByteBuffer actualAccountUuid = phoneNumberConstraintCancellationReason.getItem().get(KEY_ACCOUNT_UUID).getB();
ByteBuffer actualAccountUuid = phoneNumberConstraintCancellationReason.item().get(KEY_ACCOUNT_UUID).b().asByteBuffer();
account.setUuid(UUIDUtil.fromByteBuffer(actualAccountUuid));
update(account);
@ -134,39 +131,37 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
});
}
private TransactWriteItem buildPutWriteItemForAccount(Account account, UUID uuid) throws JsonProcessingException {
return new TransactWriteItem()
.withPut(
new Put()
.withTableName(accountsTable.getTableName())
.withItem(Map.of(
KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)),
ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()),
ATTR_ACCOUNT_DATA, new AttributeValue()
.withB(ByteBuffer.wrap(SystemMapper.getMapper().writeValueAsBytes(account))),
ATTR_MIGRATION_VERSION, new AttributeValue().withN(
String.valueOf(account.getDynamoDbMigrationVersion()))))
.withConditionExpression("attribute_not_exists(#number) OR #number = :number")
.withExpressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164))
.withExpressionAttributeValues(Map.of(":number", new AttributeValue(account.getNumber()))));
private TransactWriteItem buildPutWriteItemForAccount(Account account, UUID uuid, Put.Builder putBuilder) throws JsonProcessingException {
return TransactWriteItem.builder()
.put(putBuilder
.tableName(accountsTableName)
.item(Map.of(
KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
ATTR_ACCOUNT_E164, AttributeValues.fromString(account.getNumber()),
ATTR_ACCOUNT_DATA, AttributeValues.fromByteArray(SystemMapper.getMapper().writeValueAsBytes(account)),
ATTR_MIGRATION_VERSION, AttributeValues.fromInt(account.getDynamoDbMigrationVersion())))
.build())
.build();
}
private TransactWriteItem buildPutWriteItemForPhoneNumberConstraint(Account account, UUID uuid) {
return new TransactWriteItem()
.withPut(
new Put()
.withTableName(phoneNumbersTableName)
.withItem(Map.of(
ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()),
KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid))))
.withConditionExpression(
return TransactWriteItem.builder()
.put(
Put.builder()
.tableName(phoneNumbersTableName)
.item(Map.of(
ATTR_ACCOUNT_E164, AttributeValues.fromString(account.getNumber()),
KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid)))
.conditionExpression(
"attribute_not_exists(#number) OR (attribute_exists(#number) AND #uuid = :uuid)")
.withExpressionAttributeNames(
.expressionAttributeNames(
Map.of("#uuid", KEY_ACCOUNT_UUID,
"#number", ATTR_ACCOUNT_E164))
.withExpressionAttributeValues(
Map.of(":uuid", new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid))))
.withReturnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD));
.expressionAttributeValues(
Map.of(":uuid", AttributeValues.fromUUID(uuid)))
.returnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD)
.build())
.build();
}
@Override
@ -174,16 +169,18 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
UPDATE_TIMER.record(() -> {
UpdateItemRequest updateItemRequest;
try {
updateItemRequest = new UpdateItemRequest()
.withTableName(accountsTable.getTableName())
.withKey(Map.of(KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(account.getUuid()))))
.withUpdateExpression("SET #data = :data, #version = :version")
.withConditionExpression("attribute_exists(#number)")
.withExpressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164,
updateItemRequest = UpdateItemRequest.builder()
.tableName(accountsTableName)
.key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(account.getUuid())))
.updateExpression("SET #data = :data, #version = :version")
.conditionExpression("attribute_exists(#number)")
.expressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164,
"#data", ATTR_ACCOUNT_DATA,
"#version", ATTR_MIGRATION_VERSION))
.withExpressionAttributeValues(Map.of(":data", new AttributeValue().withB(ByteBuffer.wrap(SystemMapper.getMapper().writeValueAsBytes(account))),
":version", new AttributeValue().withN(String.valueOf(account.getDynamoDbMigrationVersion()))));
.expressionAttributeValues(Map.of(
":data", AttributeValues.fromByteArray(SystemMapper.getMapper().writeValueAsBytes(account)),
":version", AttributeValues.fromInt(account.getDynamoDbMigrationVersion())))
.build();
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
@ -193,37 +190,42 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
});
}
@Override
public Optional<Account> get(String number) {
return GET_BY_NUMBER_TIMER.record(() -> {
final GetItemResult phoneNumberAndUuid = client.getItem(phoneNumbersTableName,
Map.of(ATTR_ACCOUNT_E164, new AttributeValue(number)), true);
final GetItemResponse response = client.getItem(GetItemRequest.builder()
.tableName(phoneNumbersTableName)
.key(Map.of(ATTR_ACCOUNT_E164, AttributeValues.fromString(number)))
.build());
return Optional.ofNullable(phoneNumberAndUuid.getItem())
.map(item -> item.get(KEY_ACCOUNT_UUID).getB())
.map(uuid -> accountsTable.getItem(new GetItemSpec()
.withPrimaryKey(KEY_ACCOUNT_UUID, uuid.array())
.withConsistentRead(true)))
return Optional.ofNullable(response.item())
.map(item -> item.get(KEY_ACCOUNT_UUID))
.map(uuid -> accountByUuid(uuid))
.map(AccountsDynamoDb::fromItem);
});
}
private Map<String, AttributeValue> accountByUuid(AttributeValue uuid) {
GetItemResponse r = client.getItem(GetItemRequest.builder()
.tableName(accountsTableName)
.key(Map.of(KEY_ACCOUNT_UUID, uuid))
.consistentRead(true)
.build());
return r.item().isEmpty() ? null : r.item();
}
@Override
public Optional<Account> get(UUID uuid) {
Optional<Item> maybeItem = GET_BY_UUID_TIMER.record(() ->
Optional.ofNullable(accountsTable.getItem(new GetItemSpec().
withPrimaryKey(new PrimaryKey(KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(uuid)))
.withConsistentRead(true))));
return maybeItem.map(AccountsDynamoDb::fromItem);
return GET_BY_UUID_TIMER.record(() ->
Optional.ofNullable(accountByUuid(AttributeValues.fromUUID(uuid)))
.map(AccountsDynamoDb::fromItem));
}
@Override
public void delete(UUID uuid) {
DELETE_TIMER.record(() -> {
delete(uuid, true);
});
}
@ -238,18 +240,22 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
maybeAccount.ifPresent(account -> {
TransactWriteItem phoneNumberDelete = new TransactWriteItem()
.withDelete(new Delete()
.withTableName(phoneNumbersTableName)
.withKey(Map.of(ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()))));
TransactWriteItem phoneNumberDelete = TransactWriteItem.builder()
.delete(Delete.builder()
.tableName(phoneNumbersTableName)
.key(Map.of(ATTR_ACCOUNT_E164, AttributeValues.fromString(account.getNumber())))
.build())
.build();
TransactWriteItem accountDelete = new TransactWriteItem().withDelete(
new Delete()
.withTableName(accountsTable.getTableName())
.withKey(Map.of(KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)))));
TransactWriteItem accountDelete = TransactWriteItem.builder()
.delete(Delete.builder()
.tableName(accountsTableName)
.key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid)))
.build())
.build();
TransactWriteItemsRequest request = new TransactWriteItemsRequest()
.withTransactItems(phoneNumberDelete, accountDelete);
TransactWriteItemsRequest request = TransactWriteItemsRequest.builder()
.transactItems(phoneNumberDelete, accountDelete).build();
client.transactWriteItems(request);
});
@ -299,64 +305,62 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
try {
TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid());
TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid());
accountPut.getPut()
.setConditionExpression("attribute_not_exists(#uuid) OR (attribute_exists(#uuid) AND #version < :version)");
accountPut.getPut()
.setExpressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID,
"#version", ATTR_MIGRATION_VERSION));
accountPut.getPut()
.setExpressionAttributeValues(
Map.of(":version", new AttributeValue().withN(String.valueOf(account.getDynamoDbMigrationVersion()))));
TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid(), Put.builder()
.conditionExpression("attribute_not_exists(#uuid) OR (attribute_exists(#uuid) AND #version < :version)")
.expressionAttributeNames(Map.of(
"#uuid", KEY_ACCOUNT_UUID,
"#version", ATTR_MIGRATION_VERSION))
.expressionAttributeValues(Map.of(
":version", AttributeValues.fromInt(account.getDynamoDbMigrationVersion()))));
final TransactWriteItemsRequest request = new TransactWriteItemsRequest()
.withTransactItems(phoneNumberConstraintPut, accountPut);
final TransactWriteItemsRequest request = TransactWriteItemsRequest.builder()
.transactItems(phoneNumberConstraintPut, accountPut).build();
final CompletableFuture<Boolean> resultFuture = new CompletableFuture<>();
asyncClient.transactWriteItemsAsync(request,
new AsyncHandler<>() {
@Override
public void onError(Exception exception) {
if (exception instanceof TransactionCanceledException) {
// account is already migrated
resultFuture.complete(false);
} else {
try {
migrationRetryAccounts.put(account.getUuid());
} catch (final Exception e) {
logger.error("Could not store account {}", account.getUuid());
}
resultFuture.completeExceptionally(exception);
}
}
@Override
public void onSuccess(TransactWriteItemsRequest request, TransactWriteItemsResult transactWriteItemsResult) {
resultFuture.complete(true);
}
});
asyncClient.transactWriteItems(request).whenCompleteAsync((result, exception) -> {
if (result != null) {
resultFuture.complete(true);
return;
}
if (exception instanceof CompletionException) {
// whenCompleteAsync can wrap exceptions in a CompletionException; unwrap it to get to the root cause.
exception = exception.getCause();
}
if (exception instanceof TransactionCanceledException) {
// account is already migrated
resultFuture.complete(false);
return;
}
try {
migrationRetryAccounts.put(account.getUuid());
} catch (final Exception e) {
logger.error("Could not store account {}", account.getUuid());
}
resultFuture.completeExceptionally(exception);
});
return resultFuture;
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
private static String extractCancellationReasonCodes(final TransactionCanceledException exception) {
return exception.getCancellationReasons().stream()
.map(CancellationReason::getCode)
return exception.cancellationReasons().stream()
.map(CancellationReason::code)
.collect(Collectors.joining(", "));
}
@VisibleForTesting
static Account fromItem(Item item) {
static Account fromItem(Map<String, AttributeValue> item) {
if (!item.containsKey(ATTR_ACCOUNT_DATA) ||
!item.containsKey(ATTR_ACCOUNT_E164) ||
!item.containsKey(KEY_ACCOUNT_UUID)) {
throw new RuntimeException("item missing values");
}
try {
Account account = SystemMapper.getMapper().readValue(item.getBinary(ATTR_ACCOUNT_DATA), Account.class);
account.setNumber(item.getString(ATTR_ACCOUNT_E164));
account.setUuid(UUIDUtil.fromByteBuffer(item.getByteBuffer(KEY_ACCOUNT_UUID)));
Account account = SystemMapper.getMapper().readValue(item.get(ATTR_ACCOUNT_DATA).b().asByteArray(), Account.class);
account.setNumber(item.get(ATTR_ACCOUNT_E164).s());
account.setUuid(UUIDUtil.fromByteBuffer(item.get(KEY_ACCOUNT_UUID).b().asByteBuffer()));
return account;

View File

@ -7,7 +7,6 @@ package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
@ -42,6 +41,7 @@ import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
public class AccountsManager {

View File

@ -7,195 +7,230 @@ package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.amazonaws.services.dynamodbv2.document.DeleteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec;
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
import com.amazonaws.services.dynamodbv2.model.ReturnValue;
import com.amazonaws.services.dynamodbv2.model.Select;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.whispersystems.textsecuregcm.entities.PreKey;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
import software.amazon.awssdk.services.dynamodb.model.Select;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
public class KeysDynamoDb extends AbstractDynamoDbStore {
private final Table table;
private final String tableName;
static final String KEY_ACCOUNT_UUID = "U";
static final String KEY_DEVICE_ID_KEY_ID = "DK";
static final String KEY_PUBLIC_KEY = "P";
static final String KEY_ACCOUNT_UUID = "U";
static final String KEY_DEVICE_ID_KEY_ID = "DK";
static final String KEY_PUBLIC_KEY = "P";
private static final Timer STORE_KEYS_TIMER = Metrics.timer(name(KeysDynamoDb.class, "storeKeys"));
private static final Timer TAKE_KEY_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForDevice"));
private static final Timer TAKE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForAccount"));
private static final Timer GET_KEY_COUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "getKeyCount"));
private static final Timer DELETE_KEYS_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForDevice"));
private static final Timer DELETE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForAccount"));
private static final DistributionSummary CONTESTED_KEY_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "contestedKeys"));
private static final DistributionSummary KEY_COUNT_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "keyCount"));
private static final Timer STORE_KEYS_TIMER = Metrics.timer(name(KeysDynamoDb.class, "storeKeys"));
private static final Timer TAKE_KEY_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForDevice"));
private static final Timer TAKE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForAccount"));
private static final Timer GET_KEY_COUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "getKeyCount"));
private static final Timer DELETE_KEYS_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForDevice"));
private static final Timer DELETE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForAccount"));
private static final DistributionSummary CONTESTED_KEY_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "contestedKeys"));
private static final DistributionSummary KEY_COUNT_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "keyCount"));
public KeysDynamoDb(final DynamoDB dynamoDB, final String tableName) {
super(dynamoDB);
public KeysDynamoDb(final DynamoDbClient dynamoDB, final String tableName) {
super(dynamoDB);
this.tableName = tableName;
}
this.table = dynamoDB.getTable(tableName);
}
public void store(final Account account, final long deviceId, final List<PreKey> keys) {
STORE_KEYS_TIMER.record(() -> {
delete(account, deviceId);
public void store(final Account account, final long deviceId, final List<PreKey> keys) {
STORE_KEYS_TIMER.record(() -> {
delete(account, deviceId);
writeInBatches(keys, batch -> {
List<WriteRequest> items = new ArrayList<>();
for (final PreKey preKey : batch) {
items.add(WriteRequest.builder()
.putRequest(PutRequest.builder()
.item(getItemFromPreKey(account.getUuid(), deviceId, preKey))
.build())
.build());
}
executeTableWriteItemsUntilComplete(Map.of(tableName, items));
});
});
}
writeInBatches(keys, batch -> {
final TableWriteItems items = new TableWriteItems(table.getTableName());
public Optional<PreKey> take(final Account account, final long deviceId) {
return TAKE_KEY_FOR_DEVICE_TIMER.record(() -> {
final AttributeValue partitionKey = getPartitionKey(account.getUuid());
QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
.expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
.expressionAttributeValues(Map.of(
":uuid", partitionKey,
":sortprefix", getSortKeyPrefix(deviceId)))
.projectionExpression(KEY_DEVICE_ID_KEY_ID)
.consistentRead(false)
.build();
for (final PreKey preKey : batch) {
items.addItemToPut(getItemFromPreKey(account.getUuid(), deviceId, preKey));
}
int contestedKeys = 0;
executeTableWriteItemsUntilComplete(items);
});
});
}
try {
QueryResponse response = db().query(queryRequest);
for (Map<String, AttributeValue> candidate : response.items()) {
DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(
KEY_ACCOUNT_UUID, partitionKey,
KEY_DEVICE_ID_KEY_ID, candidate.get(KEY_DEVICE_ID_KEY_ID)))
.returnValues(ReturnValue.ALL_OLD)
.build();
DeleteItemResponse deleteItemResponse = db().deleteItem(deleteItemRequest);
if (deleteItemResponse.attributes() != null) {
return Optional.of(getPreKeyFromItem(deleteItemResponse.attributes()));
}
public Optional<PreKey> take(final Account account, final long deviceId) {
return TAKE_KEY_FOR_DEVICE_TIMER.record(() -> {
final byte[] partitionKey = getPartitionKey(account.getUuid());
contestedKeys++;
}
final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
.withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
.withValueMap(Map.of(":uuid", partitionKey,
":sortprefix", getSortKeyPrefix(deviceId)))
.withProjectionExpression(KEY_DEVICE_ID_KEY_ID)
.withConsistentRead(false);
return Optional.empty();
} finally {
CONTESTED_KEY_DISTRIBUTION.record(contestedKeys);
}
});
}
int contestedKeys = 0;
public Map<Long, PreKey> take(final Account account) {
return TAKE_KEYS_FOR_ACCOUNT_TIMER.record(() -> {
final Map<Long, PreKey> preKeysByDeviceId = new HashMap<>();
try {
for (final Item candidate : table.query(querySpec)) {
final DeleteItemSpec deleteItemSpec = new DeleteItemSpec().withPrimaryKey(KEY_ACCOUNT_UUID, partitionKey, KEY_DEVICE_ID_KEY_ID, candidate.getBinary(KEY_DEVICE_ID_KEY_ID))
.withReturnValues(ReturnValue.ALL_OLD);
for (final Device device : account.getDevices()) {
take(account, device.getId()).ifPresent(preKey -> preKeysByDeviceId.put(device.getId(), preKey));
}
final DeleteItemOutcome outcome = table.deleteItem(deleteItemSpec);
return preKeysByDeviceId;
});
}
if (outcome.getItem() != null) {
return Optional.of(getPreKeyFromItem(outcome.getItem()));
}
public int getCount(final Account account, final long deviceId) {
return GET_KEY_COUNT_TIMER.record(() -> {
QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
.expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
.expressionAttributeValues(Map.of(
":uuid", getPartitionKey(account.getUuid()),
":sortprefix", getSortKeyPrefix(deviceId)))
.select(Select.COUNT)
.consistentRead(false)
.build();
contestedKeys++;
}
int keyCount = 0;
// This is very confusing, but does appear to be the intended behavior. See:
//
// - https://github.com/aws/aws-sdk-java/issues/693
// - https://github.com/aws/aws-sdk-java/issues/915
// - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.Count
for (final QueryResponse page : db().queryPaginator(queryRequest)) {
keyCount += page.count();
}
KEY_COUNT_DISTRIBUTION.record(keyCount);
return keyCount;
});
}
return Optional.empty();
} finally {
CONTESTED_KEY_DISTRIBUTION.record(contestedKeys);
}
});
}
public void delete(final Account account) {
DELETE_KEYS_FOR_ACCOUNT_TIMER.record(() -> {
final QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#uuid = :uuid")
.expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID))
.expressionAttributeValues(Map.of(
":uuid", getPartitionKey(account.getUuid())))
.projectionExpression(KEY_DEVICE_ID_KEY_ID)
.consistentRead(true)
.build();
public Map<Long, PreKey> take(final Account account) {
return TAKE_KEYS_FOR_ACCOUNT_TIMER.record(() -> {
final Map<Long, PreKey> preKeysByDeviceId = new HashMap<>();
deleteItemsForAccountMatchingQuery(account, queryRequest);
});
}
for (final Device device : account.getDevices()) {
take(account, device.getId()).ifPresent(preKey -> preKeysByDeviceId.put(device.getId(), preKey));
}
@VisibleForTesting
void delete(final Account account, final long deviceId) {
DELETE_KEYS_FOR_DEVICE_TIMER.record(() -> {
final QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
.expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
.expressionAttributeValues(Map.of(
":uuid", getPartitionKey(account.getUuid()),
":sortprefix", getSortKeyPrefix(deviceId)))
.projectionExpression(KEY_DEVICE_ID_KEY_ID)
.consistentRead(true)
.build();
return preKeysByDeviceId;
});
}
deleteItemsForAccountMatchingQuery(account, queryRequest);
});
}
public int getCount(final Account account, final long deviceId) {
return GET_KEY_COUNT_TIMER.record(() -> {
final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
.withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
.withValueMap(Map.of(":uuid", getPartitionKey(account.getUuid()),
":sortprefix", getSortKeyPrefix(deviceId)))
.withSelect(Select.COUNT)
.withConsistentRead(false);
private void deleteItemsForAccountMatchingQuery(final Account account, final QueryRequest querySpec) {
final AttributeValue partitionKey = getPartitionKey(account.getUuid());
final int keyCount = (int)countItemsMatchingQuery(table, querySpec);
writeInBatches(db().query(querySpec).items(), batch -> {
List<WriteRequest> deletes = new ArrayList<>();
for (final Map<String, AttributeValue> item : batch) {
deletes.add(WriteRequest.builder()
.deleteRequest(DeleteRequest.builder()
.key(Map.of(
KEY_ACCOUNT_UUID, partitionKey,
KEY_DEVICE_ID_KEY_ID, item.get(KEY_DEVICE_ID_KEY_ID)))
.build())
.build());
}
executeTableWriteItemsUntilComplete(Map.of(tableName, deletes));
});
}
KEY_COUNT_DISTRIBUTION.record(keyCount);
return keyCount;
});
}
private static AttributeValue getPartitionKey(final UUID accountUuid) {
return AttributeValues.fromUUID(accountUuid);
}
public void delete(final Account account) {
DELETE_KEYS_FOR_ACCOUNT_TIMER.record(() -> {
final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid")
.withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID))
.withValueMap(Map.of(":uuid", getPartitionKey(account.getUuid())))
.withProjectionExpression(KEY_DEVICE_ID_KEY_ID)
.withConsistentRead(true);
private static AttributeValue getSortKey(final long deviceId, final long keyId) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(deviceId);
byteBuffer.putLong(keyId);
return AttributeValues.fromByteBuffer(byteBuffer.flip());
}
deleteItemsForAccountMatchingQuery(account, querySpec);
});
}
@VisibleForTesting
static AttributeValue getSortKeyPrefix(final long deviceId) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]);
byteBuffer.putLong(deviceId);
return AttributeValues.fromByteBuffer(byteBuffer.flip());
}
@VisibleForTesting
void delete(final Account account, final long deviceId) {
DELETE_KEYS_FOR_DEVICE_TIMER.record(() -> {
final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
.withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
.withValueMap(Map.of(":uuid", getPartitionKey(account.getUuid()),
":sortprefix", getSortKeyPrefix(deviceId)))
.withProjectionExpression(KEY_DEVICE_ID_KEY_ID)
.withConsistentRead(true);
private Map<String, AttributeValue> getItemFromPreKey(final UUID accountUuid, final long deviceId, final PreKey preKey) {
return Map.of(
KEY_ACCOUNT_UUID, getPartitionKey(accountUuid),
KEY_DEVICE_ID_KEY_ID, getSortKey(deviceId, preKey.getKeyId()),
KEY_PUBLIC_KEY, AttributeValues.fromString(preKey.getPublicKey()));
}
deleteItemsForAccountMatchingQuery(account, querySpec);
});
}
private void deleteItemsForAccountMatchingQuery(final Account account, final QuerySpec querySpec) {
final byte[] partitionKey = getPartitionKey(account.getUuid());
writeInBatches(table.query(querySpec), batch -> {
final TableWriteItems writeItems = new TableWriteItems(table.getTableName());
for (final Item item : batch) {
writeItems.addPrimaryKeyToDelete(new PrimaryKey(KEY_ACCOUNT_UUID, partitionKey, KEY_DEVICE_ID_KEY_ID, item.getBinary(KEY_DEVICE_ID_KEY_ID)));
}
executeTableWriteItemsUntilComplete(writeItems);
});
}
private static byte[] getPartitionKey(final UUID accountUuid) {
return UUIDUtil.toBytes(accountUuid);
}
private static byte[] getSortKey(final long deviceId, final long keyId) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(deviceId);
byteBuffer.putLong(keyId);
return byteBuffer.array();
}
private static byte[] getSortKeyPrefix(final long deviceId) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]);
byteBuffer.putLong(deviceId);
return byteBuffer.array();
}
private Item getItemFromPreKey(final UUID accountUuid, final long deviceId, final PreKey preKey) {
return new Item().withBinary(KEY_ACCOUNT_UUID, getPartitionKey(accountUuid))
.withBinary(KEY_DEVICE_ID_KEY_ID, getSortKey(deviceId, preKey.getKeyId()))
.withString(KEY_PUBLIC_KEY, preKey.getPublicKey());
}
private PreKey getPreKeyFromItem(final Item item) {
final long keyId = ByteBuffer.wrap(item.getBinary(KEY_DEVICE_ID_KEY_ID)).getLong(8);
return new PreKey(keyId, item.getString(KEY_PUBLIC_KEY));
}
private PreKey getPreKeyFromItem(Map<String, AttributeValue> item) {
final long keyId = item.get(KEY_DEVICE_ID_KEY_ID).b().asByteBuffer().getLong(8);
return new PreKey(keyId, item.get(KEY_PUBLIC_KEY).s());
}
}

View File

@ -8,17 +8,7 @@ package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import static io.micrometer.core.instrument.Metrics.timer;
import com.amazonaws.services.dynamodbv2.document.DeleteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Index;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.document.api.QueryApi;
import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec;
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
import com.amazonaws.services.dynamodbv2.model.ReturnValue;
import com.google.common.collect.ImmutableMap;
import io.micrometer.core.instrument.Timer;
import java.nio.ByteBuffer;
import java.time.Duration;
@ -27,11 +17,22 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
public class MessagesDynamoDb extends AbstractDynamoDbStore {
@ -60,7 +61,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
private final String tableName;
private final Duration timeToLive;
public MessagesDynamoDb(DynamoDB dynamoDb, String tableName, Duration timeToLive) {
public MessagesDynamoDb(DynamoDbClient dynamoDb, String tableName, Duration timeToLive) {
super(dynamoDb);
this.tableName = tableName;
@ -76,54 +77,61 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
throw new IllegalArgumentException("Maximum batch size of " + DYNAMO_DB_MAX_BATCH_SIZE + " execeeded with " + messages.size() + " messages");
}
final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
TableWriteItems items = new TableWriteItems(tableName);
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
List<WriteRequest> writeItems = new ArrayList<>();
for (MessageProtos.Envelope message : messages) {
final UUID messageUuid = UUID.fromString(message.getServerGuid());
final Item item = new Item().withBinary(KEY_PARTITION, partitionKey)
.withBinary(KEY_SORT, convertSortKey(destinationDeviceId, message.getServerTimestamp(), messageUuid))
.withBinary(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid))
.withInt(KEY_TYPE, message.getType().getNumber())
.withLong(KEY_TIMESTAMP, message.getTimestamp())
.withLong(KEY_TTL, getTtlForMessage(message));
final ImmutableMap.Builder<String, AttributeValue> item = ImmutableMap.<String, AttributeValue>builder()
.put(KEY_PARTITION, partitionKey)
.put(KEY_SORT, convertSortKey(destinationDeviceId, message.getServerTimestamp(), messageUuid))
.put(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid))
.put(KEY_TYPE, AttributeValues.fromInt(message.getType().getNumber()))
.put(KEY_TIMESTAMP, AttributeValues.fromLong(message.getTimestamp()))
.put(KEY_TTL, AttributeValues.fromLong(getTtlForMessage(message)));
if (message.hasRelay() && message.getRelay().length() > 0) {
item.withString(KEY_RELAY, message.getRelay());
item.put(KEY_RELAY, AttributeValues.fromString(message.getRelay()));
}
if (message.hasSource()) {
item.withString(KEY_SOURCE, message.getSource());
item.put(KEY_SOURCE, AttributeValues.fromString(message.getSource()));
}
if (message.hasSourceUuid()) {
item.withBinary(KEY_SOURCE_UUID, UUIDUtil.toBytes(UUID.fromString(message.getSourceUuid())));
item.put(KEY_SOURCE_UUID, AttributeValues.fromUUID(UUID.fromString(message.getSourceUuid())));
}
if (message.hasSourceDevice()) {
item.withInt(KEY_SOURCE_DEVICE, message.getSourceDevice());
item.put(KEY_SOURCE_DEVICE, AttributeValues.fromInt(message.getSourceDevice()));
}
if (message.hasLegacyMessage()) {
item.withBinary(KEY_MESSAGE, message.getLegacyMessage().toByteArray());
item.put(KEY_MESSAGE, AttributeValues.fromByteArray(message.getLegacyMessage().toByteArray()));
}
if (message.hasContent()) {
item.withBinary(KEY_CONTENT, message.getContent().toByteArray());
item.put(KEY_CONTENT, AttributeValues.fromByteArray(message.getContent().toByteArray()));
}
items.addItemToPut(item);
writeItems.add(WriteRequest.builder().putRequest(PutRequest.builder()
.item(item.build())
.build()).build());
}
executeTableWriteItemsUntilComplete(items);
executeTableWriteItemsUntilComplete(Map.of(tableName, writeItems));
}
public List<OutgoingMessageEntity> load(final UUID destinationAccountUuid, final long destinationDeviceId, final int requestedNumberOfMessagesToFetch) {
return loadTimer.record(() -> {
final int numberOfMessagesToFetch = Math.min(requestedNumberOfMessagesToFetch, RESULT_SET_CHUNK_SIZE);
final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
final QuerySpec querySpec = new QuerySpec().withConsistentRead(true)
.withKeyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.withNameMap(Map.of("#part", KEY_PARTITION,
"#sort", KEY_SORT))
.withValueMap(Map.of(":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
.withMaxResultSize(numberOfMessagesToFetch);
final Table table = getDynamoDb().getTable(tableName);
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
final QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.consistentRead(true)
.keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.expressionAttributeNames(Map.of(
"#part", KEY_PARTITION,
"#sort", KEY_SORT))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
.limit(numberOfMessagesToFetch)
.build();
List<OutgoingMessageEntity> messageEntities = new ArrayList<>(numberOfMessagesToFetch);
for (Item message : table.query(querySpec)) {
for (Map<String, AttributeValue> message : db().query(queryRequest).items()) {
messageEntities.add(convertItemToOutgoingMessageEntity(message));
}
return messageEntities;
@ -136,53 +144,63 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
throw new IllegalArgumentException("must specify a source");
}
final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
final QuerySpec querySpec = new QuerySpec().withProjectionExpression(KEY_SORT)
.withConsistentRead(true)
.withKeyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.withFilterExpression("#source = :source AND #timestamp = :timestamp")
.withNameMap(Map.of("#part", KEY_PARTITION,
"#sort", KEY_SORT,
"#source", KEY_SOURCE,
"#timestamp", KEY_TIMESTAMP))
.withValueMap(Map.of(":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId),
":source", source,
":timestamp", timestamp));
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
final QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.projectionExpression(KEY_SORT)
.consistentRead(true)
.keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.filterExpression("#source = :source AND #timestamp = :timestamp")
.expressionAttributeNames(Map.of(
"#part", KEY_PARTITION,
"#sort", KEY_SORT,
"#source", KEY_SOURCE,
"#timestamp", KEY_TIMESTAMP))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId),
":source", AttributeValues.fromString(source),
":timestamp", AttributeValues.fromLong(timestamp)))
.build();
final Table table = getDynamoDb().getTable(tableName);
return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(table, partitionKey, querySpec, table);
return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(partitionKey, queryRequest);
});
}
public Optional<OutgoingMessageEntity> deleteMessageByDestinationAndGuid(final UUID destinationAccountUuid, final long destinationDeviceId, final UUID messageUuid) {
return deleteByGuid.record(() -> {
final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
final QuerySpec querySpec = new QuerySpec().withProjectionExpression(KEY_SORT)
.withConsistentRead(true)
.withKeyConditionExpression("#part = :part AND #uuid = :uuid")
.withNameMap(Map.of("#part", KEY_PARTITION,
"#uuid", LOCAL_INDEX_MESSAGE_UUID_KEY_SORT))
.withValueMap(Map.of(":part", partitionKey,
":uuid", convertLocalIndexMessageUuidSortKey(messageUuid)));
final Table table = getDynamoDb().getTable(tableName);
final Index index = table.getIndex(LOCAL_INDEX_MESSAGE_UUID_NAME);
return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(table, partitionKey, querySpec, index);
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
final QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.indexName(LOCAL_INDEX_MESSAGE_UUID_NAME)
.projectionExpression(KEY_SORT)
.consistentRead(true)
.keyConditionExpression("#part = :part AND #uuid = :uuid")
.expressionAttributeNames(Map.of(
"#part", KEY_PARTITION,
"#uuid", LOCAL_INDEX_MESSAGE_UUID_KEY_SORT))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":uuid", convertLocalIndexMessageUuidSortKey(messageUuid)))
.build();
return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(partitionKey, queryRequest);
});
}
@Nonnull
private Optional<OutgoingMessageEntity> deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(Table table, byte[] partitionKey, QuerySpec querySpec, QueryApi queryApi) {
private Optional<OutgoingMessageEntity> deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(AttributeValue partitionKey, QueryRequest queryRequest) {
Optional<OutgoingMessageEntity> result = Optional.empty();
for (Item item : queryApi.query(querySpec)) {
final byte[] rangeKeyValue = item.getBinary(KEY_SORT);
DeleteItemSpec deleteItemSpec = new DeleteItemSpec().withPrimaryKey(KEY_PARTITION, partitionKey, KEY_SORT, rangeKeyValue);
for (Map<String, AttributeValue> item : db().query(queryRequest).items()) {
final byte[] rangeKeyValue = item.get(KEY_SORT).b().asByteArray();
DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(KEY_PARTITION, partitionKey, KEY_SORT, AttributeValues.fromByteArray(rangeKeyValue)));
if (result.isEmpty()) {
deleteItemSpec.withReturnValues(ReturnValue.ALL_OLD);
deleteItemRequest.returnValues(ReturnValue.ALL_OLD);
}
final DeleteItemOutcome deleteItemOutcome = table.deleteItem(deleteItemSpec);
if (deleteItemOutcome.getItem() != null && deleteItemOutcome.getItem().hasAttribute(KEY_PARTITION)) {
result = Optional.of(convertItemToOutgoingMessageEntity(deleteItemOutcome.getItem()));
final DeleteItemResponse deleteItemResponse = db().deleteItem(deleteItemRequest.build());
if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) {
result = Optional.of(convertItemToOutgoingMessageEntity(deleteItemResponse.attributes()));
}
}
return result;
@ -190,74 +208,88 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
public void deleteAllMessagesForAccount(final UUID destinationAccountUuid) {
deleteByAccount.record(() -> {
final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
final QuerySpec querySpec = new QuerySpec().withHashKey(KEY_PARTITION, partitionKey)
.withProjectionExpression(KEY_SORT)
.withConsistentRead(true);
deleteRowsMatchingQuery(partitionKey, querySpec);
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
final QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.projectionExpression(KEY_SORT)
.consistentRead(true)
.keyConditionExpression("#part = :part")
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
.expressionAttributeValues(Map.of(":part", partitionKey))
.build();
deleteRowsMatchingQuery(partitionKey, queryRequest);
});
}
public void deleteAllMessagesForDevice(final UUID destinationAccountUuid, final long destinationDeviceId) {
deleteByDevice.record(() -> {
final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.withNameMap(Map.of("#part", KEY_PARTITION,
"#sort", KEY_SORT))
.withValueMap(Map.of(":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
.withProjectionExpression(KEY_SORT)
.withConsistentRead(true);
deleteRowsMatchingQuery(partitionKey, querySpec);
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
final QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.expressionAttributeNames(Map.of(
"#part", KEY_PARTITION,
"#sort", KEY_SORT))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
.projectionExpression(KEY_SORT)
.consistentRead(true)
.build();
deleteRowsMatchingQuery(partitionKey, queryRequest);
});
}
private OutgoingMessageEntity convertItemToOutgoingMessageEntity(Item message) {
final SortKey sortKey = convertSortKey(message.getBinary(KEY_SORT));
final UUID messageUuid = convertLocalIndexMessageUuidSortKey(message.getBinary(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT));
final int type = message.getInt(KEY_TYPE);
final String relay = message.getString(KEY_RELAY);
final long timestamp = message.getLong(KEY_TIMESTAMP);
final String source = message.getString(KEY_SOURCE);
final UUID sourceUuid = message.hasAttribute(KEY_SOURCE_UUID) ? convertUuidFromBytes(message.getBinary(KEY_SOURCE_UUID), "message source uuid") : null;
final int sourceDevice = message.hasAttribute(KEY_SOURCE_DEVICE) ? message.getInt(KEY_SOURCE_DEVICE) : 0;
final byte[] messageBytes = message.getBinary(KEY_MESSAGE);
final byte[] content = message.getBinary(KEY_CONTENT);
private OutgoingMessageEntity convertItemToOutgoingMessageEntity(Map<String, AttributeValue> message) {
final SortKey sortKey = convertSortKey(message.get(KEY_SORT).b().asByteArray());
final UUID messageUuid = convertLocalIndexMessageUuidSortKey(message.get(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT).b().asByteArray());
final int type = AttributeValues.getInt(message, KEY_TYPE, 0);
final String relay = AttributeValues.getString(message, KEY_RELAY, null);
final long timestamp = AttributeValues.getLong(message, KEY_TIMESTAMP, 0L);
final String source = AttributeValues.getString(message, KEY_SOURCE, null);
final UUID sourceUuid = AttributeValues.getUUID(message, KEY_SOURCE_UUID, null);
final int sourceDevice = AttributeValues.getInt(message, KEY_SOURCE_DEVICE, 0);
final byte[] messageBytes = AttributeValues.getByteArray(message, KEY_MESSAGE, null);
final byte[] content = AttributeValues.getByteArray(message, KEY_CONTENT, null);
return new OutgoingMessageEntity(-1L, false, messageUuid, type, relay, timestamp, source, sourceUuid, sourceDevice, messageBytes, content, sortKey.getServerTimestamp());
}
private void deleteRowsMatchingQuery(byte[] partitionKey, QuerySpec querySpec) {
final Table table = getDynamoDb().getTable(tableName);
writeInBatches(table.query(querySpec), (itemBatch) -> deleteItems(partitionKey, itemBatch));
private void deleteRowsMatchingQuery(AttributeValue partitionKey, QueryRequest querySpec) {
writeInBatches(db().query(querySpec).items(), (itemBatch) -> deleteItems(partitionKey, itemBatch));
}
private void deleteItems(byte[] partitionKey, List<Item> items) {
final TableWriteItems tableWriteItems = new TableWriteItems(tableName);
items.stream().map(item -> new PrimaryKey(KEY_PARTITION, partitionKey, KEY_SORT, item.getBinary(KEY_SORT))).forEach(tableWriteItems::addPrimaryKeyToDelete);
executeTableWriteItemsUntilComplete(tableWriteItems);
private void deleteItems(AttributeValue partitionKey, List<Map<String, AttributeValue>> items) {
List<WriteRequest> deletes = items.stream()
.map(item -> WriteRequest.builder()
.deleteRequest(DeleteRequest.builder().key(Map.of(
KEY_PARTITION, partitionKey,
KEY_SORT, item.get(KEY_SORT))).build())
.build())
.collect(Collectors.toList());
executeTableWriteItemsUntilComplete(Map.of(tableName, deletes));
}
private long getTtlForMessage(MessageProtos.Envelope message) {
return message.getServerTimestamp() / 1000 + timeToLive.getSeconds();
}
private static byte[] convertPartitionKey(final UUID destinationAccountUuid) {
return UUIDUtil.toBytes(destinationAccountUuid);
private static AttributeValue convertPartitionKey(final UUID destinationAccountUuid) {
return AttributeValues.fromUUID(destinationAccountUuid);
}
private static byte[] convertSortKey(final long destinationDeviceId, final long serverTimestamp, final UUID messageUuid) {
private static AttributeValue convertSortKey(final long destinationDeviceId, final long serverTimestamp, final UUID messageUuid) {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[32]);
byteBuffer.putLong(destinationDeviceId);
byteBuffer.putLong(serverTimestamp);
byteBuffer.putLong(messageUuid.getMostSignificantBits());
byteBuffer.putLong(messageUuid.getLeastSignificantBits());
return byteBuffer.array();
return AttributeValues.fromByteBuffer(byteBuffer.flip());
}
private static byte[] convertDestinationDeviceIdToSortKeyPrefix(final long destinationDeviceId) {
private static AttributeValue convertDestinationDeviceIdToSortKeyPrefix(final long destinationDeviceId) {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]);
byteBuffer.putLong(destinationDeviceId);
return byteBuffer.array();
return AttributeValues.fromByteBuffer(byteBuffer.flip());
}
private static SortKey convertSortKey(final byte[] bytes) {
@ -273,8 +305,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
return new SortKey(destinationDeviceId, serverTimestamp, new UUID(mostSigBits, leastSigBits));
}
private static byte[] convertLocalIndexMessageUuidSortKey(final UUID messageUuid) {
return UUIDUtil.toBytes(messageUuid);
private static AttributeValue convertLocalIndexMessageUuidSortKey(final UUID messageUuid) {
return AttributeValues.fromUUID(messageUuid);
}
private static UUID convertLocalIndexMessageUuidSortKey(final byte[] bytes) {

View File

@ -1,42 +1,52 @@
package org.whispersystems.textsecuregcm.storage;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import java.util.stream.Collectors;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
public class MigrationDeletedAccounts extends AbstractDynamoDbStore {
private final Table table;
private final String tableName;
static final String KEY_UUID = "U";
public MigrationDeletedAccounts(DynamoDB dynamoDb, String tableName) {
public MigrationDeletedAccounts(DynamoDbClient dynamoDb, String tableName) {
super(dynamoDb);
table = dynamoDb.getTable(tableName);
this.tableName = tableName;
}
public void put(UUID uuid) {
table.putItem(new Item()
.withPrimaryKey(primaryKey(uuid)));
db().putItem(PutItemRequest.builder()
.tableName(tableName)
.item(primaryKey(uuid))
.build());
}
public List<UUID> getRecentlyDeletedUuids() {
final List<UUID> uuids = new ArrayList<>();
Optional<ScanResponse> firstPage = db().scanPaginator(ScanRequest.builder()
.tableName(tableName)
.build()).stream().findAny(); // get the first available response
for (Item item : table.scan(new ScanSpec()).firstPage()) {
// only process one page each time. If we have a significant backlog at the end of the migration
// we can handle it separately
uuids.add(UUIDUtil.fromByteBuffer(item.getByteBuffer(KEY_UUID)));
if (firstPage.isPresent()) {
for (Map<String, AttributeValue> item : firstPage.get().items()) {
// only process one page each time. If we have a significant backlog at the end of the migration
// we can handle it separately
uuids.add(AttributeValues.getUUID(item, KEY_UUID, null));
}
}
return uuids;
@ -45,20 +55,17 @@ public class MigrationDeletedAccounts extends AbstractDynamoDbStore {
public void delete(List<UUID> uuids) {
writeInBatches(uuids, (batch) -> {
List<WriteRequest> deletes = batch.stream().map((uuid) -> WriteRequest.builder().deleteRequest(DeleteRequest.builder()
.key(primaryKey(uuid))
.build()).build()).collect(Collectors.toList());
final TableWriteItems deleteItems = new TableWriteItems(table.getTableName());
for (UUID uuid : batch) {
deleteItems.addPrimaryKeyToDelete(primaryKey(uuid));
}
executeTableWriteItemsUntilComplete(deleteItems);
executeTableWriteItemsUntilComplete(Map.of(tableName, deletes));
});
}
@VisibleForTesting
public static PrimaryKey primaryKey(UUID uuid) {
return new PrimaryKey(KEY_UUID, UUIDUtil.toBytes(uuid));
public static Map<String, AttributeValue> primaryKey(UUID uuid) {
return Map.of(KEY_UUID, AttributeValues.fromUUID(uuid));
}
}

View File

@ -1,43 +1,44 @@
package org.whispersystems.textsecuregcm.storage;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.Page;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.ScanOutcome;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
public class MigrationRetryAccounts extends AbstractDynamoDbStore {
private final Table table;
private final String tableName;
static final String KEY_UUID = "U";
public MigrationRetryAccounts(DynamoDB dynamoDb, String tableName) {
public MigrationRetryAccounts(DynamoDbClient dynamoDb, String tableName) {
super(dynamoDb);
table = dynamoDb.getTable(tableName);
this.tableName = tableName;
}
public void put(UUID uuid) {
table.putItem(new Item()
.withPrimaryKey(primaryKey(uuid)));
db().putItem(PutItemRequest.builder()
.tableName(tableName)
.item(primaryKey(uuid))
.build());
}
public List<UUID> getUuids(int max) {
final List<UUID> uuids = new ArrayList<>();
for (Page<Item, ScanOutcome> page : table.scan(new ScanSpec()).pages()) {
for (ScanResponse response : db().scanPaginator(ScanRequest.builder().tableName(tableName).build())) {
for (Item item : page) {
uuids.add(UUIDUtil.fromByteBuffer(item.getByteBuffer(KEY_UUID)));
for (Map<String, AttributeValue> item : response.items()) {
uuids.add(AttributeValues.getUUID(item, KEY_UUID, null));
if (uuids.size() >= max) {
break;
@ -53,8 +54,8 @@ public class MigrationRetryAccounts extends AbstractDynamoDbStore {
}
@VisibleForTesting
public static PrimaryKey primaryKey(UUID uuid) {
return new PrimaryKey(KEY_UUID, UUIDUtil.toBytes(uuid));
public static Map<String, AttributeValue> primaryKey(UUID uuid) {
return Map.of(KEY_UUID, AttributeValues.fromUUID(uuid));
}
}

View File

@ -5,25 +5,23 @@
package org.whispersystems.textsecuregcm.storage;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec;
import com.amazonaws.services.dynamodbv2.document.spec.PutItemSpec;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import java.time.Clock;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
/**
* Stores push challenge tokens. Users may have at most one outstanding push challenge token at a time.
*/
public class PushChallengeDynamoDb extends AbstractDynamoDbStore {
private final Table table;
private final String tableName;
private final Clock clock;
static final String KEY_ACCOUNT_UUID = "U";
@ -33,15 +31,15 @@ public class PushChallengeDynamoDb extends AbstractDynamoDbStore {
private static final Map<String, String> UUID_NAME_MAP = Map.of("#uuid", KEY_ACCOUNT_UUID);
private static final Map<String, String> CHALLENGE_TOKEN_NAME_MAP = Map.of("#challenge", ATTR_CHALLENGE_TOKEN);
public PushChallengeDynamoDb(final DynamoDB dynamoDB, final String tableName) {
public PushChallengeDynamoDb(final DynamoDbClient dynamoDB, final String tableName) {
this(dynamoDB, tableName, Clock.systemUTC());
}
@VisibleForTesting
PushChallengeDynamoDb(final DynamoDB dynamoDB, final String tableName, final Clock clock) {
PushChallengeDynamoDb(final DynamoDbClient dynamoDB, final String tableName, final Clock clock) {
super(dynamoDB);
this.table = dynamoDB.getTable(tableName);
this.tableName = tableName;
this.clock = clock;
}
@ -57,13 +55,15 @@ public class PushChallengeDynamoDb extends AbstractDynamoDbStore {
*/
public boolean add(final UUID accountUuid, final byte[] challengeToken, final Duration ttl) {
try {
table.putItem( new PutItemSpec()
.withItem(new Item()
.withBinary(KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(accountUuid))
.withBinary(ATTR_CHALLENGE_TOKEN, challengeToken)
.withNumber(ATTR_TTL, getExpirationTimestamp(ttl)))
.withConditionExpression("attribute_not_exists(#uuid)")
.withNameMap(UUID_NAME_MAP));
db().putItem(PutItemRequest.builder()
.tableName(tableName)
.item(Map.of(
KEY_ACCOUNT_UUID, AttributeValues.fromUUID(accountUuid),
ATTR_CHALLENGE_TOKEN, AttributeValues.fromByteArray(challengeToken),
ATTR_TTL, AttributeValues.fromLong(getExpirationTimestamp(ttl))))
.conditionExpression("attribute_not_exists(#uuid)")
.expressionAttributeNames(UUID_NAME_MAP)
.build());
return true;
} catch (final ConditionalCheckFailedException e) {
return false;
@ -84,11 +84,13 @@ public class PushChallengeDynamoDb extends AbstractDynamoDbStore {
*/
public boolean remove(final UUID accountUuid, final byte[] challengeToken) {
try {
table.deleteItem(new DeleteItemSpec()
.withPrimaryKey(KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(accountUuid))
.withConditionExpression("#challenge = :challenge")
.withNameMap(CHALLENGE_TOKEN_NAME_MAP)
.withValueMap(Map.of(":challenge", challengeToken)));
db().deleteItem(DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(accountUuid)))
.conditionExpression("#challenge = :challenge")
.expressionAttributeNames(CHALLENGE_TOKEN_NAME_MAP)
.expressionAttributeValues(Map.of(":challenge", AttributeValues.fromByteArray(challengeToken)))
.build());
return true;
} catch (final ConditionalCheckFailedException e) {
return false;

View File

@ -1,13 +1,14 @@
package org.whispersystems.textsecuregcm.storage;
import com.amazonaws.services.dynamodbv2.document.DeleteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec;
import com.amazonaws.services.dynamodbv2.model.ReturnValue;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
public class ReportMessageDynamoDb {
@ -16,33 +17,30 @@ public class ReportMessageDynamoDb {
static final Duration TIME_TO_LIVE = Duration.ofDays(7);
private final Table table;
private final DynamoDbClient db;
private final String tableName;
public ReportMessageDynamoDb(final DynamoDB dynamoDB, final String tableName) {
this.table = dynamoDB.getTable(tableName);
public ReportMessageDynamoDb(final DynamoDbClient dynamoDB, final String tableName) {
this.db = dynamoDB;
this.tableName = tableName;
}
public void store(byte[] hash) {
table.putItem(buildItemForHash(hash));
}
private Item buildItemForHash(byte[] hash) {
return new Item()
.withBinary(KEY_HASH, hash)
.withLong(ATTR_TTL, Instant.now().plus(TIME_TO_LIVE).getEpochSecond());
db.putItem(PutItemRequest.builder()
.tableName(tableName)
.item(Map.of(
KEY_HASH, AttributeValues.fromByteArray(hash),
ATTR_TTL, AttributeValues.fromLong(Instant.now().plus(TIME_TO_LIVE).getEpochSecond())
))
.build());
}
public boolean remove(byte[] hash) {
final DeleteItemSpec deleteItemSpec = new DeleteItemSpec()
.withPrimaryKey(KEY_HASH, hash)
.withReturnValues(ReturnValue.ALL_OLD);
final DeleteItemOutcome outcome = table.deleteItem(deleteItemSpec);
return outcome.getItem() != null;
final DeleteItemResponse deleteItemResponse = db.deleteItem(DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(KEY_HASH, AttributeValues.fromByteArray(hash)))
.returnValues(ReturnValue.ALL_OLD)
.build());
return !deleteItemResponse.attributes().isEmpty();
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.util;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
/** AwsAV provides static helper methods for working with AWS AttributeValues. */
public class AttributeValues {
public static AttributeValue fromString(String value) {
return AttributeValue.builder().s(value).build();
}
public static AttributeValue fromLong(long value) {
return AttributeValue.builder().n(Long.toString(value)).build();
}
public static AttributeValue fromInt(int value) {
return AttributeValue.builder().n(Integer.toString(value)).build();
}
public static AttributeValue fromByteArray(byte[] value) {
return AttributeValues.fromSdkBytes(SdkBytes.fromByteArray(value));
}
public static AttributeValue fromByteBuffer(ByteBuffer value) {
return AttributeValues.fromSdkBytes(SdkBytes.fromByteBuffer(value));
}
public static AttributeValue fromUUID(UUID uuid) {
return AttributeValues.fromSdkBytes(SdkBytes.fromByteArrayUnsafe(UUIDUtil.toBytes(uuid)));
}
public static AttributeValue fromSdkBytes(SdkBytes value) {
return AttributeValue.builder().b(value).build();
}
private static int toInt(AttributeValue av) {
return Integer.parseInt(av.n());
}
private static long toLong(AttributeValue av) {
return Long.parseLong(av.n());
}
private static UUID toUUID(AttributeValue av) {
return UUIDUtil.fromBytes(av.b().asByteArrayUnsafe()); // We're guaranteed not to modify the byte array
}
private static byte[] toByteArray(AttributeValue av) {
return av.b().asByteArray();
}
private static String toString(AttributeValue av) {
return av.s();
}
public static Optional<AttributeValue> get(Map<String, AttributeValue> item, String key) {
return Optional.ofNullable(item.get(key));
}
public static int getInt(Map<String, AttributeValue> item, String key, int defaultValue) {
return AttributeValues.get(item, key).map(AttributeValues::toInt).orElse(defaultValue);
}
public static String getString(Map<String, AttributeValue> item, String key, String defaultValue) {
return AttributeValues.get(item, key).map(AttributeValues::toString).orElse(defaultValue);
}
public static long getLong(Map<String, AttributeValue> item, String key, long defaultValue) {
return AttributeValues.get(item, key).map(AttributeValues::toLong).orElse(defaultValue);
}
public static byte[] getByteArray(Map<String, AttributeValue> item, String key, byte[] defaultValue) {
return AttributeValues.get(item, key).map(AttributeValues::toByteArray).orElse(defaultValue);
}
public static UUID getUUID(Map<String, AttributeValue> item, String key, UUID defaultValue) {
return AttributeValues.get(item, key).map(AttributeValues::toUUID).orElse(defaultValue);
}
}

View File

@ -0,0 +1,41 @@
package org.whispersystems.textsecuregcm.util;
import org.whispersystems.textsecuregcm.configuration.DynamoDbConfiguration;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import java.util.concurrent.Executor;
public class DynamoDbFromConfig {
private static ClientOverrideConfiguration clientOverrideConfiguration(DynamoDbConfiguration config) {
return ClientOverrideConfiguration.builder()
.apiCallTimeout(config.getClientExecutionTimeout())
.apiCallAttemptTimeout(config.getClientRequestTimeout())
.build();
}
public static DynamoDbClient client(DynamoDbConfiguration config, AwsCredentialsProvider credentialsProvider) {
return DynamoDbClient.builder()
.region(Region.of(config.getRegion()))
.credentialsProvider(credentialsProvider)
.overrideConfiguration(clientOverrideConfiguration(config))
.build();
}
public static DynamoDbAsyncClient asyncClient(DynamoDbConfiguration config, AwsCredentialsProvider credentialsProvider, Executor executor) {
DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder()
.region(Region.of(config.getRegion()))
.credentialsProvider(credentialsProvider)
.overrideConfiguration(clientOverrideConfiguration(config));
if (executor != null) {
builder.asyncConfiguration(ClientAsyncConfiguration.builder()
.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
executor)
.build());
}
return builder.build();
}
}

View File

@ -5,6 +5,7 @@
package org.whispersystems.textsecuregcm.util;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.UUID;
@ -26,12 +27,15 @@ public class UUIDUtil {
}
public static UUID fromByteBuffer(final ByteBuffer byteBuffer) {
if (byteBuffer.array().length != 16) {
throw new IllegalArgumentException("unexpected byte array length; was " + byteBuffer.array().length + " but expected 16");
try {
final long mostSigBits = byteBuffer.getLong();
final long leastSigBits = byteBuffer.getLong();
if (byteBuffer.hasRemaining()) {
throw new IllegalArgumentException("unexpected byte array length; was greater than 16");
}
return new UUID(mostSigBits, leastSigBits);
} catch (BufferUnderflowException e) {
throw new IllegalArgumentException("unexpected byte array length; was less than 16");
}
final long mostSigBits = byteBuffer.getLong();
final long leastSigBits = byteBuffer.getLong();
return new UUID(mostSigBits, leastSigBits);
}
}

View File

@ -9,11 +9,6 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.fasterxml.jackson.databind.DeserializationFeature;
import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
@ -59,6 +54,9 @@ import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
import org.whispersystems.textsecuregcm.storage.ReservedUsernames;
import org.whispersystems.textsecuregcm.storage.Usernames;
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfiguration> {
@ -100,64 +98,20 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_delete_user", accountJdbi, configuration.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
ClientResources redisClusterClientResources = ClientResources.builder().build();
AmazonDynamoDBClientBuilder clientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(configuration.getMessageDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) configuration.getMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
AmazonDynamoDBClientBuilder keysDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(configuration.getKeysDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getKeysDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) configuration.getKeysDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
AmazonDynamoDBClientBuilder accountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(configuration.getAccountsDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) configuration.getAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
ThreadPoolExecutor accountsDynamoDbMigrationThreadPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>());
AmazonDynamoDBAsyncClientBuilder accountsDynamoDbAsyncClientBuilder = AmazonDynamoDBAsyncClientBuilder
.standard()
.withRegion(accountsDynamoDbClientBuilder.getRegion())
.withClientConfiguration(accountsDynamoDbClientBuilder.getClientConfiguration())
.withCredentials(accountsDynamoDbClientBuilder.getCredentials())
.withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool);
AmazonDynamoDBClientBuilder migrationDeletedAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
AmazonDynamoDBClientBuilder migrationRetryAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(configuration.getMigrationRetryAccountsDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getMigrationRetryAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) configuration.getMigrationRetryAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
AmazonDynamoDBClientBuilder reportMessageDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(configuration.getReportMessageDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getReportMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) configuration.getReportMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDB messageDynamoDb = new DynamoDB(clientBuilder.build());
DynamoDB preKeysDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
DynamoDB reportMessagesDynamoDb = new DynamoDB(reportMessageDynamoDbClientBuilder.build());
AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build();
AmazonDynamoDBAsync accountsDynamoDbAsyncClient = accountsDynamoDbAsyncClientBuilder.build();
DynamoDbClient reportMessagesDynamoDb = DynamoDbFromConfig.client(configuration.getReportMessageDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient messageDynamoDb = DynamoDbFromConfig.client(configuration.getMessageDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient preKeysDynamoDb = DynamoDbFromConfig.client(configuration.getKeysDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient accountsDynamoDbClient = DynamoDbFromConfig.client(configuration.getAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbAsyncClient accountsDynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(configuration.getAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create(),
accountsDynamoDbMigrationThreadPool);
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration(), redisClusterClientResources);
@ -173,14 +127,16 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(dynamicConfigurationManager);
DynamoDB migrationDeletedAccountsDynamoDb = new DynamoDB(migrationDeletedAccountsDynamoDbClientBuilder.build());
DynamoDB migrationRetryAccountsDynamoDb = new DynamoDB(migrationRetryAccountsDynamoDbClientBuilder.build());
DynamoDbClient migrationDeletedAccountsDynamoDb = DynamoDbFromConfig.client(configuration.getMigrationDeletedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(configuration.getMigrationRetryAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(migrationDeletedAccountsDynamoDb, configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, configuration.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());
Accounts accounts = new Accounts(accountDatabase);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient, accountsDynamoDbMigrationThreadPool, new DynamoDB(accountsDynamoDbClient), configuration.getAccountsDynamoDbConfiguration().getTableName(), configuration.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient, accountsDynamoDbMigrationThreadPool, configuration.getAccountsDynamoDbConfiguration().getTableName(), configuration.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts);
Usernames usernames = new Usernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);

View File

@ -11,26 +11,12 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.Page;
import com.amazonaws.services.dynamodbv2.document.ScanOutcome;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
@ -46,7 +32,22 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.entities.SignedPreKey;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
class AccountsDynamoDbTest {
@ -59,49 +60,75 @@ class AccountsDynamoDbTest {
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
.tableName(ACCOUNTS_TABLE_NAME)
.hashKey(AccountsDynamoDb.KEY_ACCOUNT_UUID)
.attributeDefinition(new AttributeDefinition(AccountsDynamoDb.KEY_ACCOUNT_UUID, ScalarAttributeType.B))
.attributeDefinition(AttributeDefinition.builder()
.attributeName(AccountsDynamoDb.KEY_ACCOUNT_UUID)
.attributeType(ScalarAttributeType.B)
.build())
.build();
private AccountsDynamoDb accountsDynamoDb;
private Table migrationDeletedAccountsTable;
private Table migrationRetryAccountsTable;
@BeforeEach
void setupAccountsDao() {
CreateTableRequest createNumbersTableRequest = CreateTableRequest.builder()
.tableName(NUMBERS_TABLE_NAME)
.keySchema(KeySchemaElement.builder()
.attributeName(AccountsDynamoDb.ATTR_ACCOUNT_E164)
.keyType(KeyType.HASH)
.build())
.attributeDefinitions(AttributeDefinition.builder()
.attributeName(AccountsDynamoDb.ATTR_ACCOUNT_E164)
.attributeType(ScalarAttributeType.S)
.build())
.provisionedThroughput(DynamoDbExtension.DEFAULT_PROVISIONED_THROUGHPUT)
.build();
CreateTableRequest createNumbersTableRequest = new CreateTableRequest()
.withTableName(NUMBERS_TABLE_NAME)
.withKeySchema(new KeySchemaElement(AccountsDynamoDb.ATTR_ACCOUNT_E164, KeyType.HASH))
.withAttributeDefinitions(new AttributeDefinition(AccountsDynamoDb.ATTR_ACCOUNT_E164, ScalarAttributeType.S))
.withProvisionedThroughput(DynamoDbExtension.DEFAULT_PROVISIONED_THROUGHPUT);
dynamoDbExtension.getDynamoDbClient().createTable(createNumbersTableRequest);
final Table numbersTable = dynamoDbExtension.getDynamoDB().createTable(createNumbersTableRequest);
final CreateTableRequest createMigrationDeletedAccountsTableRequest = CreateTableRequest.builder()
.tableName(MIGRATION_DELETED_ACCOUNTS_TABLE_NAME)
.keySchema(KeySchemaElement.builder()
.attributeName(MigrationDeletedAccounts.KEY_UUID)
.keyType(KeyType.HASH)
.build())
.attributeDefinitions(AttributeDefinition.builder()
.attributeName(MigrationDeletedAccounts.KEY_UUID)
.attributeType(ScalarAttributeType.B)
.build())
.provisionedThroughput(DynamoDbExtension.DEFAULT_PROVISIONED_THROUGHPUT)
.build();
final CreateTableRequest createMigrationDeletedAccountsTableRequest = new CreateTableRequest()
.withTableName(MIGRATION_DELETED_ACCOUNTS_TABLE_NAME)
.withKeySchema(new KeySchemaElement(MigrationDeletedAccounts.KEY_UUID, KeyType.HASH))
.withAttributeDefinitions(new AttributeDefinition(MigrationDeletedAccounts.KEY_UUID, ScalarAttributeType.B))
.withProvisionedThroughput(DynamoDbExtension.DEFAULT_PROVISIONED_THROUGHPUT);
dynamoDbExtension.getDynamoDbClient().createTable(createMigrationDeletedAccountsTableRequest);
migrationDeletedAccountsTable = dynamoDbExtension.getDynamoDB().createTable(createMigrationDeletedAccountsTableRequest);
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(
dynamoDbExtension.getDynamoDbClient(), MIGRATION_DELETED_ACCOUNTS_TABLE_NAME);
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(dynamoDbExtension.getDynamoDB(),
migrationDeletedAccountsTable.getTableName());
final CreateTableRequest createMigrationRetryAccountsTableRequest = CreateTableRequest.builder()
.tableName(MIGRATION_RETRY_ACCOUNTS_TABLE_NAME)
.keySchema(KeySchemaElement.builder()
.attributeName(MigrationRetryAccounts.KEY_UUID)
.keyType(KeyType.HASH)
.build())
.attributeDefinitions(AttributeDefinition.builder()
.attributeName(MigrationRetryAccounts.KEY_UUID)
.attributeType(ScalarAttributeType.B)
.build())
.provisionedThroughput(DynamoDbExtension.DEFAULT_PROVISIONED_THROUGHPUT)
.build();
final CreateTableRequest createMigrationRetryAccountsTableRequest = new CreateTableRequest()
.withTableName(MIGRATION_RETRY_ACCOUNTS_TABLE_NAME)
.withKeySchema(new KeySchemaElement(MigrationRetryAccounts.KEY_UUID, KeyType.HASH))
.withAttributeDefinitions(new AttributeDefinition(MigrationRetryAccounts.KEY_UUID, ScalarAttributeType.B))
.withProvisionedThroughput(DynamoDbExtension.DEFAULT_PROVISIONED_THROUGHPUT);
dynamoDbExtension.getDynamoDbClient().createTable(createMigrationRetryAccountsTableRequest);
migrationRetryAccountsTable = dynamoDbExtension.getDynamoDB().createTable(createMigrationRetryAccountsTableRequest);
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts((dynamoDbExtension.getDynamoDbClient()),
MIGRATION_RETRY_ACCOUNTS_TABLE_NAME);
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts((dynamoDbExtension.getDynamoDB()),
migrationRetryAccountsTable.getTableName());
this.accountsDynamoDb = new AccountsDynamoDb(dynamoDbExtension.getClient(), dynamoDbExtension.getAsyncClient(), new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()), dynamoDbExtension.getDynamoDB(), dynamoDbExtension.getTableName(), numbersTable.getTableName(),
migrationDeletedAccounts, migrationRetryAccounts);
this.accountsDynamoDb = new AccountsDynamoDb(
dynamoDbExtension.getDynamoDbClient(),
dynamoDbExtension.getDynamoDbAsyncClient(),
new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()),
dynamoDbExtension.getTableName(),
NUMBERS_TABLE_NAME,
migrationDeletedAccounts,
migrationRetryAccounts);
}
@Test
@ -262,8 +289,12 @@ class AccountsDynamoDbTest {
verifyRecentlyDeletedAccountsTableItemCount(1);
assertThat(migrationDeletedAccountsTable
.getItem(MigrationDeletedAccounts.primaryKey(deletedAccount.getUuid()))).isNotNull();
Map<String, AttributeValue> primaryKey = MigrationDeletedAccounts.primaryKey(deletedAccount.getUuid());
assertThat(dynamoDbExtension.getDynamoDbClient().getItem(GetItemRequest.builder()
.tableName(MIGRATION_DELETED_ACCOUNTS_TABLE_NAME)
.key(Map.of(MigrationDeletedAccounts.KEY_UUID, primaryKey.get(MigrationDeletedAccounts.KEY_UUID)))
.build()))
.isNotNull();
accountsDynamoDb.deleteRecentlyDeletedUuids();
@ -273,8 +304,10 @@ class AccountsDynamoDbTest {
private void verifyRecentlyDeletedAccountsTableItemCount(int expectedItemCount) {
int totalItems = 0;
for (Page<Item, ScanOutcome> page : migrationDeletedAccountsTable.scan(new ScanSpec()).pages()) {
for (Item ignored : page) {
for (ScanResponse page : dynamoDbExtension.getDynamoDbClient().scanPaginator(ScanRequest.builder()
.tableName(MIGRATION_DELETED_ACCOUNTS_TABLE_NAME)
.build())) {
for (Map<String, AttributeValue> item : page.items()) {
totalItems++;
}
}
@ -306,16 +339,15 @@ class AccountsDynamoDbTest {
configuration.setRingBufferSizeInClosedState(2);
configuration.setFailureRateThreshold(50);
final AmazonDynamoDB client = mock(AmazonDynamoDB.class);
final DynamoDB dynamoDB = new DynamoDB(client);
final DynamoDbClient client = mock(DynamoDbClient.class);
when(client.transactWriteItems(any()))
when(client.transactWriteItems(any(TransactWriteItemsRequest.class)))
.thenThrow(RuntimeException.class);
when(client.updateItem(any()))
when(client.updateItem(any(UpdateItemRequest.class)))
.thenThrow(RuntimeException.class);
AccountsDynamoDb accounts = new AccountsDynamoDb(client, mock(AmazonDynamoDBAsync.class), mock(ThreadPoolExecutor.class), dynamoDB, ACCOUNTS_TABLE_NAME, NUMBERS_TABLE_NAME, mock(
AccountsDynamoDb accounts = new AccountsDynamoDb(client, mock(DynamoDbAsyncClient.class), mock(ThreadPoolExecutor.class), ACCOUNTS_TABLE_NAME, NUMBERS_TABLE_NAME, mock(
MigrationDeletedAccounts.class), mock(MigrationRetryAccounts.class));
Account account = generateAccount("+14151112222", UUID.randomUUID());
@ -408,20 +440,22 @@ class AccountsDynamoDbTest {
}
private void verifyStoredState(String number, UUID uuid, Account expecting) {
final Table accounts = dynamoDbExtension.getDynamoDB().getTable(dynamoDbExtension.getTableName());
final DynamoDbClient db = dynamoDbExtension.getDynamoDbClient();
Item item = accounts.getItem(new GetItemSpec()
.withPrimaryKey(AccountsDynamoDb.KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(uuid))
.withConsistentRead(true));
final GetItemResponse get = db.getItem(GetItemRequest.builder()
.tableName(dynamoDbExtension.getTableName())
.key(Map.of(AccountsDynamoDb.KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid)))
.consistentRead(true)
.build());
if (item != null) {
String data = new String(item.getBinary(AccountsDynamoDb.ATTR_ACCOUNT_DATA), StandardCharsets.UTF_8);
if (get.hasItem()) {
String data = new String(get.item().get(AccountsDynamoDb.ATTR_ACCOUNT_DATA).b().asByteArray(), StandardCharsets.UTF_8);
assertThat(data).isNotEmpty();
assertThat(item.getNumber(AccountsDynamoDb.ATTR_MIGRATION_VERSION).intValue())
assertThat(AttributeValues.getInt(get.item(), AccountsDynamoDb.ATTR_MIGRATION_VERSION, -1))
.isEqualTo(expecting.getDynamoDbMigrationVersion());
Account result = AccountsDynamoDb.fromItem(item);
Account result = AccountsDynamoDb.fromItem(get.item());
verifyStoredState(number, uuid, result, expecting);
} else {
throw new AssertionError("No data");

View File

@ -1,33 +1,35 @@
package org.whispersystems.textsecuregcm.storage;
import com.almworks.sqlite4java.SQLite;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.local.main.ServerRunner;
import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import java.net.ServerSocket;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback {
static final String DEFAULT_TABLE_NAME = "test_table";
static final ProvisionedThroughput DEFAULT_PROVISIONED_THROUGHPUT = new ProvisionedThroughput(20L, 20L);
static final ProvisionedThroughput DEFAULT_PROVISIONED_THROUGHPUT = ProvisionedThroughput.builder()
.readCapacityUnits(20L)
.writeCapacityUnits(20L)
.build();
private DynamoDBProxyServer server;
private int port;
@ -42,9 +44,8 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
private final long readCapacityUnits;
private final long writeCapacityUnits;
private AmazonDynamoDB client;
private AmazonDynamoDBAsync asyncClient;
private DynamoDB dynamoDB;
private DynamoDbClient dynamoDB2;
private DynamoDbAsyncClient dynamoAsyncDB2;
private DynamoDbExtension(String tableName, String hashKey, String rangeKey, List<AttributeDefinition> attributeDefinitions, List<GlobalSecondaryIndex> globalSecondaryIndexes, long readCapacityUnits,
long writeCapacityUnits) {
@ -87,26 +88,33 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
KeySchemaElement[] keySchemaElements;
if (rangeKeyName == null) {
keySchemaElements = new KeySchemaElement[] {
new KeySchemaElement(hashKeyName, "HASH"),
KeySchemaElement.builder().attributeName(hashKeyName).keyType(KeyType.HASH).build(),
};
} else {
keySchemaElements = new KeySchemaElement[] {
new KeySchemaElement(hashKeyName, "HASH"),
new KeySchemaElement(rangeKeyName, "RANGE")
KeySchemaElement.builder().attributeName(hashKeyName).keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName(rangeKeyName).keyType(KeyType.RANGE).build(),
};
}
final CreateTableRequest createTableRequest = new CreateTableRequest()
.withTableName(tableName)
.withKeySchema(keySchemaElements)
.withAttributeDefinitions(attributeDefinitions.isEmpty() ? null : attributeDefinitions)
.withGlobalSecondaryIndexes(globalSecondaryIndexes.isEmpty() ? null : globalSecondaryIndexes)
.withProvisionedThroughput(new ProvisionedThroughput(readCapacityUnits, writeCapacityUnits));
final CreateTableRequest createTableRequest = CreateTableRequest.builder()
.tableName(tableName)
.keySchema(keySchemaElements)
.attributeDefinitions(attributeDefinitions.isEmpty() ? null : attributeDefinitions)
.globalSecondaryIndexes(globalSecondaryIndexes.isEmpty() ? null : globalSecondaryIndexes)
.provisionedThroughput(ProvisionedThroughput.builder()
.readCapacityUnits(readCapacityUnits)
.writeCapacityUnits(writeCapacityUnits)
.build())
.build();
getDynamoDB().createTable(createTableRequest);
getDynamoDbClient().createTable(createTableRequest);
}
private void startServer() throws Exception {
// Even though we're using AWS SDK v2, Dynamo's local implementation's canonical location
// is within v1 (https://github.com/aws/aws-sdk-java-v2/issues/982). This does support
// v2 clients, though.
SQLite.setLibraryPath("target/lib"); // if you see a library failed to load error, you need to run mvn test-compile at least once first
ServerSocket serverSocket = new ServerSocket(0);
serverSocket.setReuseAddress(false);
@ -117,18 +125,18 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
}
private void initializeClient() {
AmazonDynamoDBClientBuilder clientBuilder = AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration("http://localhost:" + port, "local-test-region"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey")));
client = clientBuilder.build();
asyncClient = AmazonDynamoDBAsyncClientBuilder.standard()
.withEndpointConfiguration(clientBuilder.getEndpoint())
.withCredentials(clientBuilder.getCredentials())
.build();
dynamoDB = new DynamoDB(client);
dynamoDB2 = DynamoDbClient.builder()
.endpointOverride(URI.create("http://localhost:" + port))
.region(Region.of("local-test-region"))
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create("accessKey", "secretKey")))
.build();
dynamoAsyncDB2 = DynamoDbAsyncClient.builder()
.endpointOverride(URI.create("http://localhost:" + port))
.region(Region.of("local-test-region"))
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create("accessKey", "secretKey")))
.build();
}
static class DynamoDbExtensionBuilder {
@ -140,8 +148,8 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
private List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
private List<GlobalSecondaryIndex> globalSecondaryIndexes = new ArrayList<>();
private long readCapacityUnits = DEFAULT_PROVISIONED_THROUGHPUT.getReadCapacityUnits();
private long writeCapacityUnits = DEFAULT_PROVISIONED_THROUGHPUT.getWriteCapacityUnits();
private long readCapacityUnits = DEFAULT_PROVISIONED_THROUGHPUT.readCapacityUnits();
private long writeCapacityUnits = DEFAULT_PROVISIONED_THROUGHPUT.writeCapacityUnits();
private DynamoDbExtensionBuilder() {
@ -178,16 +186,12 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
}
}
public AmazonDynamoDB getClient() {
return client;
public DynamoDbClient getDynamoDbClient() {
return dynamoDB2;
}
public AmazonDynamoDBAsync getAsyncClient() {
return asyncClient;
}
public DynamoDB getDynamoDB() {
return dynamoDB;
public DynamoDbAsyncClient getDynamoDbAsyncClient() {
return dynamoAsyncDB2;
}
public String getTableName() {

View File

@ -5,13 +5,13 @@
package org.whispersystems.textsecuregcm.storage;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import org.whispersystems.textsecuregcm.tests.util.LocalDynamoDbRule;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
public class KeysDynamoDbRule extends LocalDynamoDbRule {
public static final String TABLE_NAME = "Signal_Keys_Test";
@ -19,18 +19,22 @@ public class KeysDynamoDbRule extends LocalDynamoDbRule {
@Override
protected void before() throws Throwable {
super.before();
final DynamoDB dynamoDB = getDynamoDB();
final CreateTableRequest createTableRequest = new CreateTableRequest()
.withTableName(TABLE_NAME)
.withKeySchema(new KeySchemaElement(KeysDynamoDb.KEY_ACCOUNT_UUID, "HASH"),
new KeySchemaElement(KeysDynamoDb.KEY_DEVICE_ID_KEY_ID, "RANGE"))
.withAttributeDefinitions(new AttributeDefinition(KeysDynamoDb.KEY_ACCOUNT_UUID, ScalarAttributeType.B),
new AttributeDefinition(KeysDynamoDb.KEY_DEVICE_ID_KEY_ID, ScalarAttributeType.B))
.withProvisionedThroughput(new ProvisionedThroughput(20L, 20L));
dynamoDB.createTable(createTableRequest);
getDynamoDbClient().createTable(CreateTableRequest.builder()
.tableName(TABLE_NAME)
.keySchema(
KeySchemaElement.builder().attributeName(KeysDynamoDb.KEY_ACCOUNT_UUID).keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName(KeysDynamoDb.KEY_DEVICE_ID_KEY_ID).keyType(KeyType.RANGE)
.build())
.attributeDefinitions(AttributeDefinition.builder()
.attributeName(KeysDynamoDb.KEY_ACCOUNT_UUID)
.attributeType(ScalarAttributeType.B)
.build(),
AttributeDefinition.builder()
.attributeName(KeysDynamoDb.KEY_DEVICE_ID_KEY_ID)
.attributeType(ScalarAttributeType.B)
.build())
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(20L).writeCapacityUnits(20L).build())
.build());
}
@Override

View File

@ -9,6 +9,7 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.whispersystems.textsecuregcm.entities.PreKey;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.util.Collections;
import java.util.List;
@ -17,6 +18,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -34,7 +36,7 @@ public class KeysDynamoDbTest {
@Before
public void setup() {
keysDynamoDb = new KeysDynamoDb(dynamoDbRule.getDynamoDB(), KeysDynamoDbRule.TABLE_NAME);
keysDynamoDb = new KeysDynamoDb(dynamoDbRule.getDynamoDbClient(), KeysDynamoDbRule.TABLE_NAME);
account = mock(Account.class);
when(account.getNumber()).thenReturn(ACCOUNT_NUMBER);
@ -133,4 +135,10 @@ public class KeysDynamoDbTest {
assertEquals(0, keysDynamoDb.getCount(account, DEVICE_ID));
assertEquals(1, keysDynamoDb.getCount(account, DEVICE_ID + 1));
}
@Test
public void testSortKeyPrefix() {
AttributeValue got = KeysDynamoDb.getSortKeyPrefix(123);
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 123}, got.b().asByteArray());
}
}

View File

@ -9,12 +9,6 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.ItemCollection;
import com.amazonaws.services.dynamodbv2.document.ScanOutcome;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec;
import com.google.protobuf.ByteString;
import io.lettuce.core.cluster.SlotHash;
import java.nio.ByteBuffer;
@ -22,6 +16,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@ -38,6 +33,10 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbRule;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest {
@ -62,7 +61,7 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest {
connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz");
});
final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDB(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7));
final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDbClient(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7));
final AccountsManager accountsManager = mock(AccountsManager.class);
final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
@ -142,17 +141,16 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest {
final List<MessageProtos.Envelope> persistedMessages = new ArrayList<>(messageCount);
DynamoDB dynamoDB = messagesDynamoDbRule.getDynamoDB();
Table table = dynamoDB.getTable(MessagesDynamoDbRule.TABLE_NAME);
final ItemCollection<ScanOutcome> scan = table.scan(new ScanSpec());
for (Item item : scan) {
persistedMessages.add(MessageProtos.Envelope.newBuilder()
.setServerGuid(convertBinaryToUuid(item.getBinary("U")).toString())
.setType(MessageProtos.Envelope.Type.valueOf(item.getInt("T")))
.setTimestamp(item.getLong("TS"))
.setServerTimestamp(extractServerTimestamp(item.getBinary("S")))
.setContent(ByteString.copyFrom(item.getBinary("C")))
.build());
DynamoDbClient dynamoDB = messagesDynamoDbRule.getDynamoDbClient();
for (Map<String, AttributeValue> item : dynamoDB
.scan(ScanRequest.builder().tableName(MessagesDynamoDbRule.TABLE_NAME).build()).items()) {
persistedMessages.add(MessageProtos.Envelope.newBuilder()
.setServerGuid(AttributeValues.getUUID(item, "U", null).toString())
.setType(MessageProtos.Envelope.Type.valueOf(AttributeValues.getInt(item, "T", -1)))
.setTimestamp(AttributeValues.getLong(item, "TS", -1))
.setServerTimestamp(extractServerTimestamp(AttributeValues.getByteArray(item, "S", null)))
.setContent(ByteString.copyFrom(AttributeValues.getByteArray(item, "C", null)))
.build());
}
assertEquals(expectedMessages, persistedMessages);

View File

@ -4,10 +4,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
import java.util.UUID;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
class MigrationDeletedAccountsTest {
@ -15,13 +15,16 @@ class MigrationDeletedAccountsTest {
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
.tableName("deleted_accounts_test")
.hashKey(MigrationDeletedAccounts.KEY_UUID)
.attributeDefinition(new AttributeDefinition(MigrationDeletedAccounts.KEY_UUID, ScalarAttributeType.B))
.attributeDefinition(AttributeDefinition.builder()
.attributeName(MigrationDeletedAccounts.KEY_UUID)
.attributeType(ScalarAttributeType.B)
.build())
.build();
@Test
void test() {
final MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(dynamoDbExtension.getDynamoDB(),
final MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(dynamoDbExtension.getDynamoDbClient(),
dynamoDbExtension.getTableName());
UUID firstUuid = UUID.randomUUID();

View File

@ -2,12 +2,12 @@ package org.whispersystems.textsecuregcm.storage;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
class MigrationRetryAccountsTest {
@ -15,13 +15,16 @@ class MigrationRetryAccountsTest {
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
.tableName("account_migration_errors_test")
.hashKey(MigrationRetryAccounts.KEY_UUID)
.attributeDefinition(new AttributeDefinition(MigrationRetryAccounts.KEY_UUID, ScalarAttributeType.B))
.attributeDefinition(AttributeDefinition.builder()
.attributeName(MigrationRetryAccounts.KEY_UUID)
.attributeType(ScalarAttributeType.B)
.build())
.build();
@Test
void test() {
final MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(dynamoDbExtension.getDynamoDB(),
final MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(dynamoDbExtension.getDynamoDbClient(),
dynamoDbExtension.getTableName());
UUID firstUuid = UUID.randomUUID();

View File

@ -9,8 +9,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
@ -20,6 +18,8 @@ import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
class PushChallengeDynamoDbTest {
@ -34,12 +34,15 @@ class PushChallengeDynamoDbTest {
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
.tableName(TABLE_NAME)
.hashKey(PushChallengeDynamoDb.KEY_ACCOUNT_UUID)
.attributeDefinition(new AttributeDefinition(PushChallengeDynamoDb.KEY_ACCOUNT_UUID, ScalarAttributeType.B))
.attributeDefinition(AttributeDefinition.builder()
.attributeName(PushChallengeDynamoDb.KEY_ACCOUNT_UUID)
.attributeType(ScalarAttributeType.B)
.build())
.build();
@BeforeEach
void setUp() {
this.pushChallengeDynamoDb = new PushChallengeDynamoDb(dynamoDbExtension.getDynamoDB(), TABLE_NAME, Clock.fixed(
this.pushChallengeDynamoDb = new PushChallengeDynamoDb(dynamoDbExtension.getDynamoDbClient(), TABLE_NAME, Clock.fixed(
Instant.ofEpochMilli(CURRENT_TIME_MILLIS), ZoneId.systemDefault()));
}

View File

@ -4,13 +4,13 @@ import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
class ReportMessageDynamoDbTest {
@ -22,13 +22,16 @@ class ReportMessageDynamoDbTest {
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
.tableName(TABLE_NAME)
.hashKey(ReportMessageDynamoDb.KEY_HASH)
.attributeDefinition(new AttributeDefinition(ReportMessageDynamoDb.KEY_HASH, ScalarAttributeType.B))
.attributeDefinition(AttributeDefinition.builder()
.attributeName(ReportMessageDynamoDb.KEY_HASH)
.attributeType(ScalarAttributeType.B)
.build())
.build();
@BeforeEach
void setUp() {
this.reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbExtension.getDynamoDB(), TABLE_NAME);
this.reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbExtension.getDynamoDbClient(), TABLE_NAME);
}
@Test

View File

@ -20,7 +20,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import java.util.HashSet;
@ -49,6 +48,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
class AccountsManagerTest {

View File

@ -67,7 +67,7 @@ public class MessagesDynamoDbTest {
@Before
public void setup() {
messagesDynamoDb = new MessagesDynamoDb(dynamoDbRule.getDynamoDB(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7));
messagesDynamoDb = new MessagesDynamoDb(dynamoDbRule.getDynamoDbClient(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7));
}
@Test

View File

@ -6,16 +6,16 @@
package org.whispersystems.textsecuregcm.tests.util;
import com.almworks.sqlite4java.SQLite;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.local.main.ServerRunner;
import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer;
import org.junit.rules.ExternalResource;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import java.net.ServerSocket;
import java.net.URI;
public class LocalDynamoDbRule extends ExternalResource {
private DynamoDBProxyServer server;
@ -43,11 +43,12 @@ public class LocalDynamoDbRule extends ExternalResource {
super.after();
}
public DynamoDB getDynamoDB() {
AmazonDynamoDBClientBuilder clientBuilder =
AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:" + port, "local-test-region"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey")));
return new DynamoDB(clientBuilder.build());
public DynamoDbClient getDynamoDbClient() {
return DynamoDbClient.builder()
.endpointOverride(URI.create("http://localhost:" + port))
.region(Region.of("local-test-region"))
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create("accessKey", "secretKey")))
.build();
}
}

View File

@ -5,15 +5,15 @@
package org.whispersystems.textsecuregcm.tests.util;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndex;
import com.amazonaws.services.dynamodbv2.model.Projection;
import com.amazonaws.services.dynamodbv2.model.ProjectionType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.LocalSecondaryIndex;
import software.amazon.awssdk.services.dynamodb.model.Projection;
import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
public class MessagesDynamoDbRule extends LocalDynamoDbRule {
@ -22,20 +22,21 @@ public class MessagesDynamoDbRule extends LocalDynamoDbRule {
@Override
protected void before() throws Throwable {
super.before();
DynamoDB dynamoDB = getDynamoDB();
CreateTableRequest createTableRequest = new CreateTableRequest()
.withTableName(TABLE_NAME)
.withKeySchema(new KeySchemaElement("H", "HASH"),
new KeySchemaElement("S", "RANGE"))
.withAttributeDefinitions(new AttributeDefinition("H", ScalarAttributeType.B),
new AttributeDefinition("S", ScalarAttributeType.B),
new AttributeDefinition("U", ScalarAttributeType.B))
.withProvisionedThroughput(new ProvisionedThroughput(20L, 20L))
.withLocalSecondaryIndexes(new LocalSecondaryIndex().withIndexName("Message_UUID_Index")
.withKeySchema(new KeySchemaElement("H", "HASH"),
new KeySchemaElement("U", "RANGE"))
.withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY)));
dynamoDB.createTable(createTableRequest);
getDynamoDbClient().createTable(CreateTableRequest.builder()
.tableName(TABLE_NAME)
.keySchema(KeySchemaElement.builder().attributeName("H").keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName("S").keyType(KeyType.RANGE).build())
.attributeDefinitions(
AttributeDefinition.builder().attributeName("H").attributeType(ScalarAttributeType.B).build(),
AttributeDefinition.builder().attributeName("S").attributeType(ScalarAttributeType.B).build(),
AttributeDefinition.builder().attributeName("U").attributeType(ScalarAttributeType.B).build())
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(20L).writeCapacityUnits(20L).build())
.localSecondaryIndexes(LocalSecondaryIndex.builder().indexName("Message_UUID_Index")
.keySchema(KeySchemaElement.builder().attributeName("H").keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName("U").keyType(KeyType.RANGE).build())
.projection(Projection.builder().projectionType(ProjectionType.KEYS_ONLY).build())
.build())
.build());
}
@Override

View File

@ -12,7 +12,6 @@ import java.io.InputStreamReader;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.Optional;
import java.util.zip.GZIPInputStream;
import static org.junit.jupiter.api.Assertions.*;

View File

@ -0,0 +1,60 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.util;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.*;
public class AttributeValuesTest {
@Test
void testUUIDRoundTrip() {
UUID orig = UUID.randomUUID();
AttributeValue av = AttributeValues.fromUUID(orig);
UUID returned = AttributeValues.getUUID(Map.of("foo", av), "foo", null);
assertEquals(orig, returned);
}
@Test
void testLongRoundTrip() {
long orig = 12345;
AttributeValue av = AttributeValues.fromLong(orig);
long returned = AttributeValues.getLong(Map.of("foo", av), "foo", -1);
assertEquals(orig, returned);
}
@Test
void testIntRoundTrip() {
int orig = 12345;
AttributeValue av = AttributeValues.fromInt(orig);
int returned = AttributeValues.getInt(Map.of("foo", av), "foo", -1);
assertEquals(orig, returned);
}
@Test
void testByteBuffer() {
byte[] bytes = {1, 2, 3};
ByteBuffer bb = ByteBuffer.wrap(bytes);
AttributeValue av = AttributeValues.fromByteBuffer(bb);
byte[] returned = av.b().asByteArray();
assertArrayEquals(bytes, returned);
returned = AttributeValues.getByteArray(Map.of("foo", av), "foo", null);
assertArrayEquals(bytes, returned);
}
@Test
void testByteBuffer2() {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]);
byteBuffer.putLong(123);
assertEquals(byteBuffer.remaining(), 0);
AttributeValue av = AttributeValues.fromByteBuffer(byteBuffer.flip());
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 123}, AttributeValues.getByteArray(Map.of("foo", av), "foo", null));
}
}

View File

@ -79,7 +79,7 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest
executorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), executorService);
messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDB(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7));
messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDbClient(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7));
reportMessageManager = mock(ReportMessageManager.class);
account = mock(Account.class);
device = mock(Device.class);