Use a `Callable` for tasks performed within the scope of a pessimistic lock
This commit is contained in:
		
							parent
							
								
									b95d08aaea
								
							
						
					
					
						commit
						a4b98f38a6
					
				| 
						 | 
				
			
			@ -9,6 +9,7 @@ import com.google.common.annotations.VisibleForTesting;
 | 
			
		|||
import java.util.ArrayList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.Callable;
 | 
			
		||||
import java.util.concurrent.CompletableFuture;
 | 
			
		||||
import java.util.concurrent.CompletionException;
 | 
			
		||||
import java.util.concurrent.Executor;
 | 
			
		||||
| 
						 | 
				
			
			@ -49,10 +50,15 @@ public class AccountLockManager {
 | 
			
		|||
   * @param task                    the task to execute once locks have been acquired
 | 
			
		||||
   * @param lockAcquisitionExecutor the executor on which to run blocking lock acquire/release tasks. this executor
 | 
			
		||||
   *                                should not use virtual threads.
 | 
			
		||||
   * @throws InterruptedException if interrupted while acquiring a lock
 | 
			
		||||
   *
 | 
			
		||||
   * @return the value returned by the given {@code task}
 | 
			
		||||
   *
 | 
			
		||||
   * @throws Exception if an exception is thrown by the given {@code task}
 | 
			
		||||
   */
 | 
			
		||||
  public void withLock(final List<UUID> phoneNumberIdentifiers, final Runnable task,
 | 
			
		||||
      final Executor lockAcquisitionExecutor) {
 | 
			
		||||
  public <V> V withLock(final List<UUID> phoneNumberIdentifiers,
 | 
			
		||||
      final Callable<V> task,
 | 
			
		||||
      final Executor lockAcquisitionExecutor) throws Exception {
 | 
			
		||||
 | 
			
		||||
    if (phoneNumberIdentifiers.isEmpty()) {
 | 
			
		||||
      throw new IllegalArgumentException("List of PNIs to lock must not be empty");
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			@ -75,7 +81,7 @@ public class AccountLockManager {
 | 
			
		|||
        }
 | 
			
		||||
      }, lockAcquisitionExecutor).join();
 | 
			
		||||
 | 
			
		||||
      task.run();
 | 
			
		||||
      return task.call();
 | 
			
		||||
    } finally {
 | 
			
		||||
      CompletableFuture.runAsync(() -> {
 | 
			
		||||
        for (final LockItem lockItem : lockItems) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -273,11 +273,33 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
 | 
			
		|||
      final DeviceSpec primaryDeviceSpec,
 | 
			
		||||
      @Nullable final String userAgent) throws InterruptedException {
 | 
			
		||||
 | 
			
		||||
    final Account account = new Account();
 | 
			
		||||
    final UUID pni = phoneNumberIdentifiers.getPhoneNumberIdentifier(number).join();
 | 
			
		||||
 | 
			
		||||
    return createTimer.record(() -> {
 | 
			
		||||
      accountLockManager.withLock(List.of(pni), () -> {
 | 
			
		||||
      try {
 | 
			
		||||
        return accountLockManager.withLock(List.of(pni),
 | 
			
		||||
            () -> create(number, pni, accountAttributes, accountBadges, aciIdentityKey, pniIdentityKey, primaryDeviceSpec, userAgent), accountLockExecutor);
 | 
			
		||||
      } catch (final Exception e) {
 | 
			
		||||
        if (e instanceof RuntimeException runtimeException) {
 | 
			
		||||
          throw runtimeException;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        logger.error("Unexpected exception while creating account", e);
 | 
			
		||||
        throw new RuntimeException(e);
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private Account create(final String number,
 | 
			
		||||
      final UUID pni,
 | 
			
		||||
      final AccountAttributes accountAttributes,
 | 
			
		||||
      final List<AccountBadge> accountBadges,
 | 
			
		||||
      final IdentityKey aciIdentityKey,
 | 
			
		||||
      final IdentityKey pniIdentityKey,
 | 
			
		||||
      final DeviceSpec primaryDeviceSpec,
 | 
			
		||||
      @Nullable final String userAgent) {
 | 
			
		||||
 | 
			
		||||
    final Account account = new Account();
 | 
			
		||||
    final Optional<UUID> maybeRecentlyDeletedAccountIdentifier =
 | 
			
		||||
        accounts.findRecentlyDeletedAccountIdentifier(pni);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -389,10 +411,8 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
 | 
			
		|||
    accountAttributes.recoveryPassword().ifPresent(registrationRecoveryPassword ->
 | 
			
		||||
        registrationRecoveryPasswordsManager.store(account.getIdentifier(IdentityType.PNI),
 | 
			
		||||
            registrationRecoveryPassword));
 | 
			
		||||
      }, accountLockExecutor);
 | 
			
		||||
 | 
			
		||||
    return account;
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public CompletableFuture<Pair<Account, Device>> addDevice(final Account account, final DeviceSpec deviceSpec, final String linkDeviceToken) {
 | 
			
		||||
| 
						 | 
				
			
			@ -655,9 +675,31 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
 | 
			
		|||
 | 
			
		||||
    validateDevices(account, pniSignedPreKeys, pniPqLastResortPreKeys, pniRegistrationIds);
 | 
			
		||||
 | 
			
		||||
    final AtomicReference<Account> updatedAccount = new AtomicReference<>();
 | 
			
		||||
    try {
 | 
			
		||||
      return accountLockManager.withLock(List.of(account.getPhoneNumberIdentifier(), targetPhoneNumberIdentifier),
 | 
			
		||||
          () -> changeNumber(account, targetNumber, targetPhoneNumberIdentifier, pniIdentityKey, pniSignedPreKeys, pniPqLastResortPreKeys, pniRegistrationIds), accountLockExecutor);
 | 
			
		||||
    } catch (final Exception e) {
 | 
			
		||||
      if (e instanceof MismatchedDevicesException mismatchedDevicesException) {
 | 
			
		||||
        throw mismatchedDevicesException;
 | 
			
		||||
      } if (e instanceof RuntimeException runtimeException) {
 | 
			
		||||
        throw runtimeException;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      logger.error("Unexpected exception when changing phone number", e);
 | 
			
		||||
      throw new RuntimeException(e);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private Account changeNumber(final Account account,
 | 
			
		||||
      final String targetNumber,
 | 
			
		||||
      final UUID targetPhoneNumberIdentifier,
 | 
			
		||||
      final IdentityKey pniIdentityKey,
 | 
			
		||||
      final Map<Byte, ECSignedPreKey> pniSignedPreKeys,
 | 
			
		||||
      final Map<Byte, KEMSignedPreKey> pniPqLastResortPreKeys,
 | 
			
		||||
      final Map<Byte, Integer> pniRegistrationIds) {
 | 
			
		||||
 | 
			
		||||
    final UUID originalPhoneNumberIdentifier = account.getPhoneNumberIdentifier();
 | 
			
		||||
 | 
			
		||||
    accountLockManager.withLock(List.of(account.getPhoneNumberIdentifier(), targetPhoneNumberIdentifier), () -> {
 | 
			
		||||
    redisDelete(account);
 | 
			
		||||
 | 
			
		||||
    // There are three possible states for accounts associated with the target phone number:
 | 
			
		||||
| 
						 | 
				
			
			@ -692,7 +734,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
 | 
			
		|||
      final Collection<TransactWriteItem> keyWriteItems =
 | 
			
		||||
          buildPniKeyWriteItems(targetPhoneNumberIdentifier, pniSignedPreKeys, pniPqLastResortPreKeys);
 | 
			
		||||
 | 
			
		||||
      final Account numberChangedAccount = updateWithRetries(
 | 
			
		||||
    return updateWithRetries(
 | 
			
		||||
        account,
 | 
			
		||||
        a -> {
 | 
			
		||||
          setPniKeys(account, pniIdentityKey, pniRegistrationIds);
 | 
			
		||||
| 
						 | 
				
			
			@ -701,11 +743,6 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
 | 
			
		|||
        a -> accounts.changeNumber(a, targetNumber, targetPhoneNumberIdentifier, maybeDisplacedUuid, keyWriteItems),
 | 
			
		||||
        () -> accounts.getByAccountIdentifier(uuid).orElseThrow(),
 | 
			
		||||
        AccountChangeValidator.NUMBER_CHANGE_VALIDATOR);
 | 
			
		||||
 | 
			
		||||
      updatedAccount.set(numberChangedAccount);
 | 
			
		||||
    }, accountLockExecutor);
 | 
			
		||||
 | 
			
		||||
    return updatedAccount.get();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public Account updatePniKeys(final Account account,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -47,9 +47,8 @@ class AccountLockManagerTest {
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  void withLock() throws InterruptedException {
 | 
			
		||||
    accountLockManager.withLock(List.of(FIRST_PNI, SECOND_PNI), () -> {
 | 
			
		||||
    }, executor);
 | 
			
		||||
  void withLock() throws Exception {
 | 
			
		||||
    accountLockManager.withLock(List.of(FIRST_PNI, SECOND_PNI), () -> null, executor);
 | 
			
		||||
 | 
			
		||||
    verify(lockClient, times(2)).acquireLock(any());
 | 
			
		||||
    verify(lockClient, times(2)).releaseLock(any(ReleaseLockOptions.class));
 | 
			
		||||
| 
						 | 
				
			
			@ -69,8 +68,7 @@ class AccountLockManagerTest {
 | 
			
		|||
  void withLockEmptyList() {
 | 
			
		||||
    final Runnable task = mock(Runnable.class);
 | 
			
		||||
 | 
			
		||||
    assertThrows(IllegalArgumentException.class, () -> accountLockManager.withLock(Collections.emptyList(), () -> {
 | 
			
		||||
        },
 | 
			
		||||
    assertThrows(IllegalArgumentException.class, () -> accountLockManager.withLock(Collections.emptyList(), () -> null,
 | 
			
		||||
        executor));
 | 
			
		||||
    verify(task, never()).run();
 | 
			
		||||
  }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -28,6 +28,7 @@ import java.util.ArrayList;
 | 
			
		|||
import java.util.Optional;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.Callable;
 | 
			
		||||
import java.util.concurrent.CompletableFuture;
 | 
			
		||||
import java.util.concurrent.Executor;
 | 
			
		||||
import java.util.concurrent.LinkedBlockingDeque;
 | 
			
		||||
| 
						 | 
				
			
			@ -84,7 +85,7 @@ class AccountsManagerConcurrentModificationIntegrationTest {
 | 
			
		|||
  private Executor mutationExecutor = new ThreadPoolExecutor(20, 20, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(20));
 | 
			
		||||
 | 
			
		||||
  @BeforeEach
 | 
			
		||||
  void setup() throws InterruptedException {
 | 
			
		||||
  void setup() throws Exception {
 | 
			
		||||
 | 
			
		||||
    @SuppressWarnings("unchecked") final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
 | 
			
		||||
        mock(DynamicConfigurationManager.class);
 | 
			
		||||
| 
						 | 
				
			
			@ -108,10 +109,8 @@ class AccountsManagerConcurrentModificationIntegrationTest {
 | 
			
		|||
      final AccountLockManager accountLockManager = mock(AccountLockManager.class);
 | 
			
		||||
 | 
			
		||||
      doAnswer(invocation -> {
 | 
			
		||||
        final Runnable task = invocation.getArgument(1);
 | 
			
		||||
        task.run();
 | 
			
		||||
 | 
			
		||||
        return null;
 | 
			
		||||
        final Callable<?> task = invocation.getArgument(1);
 | 
			
		||||
        return task.call();
 | 
			
		||||
      }).when(accountLockManager).withLock(anyList(), any(), any());
 | 
			
		||||
 | 
			
		||||
      when(accountLockManager.withLockAsync(anyList(), any(), any())).thenAnswer(invocation -> {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -52,6 +52,7 @@ import java.util.Objects;
 | 
			
		|||
import java.util.Optional;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.Callable;
 | 
			
		||||
import java.util.concurrent.CompletableFuture;
 | 
			
		||||
import java.util.concurrent.CompletionException;
 | 
			
		||||
import java.util.concurrent.Executor;
 | 
			
		||||
| 
						 | 
				
			
			@ -153,7 +154,7 @@ class AccountsManagerTest {
 | 
			
		|||
  };
 | 
			
		||||
 | 
			
		||||
  @BeforeEach
 | 
			
		||||
  void setup() throws InterruptedException {
 | 
			
		||||
  void setup() throws Exception {
 | 
			
		||||
    accounts = mock(Accounts.class);
 | 
			
		||||
    keysManager = mock(KeysManager.class);
 | 
			
		||||
    messagesManager = mock(MessagesManager.class);
 | 
			
		||||
| 
						 | 
				
			
			@ -213,10 +214,8 @@ class AccountsManagerTest {
 | 
			
		|||
    final AccountLockManager accountLockManager = mock(AccountLockManager.class);
 | 
			
		||||
 | 
			
		||||
    doAnswer(invocation -> {
 | 
			
		||||
      final Runnable task = invocation.getArgument(1);
 | 
			
		||||
      task.run();
 | 
			
		||||
 | 
			
		||||
      return null;
 | 
			
		||||
      final Callable<?> task = invocation.getArgument(1);
 | 
			
		||||
      return task.call();
 | 
			
		||||
    }).when(accountLockManager).withLock(anyList(), any(), any());
 | 
			
		||||
 | 
			
		||||
    when(accountLockManager.withLockAsync(anyList(), any(), any())).thenAnswer(invocation -> {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -28,6 +28,7 @@ import java.util.List;
 | 
			
		|||
import java.util.Map;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.Callable;
 | 
			
		||||
import java.util.concurrent.CompletableFuture;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.function.Supplier;
 | 
			
		||||
| 
						 | 
				
			
			@ -81,12 +82,12 @@ class AccountsManagerUsernameIntegrationTest {
 | 
			
		|||
  private Accounts accounts;
 | 
			
		||||
 | 
			
		||||
  @BeforeEach
 | 
			
		||||
  void setup() throws InterruptedException {
 | 
			
		||||
  void setup() throws Exception {
 | 
			
		||||
    buildAccountsManager(1, 2, 10);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private void buildAccountsManager(final int initialWidth, int discriminatorMaxWidth, int attemptsPerWidth)
 | 
			
		||||
      throws InterruptedException {
 | 
			
		||||
      throws Exception {
 | 
			
		||||
    @SuppressWarnings("unchecked") final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
 | 
			
		||||
        mock(DynamicConfigurationManager.class);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -115,10 +116,8 @@ class AccountsManagerUsernameIntegrationTest {
 | 
			
		|||
    final AccountLockManager accountLockManager = mock(AccountLockManager.class);
 | 
			
		||||
 | 
			
		||||
    doAnswer(invocation -> {
 | 
			
		||||
      final Runnable task = invocation.getArgument(1);
 | 
			
		||||
      task.run();
 | 
			
		||||
 | 
			
		||||
      return null;
 | 
			
		||||
      final Callable<?> task = invocation.getArgument(1);
 | 
			
		||||
      return task.call();
 | 
			
		||||
    }).when(accountLockManager).withLock(anyList(), any(), any());
 | 
			
		||||
 | 
			
		||||
    when(accountLockManager.withLockAsync(anyList(), any(), any())).thenAnswer(invocation -> {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue