Add PNI support to AccountLockManager
This commit is contained in:
parent
73812b06be
commit
eb55b80bdc
|
@ -8,11 +8,13 @@ import com.amazonaws.services.dynamodbv2.ReleaseLockOptions;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CompletionException;
|
import java.util.concurrent.CompletionException;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.Stream;
|
||||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
||||||
|
|
||||||
public class AccountLockManager {
|
public class AccountLockManager {
|
||||||
|
@ -43,28 +45,35 @@ public class AccountLockManager {
|
||||||
* account lifecycle changes (like deleting an account or changing a phone number). The given task runs once locks for
|
* account lifecycle changes (like deleting an account or changing a phone number). The given task runs once locks for
|
||||||
* all given phone numbers have been acquired, and the locks are released as soon as the task completes by any means.
|
* all given phone numbers have been acquired, and the locks are released as soon as the task completes by any means.
|
||||||
*
|
*
|
||||||
* @param e164s the phone numbers for which to acquire a distributed, pessimistic lock
|
* @param e164s the phone numbers for which to acquire a distributed, pessimistic lock
|
||||||
* @param task the task to execute once locks have been acquired
|
* @param phoneNumberIdentifiers the phone number identifiers for which to acquire a distributed, pessimistic lock
|
||||||
|
* @param task the task to execute once locks have been acquired
|
||||||
* @param lockAcquisitionExecutor the executor on which to run blocking lock acquire/release tasks. this executor
|
* @param lockAcquisitionExecutor the executor on which to run blocking lock acquire/release tasks. this executor
|
||||||
* should not use virtual threads.
|
* should not use virtual threads.
|
||||||
*
|
|
||||||
* @throws InterruptedException if interrupted while acquiring a lock
|
* @throws InterruptedException if interrupted while acquiring a lock
|
||||||
*/
|
*/
|
||||||
public void withLock(final List<String> e164s, final Runnable task, final Executor lockAcquisitionExecutor) {
|
public void withLock(final List<String> e164s, final List<UUID> phoneNumberIdentifiers, final Runnable task,
|
||||||
|
final Executor lockAcquisitionExecutor) {
|
||||||
if (e164s.isEmpty()) {
|
if (e164s.isEmpty()) {
|
||||||
throw new IllegalArgumentException("List of e164s to lock must not be empty");
|
throw new IllegalArgumentException("List of e164s to lock must not be empty");
|
||||||
}
|
}
|
||||||
|
if (phoneNumberIdentifiers.isEmpty()) {
|
||||||
|
throw new IllegalArgumentException("List of PNIs to lock must not be empty");
|
||||||
|
}
|
||||||
|
|
||||||
final List<LockItem> lockItems = new ArrayList<>(e164s.size());
|
final List<String> allIdentifiers = Stream.concat(e164s.stream(),
|
||||||
|
phoneNumberIdentifiers.stream().map(UUID::toString))
|
||||||
|
.toList();
|
||||||
|
final List<LockItem> lockItems = new ArrayList<>(allIdentifiers.size());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Offload the acquire/release tasks to the dedicated lock acquisition executor. The lock client performs blocking
|
// Offload the acquire/release tasks to the dedicated lock acquisition executor. The lock client performs blocking
|
||||||
// operations while holding locks which forces thread pinning when this method runs on a virtual thread.
|
// operations while holding locks which forces thread pinning when this method runs on a virtual thread.
|
||||||
// https://github.com/awslabs/amazon-dynamodb-lock-client/issues/97
|
// https://github.com/awslabs/amazon-dynamodb-lock-client/issues/97
|
||||||
CompletableFuture.runAsync(() -> {
|
CompletableFuture.runAsync(() -> {
|
||||||
for (final String e164 : e164s) {
|
for (final String identifier : allIdentifiers) {
|
||||||
try {
|
try {
|
||||||
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(e164)
|
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(identifier)
|
||||||
.withAcquireReleasedLocksConsistently(true)
|
.withAcquireReleasedLocksConsistently(true)
|
||||||
.build()));
|
.build()));
|
||||||
} catch (final InterruptedException e) {
|
} catch (final InterruptedException e) {
|
||||||
|
@ -91,27 +100,32 @@ public class AccountLockManager {
|
||||||
* account lifecycle changes (like deleting an account or changing a phone number). The given task runs once locks for
|
* account lifecycle changes (like deleting an account or changing a phone number). The given task runs once locks for
|
||||||
* all given phone numbers have been acquired, and the locks are released as soon as the task completes by any means.
|
* all given phone numbers have been acquired, and the locks are released as soon as the task completes by any means.
|
||||||
*
|
*
|
||||||
* @param e164s the phone numbers for which to acquire a distributed, pessimistic lock
|
* @param e164s the phone numbers for which to acquire a distributed, pessimistic lock
|
||||||
* @param taskSupplier a supplier for the task to execute once locks have been acquired
|
* @param phoneNumberIdentifiers the phone number identifiers for which to acquire a distributed, pessimistic lock
|
||||||
* @param executor the executor on which to acquire and release locks
|
* @param taskSupplier a supplier for the task to execute once locks have been acquired
|
||||||
*
|
* @param executor the executor on which to acquire and release locks
|
||||||
* @return a future that completes normally when the given task has executed successfully and all locks have been
|
* @return a future that completes normally when the given task has executed successfully and all locks have been
|
||||||
* released; the returned future may fail with an {@link InterruptedException} if interrupted while acquiring a lock
|
* released; the returned future may fail with an {@link InterruptedException} if interrupted while acquiring a lock
|
||||||
*/
|
*/
|
||||||
public <T> CompletableFuture<T> withLockAsync(final List<String> e164s,
|
public <T> CompletableFuture<T> withLockAsync(final List<String> e164s, final List<UUID> phoneNumberIdentifiers,
|
||||||
final Supplier<CompletableFuture<T>> taskSupplier,
|
final Supplier<CompletableFuture<T>> taskSupplier, final Executor executor) {
|
||||||
final Executor executor) {
|
|
||||||
|
|
||||||
if (e164s.isEmpty()) {
|
if (e164s.isEmpty()) {
|
||||||
throw new IllegalArgumentException("List of e164s to lock must not be empty");
|
throw new IllegalArgumentException("List of e164s to lock must not be empty");
|
||||||
}
|
}
|
||||||
|
if (phoneNumberIdentifiers.isEmpty()) {
|
||||||
|
throw new IllegalArgumentException("List of PNIs to lock must not be empty");
|
||||||
|
}
|
||||||
|
|
||||||
final List<LockItem> lockItems = new ArrayList<>(e164s.size());
|
final List<String> allIdentifiers = Stream.concat(e164s.stream(),
|
||||||
|
phoneNumberIdentifiers.stream().map(UUID::toString))
|
||||||
|
.toList();
|
||||||
|
final List<LockItem> lockItems = new ArrayList<>(allIdentifiers.size());
|
||||||
|
|
||||||
return CompletableFuture.runAsync(() -> {
|
return CompletableFuture.runAsync(() -> {
|
||||||
for (final String e164 : e164s) {
|
for (final String identifier : allIdentifiers) {
|
||||||
try {
|
try {
|
||||||
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(e164)
|
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(identifier)
|
||||||
.withAcquireReleasedLocksConsistently(true)
|
.withAcquireReleasedLocksConsistently(true)
|
||||||
.build()));
|
.build()));
|
||||||
} catch (final InterruptedException e) {
|
} catch (final InterruptedException e) {
|
||||||
|
|
|
@ -271,15 +271,16 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||||
@Nullable final String userAgent) throws InterruptedException {
|
@Nullable final String userAgent) throws InterruptedException {
|
||||||
|
|
||||||
final Account account = new Account();
|
final Account account = new Account();
|
||||||
|
final UUID phoneNumberIdentifier = phoneNumberIdentifiers.getPhoneNumberIdentifier(number);
|
||||||
|
|
||||||
return createTimer.record(() -> {
|
return createTimer.record(() -> {
|
||||||
accountLockManager.withLock(List.of(number), () -> {
|
accountLockManager.withLock(List.of(number), List.of(phoneNumberIdentifier), () -> {
|
||||||
final Optional<UUID> maybeRecentlyDeletedAccountIdentifier =
|
final Optional<UUID> maybeRecentlyDeletedAccountIdentifier =
|
||||||
accounts.findRecentlyDeletedAccountIdentifier(number);
|
accounts.findRecentlyDeletedAccountIdentifier(number);
|
||||||
|
|
||||||
// Reuse the ACI from any recently-deleted account with this number to cover cases where somebody is
|
// Reuse the ACI from any recently-deleted account with this number to cover cases where somebody is
|
||||||
// re-registering.
|
// re-registering.
|
||||||
account.setNumber(number, phoneNumberIdentifiers.getPhoneNumberIdentifier(number));
|
account.setNumber(number, phoneNumberIdentifier);
|
||||||
account.setUuid(maybeRecentlyDeletedAccountIdentifier.orElseGet(UUID::randomUUID));
|
account.setUuid(maybeRecentlyDeletedAccountIdentifier.orElseGet(UUID::randomUUID));
|
||||||
account.setIdentityKey(aciIdentityKey);
|
account.setIdentityKey(aciIdentityKey);
|
||||||
account.setPhoneNumberIdentityKey(pniIdentityKey);
|
account.setPhoneNumberIdentityKey(pniIdentityKey);
|
||||||
|
@ -363,9 +364,9 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||||
// We exclude the primary device's repeated-use keys from deletion because new keys were provided as
|
// We exclude the primary device's repeated-use keys from deletion because new keys were provided as
|
||||||
// part of the account creation process, and we don't want to delete the keys that just got added.
|
// part of the account creation process, and we don't want to delete the keys that just got added.
|
||||||
return CompletableFuture.allOf(keysManager.deleteSingleUsePreKeys(aci),
|
return CompletableFuture.allOf(keysManager.deleteSingleUsePreKeys(aci),
|
||||||
keysManager.deleteSingleUsePreKeys(pni),
|
keysManager.deleteSingleUsePreKeys(pni),
|
||||||
messagesManager.clear(aci),
|
messagesManager.clear(aci),
|
||||||
profilesManager.deleteAll(aci));
|
profilesManager.deleteAll(aci));
|
||||||
})
|
})
|
||||||
.join();
|
.join();
|
||||||
}
|
}
|
||||||
|
@ -375,7 +376,8 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||||
Tags tags = Tags.of(UserAgentTagUtil.getPlatformTag(userAgent),
|
Tags tags = Tags.of(UserAgentTagUtil.getPlatformTag(userAgent),
|
||||||
Tag.of("type", accountCreationType),
|
Tag.of("type", accountCreationType),
|
||||||
Tag.of("hasPushToken", String.valueOf(
|
Tag.of("hasPushToken", String.valueOf(
|
||||||
primaryDeviceSpec.apnRegistrationId().isPresent() || primaryDeviceSpec.gcmRegistrationId().isPresent())),
|
primaryDeviceSpec.apnRegistrationId().isPresent() || primaryDeviceSpec.gcmRegistrationId()
|
||||||
|
.isPresent())),
|
||||||
Tag.of("pushTokenType", pushTokenType));
|
Tag.of("pushTokenType", pushTokenType));
|
||||||
|
|
||||||
if (StringUtils.isNotBlank(previousPushTokenType)) {
|
if (StringUtils.isNotBlank(previousPushTokenType)) {
|
||||||
|
@ -385,7 +387,8 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||||
Metrics.counter(CREATE_COUNTER_NAME, tags).increment();
|
Metrics.counter(CREATE_COUNTER_NAME, tags).increment();
|
||||||
|
|
||||||
accountAttributes.recoveryPassword().ifPresent(registrationRecoveryPassword ->
|
accountAttributes.recoveryPassword().ifPresent(registrationRecoveryPassword ->
|
||||||
registrationRecoveryPasswordsManager.storeForCurrentNumber(account.getNumber(), registrationRecoveryPassword));
|
registrationRecoveryPasswordsManager.storeForCurrentNumber(account.getNumber(),
|
||||||
|
registrationRecoveryPassword));
|
||||||
}, accountLockExecutor);
|
}, accountLockExecutor);
|
||||||
|
|
||||||
return account;
|
return account;
|
||||||
|
@ -394,6 +397,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||||
|
|
||||||
public CompletableFuture<Pair<Account, Device>> addDevice(final Account account, final DeviceSpec deviceSpec, final String linkDeviceToken) {
|
public CompletableFuture<Pair<Account, Device>> addDevice(final Account account, final DeviceSpec deviceSpec, final String linkDeviceToken) {
|
||||||
return accountLockManager.withLockAsync(List.of(account.getNumber()),
|
return accountLockManager.withLockAsync(List.of(account.getNumber()),
|
||||||
|
List.of(account.getPhoneNumberIdentifier()),
|
||||||
() -> addDevice(account.getIdentifier(IdentityType.ACI), deviceSpec, linkDeviceToken, MAX_UPDATE_ATTEMPTS),
|
() -> addDevice(account.getIdentifier(IdentityType.ACI), deviceSpec, linkDeviceToken, MAX_UPDATE_ATTEMPTS),
|
||||||
accountLockExecutor);
|
accountLockExecutor);
|
||||||
}
|
}
|
||||||
|
@ -581,7 +585,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||||
throw new IllegalArgumentException("Cannot remove primary device");
|
throw new IllegalArgumentException("Cannot remove primary device");
|
||||||
}
|
}
|
||||||
|
|
||||||
return accountLockManager.withLockAsync(List.of(account.getNumber()),
|
return accountLockManager.withLockAsync(List.of(account.getNumber()), List.of(account.getPhoneNumberIdentifier()),
|
||||||
() -> removeDevice(account.getIdentifier(IdentityType.ACI), deviceId, MAX_UPDATE_ATTEMPTS),
|
() -> removeDevice(account.getIdentifier(IdentityType.ACI), deviceId, MAX_UPDATE_ATTEMPTS),
|
||||||
accountLockExecutor);
|
accountLockExecutor);
|
||||||
}
|
}
|
||||||
|
@ -647,8 +651,10 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||||
validateDevices(account, pniSignedPreKeys, pniPqLastResortPreKeys, pniRegistrationIds);
|
validateDevices(account, pniSignedPreKeys, pniPqLastResortPreKeys, pniRegistrationIds);
|
||||||
|
|
||||||
final AtomicReference<Account> updatedAccount = new AtomicReference<>();
|
final AtomicReference<Account> updatedAccount = new AtomicReference<>();
|
||||||
|
final UUID targetPhoneNumberIdentifier = phoneNumberIdentifiers.getPhoneNumberIdentifier(targetNumber);
|
||||||
|
|
||||||
accountLockManager.withLock(List.of(account.getNumber(), targetNumber), () -> {
|
accountLockManager.withLock(List.of(account.getNumber(), targetNumber),
|
||||||
|
List.of(account.getPhoneNumberIdentifier(), targetPhoneNumberIdentifier), () -> {
|
||||||
redisDelete(account);
|
redisDelete(account);
|
||||||
|
|
||||||
// There are three possible states for accounts associated with the target phone number:
|
// There are three possible states for accounts associated with the target phone number:
|
||||||
|
@ -674,15 +680,14 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||||
}
|
}
|
||||||
|
|
||||||
final UUID uuid = account.getUuid();
|
final UUID uuid = account.getUuid();
|
||||||
final UUID phoneNumberIdentifier = phoneNumberIdentifiers.getPhoneNumberIdentifier(targetNumber);
|
|
||||||
|
|
||||||
CompletableFuture.allOf(
|
CompletableFuture.allOf(
|
||||||
keysManager.deleteSingleUsePreKeys(phoneNumberIdentifier),
|
keysManager.deleteSingleUsePreKeys(targetPhoneNumberIdentifier),
|
||||||
keysManager.deleteSingleUsePreKeys(originalPhoneNumberIdentifier))
|
keysManager.deleteSingleUsePreKeys(originalPhoneNumberIdentifier))
|
||||||
.join();
|
.join();
|
||||||
|
|
||||||
final Collection<TransactWriteItem> keyWriteItems =
|
final Collection<TransactWriteItem> keyWriteItems =
|
||||||
buildPniKeyWriteItems(uuid, phoneNumberIdentifier, pniSignedPreKeys, pniPqLastResortPreKeys);
|
buildPniKeyWriteItems(uuid, targetPhoneNumberIdentifier, pniSignedPreKeys, pniPqLastResortPreKeys);
|
||||||
|
|
||||||
final Account numberChangedAccount = updateWithRetries(
|
final Account numberChangedAccount = updateWithRetries(
|
||||||
account,
|
account,
|
||||||
|
@ -690,7 +695,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||||
setPniKeys(account, pniIdentityKey, pniRegistrationIds);
|
setPniKeys(account, pniIdentityKey, pniRegistrationIds);
|
||||||
return true;
|
return true;
|
||||||
},
|
},
|
||||||
a -> accounts.changeNumber(a, targetNumber, phoneNumberIdentifier, maybeDisplacedUuid, keyWriteItems),
|
a -> accounts.changeNumber(a, targetNumber, targetPhoneNumberIdentifier, maybeDisplacedUuid, keyWriteItems),
|
||||||
() -> accounts.getByAccountIdentifier(uuid).orElseThrow(),
|
() -> accounts.getByAccountIdentifier(uuid).orElseThrow(),
|
||||||
AccountChangeValidator.NUMBER_CHANGE_VALIDATOR);
|
AccountChangeValidator.NUMBER_CHANGE_VALIDATOR);
|
||||||
|
|
||||||
|
@ -1220,7 +1225,9 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||||
public CompletableFuture<Void> delete(final Account account, final DeletionReason deletionReason) {
|
public CompletableFuture<Void> delete(final Account account, final DeletionReason deletionReason) {
|
||||||
final Timer.Sample sample = Timer.start();
|
final Timer.Sample sample = Timer.start();
|
||||||
|
|
||||||
return accountLockManager.withLockAsync(List.of(account.getNumber()), () -> delete(account), accountLockExecutor)
|
return accountLockManager.withLockAsync(List.of(account.getNumber()), List.of(account.getPhoneNumberIdentifier()),
|
||||||
|
() -> delete(account),
|
||||||
|
accountLockExecutor)
|
||||||
.whenComplete((ignored, throwable) -> {
|
.whenComplete((ignored, throwable) -> {
|
||||||
sample.stop(deleteTimer);
|
sample.stop(deleteTimer);
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,7 @@ public class ClientPublicKeysManager {
|
||||||
*/
|
*/
|
||||||
public CompletableFuture<Void> setPublicKey(final Account account, final byte deviceId, final ECPublicKey publicKey) {
|
public CompletableFuture<Void> setPublicKey(final Account account, final byte deviceId, final ECPublicKey publicKey) {
|
||||||
return accountLockManager.withLockAsync(List.of(account.getNumber()),
|
return accountLockManager.withLockAsync(List.of(account.getNumber()),
|
||||||
|
List.of(account.getPhoneNumberIdentifier()),
|
||||||
() -> clientPublicKeys.setPublicKey(account.getIdentifier(IdentityType.ACI), deviceId, publicKey),
|
() -> clientPublicKeys.setPublicKey(account.getIdentifier(IdentityType.ACI), deviceId, publicKey),
|
||||||
accountLockExecutor);
|
accountLockExecutor);
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ import com.amazonaws.services.dynamodbv2.ReleaseLockOptions;
|
||||||
import com.google.i18n.phonenumbers.PhoneNumberUtil;
|
import com.google.i18n.phonenumbers.PhoneNumberUtil;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
@ -33,6 +34,9 @@ class AccountLockManagerTest {
|
||||||
private static final String SECOND_NUMBER = PhoneNumberUtil.getInstance().format(
|
private static final String SECOND_NUMBER = PhoneNumberUtil.getInstance().format(
|
||||||
PhoneNumberUtil.getInstance().getExampleNumber("JP"), PhoneNumberUtil.PhoneNumberFormat.E164);
|
PhoneNumberUtil.getInstance().getExampleNumber("JP"), PhoneNumberUtil.PhoneNumberFormat.E164);
|
||||||
|
|
||||||
|
private static final UUID FIRST_PNI = UUID.randomUUID();
|
||||||
|
private static final UUID SECOND_PNI = UUID.randomUUID();
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() {
|
void setUp() {
|
||||||
lockClient = mock(AmazonDynamoDBLockClient.class);
|
lockClient = mock(AmazonDynamoDBLockClient.class);
|
||||||
|
@ -51,47 +55,53 @@ class AccountLockManagerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void withLock() throws InterruptedException {
|
void withLock() throws InterruptedException {
|
||||||
accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), () -> {}, executor);
|
accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), List.of(FIRST_PNI, SECOND_PNI), () -> {
|
||||||
|
}, executor);
|
||||||
|
|
||||||
verify(lockClient, times(2)).acquireLock(any());
|
verify(lockClient, times(4)).acquireLock(any());
|
||||||
verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class));
|
verify(lockClient, times(4)).releaseLock(any(ReleaseLockOptions.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void withLockTaskThrowsException() throws InterruptedException {
|
void withLockTaskThrowsException() throws InterruptedException {
|
||||||
assertThrows(RuntimeException.class, () -> accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), () -> {
|
assertThrows(RuntimeException.class,
|
||||||
|
() -> accountLockManager.withLock(List.of(FIRST_NUMBER, SECOND_NUMBER), List.of(FIRST_PNI, SECOND_PNI), () -> {
|
||||||
throw new RuntimeException();
|
throw new RuntimeException();
|
||||||
}, executor));
|
}, executor));
|
||||||
|
|
||||||
verify(lockClient, times(2)).acquireLock(any());
|
verify(lockClient, times(4)).acquireLock(any());
|
||||||
verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class));
|
verify(lockClient, times(4)).releaseLock(any(ReleaseLockOptions.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void withLockEmptyList() {
|
void withLockEmptyList() {
|
||||||
final Runnable task = mock(Runnable.class);
|
final Runnable task = mock(Runnable.class);
|
||||||
|
|
||||||
assertThrows(IllegalArgumentException.class, () -> accountLockManager.withLock(Collections.emptyList(), () -> {}, executor));
|
assertThrows(IllegalArgumentException.class,
|
||||||
|
() -> accountLockManager.withLock(Collections.emptyList(), Collections.emptyList(), () -> {
|
||||||
|
},
|
||||||
|
executor));
|
||||||
verify(task, never()).run();
|
verify(task, never()).run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void withLockAsync() throws InterruptedException {
|
void withLockAsync() throws InterruptedException {
|
||||||
accountLockManager.withLockAsync(List.of(FIRST_NUMBER, SECOND_NUMBER),
|
accountLockManager.withLockAsync(List.of(FIRST_NUMBER, SECOND_NUMBER),
|
||||||
() -> CompletableFuture.completedFuture(null), executor).join();
|
List.of(FIRST_PNI, SECOND_PNI), () -> CompletableFuture.completedFuture(null), executor).join();
|
||||||
|
|
||||||
verify(lockClient, times(2)).acquireLock(any());
|
verify(lockClient, times(4)).acquireLock(any());
|
||||||
verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class));
|
verify(lockClient, times(4)).releaseLock(any(ReleaseLockOptions.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void withLockAsyncTaskThrowsException() throws InterruptedException {
|
void withLockAsyncTaskThrowsException() throws InterruptedException {
|
||||||
assertThrows(RuntimeException.class,
|
assertThrows(RuntimeException.class,
|
||||||
() -> accountLockManager.withLockAsync(List.of(FIRST_NUMBER, SECOND_NUMBER),
|
() -> accountLockManager.withLockAsync(List.of(FIRST_NUMBER, SECOND_NUMBER),
|
||||||
() -> CompletableFuture.failedFuture(new RuntimeException()), executor).join());
|
List.of(FIRST_PNI, SECOND_PNI), () -> CompletableFuture.failedFuture(new RuntimeException()), executor)
|
||||||
|
.join());
|
||||||
|
|
||||||
verify(lockClient, times(2)).acquireLock(any());
|
verify(lockClient, times(4)).acquireLock(any());
|
||||||
verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class));
|
verify(lockClient, times(4)).releaseLock(any(ReleaseLockOptions.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -100,7 +110,7 @@ class AccountLockManagerTest {
|
||||||
|
|
||||||
assertThrows(IllegalArgumentException.class,
|
assertThrows(IllegalArgumentException.class,
|
||||||
() -> accountLockManager.withLockAsync(Collections.emptyList(),
|
() -> accountLockManager.withLockAsync(Collections.emptyList(),
|
||||||
() -> CompletableFuture.completedFuture(null), executor));
|
Collections.emptyList(), () -> CompletableFuture.completedFuture(null), executor));
|
||||||
|
|
||||||
verify(task, never()).run();
|
verify(task, never()).run();
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyList;
|
||||||
import static org.mockito.ArgumentMatchers.anyLong;
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
import static org.mockito.Mockito.atLeast;
|
import static org.mockito.Mockito.atLeast;
|
||||||
|
@ -50,7 +51,6 @@ import org.whispersystems.textsecuregcm.auth.UnidentifiedAccessUtil;
|
||||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.entities.AccountAttributes;
|
import org.whispersystems.textsecuregcm.entities.AccountAttributes;
|
||||||
import org.whispersystems.textsecuregcm.identity.IdentityType;
|
import org.whispersystems.textsecuregcm.identity.IdentityType;
|
||||||
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
|
|
||||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
|
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
|
||||||
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
|
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
|
||||||
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client;
|
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client;
|
||||||
|
@ -107,14 +107,14 @@ class AccountsManagerConcurrentModificationIntegrationTest {
|
||||||
final AccountLockManager accountLockManager = mock(AccountLockManager.class);
|
final AccountLockManager accountLockManager = mock(AccountLockManager.class);
|
||||||
|
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
final Runnable task = invocation.getArgument(1);
|
final Runnable task = invocation.getArgument(2);
|
||||||
task.run();
|
task.run();
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}).when(accountLockManager).withLock(any(), any(), any());
|
}).when(accountLockManager).withLock(any(), anyList(), any(), any());
|
||||||
|
|
||||||
when(accountLockManager.withLockAsync(any(), any(), any())).thenAnswer(invocation -> {
|
when(accountLockManager.withLockAsync(any(), anyList(), any(), any())).thenAnswer(invocation -> {
|
||||||
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(1);
|
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(2);
|
||||||
taskSupplier.get().join();
|
taskSupplier.get().join();
|
||||||
|
|
||||||
return CompletableFuture.completedFuture(null);
|
return CompletableFuture.completedFuture(null);
|
||||||
|
|
|
@ -15,6 +15,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyByte;
|
import static org.mockito.ArgumentMatchers.anyByte;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyList;
|
||||||
import static org.mockito.ArgumentMatchers.anyLong;
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
import static org.mockito.ArgumentMatchers.argThat;
|
import static org.mockito.ArgumentMatchers.argThat;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
@ -208,14 +209,14 @@ class AccountsManagerTest {
|
||||||
final AccountLockManager accountLockManager = mock(AccountLockManager.class);
|
final AccountLockManager accountLockManager = mock(AccountLockManager.class);
|
||||||
|
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
final Runnable task = invocation.getArgument(1);
|
final Runnable task = invocation.getArgument(2);
|
||||||
task.run();
|
task.run();
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}).when(accountLockManager).withLock(any(), any(), any());
|
}).when(accountLockManager).withLock(any(), anyList(), any(), any());
|
||||||
|
|
||||||
when(accountLockManager.withLockAsync(any(), any(), any())).thenAnswer(invocation -> {
|
when(accountLockManager.withLockAsync(any(), anyList(), any(), any())).thenAnswer(invocation -> {
|
||||||
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(1);
|
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(2);
|
||||||
return taskSupplier.get();
|
return taskSupplier.get();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyList;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -113,14 +114,14 @@ class AccountsManagerUsernameIntegrationTest {
|
||||||
final AccountLockManager accountLockManager = mock(AccountLockManager.class);
|
final AccountLockManager accountLockManager = mock(AccountLockManager.class);
|
||||||
|
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
final Runnable task = invocation.getArgument(1);
|
final Runnable task = invocation.getArgument(2);
|
||||||
task.run();
|
task.run();
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}).when(accountLockManager).withLock(any(), any(), any());
|
}).when(accountLockManager).withLock(any(), anyList(), any(), any());
|
||||||
|
|
||||||
when(accountLockManager.withLockAsync(any(), any(), any())).thenAnswer(invocation -> {
|
when(accountLockManager.withLockAsync(any(), anyList(), any(), any())).thenAnswer(invocation -> {
|
||||||
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(1);
|
final Supplier<CompletableFuture<?>> taskSupplier = invocation.getArgument(2);
|
||||||
taskSupplier.get().join();
|
taskSupplier.get().join();
|
||||||
|
|
||||||
return CompletableFuture.completedFuture(null);
|
return CompletableFuture.completedFuture(null);
|
||||||
|
|
Loading…
Reference in New Issue