diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java index f74bb41bd..0c6a1589b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java @@ -48,10 +48,6 @@ public class DynamicConfiguration { @Valid private DynamicPushLatencyConfiguration pushLatency = new DynamicPushLatencyConfiguration(Collections.emptyMap()); - @JsonProperty - @Valid - private DynamicUakMigrationConfiguration uakMigrationConfiguration = new DynamicUakMigrationConfiguration(); - @JsonProperty @Valid private DynamicTurnConfiguration turn = new DynamicTurnConfiguration(); @@ -115,10 +111,6 @@ public class DynamicConfiguration { return pushLatency; } - public DynamicUakMigrationConfiguration getUakMigrationConfiguration() { - return uakMigrationConfiguration; - } - public DynamicTurnConfiguration getTurnConfiguration() { return turn; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicUakMigrationConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicUakMigrationConfiguration.java deleted file mode 100644 index 302ba980a..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicUakMigrationConfiguration.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.whispersystems.textsecuregcm.configuration.dynamic; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class DynamicUakMigrationConfiguration { - @JsonProperty - private boolean enabled = true; - - @JsonProperty - private int maxOutstandingNormalizes = 25; - - public boolean isEnabled() { - return enabled; - } - - public int getMaxOutstandingNormalizes() { - return maxOutstandingNormalizes; - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java index d2ed0deac..c902f39d0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java @@ -9,24 +9,19 @@ import static com.codahale.metrics.MetricRegistry.name; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Timer; import java.io.IOException; import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -35,7 +30,6 @@ import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicUakMigrationConfiguration; import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.UUIDUtil; @@ -47,7 +41,6 @@ import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedExce import software.amazon.awssdk.services.dynamodb.model.Delete; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; -import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException; import software.amazon.awssdk.services.dynamodb.model.Put; import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; @@ -78,7 +71,6 @@ public class Accounts extends AbstractDynamoDbStore { // unidentified access key; byte[] or null static final String ATTR_UAK = "UAK"; - private final DynamicConfigurationManager dynamicConfigurationManager; private final DynamoDbClient client; private final DynamoDbAsyncClient asyncClient; @@ -101,11 +93,6 @@ public class Accounts extends AbstractDynamoDbStore { private static final Timer GET_ALL_FROM_START_TIMER = Metrics.timer(name(Accounts.class, "getAllFrom")); private static final Timer GET_ALL_FROM_OFFSET_TIMER = Metrics.timer(name(Accounts.class, "getAllFromOffset")); private static final Timer DELETE_TIMER = Metrics.timer(name(Accounts.class, "delete")); - private static final Timer NORMALIZE_ITEM_TIMER = Metrics.timer(name(Accounts.class, "normalizeItem")); - - private static final Counter UAK_NORMALIZE_SUCCESS_COUNT = Metrics.counter(name(Accounts.class, "normalizeUakSuccess")); - private static final String UAK_NORMALIZE_ERROR_NAME = name(Accounts.class, "normalizeUakError"); - private static final String UAK_NORMALIZE_FAILURE_REASON_TAG_NAME = "reason"; private static final Logger log = LoggerFactory.getLogger(Accounts.class); @@ -116,7 +103,6 @@ public class Accounts extends AbstractDynamoDbStore { final int scanPageSize) { super(client); - this.dynamicConfigurationManager = dynamicConfigurationManager; this.client = client; this.asyncClient = asyncClient; this.phoneNumberConstraintTableName = phoneNumberConstraintTableName; @@ -697,82 +683,10 @@ public class Accounts extends AbstractDynamoDbStore { return toRecord.get().whenComplete((ignoreT, ignoreE) -> timer.record(Duration.between(start, Instant.now()))); } - private List normalizeIfRequired(final List> items) { - - // The UAK top-level attribute may not exist on older records, - // if it is absent and there is a UAK in the account blob we'll - // add the UAK as a top-level attribute - // TODO: Can eliminate this once all uaks exist as top-level attributes - final List allAccounts = new ArrayList<>(); - final List accountsToNormalize = new ArrayList<>(); - for (Map item : items) { - final Account account = fromItem(item); - allAccounts.add(account); - - boolean hasAttrUak = item.containsKey(ATTR_UAK); - if (!hasAttrUak && account.getUnidentifiedAccessKey().isPresent()) { - // the top level uak attribute doesn't exist, but there's a uak in the account - accountsToNormalize.add(account); - } else if (hasAttrUak && account.getUnidentifiedAccessKey().isPresent()) { - final AttributeValue attr = item.get(ATTR_UAK); - final byte[] nestedUak = account.getUnidentifiedAccessKey().get(); - if (!Arrays.equals(attr.b().asByteArray(), nestedUak)) { - log.warn("Discovered mismatch between attribute UAK data UAK, normalizing"); - accountsToNormalize.add(account); - } - } - } - - final DynamicUakMigrationConfiguration currentConfig = this.dynamicConfigurationManager.getConfiguration().getUakMigrationConfiguration(); - if (!currentConfig.isEnabled()) { - log.debug("Account normalization is disabled, skipping normalization for {} accounts", accountsToNormalize.size()); - return allAccounts; - } - - for (List accounts : Lists.partition(accountsToNormalize, currentConfig.getMaxOutstandingNormalizes())) { - try { - final CompletableFuture[] accountFutures = accounts.stream() - .map(account -> record(NORMALIZE_ITEM_TIMER, - () -> this.updateAsync(account).whenComplete((result, throwable) -> { - if (throwable == null) { - UAK_NORMALIZE_SUCCESS_COUNT.increment(); - return; - } - - throwable = unwrap(throwable); - if (throwable instanceof ContestedOptimisticLockException) { - // Could succeed on retry, but just backoff since this is a housekeeping operation - Metrics.counter(UAK_NORMALIZE_ERROR_NAME, - Tags.of(UAK_NORMALIZE_FAILURE_REASON_TAG_NAME, "ContestedOptimisticLock")).increment(); - } else if (throwable instanceof ProvisionedThroughputExceededException) { - Metrics.counter(UAK_NORMALIZE_ERROR_NAME, - Tags.of(UAK_NORMALIZE_FAILURE_REASON_TAG_NAME, "ProvisionedThroughPutExceeded")) - .increment(); - } else { - log.warn("Failed to normalize account, skipping", throwable); - Metrics.counter(UAK_NORMALIZE_ERROR_NAME, - Tags.of(UAK_NORMALIZE_FAILURE_REASON_TAG_NAME, "unknown")) - .increment(); - } - })).toCompletableFuture()).toArray(CompletableFuture[]::new); - - // wait for a futures in batch to complete - CompletableFuture - .allOf(accountFutures) - // exceptions handled in individual futures - .exceptionally(e -> null) - .join(); - } catch (Exception e) { - log.warn("Failed to update batch of {} accounts, skipping", accounts.size(), e); - } - } - return allAccounts; - } - private AccountCrawlChunk scanForChunk(final ScanRequest.Builder scanRequestBuilder, final int maxCount, final Timer timer) { scanRequestBuilder.tableName(accountsTableName); final List> items = timer.record(() -> scan(scanRequestBuilder.build(), maxCount)); - final List accounts = normalizeIfRequired(items); + final List accounts = items.stream().map(Accounts::fromItem).toList(); return new AccountCrawlChunk(accounts, accounts.size() > 0 ? accounts.get(accounts.size() - 1).getUuid() : null); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java index 35e93bdc1..f1e2a8c83 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java @@ -20,7 +20,6 @@ import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -28,8 +27,6 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.jdbi.v3.core.transaction.TransactionException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -46,7 +43,6 @@ import org.whispersystems.textsecuregcm.util.SystemMapper; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; -import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; @@ -781,164 +777,6 @@ class AccountsTest { assertThat(account.getUsername()).hasValueSatisfying(u -> assertThat(u).isEqualTo(username)); } - @Test - void testAddUakMissingInJson() { - // If there's no uak in the json, we shouldn't add an attribute on crawl - final UUID accountIdentifier = UUID.randomUUID(); - - final Account account = generateAccount("+18005551234", accountIdentifier, UUID.randomUUID()); - account.setUnidentifiedAccessKey(null); - accounts.create(account); - - // there should be no top level uak - Map item = dynamoDbExtension.getDynamoDbClient() - .getItem(GetItemRequest.builder() - .tableName(ACCOUNTS_TABLE_NAME) - .key(Map.of(Accounts.KEY_ACCOUNT_UUID, AttributeValues.fromUUID(accountIdentifier))) - .consistentRead(true) - .build()).item(); - assertThat(item).doesNotContainKey(Accounts.ATTR_UAK); - - // crawling should return 1 account - final AccountCrawlChunk allFromStart = accounts.getAllFromStart(1); - assertThat(allFromStart.getAccounts()).hasSize(1); - assertThat(allFromStart.getAccounts().get(0).getUuid()).isEqualTo(accountIdentifier); - - // there should still be no top level uak - item = dynamoDbExtension.getDynamoDbClient() - .getItem(GetItemRequest.builder() - .tableName(ACCOUNTS_TABLE_NAME) - .key(Map.of(Accounts.KEY_ACCOUNT_UUID, AttributeValues.fromUUID(accountIdentifier))) - .consistentRead(true) - .build()).item(); - assertThat(item).doesNotContainKey(Accounts.ATTR_UAK); - } - - @Test - void testUakMismatch() { - // If there's a UAK mismatch, we should correct it - final UUID accountIdentifier = UUID.randomUUID(); - - final Account account = generateAccount("+18005551234", accountIdentifier, UUID.randomUUID()); - accounts.create(account); - - // set the uak to garbage in the attributes - dynamoDbExtension.getDynamoDbClient().updateItem(UpdateItemRequest.builder() - .tableName(ACCOUNTS_TABLE_NAME) - .key(Map.of(Accounts.KEY_ACCOUNT_UUID, AttributeValues.fromUUID(accountIdentifier))) - .expressionAttributeNames(Map.of("#uak", Accounts.ATTR_UAK)) - .expressionAttributeValues(Map.of(":uak", AttributeValues.fromByteArray("bad-uak".getBytes()))) - .updateExpression("SET #uak = :uak").build()); - - // crawling should return 1 account and fix the uak mismatch - final AccountCrawlChunk allFromStart = accounts.getAllFromStart(1); - assertThat(allFromStart.getAccounts()).hasSize(1); - assertThat(allFromStart.getAccounts().get(0).getUuid()).isEqualTo(accountIdentifier); - assertThat(allFromStart.getAccounts().get(0).getUnidentifiedAccessKey().get()).isEqualTo(account.getUnidentifiedAccessKey().get()); - - // the top level uak should be the original - final Map item = dynamoDbExtension.getDynamoDbClient() - .getItem(GetItemRequest.builder() - .tableName(ACCOUNTS_TABLE_NAME) - .key(Map.of(Accounts.KEY_ACCOUNT_UUID, AttributeValues.fromUUID(accountIdentifier))) - .consistentRead(true) - .build()).item(); - assertThat(item).containsEntry( - Accounts.ATTR_UAK, - AttributeValues.fromByteArray(account.getUnidentifiedAccessKey().get())); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testAddMissingUakAttribute(boolean normalizeDisabled) throws JsonProcessingException { - final UUID accountIdentifier = UUID.randomUUID(); - - if (normalizeDisabled) { - final DynamicConfiguration config = DynamicConfigurationManager.parseConfiguration(""" - captcha: - scoreFloor: 1.0 - uakMigrationConfiguration: - enabled: false - """, DynamicConfiguration.class).orElseThrow(); - when(mockDynamicConfigManager.getConfiguration()).thenReturn(config); - } - - final Account account = generateAccount("+18005551234", accountIdentifier, UUID.randomUUID()); - accounts.create(account); - - // remove the top level uak (simulates old format) - dynamoDbExtension.getDynamoDbClient().updateItem(UpdateItemRequest.builder() - .tableName(ACCOUNTS_TABLE_NAME) - .key(Map.of(Accounts.KEY_ACCOUNT_UUID, AttributeValues.fromUUID(accountIdentifier))) - .expressionAttributeNames(Map.of("#uak", Accounts.ATTR_UAK)) - .updateExpression("REMOVE #uak").build()); - - // crawling should return 1 account, and fix the discrepancy between - // the json blob and the top level attributes if normalization is enabled - final AccountCrawlChunk allFromStart = accounts.getAllFromStart(1); - assertThat(allFromStart.getAccounts()).hasSize(1); - assertThat(allFromStart.getAccounts().get(0).getUuid()).isEqualTo(accountIdentifier); - - // check whether normalization happened - final Map item = dynamoDbExtension.getDynamoDbClient() - .getItem(GetItemRequest.builder() - .tableName(ACCOUNTS_TABLE_NAME) - .key(Map.of(Accounts.KEY_ACCOUNT_UUID, AttributeValues.fromUUID(accountIdentifier))) - .consistentRead(true) - .build()).item(); - if (normalizeDisabled) { - assertThat(item).doesNotContainKey(Accounts.ATTR_UAK); - } else { - assertThat(item).containsEntry(Accounts.ATTR_UAK, - AttributeValues.fromByteArray(account.getUnidentifiedAccessKey().get())); - } - } - - @ParameterizedTest - @ValueSource(ints = {24, 25, 26, 101}) - void testAddMissingUakAttributeBatched(int n) { - // generate N + 5 accounts - List allAccounts = IntStream.range(0, n + 5) - .mapToObj(i -> generateAccount(String.format("+1800555%04d", i), UUID.randomUUID(), UUID.randomUUID())) - .collect(Collectors.toList()); - allAccounts.forEach(accounts::create); - - // delete the UAK on n of them - Collections.shuffle(allAccounts); - allAccounts.stream().limit(n).forEach(account -> - dynamoDbExtension.getDynamoDbClient().updateItem(UpdateItemRequest.builder() - .tableName(ACCOUNTS_TABLE_NAME) - .key(Map.of(Accounts.KEY_ACCOUNT_UUID, AttributeValues.fromUUID(account.getUuid()))) - .expressionAttributeNames(Map.of("#uak", Accounts.ATTR_UAK)) - .updateExpression("REMOVE #uak") - .build())); - - // crawling should fix the discrepancy between - // the json blob and the top level attributes - AccountCrawlChunk chunk = accounts.getAllFromStart(7); - long verifiedCount = 0; - while (true) { - for (Account account : chunk.getAccounts()) { - // check that the attribute now exists at top level - final Map item = dynamoDbExtension.getDynamoDbClient() - .getItem(GetItemRequest.builder() - .tableName(ACCOUNTS_TABLE_NAME) - .key(Map.of(Accounts.KEY_ACCOUNT_UUID, AttributeValues.fromUUID(account.getUuid()))) - .consistentRead(true) - .build()).item(); - assertThat(item).containsEntry(Accounts.ATTR_UAK, - AttributeValues.fromByteArray(account.getUnidentifiedAccessKey().get())); - verifiedCount++; - } - if (chunk.getLastUuid().isPresent()) { - chunk = accounts.getAllFrom(chunk.getLastUuid().get(), 7); - } else { - break; - } - } - assertThat(verifiedCount).isEqualTo(n + 5); - } - private Device generateDevice(long id) { return DevicesHelper.createDevice(id); }