|
|
|
@ -80,7 +80,7 @@ import software.amazon.awssdk.utils.CompletableFutureUtils;
|
|
|
|
|
* make sure the field is stored in a DDB attribute and then put back into the account object in {@link Accounts#fromItem(Map)}.
|
|
|
|
|
*/
|
|
|
|
|
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
|
|
|
|
public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
public class Accounts {
|
|
|
|
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(Accounts.class);
|
|
|
|
|
|
|
|
|
@ -163,7 +163,8 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
|
|
|
|
|
private final Clock clock;
|
|
|
|
|
|
|
|
|
|
private final DynamoDbAsyncClient asyncClient;
|
|
|
|
|
private final DynamoDbClient dynamoDbClient;
|
|
|
|
|
private final DynamoDbAsyncClient dynamoDbAsyncClient;
|
|
|
|
|
|
|
|
|
|
private final String phoneNumberConstraintTableName;
|
|
|
|
|
private final String phoneNumberIdentifierConstraintTableName;
|
|
|
|
@ -172,11 +173,10 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
private final String usedLinkDeviceTokenTableName;
|
|
|
|
|
private final String accountsTableName;
|
|
|
|
|
|
|
|
|
|
@VisibleForTesting
|
|
|
|
|
public Accounts(
|
|
|
|
|
final Clock clock,
|
|
|
|
|
final DynamoDbClient client,
|
|
|
|
|
final DynamoDbAsyncClient asyncClient,
|
|
|
|
|
final DynamoDbClient dynamoDbClient,
|
|
|
|
|
final DynamoDbAsyncClient dynamoDbAsyncClient,
|
|
|
|
|
final String accountsTableName,
|
|
|
|
|
final String phoneNumberConstraintTableName,
|
|
|
|
|
final String phoneNumberIdentifierConstraintTableName,
|
|
|
|
@ -184,9 +184,9 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
final String deletedAccountsTableName,
|
|
|
|
|
final String usedLinkDeviceTokenTableName) {
|
|
|
|
|
|
|
|
|
|
super(client);
|
|
|
|
|
this.clock = clock;
|
|
|
|
|
this.asyncClient = asyncClient;
|
|
|
|
|
this.dynamoDbClient = dynamoDbClient;
|
|
|
|
|
this.dynamoDbAsyncClient = dynamoDbAsyncClient;
|
|
|
|
|
this.phoneNumberConstraintTableName = phoneNumberConstraintTableName;
|
|
|
|
|
this.phoneNumberIdentifierConstraintTableName = phoneNumberIdentifierConstraintTableName;
|
|
|
|
|
this.accountsTableName = accountsTableName;
|
|
|
|
@ -195,21 +195,6 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
this.usedLinkDeviceTokenTableName = usedLinkDeviceTokenTableName;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Accounts(
|
|
|
|
|
final DynamoDbClient client,
|
|
|
|
|
final DynamoDbAsyncClient asyncClient,
|
|
|
|
|
final String accountsTableName,
|
|
|
|
|
final String phoneNumberConstraintTableName,
|
|
|
|
|
final String phoneNumberIdentifierConstraintTableName,
|
|
|
|
|
final String usernamesConstraintTableName,
|
|
|
|
|
final String deletedAccountsTableName,
|
|
|
|
|
final String usedLinkDeviceTokenTableName) {
|
|
|
|
|
|
|
|
|
|
this(Clock.systemUTC(), client, asyncClient, accountsTableName,
|
|
|
|
|
phoneNumberConstraintTableName, phoneNumberIdentifierConstraintTableName, usernamesConstraintTableName,
|
|
|
|
|
deletedAccountsTableName, usedLinkDeviceTokenTableName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static class UsernameTable {
|
|
|
|
|
// usernameHash; bytes.
|
|
|
|
|
static final String KEY_USERNAME_HASH = Accounts.ATTR_USERNAME_HASH;
|
|
|
|
@ -255,7 +240,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
db().transactWriteItems(request);
|
|
|
|
|
dynamoDbClient.transactWriteItems(request);
|
|
|
|
|
} catch (final TransactionCanceledException e) {
|
|
|
|
|
|
|
|
|
|
final CancellationReason accountCancellationReason = e.cancellationReasons().get(2);
|
|
|
|
@ -384,7 +369,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
writeItems.add(UpdateAccountSpec.forAccount(accountsTableName, accountToCreate).transactItem());
|
|
|
|
|
writeItems.addAll(additionalWriteItems);
|
|
|
|
|
|
|
|
|
|
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(writeItems).build())
|
|
|
|
|
return dynamoDbAsyncClient.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(writeItems).build())
|
|
|
|
|
.thenApply(response -> {
|
|
|
|
|
accountToCreate.setVersion(accountToCreate.getVersion() + 1);
|
|
|
|
|
return (Void) null;
|
|
|
|
@ -478,7 +463,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
.transactItems(writeItems)
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
db().transactWriteItems(request);
|
|
|
|
|
dynamoDbClient.transactWriteItems(request);
|
|
|
|
|
|
|
|
|
|
account.setVersion(account.getVersion() + 1);
|
|
|
|
|
succeeded = true;
|
|
|
|
@ -603,7 +588,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
|
|
|
|
|
writeItems.add(UpdateAccountSpec.forAccount(accountsTableName, updatedAccount).transactItem());
|
|
|
|
|
|
|
|
|
|
return asyncClient
|
|
|
|
|
return dynamoDbAsyncClient
|
|
|
|
|
.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(writeItems).build())
|
|
|
|
|
.thenRun(Util.NOOP)
|
|
|
|
|
.exceptionally(ExceptionUtils.exceptionallyHandler(TransactionCanceledException.class, e -> {
|
|
|
|
@ -788,7 +773,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
holdToRemove.ifPresent(oldHold ->
|
|
|
|
|
writeItems.add(releaseHoldIfAllowedTransactItem(updatedAccount.getUuid(), oldHold, now)));
|
|
|
|
|
|
|
|
|
|
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(writeItems).build())
|
|
|
|
|
return dynamoDbAsyncClient.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(writeItems).build())
|
|
|
|
|
.thenApply(ignored -> updatedAccount);
|
|
|
|
|
})
|
|
|
|
|
.thenApply(updatedAccount -> {
|
|
|
|
@ -829,7 +814,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
|
|
|
|
|
// Otherwise, there's an existing link handle. If this is the result of an account being re-registered, we should
|
|
|
|
|
// preserve the link handle.
|
|
|
|
|
return asyncClient.getItem(GetItemRequest.builder()
|
|
|
|
|
return dynamoDbAsyncClient.getItem(GetItemRequest.builder()
|
|
|
|
|
.tableName(usernamesConstraintTableName)
|
|
|
|
|
.key(Map.of(UsernameTable.KEY_USERNAME_HASH, AttributeValues.b(usernameHash)))
|
|
|
|
|
.projectionExpression(UsernameTable.ATTR_RECLAIMABLE).build())
|
|
|
|
@ -878,7 +863,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
// 2?: Adding that hold may have caused our account to exceed our maximum holds. Release an old hold
|
|
|
|
|
holdToRemove.ifPresent(oldHold -> items.add(releaseHoldIfAllowedTransactItem(updatedAccount.getUuid(), oldHold, now)));
|
|
|
|
|
|
|
|
|
|
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(items).build())
|
|
|
|
|
return dynamoDbAsyncClient.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(items).build())
|
|
|
|
|
.thenAccept(ignored -> {
|
|
|
|
|
account.setUsernameHash(null);
|
|
|
|
|
account.setUsernameLinkDetails(null, null);
|
|
|
|
@ -1009,7 +994,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
.forAccount(accountsTableName, account)
|
|
|
|
|
.updateItemRequest();
|
|
|
|
|
|
|
|
|
|
return asyncClient.updateItem(updateItemRequest)
|
|
|
|
|
return dynamoDbAsyncClient.updateItem(updateItemRequest)
|
|
|
|
|
.thenApply(response -> {
|
|
|
|
|
account.setVersion(AttributeValues.getInt(response.attributes(), "V", account.getVersion() + 1));
|
|
|
|
|
return (Void) null;
|
|
|
|
@ -1059,7 +1044,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
writeItems.add(UpdateAccountSpec.forAccount(accountsTableName, account).transactItem());
|
|
|
|
|
writeItems.addAll(additionalWriteItems);
|
|
|
|
|
|
|
|
|
|
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder()
|
|
|
|
|
return dynamoDbAsyncClient.transactWriteItems(TransactWriteItemsRequest.builder()
|
|
|
|
|
.transactItems(writeItems)
|
|
|
|
|
.build())
|
|
|
|
|
.thenApply(response -> {
|
|
|
|
@ -1187,7 +1172,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Optional<UUID> findRecentlyDeletedAccountIdentifier(final UUID phoneNumberIdentifier) {
|
|
|
|
|
final GetItemResponse response = db().getItem(GetItemRequest.builder()
|
|
|
|
|
final GetItemResponse response = dynamoDbClient.getItem(GetItemRequest.builder()
|
|
|
|
|
.tableName(deletedAccountsTableName)
|
|
|
|
|
.consistentRead(true)
|
|
|
|
|
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_PNI, AttributeValues.fromString(phoneNumberIdentifier.toString())))
|
|
|
|
@ -1197,7 +1182,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Optional<UUID> findRecentlyDeletedPhoneNumberIdentifier(final UUID uuid) {
|
|
|
|
|
final QueryResponse response = db().query(QueryRequest.builder()
|
|
|
|
|
final QueryResponse response = dynamoDbClient.query(QueryRequest.builder()
|
|
|
|
|
.tableName(deletedAccountsTableName)
|
|
|
|
|
.indexName(DELETED_ACCOUNTS_UUID_TO_PNI_INDEX_NAME)
|
|
|
|
|
.keyConditionExpression("#uuid = :uuid")
|
|
|
|
@ -1234,7 +1219,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
|
|
|
|
|
transactWriteItems.addAll(additionalWriteItems);
|
|
|
|
|
|
|
|
|
|
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder()
|
|
|
|
|
return dynamoDbAsyncClient.transactWriteItems(TransactWriteItemsRequest.builder()
|
|
|
|
|
.transactItems(transactWriteItems)
|
|
|
|
|
.build())
|
|
|
|
|
.thenRun(Util.NOOP);
|
|
|
|
@ -1251,7 +1236,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
return Flux.range(0, segments)
|
|
|
|
|
.parallel()
|
|
|
|
|
.runOn(scheduler)
|
|
|
|
|
.flatMap(segment -> asyncClient.scanPaginator(ScanRequest.builder()
|
|
|
|
|
.flatMap(segment -> dynamoDbAsyncClient.scanPaginator(ScanRequest.builder()
|
|
|
|
|
.tableName(accountsTableName)
|
|
|
|
|
.consistentRead(true)
|
|
|
|
|
.segment(segment)
|
|
|
|
@ -1278,7 +1263,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
return Flux.range(0, segments)
|
|
|
|
|
.parallel()
|
|
|
|
|
.runOn(scheduler)
|
|
|
|
|
.flatMap(segment -> asyncClient.scanPaginator(ScanRequest.builder()
|
|
|
|
|
.flatMap(segment -> dynamoDbAsyncClient.scanPaginator(ScanRequest.builder()
|
|
|
|
|
.tableName(deletedAccountsTableName)
|
|
|
|
|
.consistentRead(true)
|
|
|
|
|
.segment(segment)
|
|
|
|
@ -1346,7 +1331,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
|
|
|
|
|
@Nonnull
|
|
|
|
|
private Optional<Map<String, AttributeValue>> itemByKey(final String table, final String keyName, final AttributeValue keyValue) {
|
|
|
|
|
final GetItemResponse response = db().getItem(GetItemRequest.builder()
|
|
|
|
|
final GetItemResponse response = dynamoDbClient.getItem(GetItemRequest.builder()
|
|
|
|
|
.tableName(table)
|
|
|
|
|
.key(Map.of(keyName, keyValue))
|
|
|
|
|
.consistentRead(true)
|
|
|
|
@ -1356,7 +1341,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
|
|
|
|
|
@Nonnull
|
|
|
|
|
private CompletableFuture<Optional<Map<String, AttributeValue>>> itemByKeyAsync(final String table, final String keyName, final AttributeValue keyValue) {
|
|
|
|
|
return asyncClient.getItem(GetItemRequest.builder()
|
|
|
|
|
return dynamoDbAsyncClient.getItem(GetItemRequest.builder()
|
|
|
|
|
.tableName(table)
|
|
|
|
|
.key(Map.of(keyName, keyValue))
|
|
|
|
|
.consistentRead(true)
|
|
|
|
@ -1364,36 +1349,9 @@ public class Accounts extends AbstractDynamoDbStore {
|
|
|
|
|
.thenApply(response -> Optional.ofNullable(response.item()).filter(item -> !item.isEmpty()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Nonnull
|
|
|
|
|
private Optional<Map<String, AttributeValue>> itemByGsiKey(final String table, final String indexName, final String keyName, final AttributeValue keyValue) {
|
|
|
|
|
final QueryResponse response = db().query(QueryRequest.builder()
|
|
|
|
|
.tableName(table)
|
|
|
|
|
.indexName(indexName)
|
|
|
|
|
.keyConditionExpression("#gsiKey = :gsiValue")
|
|
|
|
|
.projectionExpression("#uuid")
|
|
|
|
|
.expressionAttributeNames(Map.of(
|
|
|
|
|
"#gsiKey", keyName,
|
|
|
|
|
"#uuid", KEY_ACCOUNT_UUID))
|
|
|
|
|
.expressionAttributeValues(Map.of(
|
|
|
|
|
":gsiValue", keyValue))
|
|
|
|
|
.build());
|
|
|
|
|
|
|
|
|
|
if (response.count() == 0) {
|
|
|
|
|
return Optional.empty();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (response.count() > 1) {
|
|
|
|
|
throw new IllegalStateException("More than one row located for GSI [%s], key-value pair [%s, %s]"
|
|
|
|
|
.formatted(indexName, keyName, keyValue));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final AttributeValue primaryKeyValue = response.items().get(0).get(KEY_ACCOUNT_UUID);
|
|
|
|
|
return itemByKey(table, KEY_ACCOUNT_UUID, primaryKeyValue);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Nonnull
|
|
|
|
|
private CompletableFuture<Optional<Map<String, AttributeValue>>> itemByGsiKeyAsync(final String table, final String indexName, final String keyName, final AttributeValue keyValue) {
|
|
|
|
|
return asyncClient.query(QueryRequest.builder()
|
|
|
|
|
return dynamoDbAsyncClient.query(QueryRequest.builder()
|
|
|
|
|
.tableName(table)
|
|
|
|
|
.indexName(indexName)
|
|
|
|
|
.keyConditionExpression("#gsiKey = :gsiValue")
|
|
|
|
|