Remove `String e164` from `AccountLockManager`
This commit is contained in:
parent
6421438f64
commit
1c3cf39b8a
|
@ -14,19 +14,18 @@ import java.util.concurrent.CompletionException;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
||||
|
||||
public class AccountLockManager {
|
||||
|
||||
private final AmazonDynamoDBLockClient lockClient;
|
||||
|
||||
static final String KEY_ACCOUNT_E164 = "P";
|
||||
static final String KEY_ACCOUNT_PNI = "P";
|
||||
|
||||
public AccountLockManager(final DynamoDbClient lockDynamoDb, final String lockTableName) {
|
||||
this(new AmazonDynamoDBLockClient(
|
||||
AmazonDynamoDBLockClientOptions.builder(lockDynamoDb, lockTableName)
|
||||
.withPartitionKeyName(KEY_ACCOUNT_E164)
|
||||
.withPartitionKeyName(KEY_ACCOUNT_PNI)
|
||||
.withLeaseDuration(15L)
|
||||
.withHeartbeatPeriod(2L)
|
||||
.withTimeUnit(TimeUnit.SECONDS)
|
||||
|
@ -40,40 +39,34 @@ public class AccountLockManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Acquires a distributed, pessimistic lock for the accounts identified by the given phone numbers. By design, the
|
||||
* accounts need not actually exist in order to acquire a lock; this allows lock acquisition for operations that span
|
||||
* account lifecycle changes (like deleting an account or changing a phone number). The given task runs once locks for
|
||||
* all given phone numbers have been acquired, and the locks are released as soon as the task completes by any means.
|
||||
* Acquires a distributed, pessimistic lock for the accounts identified by the given phone number identifiers. By
|
||||
* design, the accounts need not actually exist in order to acquire a lock; this allows lock acquisition for
|
||||
* operations that span account lifecycle changes (like deleting an account or changing a phone number). The given
|
||||
* task runs once locks for all given identifiers have been acquired, and the locks are released as soon as the task
|
||||
* completes by any means.
|
||||
*
|
||||
* @param e164s the phone numbers for which to acquire a distributed, pessimistic lock
|
||||
* @param phoneNumberIdentifiers the phone number identifiers for which to acquire a distributed, pessimistic lock
|
||||
* @param task the task to execute once locks have been acquired
|
||||
* @param lockAcquisitionExecutor the executor on which to run blocking lock acquire/release tasks. this executor
|
||||
* should not use virtual threads.
|
||||
* @throws InterruptedException if interrupted while acquiring a lock
|
||||
*/
|
||||
public void withLock(final List<String> e164s, final List<UUID> phoneNumberIdentifiers, final Runnable task,
|
||||
public void withLock(final List<UUID> phoneNumberIdentifiers, final Runnable task,
|
||||
final Executor lockAcquisitionExecutor) {
|
||||
if (e164s.isEmpty()) {
|
||||
throw new IllegalArgumentException("List of e164s to lock must not be empty");
|
||||
}
|
||||
if (phoneNumberIdentifiers.isEmpty()) {
|
||||
throw new IllegalArgumentException("List of PNIs to lock must not be empty");
|
||||
}
|
||||
|
||||
final List<String> allIdentifiers = Stream.concat(e164s.stream(),
|
||||
phoneNumberIdentifiers.stream().map(UUID::toString))
|
||||
.toList();
|
||||
final List<LockItem> lockItems = new ArrayList<>(allIdentifiers.size());
|
||||
final List<LockItem> lockItems = new ArrayList<>(phoneNumberIdentifiers.size());
|
||||
|
||||
try {
|
||||
// Offload the acquire/release tasks to the dedicated lock acquisition executor. The lock client performs blocking
|
||||
// operations while holding locks which forces thread pinning when this method runs on a virtual thread.
|
||||
// https://github.com/awslabs/amazon-dynamodb-lock-client/issues/97
|
||||
CompletableFuture.runAsync(() -> {
|
||||
for (final String identifier : allIdentifiers) {
|
||||
for (final UUID pni : phoneNumberIdentifiers) {
|
||||
try {
|
||||
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(identifier)
|
||||
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(pni.toString())
|
||||
.withAcquireReleasedLocksConsistently(true)
|
||||
.build()));
|
||||
} catch (final InterruptedException e) {
|
||||
|
@ -95,37 +88,31 @@ public class AccountLockManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Acquires a distributed, pessimistic lock for the accounts identified by the given phone numbers. By design, the
|
||||
* accounts need not actually exist in order to acquire a lock; this allows lock acquisition for operations that span
|
||||
* account lifecycle changes (like deleting an account or changing a phone number). The given task runs once locks for
|
||||
* all given phone numbers have been acquired, and the locks are released as soon as the task completes by any means.
|
||||
* Acquires a distributed, pessimistic lock for the accounts identified by the given phone number identifiers. By
|
||||
* design, the accounts need not actually exist in order to acquire a lock; this allows lock acquisition for
|
||||
* operations that span account lifecycle changes (like deleting an account or changing a phone number). The given
|
||||
* task runs once locks for all given identifiers have been acquired, and the locks are released as soon as the task
|
||||
* completes by any means.
|
||||
*
|
||||
* @param e164s the phone numbers for which to acquire a distributed, pessimistic lock
|
||||
* @param phoneNumberIdentifiers the phone number identifiers for which to acquire a distributed, pessimistic lock
|
||||
* @param taskSupplier a supplier for the task to execute once locks have been acquired
|
||||
* @param executor the executor on which to acquire and release locks
|
||||
* @return a future that completes normally when the given task has executed successfully and all locks have been
|
||||
* released; the returned future may fail with an {@link InterruptedException} if interrupted while acquiring a lock
|
||||
*/
|
||||
public <T> CompletableFuture<T> withLockAsync(final List<String> e164s, final List<UUID> phoneNumberIdentifiers,
|
||||
public <T> CompletableFuture<T> withLockAsync(final List<UUID> phoneNumberIdentifiers,
|
||||
final Supplier<CompletableFuture<T>> taskSupplier, final Executor executor) {
|
||||
|
||||
if (e164s.isEmpty()) {
|
||||
throw new IllegalArgumentException("List of e164s to lock must not be empty");
|
||||
}
|
||||
if (phoneNumberIdentifiers.isEmpty()) {
|
||||
throw new IllegalArgumentException("List of PNIs to lock must not be empty");
|
||||
}
|
||||
|
||||
final List<String> allIdentifiers = Stream.concat(e164s.stream(),
|
||||
phoneNumberIdentifiers.stream().map(UUID::toString))
|
||||
.toList();
|
||||
final List<LockItem> lockItems = new ArrayList<>(allIdentifiers.size());
|
||||
final List<LockItem> lockItems = new ArrayList<>(phoneNumberIdentifiers.size());
|
||||
|
||||
return CompletableFuture.runAsync(() -> {
|
||||
for (final String identifier : allIdentifiers) {
|
||||
for (final UUID pni : phoneNumberIdentifiers) {
|
||||
try {
|
||||
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(identifier)
|
||||
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(pni.toString())
|
||||
.withAcquireReleasedLocksConsistently(true)
|
||||
.build()));
|
||||
} catch (final InterruptedException e) {
|
||||
|
|
|
@ -274,7 +274,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
|||
final UUID phoneNumberIdentifier = phoneNumberIdentifiers.getPhoneNumberIdentifier(number).join();
|
||||
|
||||
return createTimer.record(() -> {
|
||||
accountLockManager.withLock(List.of(number), List.of(phoneNumberIdentifier), () -> {
|
||||
accountLockManager.withLock(List.of(phoneNumberIdentifier), () -> {
|
||||
final Optional<UUID> maybeRecentlyDeletedAccountIdentifier =
|
||||
accounts.findRecentlyDeletedAccountIdentifier(number);
|
||||
|
||||
|
@ -396,8 +396,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
|||
}
|
||||
|
||||
public CompletableFuture<Pair<Account, Device>> addDevice(final Account account, final DeviceSpec deviceSpec, final String linkDeviceToken) {
|
||||
return accountLockManager.withLockAsync(List.of(account.getNumber()),
|
||||
List.of(account.getPhoneNumberIdentifier()),
|
||||
return accountLockManager.withLockAsync(List.of(account.getPhoneNumberIdentifier()),
|
||||
() -> addDevice(account.getIdentifier(IdentityType.ACI), deviceSpec, linkDeviceToken, MAX_UPDATE_ATTEMPTS),
|
||||
accountLockExecutor);
|
||||
}
|
||||
|
@ -585,7 +584,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
|||
throw new IllegalArgumentException("Cannot remove primary device");
|
||||
}
|
||||
|
||||
return accountLockManager.withLockAsync(List.of(account.getNumber()), List.of(account.getPhoneNumberIdentifier()),
|
||||
return accountLockManager.withLockAsync(List.of(account.getPhoneNumberIdentifier()),
|
||||
() -> removeDevice(account.getIdentifier(IdentityType.ACI), deviceId, MAX_UPDATE_ATTEMPTS),
|
||||
accountLockExecutor);
|
||||
}
|
||||
|
@ -653,8 +652,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
|||
final AtomicReference<Account> updatedAccount = new AtomicReference<>();
|
||||
final UUID targetPhoneNumberIdentifier = phoneNumberIdentifiers.getPhoneNumberIdentifier(targetNumber).join();
|
||||
|
||||
accountLockManager.withLock(List.of(account.getNumber(), targetNumber),
|
||||
List.of(account.getPhoneNumberIdentifier(), targetPhoneNumberIdentifier), () -> {
|
||||
accountLockManager.withLock(List.of(account.getPhoneNumberIdentifier(), targetPhoneNumberIdentifier), () -> {
|
||||
redisDelete(account);
|
||||
|
||||
// There are three possible states for accounts associated with the target phone number:
|
||||
|
@ -1225,8 +1223,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
|||
public CompletableFuture<Void> delete(final Account account, final DeletionReason deletionReason) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return accountLockManager.withLockAsync(List.of(account.getNumber()), List.of(account.getPhoneNumberIdentifier()),
|
||||
() -> delete(account),
|
||||
return accountLockManager.withLockAsync(List.of(account.getPhoneNumberIdentifier()), () -> delete(account),
|
||||
accountLockExecutor)
|
||||
.whenComplete((ignored, throwable) -> {
|
||||
sample.stop(deleteTimer);
|
||||
|
|
|
@ -41,8 +41,7 @@ public class ClientPublicKeysManager {
|
|||
* @return a future that completes when the given key has been stored
|
||||
*/
|
||||
public CompletableFuture<Void> setPublicKey(final Account account, final byte deviceId, final ECPublicKey publicKey) {
|
||||
return accountLockManager.withLockAsync(List.of(account.getNumber()),
|
||||
List.of(account.getPhoneNumberIdentifier()),
|
||||
return accountLockManager.withLockAsync(List.of(account.getPhoneNumberIdentifier()),
|
||||
() -> clientPublicKeys.setPublicKey(account.getIdentifier(IdentityType.ACI), deviceId, publicKey),
|
||||
accountLockExecutor);
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ import static org.mockito.Mockito.verify;
|
|||
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
|
||||
import com.amazonaws.services.dynamodbv2.ReleaseLockOptions;
|
||||
import com.google.i18n.phonenumbers.PhoneNumberUtil;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
@ -28,12 +27,6 @@ class AccountLockManagerTest {
|
|||
|
||||
private AccountLockManager accountLockManager;
|
||||
|
||||
private static final String FIRST_NUMBER = PhoneNumberUtil.getInstance().format(
|
||||
PhoneNumberUtil.getInstance().getExampleNumber("US"), PhoneNumberUtil.PhoneNumberFormat.E164);
|
||||
|
||||
private static final String SECOND_NUMBER = PhoneNumberUtil.getInstance().format(
|
||||
PhoneNumberUtil.getInstance().getExampleNumber("JP"), PhoneNumberUtil.PhoneNumberFormat.E164);
|
||||
|
||||
private static final UUID FIRST_PNI = UUID.randomUUID();
|
||||
private static final UUID SECOND_PNI = UUID.randomUUID();
|
||||
|
||||
|
@ -55,53 +48,51 @@ class AccountLockManagerTest {
|
|||
|
||||
@Test
|
||||
void withLock() throws InterruptedException {
|
||||
accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), List.of(FIRST_PNI, SECOND_PNI), () -> {
|
||||
accountLockManager.withLock(List.of(FIRST_PNI, SECOND_PNI), () -> {
|
||||
}, executor);
|
||||
|
||||
verify(lockClient, times(4)).acquireLock(any());
|
||||
verify(lockClient, times(4)).releaseLock(any(ReleaseLockOptions.class));
|
||||
verify(lockClient, times(2)).acquireLock(any());
|
||||
verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void withLockTaskThrowsException() throws InterruptedException {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), List.of(FIRST_PNI, SECOND_PNI), () -> {
|
||||
throw new RuntimeException();
|
||||
assertThrows(RuntimeException.class, () -> accountLockManager.withLock(List.of(FIRST_PNI, SECOND_PNI), () -> {
|
||||
throw new RuntimeException();
|
||||
}, executor));
|
||||
|
||||
verify(lockClient, times(4)).acquireLock(any());
|
||||
verify(lockClient, times(4)).releaseLock(any(ReleaseLockOptions.class));
|
||||
verify(lockClient, times(2)).acquireLock(any());
|
||||
verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void withLockEmptyList() {
|
||||
final Runnable task = mock(Runnable.class);
|
||||
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> accountLockManager.withLock(Collections.emptyList(), Collections.emptyList(), () -> {
|
||||
},
|
||||
executor));
|
||||
assertThrows(IllegalArgumentException.class, () -> accountLockManager.withLock(Collections.emptyList(), () -> {
|
||||
},
|
||||
executor));
|
||||
verify(task, never()).run();
|
||||
}
|
||||
|
||||
@Test
|
||||
void withLockAsync() throws InterruptedException {
|
||||
accountLockManager.withLockAsync(List.of(FIRST_NUMBER, SECOND_NUMBER),
|
||||
accountLockManager.withLockAsync(
|
||||
List.of(FIRST_PNI, SECOND_PNI), () -> CompletableFuture.completedFuture(null), executor).join();
|
||||
|
||||
verify(lockClient, times(4)).acquireLock(any());
|
||||
verify(lockClient, times(4)).releaseLock(any(ReleaseLockOptions.class));
|
||||
verify(lockClient, times(2)).acquireLock(any());
|
||||
verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void withLockAsyncTaskThrowsException() throws InterruptedException {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> accountLockManager.withLockAsync(List.of(FIRST_NUMBER, SECOND_NUMBER),
|
||||
() -> accountLockManager.withLockAsync(
|
||||
List.of(FIRST_PNI, SECOND_PNI), () -> CompletableFuture.failedFuture(new RuntimeException()), executor)
|
||||
.join());
|
||||
|
||||
verify(lockClient, times(4)).acquireLock(any());
|
||||
verify(lockClient, times(4)).releaseLock(any(ReleaseLockOptions.class));
|
||||
verify(lockClient, times(2)).acquireLock(any());
|
||||
verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -109,8 +100,8 @@ class AccountLockManagerTest {
|
|||
final Runnable task = mock(Runnable.class);
|
||||
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> accountLockManager.withLockAsync(Collections.emptyList(),
|
||||
Collections.emptyList(), () -> CompletableFuture.completedFuture(null), executor));
|
||||
() -> accountLockManager.withLockAsync(Collections.emptyList(), () -> CompletableFuture.completedFuture(null),
|
||||
executor));
|
||||
|
||||
verify(task, never()).run();
|
||||
}
|
||||
|
|
|
@ -107,14 +107,14 @@ class AccountsManagerConcurrentModificationIntegrationTest {
|
|||
final AccountLockManager accountLockManager = mock(AccountLockManager.class);
|
||||
|
||||
doAnswer(invocation -> {
|
||||
final Runnable task = invocation.getArgument(2);
|
||||
final Runnable task = invocation.getArgument(1);
|
||||
task.run();
|
||||
|
||||
return null;
|
||||
}).when(accountLockManager).withLock(any(), anyList(), any(), any());
|
||||
}).when(accountLockManager).withLock(anyList(), any(), any());
|
||||
|
||||
when(accountLockManager.withLockAsync(any(), anyList(), any(), any())).thenAnswer(invocation -> {
|
||||
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(2);
|
||||
when(accountLockManager.withLockAsync(anyList(), any(), any())).thenAnswer(invocation -> {
|
||||
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(1);
|
||||
taskSupplier.get().join();
|
||||
|
||||
return CompletableFuture.completedFuture(null);
|
||||
|
|
|
@ -209,14 +209,14 @@ class AccountsManagerTest {
|
|||
final AccountLockManager accountLockManager = mock(AccountLockManager.class);
|
||||
|
||||
doAnswer(invocation -> {
|
||||
final Runnable task = invocation.getArgument(2);
|
||||
final Runnable task = invocation.getArgument(1);
|
||||
task.run();
|
||||
|
||||
return null;
|
||||
}).when(accountLockManager).withLock(any(), anyList(), any(), any());
|
||||
}).when(accountLockManager).withLock(anyList(), any(), any());
|
||||
|
||||
when(accountLockManager.withLockAsync(any(), anyList(), any(), any())).thenAnswer(invocation -> {
|
||||
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(2);
|
||||
when(accountLockManager.withLockAsync(anyList(), any(), any())).thenAnswer(invocation -> {
|
||||
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(1);
|
||||
return taskSupplier.get();
|
||||
});
|
||||
|
||||
|
|
|
@ -114,14 +114,14 @@ class AccountsManagerUsernameIntegrationTest {
|
|||
final AccountLockManager accountLockManager = mock(AccountLockManager.class);
|
||||
|
||||
doAnswer(invocation -> {
|
||||
final Runnable task = invocation.getArgument(2);
|
||||
final Runnable task = invocation.getArgument(1);
|
||||
task.run();
|
||||
|
||||
return null;
|
||||
}).when(accountLockManager).withLock(any(), anyList(), any(), any());
|
||||
}).when(accountLockManager).withLock(anyList(), any(), any());
|
||||
|
||||
when(accountLockManager.withLockAsync(any(), anyList(), any(), any())).thenAnswer(invocation -> {
|
||||
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(2);
|
||||
when(accountLockManager.withLockAsync(anyList(), any(), any())).thenAnswer(invocation -> {
|
||||
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(1);
|
||||
taskSupplier.get().join();
|
||||
|
||||
return CompletableFuture.completedFuture(null);
|
||||
|
|
|
@ -98,10 +98,10 @@ public final class DynamoDbExtensionSchema {
|
|||
),
|
||||
|
||||
DELETED_ACCOUNTS_LOCK("deleted_accounts_lock_test",
|
||||
AccountLockManager.KEY_ACCOUNT_E164,
|
||||
AccountLockManager.KEY_ACCOUNT_PNI,
|
||||
null,
|
||||
List.of(AttributeDefinition.builder()
|
||||
.attributeName(AccountLockManager.KEY_ACCOUNT_E164)
|
||||
.attributeName(AccountLockManager.KEY_ACCOUNT_PNI)
|
||||
.attributeType(ScalarAttributeType.S).build()),
|
||||
List.of(), List.of()),
|
||||
|
||||
|
|
Loading…
Reference in New Issue