From 480abebf7e8e3196ddfd04133122bab32afe5c49 Mon Sep 17 00:00:00 2001 From: ravi-signal <99042880+ravi-signal@users.noreply.github.com> Date: Wed, 31 Jan 2024 14:27:16 -0600 Subject: [PATCH] Offload account lock updates to accountLockExecutor --- .../storage/AccountLockManager.java | 33 ++++++++++++++----- .../storage/AccountsManager.java | 4 +-- .../storage/AccountLockManagerTest.java | 6 ++-- ...ConcurrentModificationIntegrationTest.java | 2 +- .../storage/AccountsManagerTest.java | 2 +- ...ccountsManagerUsernameIntegrationTest.java | 2 +- 6 files changed, 32 insertions(+), 17 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java index c2f99bf16..d8a8c9868 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountLockManager.java @@ -45,10 +45,12 @@ public class AccountLockManager { * * @param e164s the phone numbers for which to acquire a distributed, pessimistic lock * @param task the task to execute once locks have been acquired + * @param lockAcquisitionExecutor the executor on which to run blocking lock acquire/release tasks. this executor + * should not use virtual threads. * * @throws InterruptedException if interrupted while acquiring a lock */ - public void withLock(final List e164s, final Runnable task) throws InterruptedException { + public void withLock(final List e164s, final Runnable task, final Executor lockAcquisitionExecutor) { if (e164s.isEmpty()) { throw new IllegalArgumentException("List of e164s to lock must not be empty"); } @@ -56,17 +58,30 @@ public class AccountLockManager { final List lockItems = new ArrayList<>(e164s.size()); try { - for (final String e164 : e164s) { - lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(e164) - .withAcquireReleasedLocksConsistently(true) - .build())); - } + // Offload the acquire/release tasks to the dedicated lock acquisition executor. The lock client performs blocking + // operations while holding locks which forces thread pinning when this method runs on a virtual thread. + // https://github.com/awslabs/amazon-dynamodb-lock-client/issues/97 + CompletableFuture.runAsync(() -> { + for (final String e164 : e164s) { + try { + lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(e164) + .withAcquireReleasedLocksConsistently(true) + .build())); + } catch (final InterruptedException e) { + throw new CompletionException(e); + } + } + }, lockAcquisitionExecutor).join(); task.run(); } finally { - lockItems.forEach(lockItem -> lockClient.releaseLock(ReleaseLockOptions.builder(lockItem) - .withBestEffort(true) - .build())); + CompletableFuture.runAsync(() -> { + for (final LockItem lockItem : lockItems) { + lockClient.releaseLock(ReleaseLockOptions.builder(lockItem) + .withBestEffort(true) + .build()); + } + }, lockAcquisitionExecutor).join(); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index e68f96138..eaefac30b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -263,7 +263,7 @@ public class AccountsManager { accountAttributes.recoveryPassword().ifPresent(registrationRecoveryPassword -> registrationRecoveryPasswordsManager.storeForCurrentNumber(account.getNumber(), registrationRecoveryPassword)); - }); + }, accountLockExecutor); return account; } @@ -423,7 +423,7 @@ public class AccountsManager { AccountChangeValidator.NUMBER_CHANGE_VALIDATOR); updatedAccount.set(numberChangedAccount); - }); + }, accountLockExecutor); return updatedAccount.get(); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java index 154a67590..93655f1e2 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountLockManagerTest.java @@ -51,7 +51,7 @@ class AccountLockManagerTest { @Test void withLock() throws InterruptedException { - accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), () -> {}); + accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), () -> {}, executor); verify(lockClient, times(2)).acquireLock(any()); verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class)); @@ -61,7 +61,7 @@ class AccountLockManagerTest { void withLockTaskThrowsException() throws InterruptedException { assertThrows(RuntimeException.class, () -> accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), () -> { throw new RuntimeException(); - })); + }, executor)); verify(lockClient, times(2)).acquireLock(any()); verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class)); @@ -71,7 +71,7 @@ class AccountLockManagerTest { void withLockEmptyList() { final Runnable task = mock(Runnable.class); - assertThrows(IllegalArgumentException.class, () -> accountLockManager.withLock(Collections.emptyList(), () -> {})); + assertThrows(IllegalArgumentException.class, () -> accountLockManager.withLock(Collections.emptyList(), () -> {}, executor)); verify(task, never()).run(); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java index efe890848..359825852 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java @@ -107,7 +107,7 @@ class AccountsManagerConcurrentModificationIntegrationTest { task.run(); return null; - }).when(accountLockManager).withLock(any(), any()); + }).when(accountLockManager).withLock(any(), any(), any()); when(accountLockManager.withLockAsync(any(), any(), any())).thenAnswer(invocation -> { final Supplier> taskSupplier = invocation.getArgument(1); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java index 1f2674e6a..a5bc86d5a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java @@ -203,7 +203,7 @@ class AccountsManagerTest { task.run(); return null; - }).when(accountLockManager).withLock(any(), any()); + }).when(accountLockManager).withLock(any(), any(), any()); when(accountLockManager.withLockAsync(any(), any(), any())).thenAnswer(invocation -> { final Supplier> taskSupplier = invocation.getArgument(1); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java index 363d329fa..2da7ea0e0 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java @@ -116,7 +116,7 @@ class AccountsManagerUsernameIntegrationTest { task.run(); return null; - }).when(accountLockManager).withLock(any(), any()); + }).when(accountLockManager).withLock(any(), any(), any()); when(accountLockManager.withLockAsync(any(), any(), any())).thenAnswer(invocation -> { final Supplier> taskSupplier = invocation.getArgument(1);