Add `MigrationMismatchedAccounts` to `AccountsManager`

This commit is contained in:
Chris Eager 2021-09-10 15:11:10 -07:00 committed by Chris Eager
parent 372e131e25
commit a51a7a0901
7 changed files with 107 additions and 47 deletions

View File

@ -460,9 +460,13 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, Metrics.globalRegistry);
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager, reportMessageManager);
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts, deletedAccountsLockDynamoDbClient, config.getDeletedAccountsLockDynamoDbConfiguration().getTableName());
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, pendingAccountsManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
deletedAccountsLockDynamoDbClient, config.getDeletedAccountsLockDynamoDbConfiguration().getTableName());
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster,
deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, mismatchedAccounts, usernamesManager,
profilesManager, pendingAccountsManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager,
dynamicConfigurationManager);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);

View File

@ -83,8 +83,9 @@ public class AccountsManager {
private final DeletedAccountsManager deletedAccountsManager;
private final DirectoryQueue directoryQueue;
private final KeysDynamoDb keysDynamoDb;
private final MessagesManager messagesManager;
private final UsernamesManager usernamesManager;
private final MessagesManager messagesManager;
private final MigrationMismatchedAccounts mismatchedAccounts;
private final UsernamesManager usernamesManager;
private final ProfilesManager profilesManager;
private final StoredVerificationCodeManager pendingAccounts;
private final SecureStorageClient secureStorageClient;
@ -111,7 +112,8 @@ public class AccountsManager {
public AccountsManager(Accounts accounts, AccountsDynamoDb accountsDynamoDb, FaultTolerantRedisCluster cacheCluster,
final DeletedAccountsManager deletedAccountsManager,
final DirectoryQueue directoryQueue,
final KeysDynamoDb keysDynamoDb, final MessagesManager messagesManager, final UsernamesManager usernamesManager,
final KeysDynamoDb keysDynamoDb, final MessagesManager messagesManager,
final MigrationMismatchedAccounts mismatchedAccounts, final UsernamesManager usernamesManager,
final ProfilesManager profilesManager,
final StoredVerificationCodeManager pendingAccounts,
final SecureStorageClient secureStorageClient,
@ -124,8 +126,9 @@ public class AccountsManager {
this.deletedAccountsManager = deletedAccountsManager;
this.directoryQueue = directoryQueue;
this.keysDynamoDb = keysDynamoDb;
this.messagesManager = messagesManager;
this.usernamesManager = usernamesManager;
this.messagesManager = messagesManager;
this.mismatchedAccounts = mismatchedAccounts;
this.usernamesManager = usernamesManager;
this.profilesManager = profilesManager;
this.pendingAccounts = pendingAccounts;
this.secureStorageClient = secureStorageClient;
@ -726,17 +729,21 @@ public class AccountsManager {
"mismatchType", mismatchDescription)
.increment();
if (maybeUUid.isPresent()
&& dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isLogMismatches()) {
maybeUUid.ifPresent(uuid -> {
final String abbreviatedCallChain = getAbbreviatedCallChain(new RuntimeException().getStackTrace());
mismatchedAccounts.put(uuid);
logger.info("Mismatched account data: {}", StructuredArguments.entries(Map.of(
"type", mismatchDescription,
"uuid", maybeUUid.get(),
"callChain", abbreviatedCallChain
)));
}
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.isLogMismatches()) {
final String abbreviatedCallChain = getAbbreviatedCallChain(new RuntimeException().getStackTrace());
logger.info("Mismatched account data: {}", StructuredArguments.entries(Map.of(
"type", mismatchDescription,
"uuid", uuid,
"callChain", abbreviatedCallChain
)));
}
});
});
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
@ -50,6 +50,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationMismatchedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
@ -107,37 +108,57 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
ThreadPoolExecutor accountsDynamoDbMigrationThreadPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>());
DynamoDbClient reportMessagesDynamoDb = DynamoDbFromConfig.client(configuration.getReportMessageDynamoDbConfiguration(),
DynamoDbClient reportMessagesDynamoDb = DynamoDbFromConfig.client(
configuration.getReportMessageDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient messageDynamoDb = DynamoDbFromConfig.client(configuration.getMessageDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient preKeysDynamoDb = DynamoDbFromConfig.client(configuration.getKeysDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient accountsDynamoDbClient = DynamoDbFromConfig.client(configuration.getAccountsDynamoDbConfiguration(),
DynamoDbClient accountsDynamoDbClient = DynamoDbFromConfig.client(
configuration.getAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbAsyncClient accountsDynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(configuration.getAccountsDynamoDbConfiguration(),
DynamoDbAsyncClient accountsDynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(
configuration.getAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create(),
accountsDynamoDbMigrationThreadPool);
DynamoDbClient deletedAccountsDynamoDbClient = DynamoDbFromConfig.client(configuration.getDeletedAccountsDynamoDbConfiguration(),
DynamoDbClient deletedAccountsDynamoDbClient = DynamoDbFromConfig.client(
configuration.getDeletedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient migrationMismatchedAccountsDynamoDb = DynamoDbFromConfig.client(
configuration.getMigrationMismatchedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster",
configuration.getCacheClusterConfiguration(), redisClusterClientResources);
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(4).build();
ExecutorService backupServiceExecutor = environment.lifecycle().executorService(name(getClass(), "backupService-%d")).maxThreads(8).minThreads(1).build();
ExecutorService storageServiceExecutor = environment.lifecycle().executorService(name(getClass(), "storageService-%d")).maxThreads(8).minThreads(1).build();
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle()
.executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(4).build();
ExecutorService backupServiceExecutor = environment.lifecycle()
.executorService(name(getClass(), "backupService-%d")).maxThreads(8).minThreads(1).build();
ExecutorService storageServiceExecutor = environment.lifecycle()
.executorService(name(getClass(), "storageService-%d")).maxThreads(8).minThreads(1).build();
ExternalServiceCredentialGenerator backupCredentialsGenerator = new ExternalServiceCredentialGenerator(configuration.getSecureBackupServiceConfiguration().getUserAuthenticationTokenSharedSecret(), new byte[0], false);
ExternalServiceCredentialGenerator storageCredentialsGenerator = new ExternalServiceCredentialGenerator(configuration.getSecureStorageServiceConfiguration().getUserAuthenticationTokenSharedSecret(), new byte[0], false);
ExternalServiceCredentialGenerator backupCredentialsGenerator = new ExternalServiceCredentialGenerator(
configuration.getSecureBackupServiceConfiguration().getUserAuthenticationTokenSharedSecret(), new byte[0],
false);
ExternalServiceCredentialGenerator storageCredentialsGenerator = new ExternalServiceCredentialGenerator(
configuration.getSecureStorageServiceConfiguration().getUserAuthenticationTokenSharedSecret(), new byte[0],
false);
DynamicConfigurationManager dynamicConfigurationManager = new DynamicConfigurationManager(configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(), configuration.getAppConfig().getConfigurationName());
DynamicConfigurationManager dynamicConfigurationManager = new DynamicConfigurationManager(
configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(),
configuration.getAppConfig().getConfigurationName());
dynamicConfigurationManager.start();
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(dynamicConfigurationManager);
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(
dynamicConfigurationManager);
DynamoDbClient migrationDeletedAccountsDynamoDb = DynamoDbFromConfig.client(configuration.getMigrationDeletedAccountsDynamoDbConfiguration(),
DynamoDbClient migrationDeletedAccountsDynamoDb = DynamoDbFromConfig.client(
configuration.getMigrationDeletedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(configuration.getMigrationRetryAccountsDynamoDbConfiguration(),
DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(
configuration.getMigrationRetryAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient pendingAccountsDynamoDbClient = DynamoDbFromConfig.client(configuration.getPendingAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
@ -163,22 +184,38 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb, configuration.getMessageDynamoDbConfiguration().getTableName(), configuration.getMessageDynamoDbConfiguration().getTimeToLive());
FaultTolerantRedisCluster messageInsertCacheCluster = new FaultTolerantRedisCluster("message_insert_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messageReadDeleteCluster = new FaultTolerantRedisCluster("message_read_delete_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", configuration.getMetricsClusterConfiguration(), redisClusterClientResources);
SecureBackupClient secureBackupClient = new SecureBackupClient(backupCredentialsGenerator, backupServiceExecutor, configuration.getSecureBackupServiceConfiguration());
SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator, storageServiceExecutor, configuration.getSecureStorageServiceConfiguration());
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
DirectoryQueue directoryQueue = new DirectoryQueue (configuration.getDirectoryConfiguration().getSqsConfiguration());
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessagesDynamoDb, configuration.getReportMessageDynamoDbConfiguration().getTableName());
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, Metrics.globalRegistry);
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager, reportMessageManager);
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts, deletedAccountsLockDynamoDbClient, configuration.getDeletedAccountsLockDynamoDbConfiguration().getTableName());
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster",
configuration.getMetricsClusterConfiguration(), redisClusterClientResources);
SecureBackupClient secureBackupClient = new SecureBackupClient(backupCredentialsGenerator, backupServiceExecutor,
configuration.getSecureBackupServiceConfiguration());
SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator,
storageServiceExecutor, configuration.getSecureStorageServiceConfiguration());
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster,
keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
DirectoryQueue directoryQueue = new DirectoryQueue(
configuration.getDirectoryConfiguration().getSqsConfiguration());
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessagesDynamoDb,
configuration.getReportMessageDynamoDbConfiguration().getTableName());
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb,
Metrics.globalRegistry);
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager,
reportMessageManager);
MigrationMismatchedAccounts mismatchedAccounts = new MigrationMismatchedAccounts(
migrationMismatchedAccountsDynamoDb,
configuration.getMigrationMismatchedAccountsDynamoDbConfiguration().getTableName());
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
deletedAccountsLockDynamoDbClient,
configuration.getDeletedAccountsLockDynamoDbConfiguration().getTableName());
StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts);
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, pendingAccountsManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager);
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster,
deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, mismatchedAccounts, usernamesManager,
profilesManager, pendingAccountsManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager,
dynamicConfigurationManager);
for (String user: users) {
for (String user : users) {
Optional<Account> account = accountsManager.get(user);
if (account.isPresent()) {

View File

@ -48,6 +48,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationMismatchedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
@ -153,6 +154,9 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
DynamoDbClient migrationDeletedAccountsDynamoDb = DynamoDbFromConfig
.client(configuration.getMigrationDeletedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient migrationMismatchedAccountsDynamoDb = DynamoDbFromConfig
.client(configuration.getMigrationMismatchedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig
.client(configuration.getMigrationRetryAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
@ -217,12 +221,16 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
Metrics.globalRegistry);
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager,
reportMessageManager);
MigrationMismatchedAccounts mismatchedAccounts = new MigrationMismatchedAccounts(
migrationMismatchedAccountsDynamoDb,
configuration.getMigrationMismatchedAccountsDynamoDbConfiguration().getTableName());
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
deletedAccountsLockDynamoDbClient,
configuration.getDeletedAccountsLockDynamoDbConfiguration().getTableName());
StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts);
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster,
deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager,
deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, mismatchedAccounts, usernamesManager,
profilesManager,
pendingAccountsManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager,
dynamicConfigurationManager);

View File

@ -204,6 +204,7 @@ class AccountsDynamoDbMigrationCrawlerIntegrationTest {
directoryQueue,
keysDynamoDb,
mock(MessagesManager.class),
mock(MigrationMismatchedAccounts.class),
mock(UsernamesManager.class),
mock(ProfilesManager.class),
mock(StoredVerificationCodeManager.class),

View File

@ -160,6 +160,7 @@ class AccountsManagerConcurrentModificationIntegrationTest {
mock(DirectoryQueue.class),
mock(KeysDynamoDb.class),
mock(MessagesManager.class),
mock(MigrationMismatchedAccounts.class),
mock(UsernamesManager.class),
mock(ProfilesManager.class),
mock(StoredVerificationCodeManager.class),

View File

@ -59,6 +59,7 @@ import org.whispersystems.textsecuregcm.storage.Device.DeviceCapabilities;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.MigrationMismatchedAccounts;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.StoredVerificationCodeManager;
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
@ -120,6 +121,7 @@ class AccountsManagerTest {
directoryQueue,
keys,
messagesManager,
mock(MigrationMismatchedAccounts.class),
mock(UsernamesManager.class),
profilesManager,
mock(StoredVerificationCodeManager.class),