diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index e764735b2..1f9388fe9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -443,6 +443,11 @@ public class WhisperServerService extends Application deleteAccount(@Auth DisabledPermittedAuthenticatedAccount auth) throws InterruptedException { + return accounts.delete(auth.getAccount(), AccountsManager.DeletionReason.USER_REQUEST); } private void clearUsernameLink(final Account account) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java index c8f38c3b4..96c3040cf 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java @@ -12,8 +12,6 @@ import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,11 +23,9 @@ public class AccountCleaner extends AccountDatabaseCrawlerListener { private static final Counter DELETED_ACCOUNT_COUNTER = Metrics.counter(name(AccountCleaner.class, "deletedAccounts")); private final AccountsManager accountsManager; - private final Executor deletionExecutor; - public AccountCleaner(final AccountsManager accountsManager, final Executor deletionExecutor) { + public AccountCleaner(final AccountsManager accountsManager) { this.accountsManager = accountsManager; - this.deletionExecutor = deletionExecutor; } @Override @@ -44,13 +40,7 @@ public class AccountCleaner extends AccountDatabaseCrawlerListener { protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) { final List> deletionFutures = chunkAccounts.stream() .filter(AccountCleaner::isExpired) - .map(account -> CompletableFuture.runAsync(() -> { - try { - accountsManager.delete(account, AccountsManager.DeletionReason.EXPIRED); - } catch (final InterruptedException e) { - throw new CompletionException(e); - } - }, deletionExecutor) + .map(account -> accountsManager.delete(account, AccountsManager.DeletionReason.EXPIRED) .whenComplete((ignored, throwable) -> { if (throwable != null) { log.warn("Failed to delete account {}", account.getUuid(), throwable); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java index 3f5f01454..fd2c6c16f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java @@ -8,7 +8,12 @@ import com.amazonaws.services.dynamodbv2.ReleaseLockOptions; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.whispersystems.textsecuregcm.util.Util; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; public class AccountLockManager { @@ -65,4 +70,44 @@ public class AccountLockManager { .build())); } } + + /** + * Acquires a distributed, pessimistic lock for the accounts identified by the given phone numbers. By design, the + * accounts need not actually exist in order to acquire a lock; this allows lock acquisition for operations that span + * account lifecycle changes (like deleting an account or changing a phone number). The given task runs once locks for + * all given phone numbers have been acquired, and the locks are released as soon as the task completes by any means. + * + * @param e164s the phone numbers for which to acquire a distributed, pessimistic lock + * @param taskSupplier a supplier for the task to execute once locks have been acquired + * @param executor the executor on which to acquire and release locks + * + * @return a future that completes normally when the given task has executed successfully and all locks have been + * released; the returned future may fail with an {@link InterruptedException} if interrupted while acquiring a lock + */ public CompletableFuture withLockAsync(final List e164s, + final Supplier> taskSupplier, + final Executor executor) { + + if (e164s.isEmpty()) { + throw new IllegalArgumentException("List of e164s to lock must not be empty"); + } + + final List lockItems = new ArrayList<>(e164s.size()); + + return CompletableFuture.runAsync(() -> { + for (final String e164 : e164s) { + try { + lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(e164) + .withAcquireReleasedLocksConsistently(true) + .build())); + } catch (final InterruptedException e) { + throw new CompletionException(e); + } + } + }, executor) + .thenCompose(ignored -> taskSupplier.get()) + .whenCompleteAsync((ignored, throwable) -> lockItems.forEach(lockItem -> lockClient.releaseLock(ReleaseLockOptions.builder(lockItem) + .withBestEffort(true) + .build())), executor) + .thenRun(Util.NOOP); + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java index 5dfe76baa..bc3c8a42a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java @@ -40,6 +40,7 @@ import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.ExceptionUtils; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.UUIDUtil; +import org.whispersystems.textsecuregcm.util.Util; import reactor.core.publisher.Flux; import reactor.core.publisher.ParallelFlux; import reactor.core.scheduler.Scheduler; @@ -786,23 +787,28 @@ public class Accounts extends AbstractDynamoDbStore { return Optional.ofNullable(response.items().get(0).get(DELETED_ACCOUNTS_KEY_ACCOUNT_E164).s()); } - public void delete(final UUID uuid) { - DELETE_TIMER.record(() -> getByAccountIdentifier(uuid).ifPresent(account -> { + public CompletableFuture delete(final UUID uuid) { + final Timer.Sample sample = Timer.start(); - final List transactWriteItems = new ArrayList<>(List.of( - buildDelete(phoneNumberConstraintTableName, ATTR_ACCOUNT_E164, account.getNumber()), - buildDelete(accountsTableName, KEY_ACCOUNT_UUID, uuid), - buildDelete(phoneNumberIdentifierConstraintTableName, ATTR_PNI_UUID, account.getPhoneNumberIdentifier()), - buildPutDeletedAccount(uuid, account.getNumber()) - )); + return getByAccountIdentifierAsync(uuid) + .thenCompose(maybeAccount -> maybeAccount.map(account -> { + final List transactWriteItems = new ArrayList<>(List.of( + buildDelete(phoneNumberConstraintTableName, ATTR_ACCOUNT_E164, account.getNumber()), + buildDelete(accountsTableName, KEY_ACCOUNT_UUID, uuid), + buildDelete(phoneNumberIdentifierConstraintTableName, ATTR_PNI_UUID, account.getPhoneNumberIdentifier()), + buildPutDeletedAccount(uuid, account.getNumber()) + )); - account.getUsernameHash().ifPresent(usernameHash -> transactWriteItems.add( - buildDelete(usernamesConstraintTableName, ATTR_USERNAME_HASH, usernameHash))); + account.getUsernameHash().ifPresent(usernameHash -> transactWriteItems.add( + buildDelete(usernamesConstraintTableName, ATTR_USERNAME_HASH, usernameHash))); - final TransactWriteItemsRequest request = TransactWriteItemsRequest.builder() - .transactItems(transactWriteItems).build(); - db().transactWriteItems(request); - })); + return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder() + .transactItems(transactWriteItems) + .build()) + .thenRun(Util.NOOP); + }) + .orElseGet(() -> CompletableFuture.completedFuture(null))) + .thenRun(() -> sample.stop(DELETE_TIMER)); } ParallelFlux getAll(final int segments, final Scheduler scheduler) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 4cc24c421..60e4f9e34 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -35,6 +35,7 @@ import java.util.OptionalInt; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -110,6 +111,7 @@ public class AccountsManager { private final ClientPresenceManager clientPresenceManager; private final ExperimentEnrollmentManager experimentEnrollmentManager; private final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager; + private final Executor accountLockExecutor; private final Clock clock; private static final ObjectWriter ACCOUNT_REDIS_JSON_WRITER = SystemMapper.jsonMapper() @@ -155,6 +157,7 @@ public class AccountsManager { final ClientPresenceManager clientPresenceManager, final ExperimentEnrollmentManager experimentEnrollmentManager, final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager, + final Executor accountLockExecutor, final Clock clock) { this.accounts = accounts; this.phoneNumberIdentifiers = phoneNumberIdentifiers; @@ -169,6 +172,7 @@ public class AccountsManager { this.clientPresenceManager = clientPresenceManager; this.experimentEnrollmentManager = experimentEnrollmentManager; this.registrationRecoveryPasswordsManager = requireNonNull(registrationRecoveryPasswordsManager); + this.accountLockExecutor = accountLockExecutor; this.clock = requireNonNull(clock); } @@ -234,7 +238,7 @@ public class AccountsManager { keysManager.delete(account.getPhoneNumberIdentifier())); messagesManager.clear(actualUuid).join(); - profilesManager.deleteAll(actualUuid); + profilesManager.deleteAll(actualUuid).join(); deleteKeysFuture.join(); @@ -301,7 +305,7 @@ public class AccountsManager { final Optional maybeDisplacedUuid; if (maybeExistingAccount.isPresent()) { - delete(maybeExistingAccount.get()); + delete(maybeExistingAccount.get()).join(); maybeDisplacedUuid = maybeExistingAccount.map(Account::getUuid); } else { maybeDisplacedUuid = recentlyDeletedAci; @@ -847,50 +851,39 @@ public class AccountsManager { return accounts.getAll(segments, scheduler); } - public void delete(final Account account, final DeletionReason deletionReason) throws InterruptedException { - try (final Timer.Context ignored = deleteTimer.time()) { - accountLockManager.withLock(List.of(account.getNumber()), () -> delete(account)); - } catch (final RuntimeException | InterruptedException e) { - logger.warn("Failed to delete account", e); - throw e; - } + public CompletableFuture delete(final Account account, final DeletionReason deletionReason) { + @SuppressWarnings("resource") final Timer.Context timerContext = deleteTimer.time(); - Metrics.counter(DELETE_COUNTER_NAME, - COUNTRY_CODE_TAG_NAME, Util.getCountryCode(account.getNumber()), - DELETION_REASON_TAG_NAME, deletionReason.tagValue) - .increment(); + return accountLockManager.withLockAsync(List.of(account.getNumber()), () -> delete(account), accountLockExecutor) + .whenComplete((ignored, throwable) -> { + timerContext.close(); + + if (throwable == null) { + Metrics.counter(DELETE_COUNTER_NAME, + COUNTRY_CODE_TAG_NAME, Util.getCountryCode(account.getNumber()), + DELETION_REASON_TAG_NAME, deletionReason.tagValue) + .increment(); + } else { + logger.warn("Failed to delete account", throwable); + } + }); } - private void delete(final Account account) { - final CompletableFuture deleteStorageServiceDataFuture = secureStorageClient.deleteStoredData( - account.getUuid()); - final CompletableFuture deleteBackupServiceDataFuture = secureBackupClient.deleteBackups(account.getUuid()); - final CompletableFuture deleteSecureValueRecoveryServiceDataFuture = secureValueRecovery2Client.deleteBackups( - account.getUuid()); - - final CompletableFuture deleteKeysFuture = CompletableFuture.allOf( - keysManager.delete(account.getUuid()), - keysManager.delete(account.getPhoneNumberIdentifier())); - - final CompletableFuture deleteMessagesFuture = CompletableFuture.allOf( - messagesManager.clear(account.getUuid()), - messagesManager.clear(account.getPhoneNumberIdentifier())); - - profilesManager.deleteAll(account.getUuid()); - registrationRecoveryPasswordsManager.removeForNumber(account.getNumber()); - - deleteKeysFuture.join(); - deleteMessagesFuture.join(); - deleteStorageServiceDataFuture.join(); - deleteBackupServiceDataFuture.join(); - deleteSecureValueRecoveryServiceDataFuture.join(); - - accounts.delete(account.getUuid()); - redisDelete(account); - - RedisOperation.unchecked(() -> - account.getDevices().forEach(device -> - clientPresenceManager.disconnectPresence(account.getUuid(), device.getId()))); + private CompletableFuture delete(final Account account) { + return CompletableFuture.allOf( + secureStorageClient.deleteStoredData(account.getUuid()), + secureBackupClient.deleteBackups(account.getUuid()), + secureValueRecovery2Client.deleteBackups(account.getUuid()), + keysManager.delete(account.getUuid()), + keysManager.delete(account.getPhoneNumberIdentifier()), + messagesManager.clear(account.getUuid()), + messagesManager.clear(account.getPhoneNumberIdentifier()), + profilesManager.deleteAll(account.getUuid()), + registrationRecoveryPasswordsManager.removeForNumber(account.getNumber())) + .thenCompose(ignored -> CompletableFuture.allOf(accounts.delete(account.getUuid()), redisDeleteAsync(account))) + .thenRun(() -> RedisOperation.unchecked(() -> + account.getDevices().forEach(device -> + clientPresenceManager.disconnectPresence(account.getUuid(), device.getId())))); } private String getUsernameHashAccountMapKey(byte[] usernameHash) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java index b62bdbff5..c82d62adf 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java @@ -22,6 +22,8 @@ import org.apache.commons.lang3.StringUtils; import org.whispersystems.textsecuregcm.util.AsyncTimerUtil; import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.Util; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -30,7 +32,6 @@ import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.QueryRequest; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; -import software.amazon.awssdk.services.dynamodb.paginators.QueryIterable; public class Profiles { @@ -77,6 +78,8 @@ public class Profiles { private static final Timer DELETE_PROFILES_TIMER = Metrics.timer(name(Profiles.class, "delete")); private static final String PARSE_BYTE_ARRAY_COUNTER_NAME = name(Profiles.class, "parseByteArray"); + private static final int MAX_CONCURRENCY = 32; + public Profiles(final DynamoDbClient dynamoDbClient, final DynamoDbAsyncClient dynamoDbAsyncClient, final String tableName) { @@ -244,27 +247,28 @@ public class Profiles { return AttributeValues.extractByteArray(attributeValue, PARSE_BYTE_ARRAY_COUNTER_NAME); } - public void deleteAll(final UUID uuid) { - DELETE_PROFILES_TIMER.record(() -> { - final AttributeValue uuidAttributeValue = AttributeValues.fromUUID(uuid); + public CompletableFuture deleteAll(final UUID uuid) { + final Timer.Sample sample = Timer.start(); - final QueryIterable queryIterable = dynamoDbClient.queryPaginator(QueryRequest.builder() - .tableName(tableName) - .keyConditionExpression("#uuid = :uuid") - .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID)) - .expressionAttributeValues(Map.of(":uuid", uuidAttributeValue)) - .projectionExpression(ATTR_VERSION) - .consistentRead(true) - .build()); + final AttributeValue uuidAttributeValue = AttributeValues.fromUUID(uuid); - CompletableFuture.allOf(queryIterable.items().stream() - .map(item -> dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder() - .tableName(tableName) - .key(Map.of( - KEY_ACCOUNT_UUID, uuidAttributeValue, - ATTR_VERSION, item.get(ATTR_VERSION))) - .build())) - .toArray(CompletableFuture[]::new)).join(); - }); + return Flux.from(dynamoDbAsyncClient.queryPaginator(QueryRequest.builder() + .tableName(tableName) + .keyConditionExpression("#uuid = :uuid") + .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID)) + .expressionAttributeValues(Map.of(":uuid", uuidAttributeValue)) + .projectionExpression(ATTR_VERSION) + .consistentRead(true) + .build()) + .items()) + .flatMap(item -> Mono.fromFuture(() -> dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder() + .tableName(tableName) + .key(Map.of( + KEY_ACCOUNT_UUID, uuidAttributeValue, + ATTR_VERSION, item.get(ATTR_VERSION))) + .build())), MAX_CONCURRENCY) + .doOnComplete(() -> sample.stop(DELETE_PROFILES_TIMER)) + .then() + .toFuture(); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java index 7274f0a53..f3464d237 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java @@ -47,9 +47,8 @@ public class ProfilesManager { .thenCompose(ignored -> redisSetAsync(uuid, versionedProfile)); } - public void deleteAll(UUID uuid) { - redisDelete(uuid); - profiles.deleteAll(uuid); + public CompletableFuture deleteAll(UUID uuid) { + return CompletableFuture.allOf(redisDelete(uuid), profiles.deleteAll(uuid)); } public Optional get(UUID uuid, String version) { @@ -132,8 +131,10 @@ public class ProfilesManager { } } - private void redisDelete(UUID uuid) { - cacheCluster.useCluster(connection -> connection.sync().del(getCacheKey(uuid))); + private CompletableFuture redisDelete(UUID uuid) { + return cacheCluster.withCluster(connection -> connection.async().del(getCacheKey(uuid))) + .toCompletableFuture() + .thenRun(Util.NOOP); } private String getCacheKey(UUID uuid) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java index e2f9b5a79..66fd9337a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java @@ -114,6 +114,8 @@ public class AssignUsernameCommand extends EnvironmentCommand { - final ExecutorService accountDeletionExecutor = environment.lifecycle() - .executorService(name(getClass(), "accountCleaner-%d")).maxThreads(workers).minThreads(workers).build(); - final AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache( cacheCluster, AccountDatabaseCrawlerCache.ACCOUNT_CLEANER_PREFIX); yield new AccountDatabaseCrawler("Account cleaner crawler", accountsManager, accountDatabaseCrawlerCache, - List.of(new AccountCleaner(accountsManager, accountDeletionExecutor)), + List.of(new AccountCleaner(accountsManager)), configuration.getAccountDatabaseCrawlerConfiguration().getChunkSize() ); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index e7d63a5dd..6b58963df 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -58,7 +58,7 @@ public class DeleteUserCommand extends EnvironmentCommand account = accountsManager.getByE164(user); if (account.isPresent()) { - accountsManager.delete(account.get(), DeletionReason.ADMIN_DELETED); + accountsManager.delete(account.get(), DeletionReason.ADMIN_DELETED).join(); logger.warn("Removed " + account.get().getNumber()); } else { logger.warn("Account not found"); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/AccountControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/AccountControllerTest.java index 483befe66..73b09ed6f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/AccountControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/AccountControllerTest.java @@ -37,6 +37,7 @@ import java.util.HexFormat; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation; @@ -815,7 +816,7 @@ class AccountControllerTest { } @Test - void testDeleteAccount() throws InterruptedException { + void testDeleteAccount() { Response response = resources.getJerseyTest() .target("/v1/accounts/me") @@ -828,18 +829,18 @@ class AccountControllerTest { } @Test - void testDeleteAccountInterrupted() throws InterruptedException { - doThrow(InterruptedException.class).when(accountsManager).delete(any(), any()); + void testDeleteAccountException() { + when(accountsManager.delete(any(), any())).thenReturn(CompletableFuture.failedFuture(new RuntimeException("OH NO"))); - Response response = - resources.getJerseyTest() - .target("/v1/accounts/me") - .request() - .header(HttpHeaders.AUTHORIZATION, AuthHelper.getAuthHeader(AuthHelper.VALID_UUID, AuthHelper.VALID_PASSWORD)) - .delete(); + try (final Response response = resources.getJerseyTest() + .target("/v1/accounts/me") + .request() + .header(HttpHeaders.AUTHORIZATION, AuthHelper.getAuthHeader(AuthHelper.VALID_UUID, AuthHelper.VALID_PASSWORD)) + .delete()) { - assertThat(response.getStatus()).isEqualTo(500); - verify(accountsManager).delete(AuthHelper.VALID_ACCOUNT, AccountsManager.DeletionReason.USER_REQUEST); + assertThat(response.getStatus()).isEqualTo(500); + verify(accountsManager).delete(AuthHelper.VALID_ACCOUNT, AccountsManager.DeletionReason.USER_REQUEST); + } } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCleanerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCleanerTest.java index 9b6cd0108..b79bf4f0a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCleanerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCleanerTest.java @@ -15,10 +15,8 @@ import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason; @@ -35,11 +33,10 @@ class AccountCleanerTest { private final Device undeletedDisabledDevice = mock(Device.class ); private final Device undeletedEnabledDevice = mock(Device.class ); - private ExecutorService deletionExecutor; - - @BeforeEach void setup() { + when(accountsManager.delete(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); + when(deletedDisabledDevice.isEnabled()).thenReturn(false); when(deletedDisabledDevice.getGcmId()).thenReturn(null); when(deletedDisabledDevice.getApnId()).thenReturn(null); @@ -66,19 +63,11 @@ class AccountCleanerTest { when(undeletedEnabledAccount.getNumber()).thenReturn("+14153333333"); when(undeletedEnabledAccount.getLastSeen()).thenReturn(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(179)); when(undeletedEnabledAccount.getUuid()).thenReturn(UUID.randomUUID()); - - deletionExecutor = Executors.newFixedThreadPool(2); - } - - @AfterEach - void tearDown() throws InterruptedException { - deletionExecutor.shutdown(); - deletionExecutor.awaitTermination(2, TimeUnit.SECONDS); } @Test - void testAccounts() throws InterruptedException { - AccountCleaner accountCleaner = new AccountCleaner(accountsManager, deletionExecutor); + void testAccounts() { + AccountCleaner accountCleaner = new AccountCleaner(accountsManager); accountCleaner.onCrawlStart(); accountCleaner.timeAndProcessCrawlChunk(Optional.empty(), Arrays.asList(deletedDisabledAccount, undeletedDisabledAccount, undeletedEnabledAccount)); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java index fecc32f0d..154a67590 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java @@ -12,12 +12,18 @@ import com.amazonaws.services.dynamodbv2.ReleaseLockOptions; import com.google.i18n.phonenumbers.PhoneNumberUtil; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class AccountLockManagerTest { private AmazonDynamoDBLockClient lockClient; + private ExecutorService executor; private AccountLockManager accountLockManager; @@ -30,10 +36,19 @@ class AccountLockManagerTest { @BeforeEach void setUp() { lockClient = mock(AmazonDynamoDBLockClient.class); + executor = Executors.newSingleThreadExecutor(); accountLockManager = new AccountLockManager(lockClient); } + @AfterEach + void tearDown() throws InterruptedException { + executor.shutdown(); + + //noinspection ResultOfMethodCallIgnored + executor.awaitTermination(1, TimeUnit.SECONDS); + } + @Test void withLock() throws InterruptedException { accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), () -> {}); @@ -59,4 +74,34 @@ class AccountLockManagerTest { assertThrows(IllegalArgumentException.class, () -> accountLockManager.withLock(Collections.emptyList(), () -> {})); verify(task, never()).run(); } + + @Test + void withLockAsync() throws InterruptedException { + accountLockManager.withLockAsync(List.of(FIRST_NUMBER, SECOND_NUMBER), + () -> CompletableFuture.completedFuture(null), executor).join(); + + verify(lockClient, times(2)).acquireLock(any()); + verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class)); + } + + @Test + void withLockAsyncTaskThrowsException() throws InterruptedException { + assertThrows(RuntimeException.class, + () -> accountLockManager.withLockAsync(List.of(FIRST_NUMBER, SECOND_NUMBER), + () -> CompletableFuture.failedFuture(new RuntimeException()), executor).join()); + + verify(lockClient, times(2)).acquireLock(any()); + verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class)); + } + + @Test + void withLockAsyncEmptyList() { + final Runnable task = mock(Runnable.class); + + assertThrows(IllegalArgumentException.class, + () -> accountLockManager.withLockAsync(Collections.emptyList(), + () -> CompletableFuture.completedFuture(null), executor)); + + verify(task, never()).run(); + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java index 39a7bc9d0..e5186371d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java @@ -20,6 +20,10 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -58,6 +62,7 @@ class AccountsManagerChangeNumberIntegrationTest { static final RedisClusterExtension CACHE_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); private ClientPresenceManager clientPresenceManager; + private ExecutorService accountLockExecutor; private AccountsManager accountsManager; @@ -81,6 +86,8 @@ class AccountsManagerChangeNumberIntegrationTest { Tables.DELETED_ACCOUNTS.tableName(), SCAN_PAGE_SIZE); + accountLockExecutor = Executors.newSingleThreadExecutor(); + final AccountLockManager accountLockManager = new AccountLockManager(DYNAMO_DB_EXTENSION.getDynamoDbClient(), Tables.DELETED_ACCOUNTS_LOCK.tableName()); @@ -104,6 +111,15 @@ class AccountsManagerChangeNumberIntegrationTest { final MessagesManager messagesManager = mock(MessagesManager.class); when(messagesManager.clear(any())).thenReturn(CompletableFuture.completedFuture(null)); + final ProfilesManager profilesManager = mock(ProfilesManager.class); + when(profilesManager.deleteAll(any())).thenReturn(CompletableFuture.completedFuture(null)); + + final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager = + mock(RegistrationRecoveryPasswordsManager.class); + + when(registrationRecoveryPasswordsManager.removeForNumber(any())) + .thenReturn(CompletableFuture.completedFuture(null)); + accountsManager = new AccountsManager( accounts, phoneNumberIdentifiers, @@ -111,17 +127,26 @@ class AccountsManagerChangeNumberIntegrationTest { accountLockManager, keysManager, messagesManager, - mock(ProfilesManager.class), + profilesManager, secureStorageClient, secureBackupClient, svr2Client, clientPresenceManager, mock(ExperimentEnrollmentManager.class), - mock(RegistrationRecoveryPasswordsManager.class), + registrationRecoveryPasswordsManager, + accountLockExecutor, mock(Clock.class)); } } + @AfterEach + void tearDown() throws InterruptedException { + accountLockExecutor.shutdown(); + + //noinspection ResultOfMethodCallIgnored + accountLockExecutor.awaitTermination(1, TimeUnit.SECONDS); + } + @Test void testChangeNumber() throws InterruptedException, MismatchedDevicesException { final String originalNumber = "+18005551111"; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java index 4780b86a0..9252b8ded 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -104,6 +105,13 @@ class AccountsManagerConcurrentModificationIntegrationTest { return null; }).when(accountLockManager).withLock(any(), any()); + when(accountLockManager.withLockAsync(any(), any(), any())).thenAnswer(invocation -> { + final Supplier> taskSupplier = invocation.getArgument(1); + taskSupplier.get().join(); + + return CompletableFuture.completedFuture(null); + }); + final PhoneNumberIdentifiers phoneNumberIdentifiers = mock(PhoneNumberIdentifiers.class); when(phoneNumberIdentifiers.getPhoneNumberIdentifier(anyString())) .thenAnswer((Answer) invocation -> UUID.randomUUID()); @@ -122,6 +130,7 @@ class AccountsManagerConcurrentModificationIntegrationTest { mock(ClientPresenceManager.class), mock(ExperimentEnrollmentManager.class), mock(RegistrationRecoveryPasswordsManager.class), + mock(Executor.class), mock(Clock.class) ); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java index 4abee6efb..70cf5c12d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java @@ -16,7 +16,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; @@ -46,7 +45,9 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; @@ -140,6 +141,7 @@ class AccountsManagerTest { when(asyncCommands.setex(any(), anyLong(), any())).thenReturn(MockRedisFuture.completedFuture("OK")); when(accounts.updateAsync(any())).thenReturn(CompletableFuture.completedFuture(null)); + when(accounts.delete(any())).thenReturn(CompletableFuture.completedFuture(null)); doAnswer((Answer) invocation -> { final Account account = invocation.getArgument(0, Account.class); @@ -188,8 +190,21 @@ class AccountsManagerTest { return null; }).when(accountLockManager).withLock(any(), any()); + when(accountLockManager.withLockAsync(any(), any(), any())).thenAnswer(invocation -> { + final Supplier> taskSupplier = invocation.getArgument(1); + taskSupplier.get().join(); + + return CompletableFuture.completedFuture(null); + }); + + final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager = + mock(RegistrationRecoveryPasswordsManager.class); + + when(registrationRecoveryPasswordsManager.removeForNumber(anyString())).thenReturn(CompletableFuture.completedFuture(null)); + when(keysManager.delete(any())).thenReturn(CompletableFuture.completedFuture(null)); when(messagesManager.clear(any())).thenReturn(CompletableFuture.completedFuture(null)); + when(profilesManager.deleteAll(any())).thenReturn(CompletableFuture.completedFuture(null)); accountsManager = new AccountsManager( accounts, @@ -207,7 +222,8 @@ class AccountsManagerTest { svr2Client, clientPresenceManager, enrollmentManager, - mock(RegistrationRecoveryPasswordsManager.class), + registrationRecoveryPasswordsManager, + mock(Executor.class), mock(Clock.class)); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java index 59694db81..7f0031547 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java @@ -29,6 +29,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Supplier; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -106,6 +109,13 @@ class AccountsManagerUsernameIntegrationTest { return null; }).when(accountLockManager).withLock(any(), any()); + when(accountLockManager.withLockAsync(any(), any(), any())).thenAnswer(invocation -> { + final Supplier> taskSupplier = invocation.getArgument(1); + taskSupplier.get().join(); + + return CompletableFuture.completedFuture(null); + }); + final PhoneNumberIdentifiers phoneNumberIdentifiers = new PhoneNumberIdentifiers(DYNAMO_DB_EXTENSION.getDynamoDbClient(), Tables.PNI.tableName()); @@ -126,6 +136,7 @@ class AccountsManagerUsernameIntegrationTest { mock(ClientPresenceManager.class), experimentEnrollmentManager, mock(RegistrationRecoveryPasswordsManager.class), + mock(Executor.class), mock(Clock.class)); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java index 3057a58c8..ca4bc4946 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java @@ -36,6 +36,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -180,6 +181,7 @@ class AccountsTest { mock(ClientPresenceManager.class), mock(ExperimentEnrollmentManager.class), mock(RegistrationRecoveryPasswordsManager.class), + mock(Executor.class), mock(Clock.class)); final Account account = nextRandomAccount(); @@ -246,7 +248,7 @@ class AccountsTest { assertPhoneNumberConstraintExists("+14151112222", account.getUuid()); assertPhoneNumberIdentifierConstraintExists(account.getPhoneNumberIdentifier(), account.getUuid()); - accounts.delete(originalUuid); + accounts.delete(originalUuid).join(); assertThat(accounts.findRecentlyDeletedAccountIdentifier(account.getNumber())).hasValue(originalUuid); freshUser = accounts.create(account); @@ -577,7 +579,7 @@ class AccountsTest { assertThat(accounts.getByAccountIdentifier(deletedAccount.getUuid())).isPresent(); assertThat(accounts.getByAccountIdentifier(retainedAccount.getUuid())).isPresent(); - accounts.delete(deletedAccount.getUuid()); + accounts.delete(deletedAccount.getUuid()).join(); assertThat(accounts.getByAccountIdentifier(deletedAccount.getUuid())).isNotPresent(); assertThat(accounts.findRecentlyDeletedAccountIdentifier(deletedAccount.getNumber())).hasValue(deletedAccount.getUuid()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesTest.java index d67229951..e18e3df5c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesTest.java @@ -84,7 +84,7 @@ public class ProfilesTest { void testDeleteReset() throws InvalidInputException { profiles.set(ACI, validProfile); - profiles.deleteAll(ACI); + profiles.deleteAll(ACI).join(); final String version = "someVersion"; final byte[] name = ProfileTestHelper.generateRandomByteArray(81); @@ -242,7 +242,7 @@ public class ProfilesTest { profiles.set(ACI, profileOne); profiles.set(ACI, profileTwo); - profiles.deleteAll(ACI); + profiles.deleteAll(ACI).join(); Optional retrieved = profiles.get(ACI, versionOne);