Perform cleanup operations before overwriting an existing account record

This commit is contained in:
Jon Chambers 2023-12-05 12:18:09 -05:00 committed by GitHub
parent 331bbdd4e6
commit 5f0726af8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 128 additions and 44 deletions

View File

@ -468,6 +468,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.minThreads(8)
.maxThreads(8)
.build();
ExecutorService clientPresenceExecutor = environment.lifecycle()
.executorService(name(getClass(), "clientPresence-%d"))
.minThreads(8)
.maxThreads(8)
.build();
ScheduledExecutorService subscriptionProcessorRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "subscriptionProcessorRetry-%d")).threads(1).build();
@ -540,7 +545,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
accountLockManager, keysManager, messagesManager, profilesManager,
secureStorageClient, secureValueRecovery2Client,
clientPresenceManager,
experimentEnrollmentManager, registrationRecoveryPasswordsManager, accountLockExecutor, clock);
experimentEnrollmentManager, registrationRecoveryPasswordsManager, accountLockExecutor, clientPresenceExecutor,
clock);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
APNSender apnSender = new APNSender(apnSenderExecutor, config.getApnConfiguration());
FcmSender fcmSender = new FcmSender(fcmSenderExecutor, config.getFcmConfiguration().credentials().value());

View File

@ -29,6 +29,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -36,6 +37,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.util.AsyncTimerUtil;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
@ -184,7 +186,9 @@ public class Accounts extends AbstractDynamoDbStore {
deletedAccountsTableName);
}
public boolean create(final Account account, final Function<Account, Collection<TransactWriteItem>> additionalWriteItemsFunction) {
public boolean create(final Account account,
final Function<Account, Collection<TransactWriteItem>> additionalWriteItemsFunction,
final BiFunction<UUID, UUID, CompletableFuture<Void>> existingAccountCleanupOperation) {
return CREATE_TIMER.record(() -> {
try {
@ -239,7 +243,10 @@ public class Accounts extends AbstractDynamoDbStore {
account.setUuid(UUIDUtil.fromByteBuffer(actualAccountUuid));
final Account existingAccount = getByAccountIdentifier(account.getUuid()).orElseThrow();
account.setNumber(existingAccount.getNumber(), existingAccount.getPhoneNumberIdentifier());
joinAndUnwrapUpdateFuture(reclaimAccount(existingAccount, account, additionalWriteItemsFunction.apply(account)));
existingAccountCleanupOperation.apply(existingAccount.getIdentifier(IdentityType.ACI), existingAccount.getIdentifier(IdentityType.PNI))
.thenCompose(ignored -> reclaimAccount(existingAccount, account, additionalWriteItemsFunction.apply(account)))
.join();
return false;
}

View File

@ -114,6 +114,7 @@ public class AccountsManager {
private final ExperimentEnrollmentManager experimentEnrollmentManager;
private final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager;
private final Executor accountLockExecutor;
private final Executor clientPresenceExecutor;
private final Clock clock;
private static final ObjectWriter ACCOUNT_REDIS_JSON_WRITER = SystemMapper.jsonMapper()
@ -159,6 +160,7 @@ public class AccountsManager {
final ExperimentEnrollmentManager experimentEnrollmentManager,
final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager,
final Executor accountLockExecutor,
final Executor clientPresenceExecutor,
final Clock clock) {
this.accounts = accounts;
this.phoneNumberIdentifiers = phoneNumberIdentifiers;
@ -173,6 +175,7 @@ public class AccountsManager {
this.experimentEnrollmentManager = experimentEnrollmentManager;
this.registrationRecoveryPasswordsManager = requireNonNull(registrationRecoveryPasswordsManager);
this.accountLockExecutor = accountLockExecutor;
this.clientPresenceExecutor = clientPresenceExecutor;
this.clock = requireNonNull(clock);
}
@ -243,14 +246,33 @@ public class AccountsManager {
aciSignedPreKey,
pniSignedPreKey,
aciPqLastResortPreKey,
pniPqLastResortPreKey));
pniPqLastResortPreKey),
(aci, pni) -> CompletableFuture.allOf(
keysManager.delete(aci),
keysManager.delete(pni),
messagesManager.clear(aci),
profilesManager.deleteAll(aci)
).thenRunAsync(() -> clientPresenceManager.disconnectAllPresencesForUuid(aci), clientPresenceExecutor));
// create() sometimes updates the UUID, if there was a number conflict.
// for metrics, we want secondary to run with the same original UUID
final UUID actualUuid = account.getUuid();
if (!account.getUuid().equals(originalUuid)) {
// If the UUID changed, then we overwrote an existing account. We should have cleared all messages before
// overwriting the old account, but more may have arrived while we were working. Similarly, the old account
// holder could have added keys or profiles. We'll largely repeat the cleanup process after creating the
// account to make sure we really REALLY got everything.
//
// We exclude the primary device's repeated-use keys from deletion because new keys were provided as
// part of the account creation process, and we don't want to delete the keys that just got added.
CompletableFuture.allOf(keysManager.delete(account.getIdentifier(IdentityType.ACI), true),
keysManager.delete(account.getIdentifier(IdentityType.PNI), true),
messagesManager.clear(account.getIdentifier(IdentityType.ACI)),
profilesManager.deleteAll(account.getIdentifier(IdentityType.ACI)))
.join();
}
redisSet(account);
final Tags tags;
// In terms of previously-existing accounts, there are three possible cases:
//
// 1. This is a completely new account; there was no pre-existing account and no recently-deleted account
@ -259,27 +281,6 @@ public class AccountsManager {
// instance to match the stored account record (i.e. originalUuid != actualUuid).
// 3. This is a re-registration of a recently-deleted account, in which case maybeRecentlyDeletedUuid is
// present.
//
// All cases are mutually-exclusive. In the first case, we don't need to do anything. In the third, we can be
// confident that everything has already been deleted. In the second case, though, we're taking over an existing
// account and need to clear out messages and keys that may have been stored for the old account.
if (!originalUuid.equals(actualUuid)) {
// We exclude the primary device's repeated-use keys from deletion because new keys were provided as part of
// the account creation process, and we don't want to delete the keys that just got added.
final CompletableFuture<Void> deleteKeysFuture = CompletableFuture.allOf(
keysManager.delete(actualUuid, true),
keysManager.delete(account.getPhoneNumberIdentifier(), true));
messagesManager.clear(actualUuid).join();
profilesManager.deleteAll(actualUuid).join();
deleteKeysFuture.join();
clientPresenceManager.disconnectAllPresencesForUuid(actualUuid);
}
final Tags tags;
if (freshUser) {
tags = Tags.of("type", maybeRecentlyDeletedAccountIdentifier.isPresent() ? "recently-deleted" : "new");
} else {

View File

@ -116,6 +116,8 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
.executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService accountLockExecutor = environment.lifecycle()
.executorService(name(getClass(), "accountLock-%d")).minThreads(1).maxThreads(1).build();
ExecutorService clientPresenceExecutor = environment.lifecycle()
.executorService(name(getClass(), "clientPresence-%d")).minThreads(1).maxThreads(1).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "secureValueRecoveryServiceRetry-%d")).threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
@ -202,7 +204,8 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
AccountsManager accountsManager = new AccountsManager(accounts, phoneNumberIdentifiers, cacheCluster,
accountLockManager, keys, messagesManager, profilesManager,
secureStorageClient, secureValueRecovery2Client, clientPresenceManager,
experimentEnrollmentManager, registrationRecoveryPasswordsManager, accountLockExecutor, Clock.systemUTC());
experimentEnrollmentManager, registrationRecoveryPasswordsManager, accountLockExecutor, clientPresenceExecutor,
Clock.systemUTC());
final String usernameHash = namespace.getString("usernameHash");
final String encryptedUsername = namespace.getString("encryptedUsername");

View File

@ -88,6 +88,8 @@ record CommandDependencies(
.executorService(name(name, "storageService-%d")).maxThreads(8).minThreads(8).build();
ExecutorService accountLockExecutor = environment.lifecycle()
.executorService(name(name, "accountLock-%d")).minThreads(8).maxThreads(8).build();
ExecutorService clientPresenceExecutor = environment.lifecycle()
.executorService(name(name, "clientPresence-%d")).minThreads(8).maxThreads(8).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "secureValueRecoveryServiceRetry-%d")).threads(1).build();
@ -177,7 +179,8 @@ record CommandDependencies(
AccountsManager accountsManager = new AccountsManager(accounts, phoneNumberIdentifiers, cacheCluster,
accountLockManager, keys, messagesManager, profilesManager,
secureStorageClient, secureValueRecovery2Client, clientPresenceManager,
experimentEnrollmentManager, registrationRecoveryPasswordsManager, accountLockExecutor, clock);
experimentEnrollmentManager, registrationRecoveryPasswordsManager, accountLockExecutor, clientPresenceExecutor,
clock);
environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(clientPresenceManager);

View File

@ -67,6 +67,7 @@ public class AccountCreationIntegrationTest {
private static final Clock CLOCK = Clock.fixed(Instant.now(), ZoneId.systemDefault());
private ExecutorService accountLockExecutor;
private ExecutorService clientPresenceExecutor;
private AccountsManager accountsManager;
private KeysManager keysManager;
@ -99,6 +100,7 @@ public class AccountCreationIntegrationTest {
DynamoDbExtensionSchema.Tables.DELETED_ACCOUNTS.tableName());
accountLockExecutor = Executors.newSingleThreadExecutor();
clientPresenceExecutor = Executors.newSingleThreadExecutor();
final AccountLockManager accountLockManager = new AccountLockManager(DYNAMO_DB_EXTENSION.getDynamoDbClient(),
DynamoDbExtensionSchema.Tables.DELETED_ACCOUNTS_LOCK.tableName());
@ -139,15 +141,20 @@ public class AccountCreationIntegrationTest {
mock(ExperimentEnrollmentManager.class),
registrationRecoveryPasswordsManager,
accountLockExecutor,
clientPresenceExecutor,
CLOCK);
}
@AfterEach
void tearDown() throws InterruptedException {
accountLockExecutor.shutdown();
clientPresenceExecutor.shutdown();
//noinspection ResultOfMethodCallIgnored
accountLockExecutor.awaitTermination(1, TimeUnit.SECONDS);
//noinspection ResultOfMethodCallIgnored
clientPresenceExecutor.awaitTermination(1, TimeUnit.SECONDS);
}
@CartesianTest

View File

@ -64,6 +64,7 @@ class AccountsManagerChangeNumberIntegrationTest {
private ClientPresenceManager clientPresenceManager;
private ExecutorService accountLockExecutor;
private ExecutorService clientPresenceExecutor;
private AccountsManager accountsManager;
@ -95,6 +96,7 @@ class AccountsManagerChangeNumberIntegrationTest {
Tables.DELETED_ACCOUNTS.tableName());
accountLockExecutor = Executors.newSingleThreadExecutor();
clientPresenceExecutor = Executors.newSingleThreadExecutor();
final AccountLockManager accountLockManager = new AccountLockManager(DYNAMO_DB_EXTENSION.getDynamoDbClient(),
Tables.DELETED_ACCOUNTS_LOCK.tableName());
@ -136,6 +138,7 @@ class AccountsManagerChangeNumberIntegrationTest {
mock(ExperimentEnrollmentManager.class),
registrationRecoveryPasswordsManager,
accountLockExecutor,
clientPresenceExecutor,
mock(Clock.class));
}
}
@ -143,9 +146,13 @@ class AccountsManagerChangeNumberIntegrationTest {
@AfterEach
void tearDown() throws InterruptedException {
accountLockExecutor.shutdown();
clientPresenceExecutor.shutdown();
//noinspection ResultOfMethodCallIgnored
accountLockExecutor.awaitTermination(1, TimeUnit.SECONDS);
//noinspection ResultOfMethodCallIgnored
clientPresenceExecutor.awaitTermination(1, TimeUnit.SECONDS);
}
@Test

View File

@ -141,6 +141,7 @@ class AccountsManagerConcurrentModificationIntegrationTest {
mock(ExperimentEnrollmentManager.class),
mock(RegistrationRecoveryPasswordsManager.class),
mock(Executor.class),
mock(Executor.class),
mock(Clock.class)
);
}

View File

@ -48,6 +48,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -134,6 +135,15 @@ class AccountsManagerTest {
profilesManager = mock(ProfilesManager.class);
clientPresenceManager = mock(ClientPresenceManager.class);
final Executor clientPresenceExecutor = mock(Executor.class);
doAnswer(invocation -> {
final Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(clientPresenceExecutor).execute(any());
//noinspection unchecked
commands = mock(RedisAdvancedClusterCommands.class);
@ -224,6 +234,7 @@ class AccountsManagerTest {
enrollmentManager,
registrationRecoveryPasswordsManager,
mock(Executor.class),
clientPresenceExecutor,
mock(Clock.class));
}
@ -856,7 +867,7 @@ class AccountsManagerTest {
when(commands.get(eq("Account3::" + uuid))).thenReturn(null);
when(accounts.getByAccountIdentifier(uuid)).thenReturn(Optional.empty())
.thenReturn(Optional.of(account));
when(accounts.create(any(), any())).thenThrow(ContestedOptimisticLockException.class);
when(accounts.create(any(), any(), any())).thenThrow(ContestedOptimisticLockException.class);
accountsManager.update(account, a -> {
});
@ -974,14 +985,14 @@ class AccountsManagerTest {
@Test
void testCreateFreshAccount() throws InterruptedException {
when(accounts.create(any(), any())).thenReturn(true);
when(accounts.create(any(), any(), any())).thenReturn(true);
final String e164 = "+18005550123";
final AccountAttributes attributes = new AccountAttributes(false, 1, 2, null, null, true, null);
createAccount(e164, attributes);
verify(accounts).create(argThat(account -> e164.equals(account.getNumber())), any());
verify(accounts).create(argThat(account -> e164.equals(account.getNumber())), any(), any());
verifyNoInteractions(messagesManager);
verifyNoInteractions(profilesManager);
@ -991,25 +1002,31 @@ class AccountsManagerTest {
void testReregisterAccount() throws InterruptedException {
final UUID existingUuid = UUID.randomUUID();
when(accounts.create(any(), any())).thenAnswer(invocation -> {
invocation.getArgument(0, Account.class).setUuid(existingUuid);
return false;
});
final String e164 = "+18005550123";
final AccountAttributes attributes = new AccountAttributes(false, 1, 2, null, null, true, null);
when(accounts.create(any(), any(), any())).thenAnswer(invocation -> {
invocation.getArgument(0, Account.class).setUuid(existingUuid);
final BiFunction<UUID, UUID, CompletableFuture<Void>> cleanupOperation = invocation.getArgument(2);
cleanupOperation.apply(existingUuid, phoneNumberIdentifiersByE164.get(e164));
return false;
});
createAccount(e164, attributes);
assertTrue(phoneNumberIdentifiersByE164.containsKey(e164));
verify(accounts)
.create(argThat(account -> e164.equals(account.getNumber()) && existingUuid.equals(account.getUuid())), any());
.create(argThat(account -> e164.equals(account.getNumber()) && existingUuid.equals(account.getUuid())), any(), any());
verify(keysManager).delete(existingUuid);
verify(keysManager).delete(phoneNumberIdentifiersByE164.get(e164));
verify(keysManager).delete(existingUuid, true);
verify(keysManager).delete(phoneNumberIdentifiersByE164.get(e164), true);
verify(messagesManager).clear(existingUuid);
verify(profilesManager).deleteAll(existingUuid);
verify(messagesManager, times(2)).clear(existingUuid);
verify(profilesManager, times(2)).deleteAll(existingUuid);
verify(clientPresenceManager).disconnectAllPresencesForUuid(existingUuid);
}
@ -1018,7 +1035,7 @@ class AccountsManagerTest {
final UUID recentlyDeletedUuid = UUID.randomUUID();
when(accounts.findRecentlyDeletedAccountIdentifier(anyString())).thenReturn(Optional.of(recentlyDeletedUuid));
when(accounts.create(any(), any())).thenReturn(true);
when(accounts.create(any(), any(), any())).thenReturn(true);
final String e164 = "+18005550123";
final AccountAttributes attributes = new AccountAttributes(false, 1, 2, null, null, true, null);
@ -1027,6 +1044,7 @@ class AccountsManagerTest {
verify(accounts).create(
argThat(account -> e164.equals(account.getNumber()) && recentlyDeletedUuid.equals(account.getUuid())),
any(),
any());
verifyNoInteractions(keysManager);

View File

@ -149,6 +149,7 @@ class AccountsManagerUsernameIntegrationTest {
experimentEnrollmentManager,
mock(RegistrationRecoveryPasswordsManager.class),
mock(Executor.class),
mock(Executor.class),
mock(Clock.class));
}
@ -312,7 +313,8 @@ class AccountsManagerUsernameIntegrationTest {
final Account account = AccountsHelper.createAccount(accountsManager, "+18005551111");
account.setUsernameHash(TestRandomUtil.nextBytes(16));
accounts.create(account, ignored -> Collections.emptyList());
accounts.create(account, ignored -> Collections.emptyList(),
(ignoredAci, ignoredPni) -> CompletableFuture.completedFuture(null));
final UUID linkHandle = UUID.randomUUID();
final byte[] encryptedUsername = TestRandomUtil.nextBytes(32);

View File

@ -209,6 +209,34 @@ class AccountsTest {
assertThat(accounts.findRecentlyDeletedAccountIdentifier(account.getNumber())).isEmpty();
}
@Test
void testStoreCleanupFailure() {
final Account existingAccount = nextRandomAccount();
createAccount(existingAccount);
verifyStoredState(existingAccount.getNumber(),
existingAccount.getUuid(),
existingAccount.getPhoneNumberIdentifier(),
existingAccount.getUsernameHash().orElse(null),
existingAccount,
true);
final CompletionException completionException = assertThrows(CompletionException.class,
() -> accounts.create(generateAccount(existingAccount.getNumber(), UUID.randomUUID(), UUID.randomUUID()),
ignored -> Collections.emptyList(),
(aci, pni) -> CompletableFuture.failedFuture(new RuntimeException("OH NO"))));
assertTrue(completionException.getCause() instanceof RuntimeException);
// If the existing account cleanup task failed, we should not overwrite the existing account record
verifyStoredState(existingAccount.getNumber(),
existingAccount.getUuid(),
existingAccount.getPhoneNumberIdentifier(),
existingAccount.getUsernameHash().orElse(null),
existingAccount,
true);
}
@Test
void testStoreMulti() {
final List<Device> devices = List.of(generateDevice(DEVICE_ID_1), generateDevice(DEVICE_ID_2));
@ -1113,7 +1141,8 @@ class AccountsTest {
}
private boolean createAccount(final Account account) {
return accounts.create(account, ignored -> Collections.emptyList());
return accounts.create(account, ignored -> Collections.emptyList(),
(ignoredAci, ignoredPni) -> CompletableFuture.completedFuture(null));
}
private static Account nextRandomAccount() {