Offload account lock updates to accountLockExecutor
This commit is contained in:
parent
b924dea045
commit
480abebf7e
|
@ -45,10 +45,12 @@ public class AccountLockManager {
|
|||
*
|
||||
* @param e164s the phone numbers for which to acquire a distributed, pessimistic lock
|
||||
* @param task the task to execute once locks have been acquired
|
||||
* @param lockAcquisitionExecutor the executor on which to run blocking lock acquire/release tasks. this executor
|
||||
* should not use virtual threads.
|
||||
*
|
||||
* @throws InterruptedException if interrupted while acquiring a lock
|
||||
*/
|
||||
public void withLock(final List<String> e164s, final Runnable task) throws InterruptedException {
|
||||
public void withLock(final List<String> e164s, final Runnable task, final Executor lockAcquisitionExecutor) {
|
||||
if (e164s.isEmpty()) {
|
||||
throw new IllegalArgumentException("List of e164s to lock must not be empty");
|
||||
}
|
||||
|
@ -56,17 +58,30 @@ public class AccountLockManager {
|
|||
final List<LockItem> lockItems = new ArrayList<>(e164s.size());
|
||||
|
||||
try {
|
||||
for (final String e164 : e164s) {
|
||||
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(e164)
|
||||
.withAcquireReleasedLocksConsistently(true)
|
||||
.build()));
|
||||
}
|
||||
// Offload the acquire/release tasks to the dedicated lock acquisition executor. The lock client performs blocking
|
||||
// operations while holding locks which forces thread pinning when this method runs on a virtual thread.
|
||||
// https://github.com/awslabs/amazon-dynamodb-lock-client/issues/97
|
||||
CompletableFuture.runAsync(() -> {
|
||||
for (final String e164 : e164s) {
|
||||
try {
|
||||
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(e164)
|
||||
.withAcquireReleasedLocksConsistently(true)
|
||||
.build()));
|
||||
} catch (final InterruptedException e) {
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
}
|
||||
}, lockAcquisitionExecutor).join();
|
||||
|
||||
task.run();
|
||||
} finally {
|
||||
lockItems.forEach(lockItem -> lockClient.releaseLock(ReleaseLockOptions.builder(lockItem)
|
||||
.withBestEffort(true)
|
||||
.build()));
|
||||
CompletableFuture.runAsync(() -> {
|
||||
for (final LockItem lockItem : lockItems) {
|
||||
lockClient.releaseLock(ReleaseLockOptions.builder(lockItem)
|
||||
.withBestEffort(true)
|
||||
.build());
|
||||
}
|
||||
}, lockAcquisitionExecutor).join();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -263,7 +263,7 @@ public class AccountsManager {
|
|||
|
||||
accountAttributes.recoveryPassword().ifPresent(registrationRecoveryPassword ->
|
||||
registrationRecoveryPasswordsManager.storeForCurrentNumber(account.getNumber(), registrationRecoveryPassword));
|
||||
});
|
||||
}, accountLockExecutor);
|
||||
|
||||
return account;
|
||||
}
|
||||
|
@ -423,7 +423,7 @@ public class AccountsManager {
|
|||
AccountChangeValidator.NUMBER_CHANGE_VALIDATOR);
|
||||
|
||||
updatedAccount.set(numberChangedAccount);
|
||||
});
|
||||
}, accountLockExecutor);
|
||||
|
||||
return updatedAccount.get();
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ class AccountLockManagerTest {
|
|||
|
||||
@Test
|
||||
void withLock() throws InterruptedException {
|
||||
accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), () -> {});
|
||||
accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), () -> {}, executor);
|
||||
|
||||
verify(lockClient, times(2)).acquireLock(any());
|
||||
verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class));
|
||||
|
@ -61,7 +61,7 @@ class AccountLockManagerTest {
|
|||
void withLockTaskThrowsException() throws InterruptedException {
|
||||
assertThrows(RuntimeException.class, () -> accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), () -> {
|
||||
throw new RuntimeException();
|
||||
}));
|
||||
}, executor));
|
||||
|
||||
verify(lockClient, times(2)).acquireLock(any());
|
||||
verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class));
|
||||
|
@ -71,7 +71,7 @@ class AccountLockManagerTest {
|
|||
void withLockEmptyList() {
|
||||
final Runnable task = mock(Runnable.class);
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> accountLockManager.withLock(Collections.emptyList(), () -> {}));
|
||||
assertThrows(IllegalArgumentException.class, () -> accountLockManager.withLock(Collections.emptyList(), () -> {}, executor));
|
||||
verify(task, never()).run();
|
||||
}
|
||||
|
||||
|
|
|
@ -107,7 +107,7 @@ class AccountsManagerConcurrentModificationIntegrationTest {
|
|||
task.run();
|
||||
|
||||
return null;
|
||||
}).when(accountLockManager).withLock(any(), any());
|
||||
}).when(accountLockManager).withLock(any(), any(), any());
|
||||
|
||||
when(accountLockManager.withLockAsync(any(), any(), any())).thenAnswer(invocation -> {
|
||||
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(1);
|
||||
|
|
|
@ -203,7 +203,7 @@ class AccountsManagerTest {
|
|||
task.run();
|
||||
|
||||
return null;
|
||||
}).when(accountLockManager).withLock(any(), any());
|
||||
}).when(accountLockManager).withLock(any(), any(), any());
|
||||
|
||||
when(accountLockManager.withLockAsync(any(), any(), any())).thenAnswer(invocation -> {
|
||||
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(1);
|
||||
|
|
|
@ -116,7 +116,7 @@ class AccountsManagerUsernameIntegrationTest {
|
|||
task.run();
|
||||
|
||||
return null;
|
||||
}).when(accountLockManager).withLock(any(), any());
|
||||
}).when(accountLockManager).withLock(any(), any(), any());
|
||||
|
||||
when(accountLockManager.withLockAsync(any(), any(), any())).thenAnswer(invocation -> {
|
||||
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(1);
|
||||
|
|
Loading…
Reference in New Issue