From 5a88ff0811fbb787a248d39f8082edee3f6c4e59 Mon Sep 17 00:00:00 2001 From: Ravi Khadiwala Date: Wed, 9 Mar 2022 23:28:55 -0600 Subject: [PATCH] Use the async dynamo client to batch uak updates --- .../textsecuregcm/WhisperServerService.java | 4 +- .../DynamicUakMigrationConfiguration.java | 7 + .../textsecuregcm/storage/Accounts.java | 174 ++++++++++++------ .../workers/AssignUsernameCommand.java | 4 +- .../workers/DeleteUserCommand.java | 4 +- .../SetUserDiscoverabilityCommand.java | 4 +- ...ntsManagerChangeNumberIntegrationTest.java | 1 + ...ConcurrentModificationIntegrationTest.java | 1 + .../textsecuregcm/storage/AccountsTest.java | 64 ++++++- 9 files changed, 190 insertions(+), 73 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 3bdf62676..9d8ad87b1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -339,7 +339,9 @@ public class WhisperServerService extends Application dynamicConfigurationManager; + private final DynamicConfigurationManager dynamicConfigurationManager; private final DynamoDbClient client; + private final DynamoDbAsyncClient asyncClient; private final String phoneNumberConstraintTableName; private final String phoneNumberIdentifierConstraintTableName; @@ -96,18 +105,21 @@ public class Accounts extends AbstractDynamoDbStore { private static final Timer NORMALIZE_ITEM_TIMER = Metrics.timer(name(Accounts.class, "normalizeItem")); private static final Counter UAK_NORMALIZE_SUCCESS_COUNT = Metrics.counter(name(Accounts.class, "normalizeUakSuccess")); - private static final Counter UAK_NORMALIZE_ERROR_COUNT = Metrics.counter(name(Accounts.class, "normalizeUakError")); + private static final String UAK_NORMALIZE_ERROR_NAME = name(Accounts.class, "normalizeUakError"); + private static final String UAK_NORMALIZE_FAILURE_REASON_TAG_NAME = "reason"; private static final Logger log = LoggerFactory.getLogger(Accounts.class); public Accounts(final DynamicConfigurationManager dynamicConfigurationManager, - DynamoDbClient client, String accountsTableName, String phoneNumberConstraintTableName, + DynamoDbClient client, DynamoDbAsyncClient asyncClient, + String accountsTableName, String phoneNumberConstraintTableName, String phoneNumberIdentifierConstraintTableName, final String usernamesConstraintTableName, final int scanPageSize) { super(client); this.dynamicConfigurationManager = dynamicConfigurationManager; this.client = client; + this.asyncClient = asyncClient; this.phoneNumberConstraintTableName = phoneNumberConstraintTableName; this.phoneNumberIdentifierConstraintTableName = phoneNumberIdentifierConstraintTableName; this.accountsTableName = accountsTableName; @@ -469,10 +481,19 @@ public class Accounts extends AbstractDynamoDbStore { }); } - public void update(Account account) throws ContestedOptimisticLockException { - UPDATE_TIMER.record(() -> { - final UpdateItemRequest updateItemRequest; + /** + * Extract the cause from a CompletionException + */ + private static Throwable unwrap(Throwable throwable) { + while (throwable instanceof CompletionException e && throwable.getCause() != null) { + throwable = e.getCause(); + } + return throwable; + } + public CompletionStage updateAsync(Account account) { + return record(UPDATE_TIMER, () -> { + final UpdateItemRequest updateItemRequest; try { // username, e164, and pni cannot be modified through this method Map attrNames = new HashMap<>(Map.of( @@ -508,21 +529,41 @@ public class Accounts extends AbstractDynamoDbStore { throw new IllegalArgumentException(e); } - try { - final UpdateItemResponse response = client.updateItem(updateItemRequest); - account.setVersion(AttributeValues.getInt(response.attributes(), "V", account.getVersion() + 1)); - } catch (final TransactionConflictException e) { - - throw new ContestedOptimisticLockException(); - - } catch (final ConditionalCheckFailedException e) { - // the exception doesn't give details about which condition failed, - // but we can infer it was an optimistic locking failure if the UUID is known - throw getByAccountIdentifier(account.getUuid()).isPresent() ? new ContestedOptimisticLockException() : e; - } + return asyncClient.updateItem(updateItemRequest) + .thenApply(response -> { + account.setVersion(AttributeValues.getInt(response.attributes(), "V", account.getVersion() + 1)); + return (Void) null; + }) + .exceptionally(throwable -> { + final Throwable unwrapped = unwrap(throwable); + if (unwrapped instanceof TransactionConflictException) { + throw new ContestedOptimisticLockException(); + } else if (unwrapped instanceof ConditionalCheckFailedException e) { + // the exception doesn't give details about which condition failed, + // but we can infer it was an optimistic locking failure if the UUID is known + throw getByAccountIdentifier(account.getUuid()).isPresent() ? new ContestedOptimisticLockException() : e; + } else { + // rethrow + throw CompletableFutureUtils.errorAsCompletionException(throwable); + } + }); }); } + public void update(Account account) throws ContestedOptimisticLockException { + try { + this.updateAsync(account).toCompletableFuture().join(); + } catch (CompletionException e) { + // unwrap CompletionExceptions, throw as long is it's unchecked + Throwables.throwIfUnchecked(unwrap(e)); + + // if we otherwise somehow got a wrapped checked exception, + // rethrow the checked exception wrapped by the original CompletionException + log.error("Unexpected checked exception thrown from dynamo update", e); + throw e; + } + } + public Optional getByE164(String number) { return GET_BY_NUMBER_TIMER.record(() -> { @@ -641,6 +682,11 @@ public class Accounts extends AbstractDynamoDbStore { return scanForChunk(scanRequestBuilder, maxCount, GET_ALL_FROM_START_TIMER); } + private static CompletionStage record(final Timer timer, Supplier> toRecord) { + final Instant start = Instant.now(); + return toRecord.get().whenComplete((ignoreT, ignoreE) -> timer.record(Duration.between(start, Instant.now()))); + } + private List normalizeIfRequired(final List> items) { // The UAK top-level attribute may not exist on older records, @@ -653,52 +699,62 @@ public class Accounts extends AbstractDynamoDbStore { final Account account = fromItem(item); allAccounts.add(account); - if (!item.containsKey(ATTR_UAK) && account.getUnidentifiedAccessKey().isPresent()) { + boolean hasAttrUak = item.containsKey(ATTR_UAK); + if (!hasAttrUak && account.getUnidentifiedAccessKey().isPresent()) { // the top level uak attribute doesn't exist, but there's a uak in the account accountsToNormalize.add(account); + } else if (hasAttrUak && account.getUnidentifiedAccessKey().isPresent()) { + final AttributeValue attr = item.get(ATTR_UAK); + final byte[] nestedUak = account.getUnidentifiedAccessKey().get(); + if (!Arrays.equals(attr.b().asByteArray(), nestedUak)) { + log.warn("Discovered mismatch between attribute UAK data UAK, normalizing"); + accountsToNormalize.add(account); + } } } - if (!this.dynamicConfigurationManager.getConfiguration().getUakMigrationConfiguration().isEnabled()) { + final DynamicUakMigrationConfiguration currentConfig = this.dynamicConfigurationManager.getConfiguration().getUakMigrationConfiguration(); + if (!currentConfig.isEnabled()) { log.debug("Account normalization is disabled, skipping normalization for {} accounts", accountsToNormalize.size()); return allAccounts; } - final int BATCH_SIZE = 25; // dynamodb max batch size - final String updateUakStatement = String.format("UPDATE %s SET %s = ? WHERE %s = ?", accountsTableName, ATTR_UAK, KEY_ACCOUNT_UUID); - for (List toNormalize : Lists.partition(accountsToNormalize, BATCH_SIZE)) { - NORMALIZE_ITEM_TIMER.record(() -> { - try { - final List updateStatements = toNormalize.stream() - .map(account -> BatchStatementRequest.builder() - .statement(updateUakStatement) - .parameters( - AttributeValues.fromByteArray(account.getUnidentifiedAccessKey().get()), - AttributeValues.fromUUID(account.getUuid())) - .build()) - .toList(); + for (List accounts : Lists.partition(accountsToNormalize, currentConfig.getMaxOutstandingNormalizes())) { + try { + final CompletableFuture[] accountFutures = accounts.stream() + .map(account -> record(NORMALIZE_ITEM_TIMER, + () -> this.updateAsync(account).whenComplete((result, throwable) -> { + if (throwable == null) { + UAK_NORMALIZE_SUCCESS_COUNT.increment(); + return; + } - final BatchExecuteStatementResponse result = client.batchExecuteStatement(BatchExecuteStatementRequest - .builder() - .statements(updateStatements) - .build()); + throwable = unwrap(throwable); + if (throwable instanceof ContestedOptimisticLockException) { + // Could succeed on retry, but just backoff since this is a housekeeping operation + Metrics.counter(UAK_NORMALIZE_ERROR_NAME, + Tags.of(UAK_NORMALIZE_FAILURE_REASON_TAG_NAME, "ContestedOptimisticLock")).increment(); + } else if (throwable instanceof ProvisionedThroughputExceededException) { + Metrics.counter(UAK_NORMALIZE_ERROR_NAME, + Tags.of(UAK_NORMALIZE_FAILURE_REASON_TAG_NAME, "ProvisionedThroughPutExceeded")) + .increment(); + } else { + log.warn("Failed to normalize account, skipping", throwable); + Metrics.counter(UAK_NORMALIZE_ERROR_NAME, + Tags.of(UAK_NORMALIZE_FAILURE_REASON_TAG_NAME, "unknown")) + .increment(); + } + })).toCompletableFuture()).toArray(CompletableFuture[]::new); - final Map errors = result.responses().stream() - .map(BatchStatementResponse::error) - .filter(e -> e != null) - .collect(Collectors.groupingBy(BatchStatementError::codeAsString, Collectors.counting())); - - final long errorCount = errors.values().stream().mapToLong(Long::longValue).sum(); - UAK_NORMALIZE_SUCCESS_COUNT.increment(toNormalize.size() - errorCount); - UAK_NORMALIZE_ERROR_COUNT.increment(errorCount); - if (!errors.isEmpty()) { - log.warn("Failed to normalize account uaks in batch of {}, error codes: {}", toNormalize.size(), errors); - } - } catch (final Exception e) { - UAK_NORMALIZE_ERROR_COUNT.increment(toNormalize.size()); - log.warn("Failed to normalize accounts in a batch of {}", toNormalize.size(), e); - } - }); + // wait for a futures in batch to complete + CompletableFuture + .allOf(accountFutures) + // exceptions handled in individual futures + .exceptionally(e -> null) + .join(); + } catch (Exception e) { + log.warn("Failed to update batch of {} accounts, skipping", accounts.size(), e); + } } return allAccounts; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java index b2d1a6768..c06f34fee 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java @@ -135,7 +135,9 @@ public class AssignUsernameCommand extends EnvironmentCommand item = dynamoDbExtension.getDynamoDbClient() + .getItem(GetItemRequest.builder() + .tableName(ACCOUNTS_TABLE_NAME) + .key(Map.of(Accounts.KEY_ACCOUNT_UUID, AttributeValues.fromUUID(accountIdentifier))) + .consistentRead(true) + .build()).item(); + assertThat(item).containsEntry( + Accounts.ATTR_UAK, + AttributeValues.fromByteArray(account.getUnidentifiedAccessKey().get())); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testAddMissingUakAttribute(boolean normalizeDisabled) throws JsonProcessingException {