diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java index bf75e0f2d..7347a62de 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java @@ -14,19 +14,18 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import java.util.stream.Stream; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; public class AccountLockManager { private final AmazonDynamoDBLockClient lockClient; - static final String KEY_ACCOUNT_E164 = "P"; + static final String KEY_ACCOUNT_PNI = "P"; public AccountLockManager(final DynamoDbClient lockDynamoDb, final String lockTableName) { this(new AmazonDynamoDBLockClient( AmazonDynamoDBLockClientOptions.builder(lockDynamoDb, lockTableName) - .withPartitionKeyName(KEY_ACCOUNT_E164) + .withPartitionKeyName(KEY_ACCOUNT_PNI) .withLeaseDuration(15L) .withHeartbeatPeriod(2L) .withTimeUnit(TimeUnit.SECONDS) @@ -40,40 +39,34 @@ public class AccountLockManager { } /** - * Acquires a distributed, pessimistic lock for the accounts identified by the given phone numbers. By design, the - * accounts need not actually exist in order to acquire a lock; this allows lock acquisition for operations that span - * account lifecycle changes (like deleting an account or changing a phone number). The given task runs once locks for - * all given phone numbers have been acquired, and the locks are released as soon as the task completes by any means. + * Acquires a distributed, pessimistic lock for the accounts identified by the given phone number identifiers. By + * design, the accounts need not actually exist in order to acquire a lock; this allows lock acquisition for + * operations that span account lifecycle changes (like deleting an account or changing a phone number). The given + * task runs once locks for all given identifiers have been acquired, and the locks are released as soon as the task + * completes by any means. * - * @param e164s the phone numbers for which to acquire a distributed, pessimistic lock * @param phoneNumberIdentifiers the phone number identifiers for which to acquire a distributed, pessimistic lock * @param task the task to execute once locks have been acquired * @param lockAcquisitionExecutor the executor on which to run blocking lock acquire/release tasks. this executor * should not use virtual threads. * @throws InterruptedException if interrupted while acquiring a lock */ - public void withLock(final List e164s, final List phoneNumberIdentifiers, final Runnable task, + public void withLock(final List phoneNumberIdentifiers, final Runnable task, final Executor lockAcquisitionExecutor) { - if (e164s.isEmpty()) { - throw new IllegalArgumentException("List of e164s to lock must not be empty"); - } if (phoneNumberIdentifiers.isEmpty()) { throw new IllegalArgumentException("List of PNIs to lock must not be empty"); } - final List allIdentifiers = Stream.concat(e164s.stream(), - phoneNumberIdentifiers.stream().map(UUID::toString)) - .toList(); - final List lockItems = new ArrayList<>(allIdentifiers.size()); + final List lockItems = new ArrayList<>(phoneNumberIdentifiers.size()); try { // Offload the acquire/release tasks to the dedicated lock acquisition executor. The lock client performs blocking // operations while holding locks which forces thread pinning when this method runs on a virtual thread. // https://github.com/awslabs/amazon-dynamodb-lock-client/issues/97 CompletableFuture.runAsync(() -> { - for (final String identifier : allIdentifiers) { + for (final UUID pni : phoneNumberIdentifiers) { try { - lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(identifier) + lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(pni.toString()) .withAcquireReleasedLocksConsistently(true) .build())); } catch (final InterruptedException e) { @@ -95,37 +88,31 @@ public class AccountLockManager { } /** - * Acquires a distributed, pessimistic lock for the accounts identified by the given phone numbers. By design, the - * accounts need not actually exist in order to acquire a lock; this allows lock acquisition for operations that span - * account lifecycle changes (like deleting an account or changing a phone number). The given task runs once locks for - * all given phone numbers have been acquired, and the locks are released as soon as the task completes by any means. + * Acquires a distributed, pessimistic lock for the accounts identified by the given phone number identifiers. By + * design, the accounts need not actually exist in order to acquire a lock; this allows lock acquisition for + * operations that span account lifecycle changes (like deleting an account or changing a phone number). The given + * task runs once locks for all given identifiers have been acquired, and the locks are released as soon as the task + * completes by any means. * - * @param e164s the phone numbers for which to acquire a distributed, pessimistic lock * @param phoneNumberIdentifiers the phone number identifiers for which to acquire a distributed, pessimistic lock * @param taskSupplier a supplier for the task to execute once locks have been acquired * @param executor the executor on which to acquire and release locks * @return a future that completes normally when the given task has executed successfully and all locks have been * released; the returned future may fail with an {@link InterruptedException} if interrupted while acquiring a lock */ - public CompletableFuture withLockAsync(final List e164s, final List phoneNumberIdentifiers, + public CompletableFuture withLockAsync(final List phoneNumberIdentifiers, final Supplier> taskSupplier, final Executor executor) { - if (e164s.isEmpty()) { - throw new IllegalArgumentException("List of e164s to lock must not be empty"); - } if (phoneNumberIdentifiers.isEmpty()) { throw new IllegalArgumentException("List of PNIs to lock must not be empty"); } - final List allIdentifiers = Stream.concat(e164s.stream(), - phoneNumberIdentifiers.stream().map(UUID::toString)) - .toList(); - final List lockItems = new ArrayList<>(allIdentifiers.size()); + final List lockItems = new ArrayList<>(phoneNumberIdentifiers.size()); return CompletableFuture.runAsync(() -> { - for (final String identifier : allIdentifiers) { + for (final UUID pni : phoneNumberIdentifiers) { try { - lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(identifier) + lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(pni.toString()) .withAcquireReleasedLocksConsistently(true) .build())); } catch (final InterruptedException e) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 7ad620395..d2de43848 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -274,7 +274,7 @@ public class AccountsManager extends RedisPubSubAdapter implemen final UUID phoneNumberIdentifier = phoneNumberIdentifiers.getPhoneNumberIdentifier(number).join(); return createTimer.record(() -> { - accountLockManager.withLock(List.of(number), List.of(phoneNumberIdentifier), () -> { + accountLockManager.withLock(List.of(phoneNumberIdentifier), () -> { final Optional maybeRecentlyDeletedAccountIdentifier = accounts.findRecentlyDeletedAccountIdentifier(number); @@ -396,8 +396,7 @@ public class AccountsManager extends RedisPubSubAdapter implemen } public CompletableFuture> addDevice(final Account account, final DeviceSpec deviceSpec, final String linkDeviceToken) { - return accountLockManager.withLockAsync(List.of(account.getNumber()), - List.of(account.getPhoneNumberIdentifier()), + return accountLockManager.withLockAsync(List.of(account.getPhoneNumberIdentifier()), () -> addDevice(account.getIdentifier(IdentityType.ACI), deviceSpec, linkDeviceToken, MAX_UPDATE_ATTEMPTS), accountLockExecutor); } @@ -585,7 +584,7 @@ public class AccountsManager extends RedisPubSubAdapter implemen throw new IllegalArgumentException("Cannot remove primary device"); } - return accountLockManager.withLockAsync(List.of(account.getNumber()), List.of(account.getPhoneNumberIdentifier()), + return accountLockManager.withLockAsync(List.of(account.getPhoneNumberIdentifier()), () -> removeDevice(account.getIdentifier(IdentityType.ACI), deviceId, MAX_UPDATE_ATTEMPTS), accountLockExecutor); } @@ -653,8 +652,7 @@ public class AccountsManager extends RedisPubSubAdapter implemen final AtomicReference updatedAccount = new AtomicReference<>(); final UUID targetPhoneNumberIdentifier = phoneNumberIdentifiers.getPhoneNumberIdentifier(targetNumber).join(); - accountLockManager.withLock(List.of(account.getNumber(), targetNumber), - List.of(account.getPhoneNumberIdentifier(), targetPhoneNumberIdentifier), () -> { + accountLockManager.withLock(List.of(account.getPhoneNumberIdentifier(), targetPhoneNumberIdentifier), () -> { redisDelete(account); // There are three possible states for accounts associated with the target phone number: @@ -1225,8 +1223,7 @@ public class AccountsManager extends RedisPubSubAdapter implemen public CompletableFuture delete(final Account account, final DeletionReason deletionReason) { final Timer.Sample sample = Timer.start(); - return accountLockManager.withLockAsync(List.of(account.getNumber()), List.of(account.getPhoneNumberIdentifier()), - () -> delete(account), + return accountLockManager.withLockAsync(List.of(account.getPhoneNumberIdentifier()), () -> delete(account), accountLockExecutor) .whenComplete((ignored, throwable) -> { sample.stop(deleteTimer); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ClientPublicKeysManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ClientPublicKeysManager.java index bb7881ab9..41a6b627d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ClientPublicKeysManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ClientPublicKeysManager.java @@ -41,8 +41,7 @@ public class ClientPublicKeysManager { * @return a future that completes when the given key has been stored */ public CompletableFuture setPublicKey(final Account account, final byte deviceId, final ECPublicKey publicKey) { - return accountLockManager.withLockAsync(List.of(account.getNumber()), - List.of(account.getPhoneNumberIdentifier()), + return accountLockManager.withLockAsync(List.of(account.getPhoneNumberIdentifier()), () -> clientPublicKeys.setPublicKey(account.getIdentifier(IdentityType.ACI), deviceId, publicKey), accountLockExecutor); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java index b8866fb21..faf018ff2 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java @@ -9,7 +9,6 @@ import static org.mockito.Mockito.verify; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient; import com.amazonaws.services.dynamodbv2.ReleaseLockOptions; -import com.google.i18n.phonenumbers.PhoneNumberUtil; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -28,12 +27,6 @@ class AccountLockManagerTest { private AccountLockManager accountLockManager; - private static final String FIRST_NUMBER = PhoneNumberUtil.getInstance().format( - PhoneNumberUtil.getInstance().getExampleNumber("US"), PhoneNumberUtil.PhoneNumberFormat.E164); - - private static final String SECOND_NUMBER = PhoneNumberUtil.getInstance().format( - PhoneNumberUtil.getInstance().getExampleNumber("JP"), PhoneNumberUtil.PhoneNumberFormat.E164); - private static final UUID FIRST_PNI = UUID.randomUUID(); private static final UUID SECOND_PNI = UUID.randomUUID(); @@ -55,53 +48,51 @@ class AccountLockManagerTest { @Test void withLock() throws InterruptedException { - accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), List.of(FIRST_PNI, SECOND_PNI), () -> { + accountLockManager.withLock(List.of(FIRST_PNI, SECOND_PNI), () -> { }, executor); - verify(lockClient, times(4)).acquireLock(any()); - verify(lockClient, times(4)).releaseLock(any(ReleaseLockOptions.class)); + verify(lockClient, times(2)).acquireLock(any()); + verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class)); } @Test void withLockTaskThrowsException() throws InterruptedException { - assertThrows(RuntimeException.class, - () -> accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), List.of(FIRST_PNI, SECOND_PNI), () -> { - throw new RuntimeException(); + assertThrows(RuntimeException.class, () -> accountLockManager.withLock(List.of(FIRST_PNI, SECOND_PNI), () -> { + throw new RuntimeException(); }, executor)); - verify(lockClient, times(4)).acquireLock(any()); - verify(lockClient, times(4)).releaseLock(any(ReleaseLockOptions.class)); + verify(lockClient, times(2)).acquireLock(any()); + verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class)); } @Test void withLockEmptyList() { final Runnable task = mock(Runnable.class); - assertThrows(IllegalArgumentException.class, - () -> accountLockManager.withLock(Collections.emptyList(), Collections.emptyList(), () -> { - }, - executor)); + assertThrows(IllegalArgumentException.class, () -> accountLockManager.withLock(Collections.emptyList(), () -> { + }, + executor)); verify(task, never()).run(); } @Test void withLockAsync() throws InterruptedException { - accountLockManager.withLockAsync(List.of(FIRST_NUMBER, SECOND_NUMBER), + accountLockManager.withLockAsync( List.of(FIRST_PNI, SECOND_PNI), () -> CompletableFuture.completedFuture(null), executor).join(); - verify(lockClient, times(4)).acquireLock(any()); - verify(lockClient, times(4)).releaseLock(any(ReleaseLockOptions.class)); + verify(lockClient, times(2)).acquireLock(any()); + verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class)); } @Test void withLockAsyncTaskThrowsException() throws InterruptedException { assertThrows(RuntimeException.class, - () -> accountLockManager.withLockAsync(List.of(FIRST_NUMBER, SECOND_NUMBER), + () -> accountLockManager.withLockAsync( List.of(FIRST_PNI, SECOND_PNI), () -> CompletableFuture.failedFuture(new RuntimeException()), executor) .join()); - verify(lockClient, times(4)).acquireLock(any()); - verify(lockClient, times(4)).releaseLock(any(ReleaseLockOptions.class)); + verify(lockClient, times(2)).acquireLock(any()); + verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class)); } @Test @@ -109,8 +100,8 @@ class AccountLockManagerTest { final Runnable task = mock(Runnable.class); assertThrows(IllegalArgumentException.class, - () -> accountLockManager.withLockAsync(Collections.emptyList(), - Collections.emptyList(), () -> CompletableFuture.completedFuture(null), executor)); + () -> accountLockManager.withLockAsync(Collections.emptyList(), () -> CompletableFuture.completedFuture(null), + executor)); verify(task, never()).run(); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java index 6d9706a04..3968d53ff 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java @@ -107,14 +107,14 @@ class AccountsManagerConcurrentModificationIntegrationTest { final AccountLockManager accountLockManager = mock(AccountLockManager.class); doAnswer(invocation -> { - final Runnable task = invocation.getArgument(2); + final Runnable task = invocation.getArgument(1); task.run(); return null; - }).when(accountLockManager).withLock(any(), anyList(), any(), any()); + }).when(accountLockManager).withLock(anyList(), any(), any()); - when(accountLockManager.withLockAsync(any(), anyList(), any(), any())).thenAnswer(invocation -> { - final Supplier> taskSupplier = invocation.getArgument(2); + when(accountLockManager.withLockAsync(anyList(), any(), any())).thenAnswer(invocation -> { + final Supplier> taskSupplier = invocation.getArgument(1); taskSupplier.get().join(); return CompletableFuture.completedFuture(null); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java index 7e43a6860..226ecd74e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java @@ -209,14 +209,14 @@ class AccountsManagerTest { final AccountLockManager accountLockManager = mock(AccountLockManager.class); doAnswer(invocation -> { - final Runnable task = invocation.getArgument(2); + final Runnable task = invocation.getArgument(1); task.run(); return null; - }).when(accountLockManager).withLock(any(), anyList(), any(), any()); + }).when(accountLockManager).withLock(anyList(), any(), any()); - when(accountLockManager.withLockAsync(any(), anyList(), any(), any())).thenAnswer(invocation -> { - final Supplier> taskSupplier = invocation.getArgument(2); + when(accountLockManager.withLockAsync(anyList(), any(), any())).thenAnswer(invocation -> { + final Supplier> taskSupplier = invocation.getArgument(1); return taskSupplier.get(); }); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java index 0b25deb32..0959cc130 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java @@ -114,14 +114,14 @@ class AccountsManagerUsernameIntegrationTest { final AccountLockManager accountLockManager = mock(AccountLockManager.class); doAnswer(invocation -> { - final Runnable task = invocation.getArgument(2); + final Runnable task = invocation.getArgument(1); task.run(); return null; - }).when(accountLockManager).withLock(any(), anyList(), any(), any()); + }).when(accountLockManager).withLock(anyList(), any(), any()); - when(accountLockManager.withLockAsync(any(), anyList(), any(), any())).thenAnswer(invocation -> { - final Supplier> taskSupplier = invocation.getArgument(2); + when(accountLockManager.withLockAsync(anyList(), any(), any())).thenAnswer(invocation -> { + final Supplier> taskSupplier = invocation.getArgument(1); taskSupplier.get().join(); return CompletableFuture.completedFuture(null); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java index 6fc7c44fd..1d9fd4527 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java @@ -98,10 +98,10 @@ public final class DynamoDbExtensionSchema { ), DELETED_ACCOUNTS_LOCK("deleted_accounts_lock_test", - AccountLockManager.KEY_ACCOUNT_E164, + AccountLockManager.KEY_ACCOUNT_PNI, null, List.of(AttributeDefinition.builder() - .attributeName(AccountLockManager.KEY_ACCOUNT_E164) + .attributeName(AccountLockManager.KEY_ACCOUNT_PNI) .attributeType(ScalarAttributeType.S).build()), List.of(), List.of()),