diff --git a/service/pom.xml b/service/pom.xml index 936c91dc1..b584a1394 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -283,6 +283,17 @@ com.amazonaws aws-java-sdk-s3 + + com.amazonaws + dynamodb-lock-client + 1.1.0 + + + commons-logging + commons-logging + + + redis.clients diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index de63cbb94..4f5a671c7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -163,6 +163,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private DeletedAccountsDynamoDbConfiguration deletedAccountsDynamoDb; + @Valid + @NotNull + @JsonProperty + private DynamoDbConfiguration deletedAccountsLockDynamoDb; + @Valid @NotNull @JsonProperty @@ -391,6 +396,10 @@ public class WhisperServerConfiguration extends Configuration { return deletedAccountsDynamoDb; } + public DynamoDbConfiguration getDeletedAccountsLockDynamoDbConfiguration() { + return deletedAccountsLockDynamoDb; + } + public DatabaseConfiguration getAbuseDatabaseConfiguration() { return abuseDatabase; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 93292ce03..5a59c2bf0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -6,6 +6,10 @@ package org.whispersystems.textsecuregcm; import static com.codahale.metrics.MetricRegistry.name; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.jdbi3.strategies.DefaultNameStrategy; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -153,6 +157,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.ActiveUserCounter; import org.whispersystems.textsecuregcm.storage.DeletedAccounts; import org.whispersystems.textsecuregcm.storage.DeletedAccountsDirectoryReconciler; +import org.whispersystems.textsecuregcm.storage.DeletedAccountsManager; import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawler; import org.whispersystems.textsecuregcm.storage.DirectoryReconciler; import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient; @@ -347,6 +352,13 @@ public class WhisperServerService extends Application deleteStorageServiceDataFuture = secureStorageClient.deleteStoredData(account.getUuid()); final CompletableFuture deleteBackupServiceDataFuture = secureBackupClient.deleteBackups(account.getUuid()); @@ -340,9 +340,9 @@ public class AccountsManager { } } - deletedAccounts.put(account.getUuid(), account.getNumber()); + deletedAccountsManager.put(account.getUuid(), account.getNumber()); - } catch (final Exception e) { + } catch (final RuntimeException | InterruptedException e) { logger.warn("Failed to delete account", e); Metrics.counter(DELETE_ERROR_COUNTER_NAME, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccounts.java index 8980e25f7..a0a9cfc69 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccounts.java @@ -6,13 +6,24 @@ package org.whispersystems.textsecuregcm.storage; import java.time.Duration; import java.time.Instant; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.Pair; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; @@ -27,6 +38,9 @@ public class DeletedAccounts extends AbstractDynamoDbStore { static final Duration TIME_TO_LIVE = Duration.ofDays(30); + // Note that this limit is imposed by DynamoDB itself; going above 100 will result in errors + static final int GET_BATCH_SIZE = 100; + private final String tableName; private final String needsReconciliationIndexName; @@ -37,7 +51,7 @@ public class DeletedAccounts extends AbstractDynamoDbStore { this.needsReconciliationIndexName = needsReconciliationIndexName; } - public void put(UUID uuid, String e164) { + void put(UUID uuid, String e164) { db().putItem(PutItemRequest.builder() .tableName(tableName) .item(Map.of( @@ -48,7 +62,7 @@ public class DeletedAccounts extends AbstractDynamoDbStore { .build()); } - public List> listAccountsToReconcile(final int max) { + List> listAccountsToReconcile(final int max) { final ScanRequest scanRequest = ScanRequest.builder() .tableName(tableName) @@ -64,7 +78,42 @@ public class DeletedAccounts extends AbstractDynamoDbStore { .collect(Collectors.toList()); } - public void markReconciled(final List phoneNumbersReconciled) { + Set getAccountsNeedingReconciliation(final Collection e164s) { + final Queue> pendingKeys = e164s.stream() + .map(e164 -> Map.of(KEY_ACCOUNT_E164, AttributeValues.fromString(e164))) + .collect(Collectors.toCollection(() -> new ArrayDeque<>(e164s.size()))); + + final Set accountsNeedingReconciliation = new HashSet<>(e164s.size()); + final List> batchKeys = new ArrayList<>(GET_BATCH_SIZE); + + while (!pendingKeys.isEmpty()) { + batchKeys.clear(); + + for (int i = 0; i < GET_BATCH_SIZE && !pendingKeys.isEmpty(); i++) { + batchKeys.add(pendingKeys.remove()); + } + + final BatchGetItemResponse response = db().batchGetItem(BatchGetItemRequest.builder() + .requestItems(Map.of(tableName, KeysAndAttributes.builder() + .consistentRead(true) + .keys(batchKeys) + .build())) + .build()); + + response.responses().getOrDefault(tableName, Collections.emptyList()).stream() + .filter(attributes -> AttributeValues.getInt(attributes, ATTR_NEEDS_CDS_RECONCILIATION, 0) == 1) + .map(attributes -> AttributeValues.getString(attributes, KEY_ACCOUNT_E164, null)) + .forEach(accountsNeedingReconciliation::add); + + if (response.hasUnprocessedKeys() && response.unprocessedKeys().containsKey(tableName)) { + pendingKeys.addAll(response.unprocessedKeys().get(tableName).keys()); + } + } + + return accountsNeedingReconciliation; + } + + void markReconciled(final Collection phoneNumbersReconciled) { phoneNumbersReconciled.forEach(number -> db().updateItem( UpdateItemRequest.builder() diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsManager.java new file mode 100644 index 000000000..0ed64cf6b --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsManager.java @@ -0,0 +1,120 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import com.amazonaws.services.dynamodbv2.AcquireLockOptions; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions; +import com.amazonaws.services.dynamodbv2.LockItem; +import com.amazonaws.services.dynamodbv2.ReleaseLockOptions; +import com.amazonaws.services.dynamodbv2.model.LockCurrentlyUnavailableException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.util.Pair; + +public class DeletedAccountsManager { + + private final DeletedAccounts deletedAccounts; + + private final AmazonDynamoDBLockClient lockClient; + + private static final Logger log = LoggerFactory.getLogger(DeletedAccountsManager.class); + + @FunctionalInterface + public interface DeletedAccountReconciliationConsumer { + + /** + * Reconcile a list of deleted account records. + * + * @param deletedAccounts the account records to reconcile + * @return a list of account records that were successfully reconciled; accounts that were not successfully + * reconciled may be retried later + * @throws ChunkProcessingFailedException in the event of an error while processing the batch of account records + */ + Collection reconcile(List> deletedAccounts) throws ChunkProcessingFailedException; + } + + public DeletedAccountsManager(final DeletedAccounts deletedAccounts, final AmazonDynamoDB lockDynamoDb, final String lockTableName) { + this.deletedAccounts = deletedAccounts; + + lockClient = new AmazonDynamoDBLockClient( + AmazonDynamoDBLockClientOptions.builder(lockDynamoDb, lockTableName) + .withPartitionKeyName(DeletedAccounts.KEY_ACCOUNT_E164) + .withLeaseDuration(15L) + .withHeartbeatPeriod(2L) + .withTimeUnit(TimeUnit.SECONDS) + .withCreateHeartbeatBackgroundThread(true) + .build()); + } + + public void put(final UUID uuid, final String e164) throws InterruptedException { + withLock(e164, () -> deletedAccounts.put(uuid, e164)); + } + + private void withLock(final String e164, final Runnable task) throws InterruptedException { + final LockItem lockItem = lockClient.acquireLock(AcquireLockOptions.builder(e164) + .withAcquireReleasedLocksConsistently(true) + .build()); + + try { + task.run(); + } finally { + lockClient.releaseLock(ReleaseLockOptions.builder(lockItem) + .withBestEffort(true) + .build()); + } + } + + public void lockAndReconcileAccounts(final int max, final DeletedAccountReconciliationConsumer consumer) throws ChunkProcessingFailedException { + final List lockItems = new ArrayList<>(); + final List> reconciliationCandidates = deletedAccounts.listAccountsToReconcile(max).stream() + .filter(pair -> { + boolean lockAcquired = false; + + try { + lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(pair.second()) + .withAcquireReleasedLocksConsistently(true) + .withShouldSkipBlockingWait(true) + .build())); + + lockAcquired = true; + } catch (final InterruptedException e) { + log.warn("Interrupted while acquiring lock for reconciliation", e); + } catch (final LockCurrentlyUnavailableException ignored) { + } + + return lockAcquired; + }) + .collect(Collectors.toList()); + + assert lockItems.size() == reconciliationCandidates.size(); + + // A deleted account's status may have changed in the time between getting a list of candidates and acquiring a lock + // on the candidate records. Now that we hold the lock, check which of the candidates still need to be reconciled. + final Set numbersNeedingReconciliationAfterLock = + deletedAccounts.getAccountsNeedingReconciliation(reconciliationCandidates.stream() + .map(Pair::second) + .collect(Collectors.toList())); + + final List> accountsToReconcile = reconciliationCandidates.stream() + .filter(candidate -> numbersNeedingReconciliationAfterLock.contains(candidate.second())) + .collect(Collectors.toList()); + + try { + deletedAccounts.markReconciled(consumer.reconcile(accountsToReconcile)); + } finally { + lockItems.forEach(lockItem -> lockClient.releaseLock(ReleaseLockOptions.builder(lockItem).withBestEffort(true).build())); + } + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawler.java index 5794dc53b..6e905ba3f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawler.java @@ -6,7 +6,6 @@ package org.whispersystems.textsecuregcm.storage; import static com.codahale.metrics.MetricRegistry.name; -import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Metrics; import java.io.IOException; import java.time.Duration; @@ -27,42 +26,41 @@ public class DeletedAccountsTableCrawler extends ManagedPeriodicWork { private static final int MAX_BATCH_SIZE = 5_000; private static final String BATCH_SIZE_DISTRIBUTION_NAME = name(DeletedAccountsTableCrawler.class, "batchSize"); - private final DeletedAccounts deletedAccounts; + private final DeletedAccountsManager deletedAccountsManager; private final List reconcilers; public DeletedAccountsTableCrawler( - final DeletedAccounts deletedAccounts, + final DeletedAccountsManager deletedAccountsManager, final List reconcilers, final FaultTolerantRedisCluster cluster, final ScheduledExecutorService executorService) throws IOException { super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL, executorService); - this.deletedAccounts = deletedAccounts; + this.deletedAccountsManager = deletedAccountsManager; this.reconcilers = reconcilers; } @Override public void doPeriodicWork() throws Exception { - final List> deletedAccounts = this.deletedAccounts.listAccountsToReconcile(MAX_BATCH_SIZE); + deletedAccountsManager.lockAndReconcileAccounts(MAX_BATCH_SIZE, deletedAccounts -> { + final List deletedUsers = deletedAccounts.stream() + .map(pair -> new User(pair.first(), pair.second())) + .collect(Collectors.toList()); - final List deletedUsers = deletedAccounts.stream() - .map(pair -> new User(pair.first(), pair.second())) - .collect(Collectors.toList()); + for (DeletedAccountsDirectoryReconciler reconciler : reconcilers) { + reconciler.onCrawlChunk(deletedUsers); + } - for (DeletedAccountsDirectoryReconciler reconciler : reconcilers) { - reconciler.onCrawlChunk(deletedUsers); - } + final List reconciledPhoneNumbers = deletedAccounts.stream() + .map(Pair::second) + .collect(Collectors.toList()); - final List reconciledPhoneNumbers = deletedAccounts.stream() - .map(Pair::second) - .collect(Collectors.toList()); + Metrics.summary(BATCH_SIZE_DISTRIBUTION_NAME).record(reconciledPhoneNumbers.size()); - this.deletedAccounts.markReconciled(reconciledPhoneNumbers); - - Metrics.summary(BATCH_SIZE_DISTRIBUTION_NAME) - .record(reconciledPhoneNumbers.size()); + return reconciledPhoneNumbers; + }); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index c31e5b4b5..a2c21a4d5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -7,6 +7,10 @@ package org.whispersystems.textsecuregcm.workers; import static com.codahale.metrics.MetricRegistry.name; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.fasterxml.jackson.databind.DeserializationFeature; import io.dropwizard.Application; import io.dropwizard.cli.EnvironmentCommand; @@ -38,6 +42,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason; import org.whispersystems.textsecuregcm.storage.DeletedAccounts; +import org.whispersystems.textsecuregcm.storage.DeletedAccountsManager; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.KeysDynamoDb; @@ -133,6 +138,13 @@ public class DeleteUserCommand extends EnvironmentCommand account = accountsManager.get(user); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java index c381a60fc..2ff5ef98e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java @@ -148,7 +148,7 @@ class AccountsManagerConcurrentModificationIntegrationTest { accounts, accountsDynamoDb, RedisClusterHelper.buildMockRedisCluster(commands), - mock(DeletedAccounts.class), + mock(DeletedAccountsManager.class), mock(DirectoryQueue.class), mock(KeysDynamoDb.class), mock(MessagesManager.class), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsManagerTest.java new file mode 100644 index 000000000..22f7e2d84 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsManagerTest.java @@ -0,0 +1,142 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.function.Executable; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.Projection; +import software.amazon.awssdk.services.dynamodb.model.ProjectionType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import java.lang.Thread.State; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class DeletedAccountsManagerTest { + + private static final String NEEDS_RECONCILIATION_INDEX_NAME = "needs_reconciliation_test"; + + @RegisterExtension + static final DynamoDbExtension DELETED_ACCOUNTS_DYNAMODB_EXTENSION = DynamoDbExtension.builder() + .tableName("deleted_accounts_test") + .hashKey(DeletedAccounts.KEY_ACCOUNT_E164) + .attributeDefinition(AttributeDefinition.builder() + .attributeName(DeletedAccounts.KEY_ACCOUNT_E164) + .attributeType(ScalarAttributeType.S).build()) + .attributeDefinition(AttributeDefinition.builder() + .attributeName(DeletedAccounts.ATTR_NEEDS_CDS_RECONCILIATION) + .attributeType(ScalarAttributeType.N) + .build()) + .globalSecondaryIndex(GlobalSecondaryIndex.builder() + .indexName(NEEDS_RECONCILIATION_INDEX_NAME) + .keySchema(KeySchemaElement.builder().attributeName(DeletedAccounts.KEY_ACCOUNT_E164).keyType(KeyType.HASH).build(), + KeySchemaElement.builder().attributeName(DeletedAccounts.ATTR_NEEDS_CDS_RECONCILIATION).keyType(KeyType.RANGE).build()) + .projection(Projection.builder().projectionType(ProjectionType.INCLUDE).nonKeyAttributes(DeletedAccounts.ATTR_ACCOUNT_UUID).build()) + .provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(10L).writeCapacityUnits(10L).build()) + .build()) + .build(); + + @RegisterExtension + static DynamoDbExtension DELETED_ACCOUNTS_LOCK_DYNAMODB_EXTENSION = DynamoDbExtension.builder() + .tableName("deleted_accounts_lock_test") + .hashKey(DeletedAccounts.KEY_ACCOUNT_E164) + .attributeDefinition(AttributeDefinition.builder() + .attributeName(DeletedAccounts.KEY_ACCOUNT_E164) + .attributeType(ScalarAttributeType.S).build()) + .build(); + + private DeletedAccountsManager deletedAccountsManager; + + @BeforeEach + void setUp() { + final DeletedAccounts deletedAccounts = new DeletedAccounts(DELETED_ACCOUNTS_DYNAMODB_EXTENSION.getDynamoDbClient(), + DELETED_ACCOUNTS_DYNAMODB_EXTENSION.getTableName(), + NEEDS_RECONCILIATION_INDEX_NAME); + + deletedAccountsManager = new DeletedAccountsManager(deletedAccounts, + DELETED_ACCOUNTS_LOCK_DYNAMODB_EXTENSION.getLegacyDynamoClient(), + DELETED_ACCOUNTS_LOCK_DYNAMODB_EXTENSION.getTableName()); + } + + @Test + void testReconciliationLockContention() throws ChunkProcessingFailedException, InterruptedException { + + final UUID[] uuids = new UUID[3]; + final String[] e164s = new String[uuids.length]; + + for (int i = 0; i < uuids.length; i++) { + uuids[i] = UUID.randomUUID(); + e164s[i] = String.format("+1800555%04d", i); + } + + final Map expectedReconciledAccounts = new HashMap<>(); + + for (int i = 0; i < uuids.length; i++) { + deletedAccountsManager.put(uuids[i], e164s[i]); + expectedReconciledAccounts.put(e164s[i], uuids[i]); + } + + final UUID replacedUUID = UUID.randomUUID(); + final Map reconciledAccounts = new HashMap<>(); + + final Thread putThread = new Thread(() -> { + try { + deletedAccountsManager.put(replacedUUID, e164s[0]); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + getClass().getSimpleName() + "-put"); + + final Thread reconcileThread = new Thread(() -> { + try { + deletedAccountsManager.lockAndReconcileAccounts(uuids.length, deletedAccounts -> { + // We hold the lock for the first account, so a thread trying to operate on that first count should block + // waiting for the lock. + putThread.start(); + + // Make sure the other thread really does actually block at some point + while (putThread.getState() != State.TIMED_WAITING) { + Thread.yield(); + } + + deletedAccounts.forEach(pair -> reconciledAccounts.put(pair.second(), pair.first())); + return reconciledAccounts.keySet(); + }); + } catch (ChunkProcessingFailedException e) { + throw new AssertionError(e); + } + }, getClass().getSimpleName() + "-reconcile"); + + reconcileThread.start(); + + assertDoesNotThrow((Executable) reconcileThread::join); + assertDoesNotThrow((Executable) putThread::join); + + assertEquals(expectedReconciledAccounts, reconciledAccounts); + + // The "put" thread should have completed after the reconciliation thread wrapped up. We can verify that's true by + // reconciling again; the updated account (and only that account) should appear in the "needs reconciliation" list. + deletedAccountsManager.lockAndReconcileAccounts(uuids.length, deletedAccounts -> { + assertEquals(1, deletedAccounts.size()); + assertEquals(replacedUUID, deletedAccounts.get(0).first()); + assertEquals(e164s[0], deletedAccounts.get(0).second()); + + return List.of(deletedAccounts.get(0).second()); + }); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTest.java index 9a22d8d3f..e51629408 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTest.java @@ -7,8 +7,12 @@ package org.whispersystems.textsecuregcm.storage; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.util.Pair; @@ -45,13 +49,17 @@ class DeletedAccountsTest { .build()) .build(); - @Test - void test() { + private DeletedAccounts deletedAccounts; - final DeletedAccounts deletedAccounts = new DeletedAccounts(dynamoDbExtension.getDynamoDbClient(), + @BeforeEach + void setUp() { + deletedAccounts = new DeletedAccounts(dynamoDbExtension.getDynamoDbClient(), dynamoDbExtension.getTableName(), NEEDS_RECONCILIATION_INDEX_NAME); + } + @Test + void testPutList() { UUID firstUuid = UUID.randomUUID(); UUID secondUuid = UUID.randomUUID(); UUID thirdUuid = UUID.randomUUID(); @@ -81,4 +89,42 @@ class DeletedAccountsTest { assertTrue(deletedAccounts.listAccountsToReconcile(1).isEmpty()); } + + @Test + void testGetAccountsNeedingReconciliation() { + final UUID firstUuid = UUID.randomUUID(); + final UUID secondUuid = UUID.randomUUID(); + + final String firstNumber = "+14152221234"; + final String secondNumber = "+14152225678"; + final String thirdNumber = "+14159998765"; + + assertEquals(Collections.emptySet(), + deletedAccounts.getAccountsNeedingReconciliation(List.of(firstNumber, secondNumber, thirdNumber))); + + deletedAccounts.put(firstUuid, firstNumber); + deletedAccounts.put(secondUuid, secondNumber); + + assertEquals(Set.of(firstNumber, secondNumber), + deletedAccounts.getAccountsNeedingReconciliation(List.of(firstNumber, secondNumber, thirdNumber))); + } + + @Test + void testGetAccountsNeedingReconciliationLargeBatch() { + final int itemCount = (DeletedAccounts.GET_BATCH_SIZE * 3) + 1; + + final Set expectedAccountsNeedingReconciliation = new HashSet<>(itemCount); + + for (int i = 0; i < itemCount; i++) { + final String e164 = String.format("+18000555%04d", i); + + deletedAccounts.put(UUID.randomUUID(), e164); + expectedAccountsNeedingReconciliation.add(e164); + } + + final Set accountsNeedingReconciliation = + deletedAccounts.getAccountsNeedingReconciliation(expectedAccountsNeedingReconciliation); + + assertEquals(expectedAccountsNeedingReconciliation, accountsNeedingReconciliation); + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java index 9dd005cf1..b6f2f4a3f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java @@ -1,6 +1,13 @@ package org.whispersystems.textsecuregcm.storage; import com.almworks.sqlite4java.SQLite; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.local.main.ServerRunner; import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer; import java.net.ServerSocket; @@ -46,6 +53,7 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback private DynamoDbClient dynamoDB2; private DynamoDbAsyncClient dynamoAsyncDB2; + private AmazonDynamoDB legacyDynamoClient; private DynamoDbExtension(String tableName, String hashKey, String rangeKey, List attributeDefinitions, List globalSecondaryIndexes, long readCapacityUnits, long writeCapacityUnits) { @@ -137,6 +145,11 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback .credentialsProvider(StaticCredentialsProvider.create( AwsBasicCredentials.create("accessKey", "secretKey"))) .build(); + legacyDynamoClient = AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration("http://localhost:" + port, "local-test-region")) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) + .build(); } static class DynamoDbExtensionBuilder { @@ -194,6 +207,10 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback return dynamoAsyncDB2; } + public AmazonDynamoDB getLegacyDynamoClient() { + return legacyDynamoClient; + } + public String getTableName() { return tableName; } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/AccountControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/AccountControllerTest.java index ee0064c6d..50c969793 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/AccountControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/AccountControllerTest.java @@ -1565,7 +1565,7 @@ class AccountControllerTest { } @Test - void testDeleteAccount() { + void testDeleteAccount() throws InterruptedException { Response response = resources.getJerseyTest() .target("/v1/accounts/me") @@ -1577,6 +1577,21 @@ class AccountControllerTest { verify(accountsManager).delete(AuthHelper.VALID_ACCOUNT, AccountsManager.DeletionReason.USER_REQUEST); } + @Test + void testDeleteAccountInterrupted() throws InterruptedException { + doThrow(InterruptedException.class).when(accountsManager).delete(any(), any()); + + Response response = + resources.getJerseyTest() + .target("/v1/accounts/me") + .request() + .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD)) + .delete(); + + assertThat(response.getStatus()).isEqualTo(500); + verify(accountsManager).delete(AuthHelper.VALID_ACCOUNT, AccountsManager.DeletionReason.USER_REQUEST); + } + @ParameterizedTest @MethodSource void testSignupCaptcha(final String message, final boolean enforced, final Set countryCodes, final int expectedResponseStatusCode) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountCleanerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountCleanerTest.java index 571482fcd..a3d807186 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountCleanerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountCleanerTest.java @@ -72,7 +72,7 @@ public class AccountCleanerTest { } @Test - public void testAccounts() throws AccountDatabaseCrawlerRestartException { + public void testAccounts() throws AccountDatabaseCrawlerRestartException, InterruptedException { AccountCleaner accountCleaner = new AccountCleaner(accountsManager); accountCleaner.onCrawlStart(); accountCleaner.timeAndProcessCrawlChunk(Optional.empty(), Arrays.asList(deletedDisabledAccount, undeletedDisabledAccount, undeletedEnabledAccount)); @@ -86,7 +86,7 @@ public class AccountCleanerTest { } @Test - public void testMaxAccountUpdates() throws AccountDatabaseCrawlerRestartException { + public void testMaxAccountUpdates() throws AccountDatabaseCrawlerRestartException, InterruptedException { List accounts = new LinkedList<>(); accounts.add(undeletedEnabledAccount); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java index 5aa0d41ed..2be2d2295 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java @@ -50,6 +50,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.ContestedOptimisticLockException; import org.whispersystems.textsecuregcm.storage.DeletedAccounts; +import org.whispersystems.textsecuregcm.storage.DeletedAccountsManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.KeysDynamoDb; @@ -87,7 +88,7 @@ class AccountsManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); - DeletedAccounts deletedAccounts = mock(DeletedAccounts.class); + DeletedAccountsManager deletedAccounts = mock(DeletedAccountsManager.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -139,7 +140,7 @@ class AccountsManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); - DeletedAccounts deletedAccounts = mock(DeletedAccounts.class); + DeletedAccountsManager deletedAccounts = mock(DeletedAccountsManager.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -178,7 +179,7 @@ class AccountsManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); - DeletedAccounts deletedAccounts = mock(DeletedAccounts.class); + DeletedAccountsManager deletedAccounts = mock(DeletedAccountsManager.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -221,7 +222,7 @@ class AccountsManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); - DeletedAccounts deletedAccounts = mock(DeletedAccounts.class); + DeletedAccountsManager deletedAccounts = mock(DeletedAccountsManager.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -263,7 +264,7 @@ class AccountsManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); - DeletedAccounts deletedAccounts = mock(DeletedAccounts.class); + DeletedAccountsManager deletedAccounts = mock(DeletedAccountsManager.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -305,7 +306,7 @@ class AccountsManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); - DeletedAccounts deletedAccounts = mock(DeletedAccounts.class); + DeletedAccountsManager deletedAccounts = mock(DeletedAccountsManager.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -347,7 +348,7 @@ class AccountsManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); - DeletedAccounts deletedAccounts = mock(DeletedAccounts.class); + DeletedAccountsManager deletedAccountsManager = mock(DeletedAccountsManager.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -366,7 +367,7 @@ class AccountsManagerTest { when(accountsDynamoDb.get(uuid)).thenReturn(Optional.of(new Account("+14152222222", uuid, new HashSet<>(), new byte[16]))); doAnswer(ACCOUNT_UPDATE_ANSWER).when(accounts).update(any(Account.class)); - AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, deletedAccounts, + AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); Account updatedAccount = accountsManager.update(account, a -> a.setProfileName("name")); @@ -408,7 +409,7 @@ class AccountsManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); - DeletedAccounts deletedAccounts = mock(DeletedAccounts.class); + DeletedAccountsManager deletedAccounts = mock(DeletedAccountsManager.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -447,7 +448,7 @@ class AccountsManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); - DeletedAccounts deletedAccounts = mock(DeletedAccounts.class); + DeletedAccountsManager deletedAccounts = mock(DeletedAccountsManager.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -495,7 +496,7 @@ class AccountsManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); - DeletedAccounts deletedAccounts = mock(DeletedAccounts.class); + DeletedAccountsManager deletedAccountsManager = mock(DeletedAccountsManager.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -513,7 +514,7 @@ class AccountsManagerTest { .thenReturn(Optional.of(account)); when(accountsDynamoDb.create(any())).thenThrow(ContestedOptimisticLockException.class); - AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, deletedAccounts, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); + AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); accountsManager.update(account, a -> {}); @@ -530,7 +531,7 @@ class AccountsManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); - DeletedAccounts deletedAccounts = mock(DeletedAccounts.class); + DeletedAccountsManager deletedAccountsManager = mock(DeletedAccountsManager.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -539,7 +540,7 @@ class AccountsManagerTest { SecureBackupClient secureBackupClient = mock(SecureBackupClient.class); SecureStorageClient secureStorageClient = mock(SecureStorageClient.class); - AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, deletedAccounts, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); + AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); assertEquals(Optional.empty(), accountsManager.compareAccounts(Optional.empty(), Optional.empty())); @@ -580,7 +581,7 @@ class AccountsManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); - DeletedAccounts deletedAccounts = mock(DeletedAccounts.class); + DeletedAccountsManager deletedAccounts = mock(DeletedAccountsManager.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class);