Retire integration with legacy contact discovery system
This commit is contained in:
		
							parent
							
								
									8d468d17e3
								
							
						
					
					
						commit
						12b58a31a1
					
				|  | @ -47,7 +47,6 @@ dynamoDbTables: | |||
|     scanPageSize: 100 | ||||
|   deletedAccounts: | ||||
|     tableName: Example_DeletedAccounts | ||||
|     needsReconciliationIndexName: NeedsReconciliation | ||||
|   deletedAccountsLock: | ||||
|     tableName: Example_DeletedAccountsLock | ||||
|   issuedReceipts: | ||||
|  | @ -99,43 +98,6 @@ pushSchedulerCluster: # Redis server configuration for push scheduler cluster | |||
| rateLimitersCluster: # Redis server configuration for rate limiters cluster | ||||
|   configurationUri: redis://redis.example.com:6379/ | ||||
| 
 | ||||
| directory: | ||||
|   client: # Configuration for interfacing with Contact Discovery Service cluster | ||||
|     userAuthenticationTokenSharedSecret: 00000f # hex-encoded secret shared with CDS used to generate auth tokens for Signal users | ||||
|     userAuthenticationTokenUserIdSecret: 00000f # hex-encoded secret shared among Signal-Servers to obscure user phone numbers from CDS | ||||
|   sqs: | ||||
|     accessKey: test     # AWS SQS accessKey | ||||
|     accessSecret: test  # AWS SQS accessSecret | ||||
|     queueUrls: # AWS SQS queue urls | ||||
|       - https://sqs.example.com/directory.fifo | ||||
|   server: # One or more CDS servers | ||||
|     - replicationName: example           # CDS replication name | ||||
|       replicationUrl: cds.example.com    # CDS replication endpoint base url | ||||
|       replicationPassword: example       # CDS replication endpoint password | ||||
|       replicationCaCertificates:         # CDS replication endpoint TLS certificate trust root | ||||
|         - | | ||||
|           -----BEGIN CERTIFICATE----- | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789+abcdefghijklmnopqrstuvwxyz | ||||
|           AAAAAAAAAAAAAAAAAAAA | ||||
|           -----END CERTIFICATE----- | ||||
| 
 | ||||
| directoryV2: | ||||
|   client: # Configuration for interfacing with Contact Discovery Service v2 cluster | ||||
|     userAuthenticationTokenSharedSecret: abcdefghijklmnopqrstuvwxyz0123456789ABCDEFG= # base64-encoded secret shared with CDS to generate auth tokens for Signal users | ||||
|  |  | |||
|  | @ -294,10 +294,6 @@ | |||
|       <groupId>software.amazon.awssdk</groupId> | ||||
|       <artifactId>s3</artifactId> | ||||
|     </dependency> | ||||
|     <dependency> | ||||
|       <groupId>software.amazon.awssdk</groupId> | ||||
|       <artifactId>sqs</artifactId> | ||||
|     </dependency> | ||||
|     <dependency> | ||||
|       <groupId>software.amazon.awssdk</groupId> | ||||
|       <artifactId>dynamodb</artifactId> | ||||
|  |  | |||
|  | @ -23,7 +23,6 @@ import org.whispersystems.textsecuregcm.configuration.BraintreeConfiguration; | |||
| import org.whispersystems.textsecuregcm.configuration.CallLinkConfiguration; | ||||
| import org.whispersystems.textsecuregcm.configuration.CdnConfiguration; | ||||
| import org.whispersystems.textsecuregcm.configuration.DatadogConfiguration; | ||||
| import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration; | ||||
| import org.whispersystems.textsecuregcm.configuration.DirectoryV2Configuration; | ||||
| import org.whispersystems.textsecuregcm.configuration.DynamoDbClientConfiguration; | ||||
| import org.whispersystems.textsecuregcm.configuration.DynamoDbTables; | ||||
|  | @ -115,11 +114,6 @@ public class WhisperServerConfiguration extends Configuration { | |||
|   @JsonProperty | ||||
|   private RedisClusterConfiguration metricsCluster; | ||||
| 
 | ||||
|   @NotNull | ||||
|   @Valid | ||||
|   @JsonProperty | ||||
|   private DirectoryConfiguration directory; | ||||
| 
 | ||||
|   @NotNull | ||||
|   @Valid | ||||
|   @JsonProperty | ||||
|  | @ -321,10 +315,6 @@ public class WhisperServerConfiguration extends Configuration { | |||
|     return metricsCluster; | ||||
|   } | ||||
| 
 | ||||
|   public DirectoryConfiguration getDirectoryConfiguration() { | ||||
|     return directory; | ||||
|   } | ||||
| 
 | ||||
|   public SecureValueRecovery2Configuration getSvr2Configuration() { | ||||
|     return svr2; | ||||
|   } | ||||
|  |  | |||
|  | @ -37,7 +37,6 @@ import java.net.http.HttpClient; | |||
| import java.nio.charset.StandardCharsets; | ||||
| import java.time.Clock; | ||||
| import java.time.Duration; | ||||
| import java.util.ArrayList; | ||||
| import java.util.Collections; | ||||
| import java.util.EnumSet; | ||||
| import java.util.List; | ||||
|  | @ -80,7 +79,6 @@ import org.whispersystems.textsecuregcm.captcha.CaptchaChecker; | |||
| import org.whispersystems.textsecuregcm.captcha.HCaptchaClient; | ||||
| import org.whispersystems.textsecuregcm.captcha.RecaptchaClient; | ||||
| import org.whispersystems.textsecuregcm.captcha.RegistrationCaptchaManager; | ||||
| import org.whispersystems.textsecuregcm.configuration.DirectoryServerConfiguration; | ||||
| import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; | ||||
| import org.whispersystems.textsecuregcm.controllers.AccountController; | ||||
| import org.whispersystems.textsecuregcm.controllers.AccountControllerV2; | ||||
|  | @ -91,7 +89,6 @@ import org.whispersystems.textsecuregcm.controllers.CallLinkController; | |||
| import org.whispersystems.textsecuregcm.controllers.CertificateController; | ||||
| import org.whispersystems.textsecuregcm.controllers.ChallengeController; | ||||
| import org.whispersystems.textsecuregcm.controllers.DeviceController; | ||||
| import org.whispersystems.textsecuregcm.controllers.DirectoryController; | ||||
| import org.whispersystems.textsecuregcm.controllers.DirectoryV2Controller; | ||||
| import org.whispersystems.textsecuregcm.controllers.DonationController; | ||||
| import org.whispersystems.textsecuregcm.controllers.KeepAliveController; | ||||
|  | @ -168,7 +165,6 @@ import org.whispersystems.textsecuregcm.spam.RateLimitChallengeListener; | |||
| import org.whispersystems.textsecuregcm.spam.ReportSpamTokenProvider; | ||||
| import org.whispersystems.textsecuregcm.spam.ScoreThresholdProvider; | ||||
| import org.whispersystems.textsecuregcm.spam.SpamFilter; | ||||
| import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; | ||||
| import org.whispersystems.textsecuregcm.storage.AccountCleaner; | ||||
| import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler; | ||||
| import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache; | ||||
|  | @ -178,11 +174,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager; | |||
| import org.whispersystems.textsecuregcm.storage.ChangeNumberManager; | ||||
| import org.whispersystems.textsecuregcm.storage.ContactDiscoveryWriter; | ||||
| import org.whispersystems.textsecuregcm.storage.DeletedAccounts; | ||||
| import org.whispersystems.textsecuregcm.storage.DeletedAccountsDirectoryReconciler; | ||||
| import org.whispersystems.textsecuregcm.storage.DeletedAccountsManager; | ||||
| import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawler; | ||||
| import org.whispersystems.textsecuregcm.storage.DirectoryReconciler; | ||||
| import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient; | ||||
| import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; | ||||
| import org.whispersystems.textsecuregcm.storage.IssuedReceiptsManager; | ||||
| import org.whispersystems.textsecuregcm.storage.Keys; | ||||
|  | @ -328,8 +320,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration | |||
|         .build(); | ||||
| 
 | ||||
|     DeletedAccounts deletedAccounts = new DeletedAccounts(dynamoDbClient, | ||||
|         config.getDynamoDbTables().getDeletedAccounts().getTableName(), | ||||
|         config.getDynamoDbTables().getDeletedAccounts().getNeedsReconciliationIndexName()); | ||||
|         config.getDynamoDbTables().getDeletedAccounts().getTableName()); | ||||
| 
 | ||||
|     DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = | ||||
|         new DynamicConfigurationManager<>(config.getAppConfig().getApplication(), | ||||
|  | @ -464,8 +455,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration | |||
|         config.getBraintree().supportedCurrencies(), config.getBraintree().merchantAccounts(), | ||||
|         config.getBraintree().graphqlUrl(), config.getBraintree().circuitBreaker(), subscriptionProcessorExecutor); | ||||
| 
 | ||||
|     ExternalServiceCredentialsGenerator directoryCredentialsGenerator = DirectoryController.credentialsGenerator( | ||||
|         config.getDirectoryConfiguration().getDirectoryClientConfiguration()); | ||||
|     ExternalServiceCredentialsGenerator directoryV2CredentialsGenerator = DirectoryV2Controller.credentialsGenerator( | ||||
|         config.getDirectoryV2Configuration().getDirectoryV2ClientConfiguration()); | ||||
|     ExternalServiceCredentialsGenerator storageCredentialsGenerator = SecureStorageController.credentialsGenerator( | ||||
|  | @ -502,7 +491,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration | |||
|         storageServiceExecutor, config.getSecureStorageServiceConfiguration()); | ||||
|     ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, recurringJobExecutor, | ||||
|         keyspaceNotificationDispatchExecutor); | ||||
|     DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration()); | ||||
|     StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts); | ||||
|     StoredVerificationCodeManager pendingDevicesManager = new StoredVerificationCodeManager(pendingDevices); | ||||
|     ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); | ||||
|  | @ -516,7 +504,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration | |||
|     DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts, | ||||
|         deletedAccountsLockDynamoDbClient, config.getDynamoDbTables().getDeletedAccountsLock().getTableName()); | ||||
|     AccountsManager accountsManager = new AccountsManager(accounts, phoneNumberIdentifiers, cacheCluster, | ||||
|         deletedAccountsManager, directoryQueue, keys, messagesManager, profilesManager, | ||||
|         deletedAccountsManager, keys, messagesManager, profilesManager, | ||||
|         pendingAccountsManager, secureStorageClient, secureBackupClient, secureValueRecovery2Client, clientPresenceManager, | ||||
|         experimentEnrollmentManager, registrationRecoveryPasswordsManager, clock); | ||||
|     RemoteConfigsManager       remoteConfigsManager       = new RemoteConfigsManager(remoteConfigs); | ||||
|  | @ -573,33 +561,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration | |||
|     MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes())); | ||||
|     ChangeNumberManager changeNumberManager = new ChangeNumberManager(messageSender, accountsManager); | ||||
| 
 | ||||
|     final List<AccountDatabaseCrawlerListener> directoryReconciliationAccountDatabaseCrawlerListeners = new ArrayList<>(); | ||||
|     final List<DeletedAccountsDirectoryReconciler> deletedAccountsDirectoryReconcilers = new ArrayList<>(); | ||||
|     for (DirectoryServerConfiguration directoryServerConfiguration : config.getDirectoryConfiguration() | ||||
|         .getDirectoryServerConfiguration()) { | ||||
|       final DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient( | ||||
|           directoryServerConfiguration); | ||||
|       final DirectoryReconciler directoryReconciler = new DirectoryReconciler( | ||||
|           directoryServerConfiguration.getReplicationName(), directoryReconciliationClient, | ||||
|           dynamicConfigurationManager); | ||||
|       // reconcilers are read-only | ||||
|       directoryReconciliationAccountDatabaseCrawlerListeners.add(directoryReconciler); | ||||
| 
 | ||||
|       final DeletedAccountsDirectoryReconciler deletedAccountsDirectoryReconciler = new DeletedAccountsDirectoryReconciler( | ||||
|           directoryServerConfiguration.getReplicationName(), directoryReconciliationClient); | ||||
|       deletedAccountsDirectoryReconcilers.add(deletedAccountsDirectoryReconciler); | ||||
|     } | ||||
| 
 | ||||
|     AccountDatabaseCrawlerCache directoryReconciliationAccountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache( | ||||
|         cacheCluster, AccountDatabaseCrawlerCache.DIRECTORY_RECONCILER_PREFIX); | ||||
|     AccountDatabaseCrawler directoryReconciliationAccountDatabaseCrawler = new AccountDatabaseCrawler( | ||||
|         "Reconciliation crawler", | ||||
|         accountsManager, | ||||
|         directoryReconciliationAccountDatabaseCrawlerCache, directoryReconciliationAccountDatabaseCrawlerListeners, | ||||
|         config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), | ||||
|         config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs() | ||||
|     ); | ||||
| 
 | ||||
|     AccountDatabaseCrawlerCache accountCleanerAccountDatabaseCrawlerCache = | ||||
|         new AccountDatabaseCrawlerCache(cacheCluster, AccountDatabaseCrawlerCache.ACCOUNT_CLEANER_PREFIX); | ||||
|     AccountDatabaseCrawler accountCleanerAccountDatabaseCrawler = new AccountDatabaseCrawler("Account cleaner crawler", | ||||
|  | @ -625,8 +586,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration | |||
|         config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs() | ||||
|     ); | ||||
| 
 | ||||
|     DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccountsManager, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor); | ||||
| 
 | ||||
|     HttpClient currencyClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).connectTimeout(Duration.ofSeconds(10)).build(); | ||||
|     FixerClient fixerClient = new FixerClient(currencyClient, config.getPaymentsServiceConfiguration().getFixerApiKey()); | ||||
|     CoinMarketCapClient coinMarketCapClient = new CoinMarketCapClient(currencyClient, config.getPaymentsServiceConfiguration().getCoinMarketCapApiKey(), config.getPaymentsServiceConfiguration().getCoinMarketCapCurrencyIds()); | ||||
|  | @ -637,14 +596,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration | |||
|     environment.lifecycle().manage(apnPushNotificationScheduler); | ||||
|     environment.lifecycle().manage(provisioningManager); | ||||
|     environment.lifecycle().manage(accountDatabaseCrawler); | ||||
|     environment.lifecycle().manage(directoryReconciliationAccountDatabaseCrawler); | ||||
|     environment.lifecycle().manage(accountCleanerAccountDatabaseCrawler); | ||||
|     environment.lifecycle().manage(deletedAccountsTableCrawler); | ||||
|     environment.lifecycle().manage(messagesCache); | ||||
|     environment.lifecycle().manage(messagePersister); | ||||
|     environment.lifecycle().manage(clientPresenceManager); | ||||
|     environment.lifecycle().manage(currencyManager); | ||||
|     environment.lifecycle().manage(directoryQueue); | ||||
|     environment.lifecycle().manage(registrationServiceClient); | ||||
| 
 | ||||
|     final RegistrationCaptchaManager registrationCaptchaManager = new RegistrationCaptchaManager(captchaChecker, | ||||
|  | @ -767,7 +723,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration | |||
|         new CertificateController(new CertificateGenerator(config.getDeliveryCertificate().getCertificate(), config.getDeliveryCertificate().getPrivateKey(), config.getDeliveryCertificate().getExpiresDays()), zkAuthOperations, clock), | ||||
|         new ChallengeController(rateLimitChallengeManager), | ||||
|         new DeviceController(pendingDevicesManager, accountsManager, messagesManager, keys, rateLimiters, config.getMaxDevices()), | ||||
|         new DirectoryController(directoryCredentialsGenerator), | ||||
|         new DirectoryV2Controller(directoryV2CredentialsGenerator), | ||||
|         new DonationController(clock, zkReceiptOperations, redeemedReceiptsManager, accountsManager, config.getBadges(), | ||||
|             ReceiptCredentialPresentation::new), | ||||
|  |  | |||
|  | @ -1,25 +0,0 @@ | |||
| package org.whispersystems.textsecuregcm.configuration; | ||||
| 
 | ||||
| import com.fasterxml.jackson.annotation.JsonCreator; | ||||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||||
| import javax.validation.constraints.NotBlank; | ||||
| import org.whispersystems.textsecuregcm.configuration.DynamoDbTables.Table; | ||||
| 
 | ||||
| public class DeletedAccountsTableConfiguration extends Table { | ||||
| 
 | ||||
|   private final String needsReconciliationIndexName; | ||||
| 
 | ||||
|   @JsonCreator | ||||
|   public DeletedAccountsTableConfiguration( | ||||
|       @JsonProperty("tableName") final String tableName, | ||||
|       @JsonProperty("needsReconciliationIndexName") final String needsReconciliationIndexName) { | ||||
| 
 | ||||
|     super(tableName); | ||||
|     this.needsReconciliationIndexName = needsReconciliationIndexName; | ||||
|   } | ||||
| 
 | ||||
|   @NotBlank | ||||
|   public String getNeedsReconciliationIndexName() { | ||||
|     return needsReconciliationIndexName; | ||||
|   } | ||||
| } | ||||
|  | @ -1,29 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013-2020 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| package org.whispersystems.textsecuregcm.configuration; | ||||
| 
 | ||||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||||
| import java.util.HexFormat; | ||||
| import javax.validation.constraints.NotEmpty; | ||||
| 
 | ||||
| public class DirectoryClientConfiguration { | ||||
| 
 | ||||
|   @NotEmpty | ||||
|   @JsonProperty | ||||
|   private String userAuthenticationTokenSharedSecret; | ||||
| 
 | ||||
|   @NotEmpty | ||||
|   @JsonProperty | ||||
|   private String userAuthenticationTokenUserIdSecret; | ||||
| 
 | ||||
|   public byte[] getUserAuthenticationTokenSharedSecret() { | ||||
|     return HexFormat.of().parseHex(userAuthenticationTokenSharedSecret); | ||||
|   } | ||||
| 
 | ||||
|   public byte[] getUserAuthenticationTokenUserIdSecret() { | ||||
|     return HexFormat.of().parseHex(userAuthenticationTokenUserIdSecret); | ||||
|   } | ||||
| 
 | ||||
| } | ||||
|  | @ -1,41 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013-2020 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| package org.whispersystems.textsecuregcm.configuration; | ||||
| 
 | ||||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||||
| 
 | ||||
| import javax.validation.Valid; | ||||
| import javax.validation.constraints.NotNull; | ||||
| import java.util.List; | ||||
| 
 | ||||
| public class DirectoryConfiguration { | ||||
| 
 | ||||
|   @JsonProperty | ||||
|   @NotNull | ||||
|   @Valid | ||||
|   private SqsConfiguration sqs; | ||||
|      | ||||
|   @JsonProperty | ||||
|   @NotNull | ||||
|   @Valid | ||||
|   private DirectoryClientConfiguration client; | ||||
| 
 | ||||
|   @JsonProperty | ||||
|   @NotNull | ||||
|   @Valid | ||||
|   private List<DirectoryServerConfiguration> server; | ||||
| 
 | ||||
|   public SqsConfiguration getSqsConfiguration() { | ||||
|     return sqs; | ||||
|   } | ||||
| 
 | ||||
|   public DirectoryClientConfiguration getDirectoryClientConfiguration() { | ||||
|     return client; | ||||
|   } | ||||
| 
 | ||||
|   public List<DirectoryServerConfiguration> getDirectoryServerConfiguration() { | ||||
|     return server; | ||||
|   } | ||||
| } | ||||
|  | @ -1,46 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013-2020 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| package org.whispersystems.textsecuregcm.configuration; | ||||
| 
 | ||||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||||
| import javax.validation.constraints.NotBlank; | ||||
| import javax.validation.constraints.NotEmpty; | ||||
| import java.util.List; | ||||
| 
 | ||||
| public class DirectoryServerConfiguration { | ||||
| 
 | ||||
|   @NotEmpty | ||||
|   @JsonProperty | ||||
|   private String replicationName; | ||||
| 
 | ||||
|   @NotEmpty | ||||
|   @JsonProperty | ||||
|   private String replicationUrl; | ||||
| 
 | ||||
|   @NotEmpty | ||||
|   @JsonProperty | ||||
|   private String replicationPassword; | ||||
| 
 | ||||
|   @NotEmpty | ||||
|   @JsonProperty | ||||
|   private List<@NotBlank String> replicationCaCertificates; | ||||
| 
 | ||||
|   public String getReplicationName() { | ||||
|     return replicationName; | ||||
|   } | ||||
| 
 | ||||
|   public String getReplicationUrl() { | ||||
|     return replicationUrl; | ||||
|   } | ||||
| 
 | ||||
|   public String getReplicationPassword() { | ||||
|     return replicationPassword; | ||||
|   } | ||||
| 
 | ||||
|   public List<String> getReplicationCaCertificates() { | ||||
|     return replicationCaCertificates; | ||||
|   } | ||||
| 
 | ||||
| } | ||||
|  | @ -47,7 +47,7 @@ public class DynamoDbTables { | |||
|   } | ||||
| 
 | ||||
|   private final AccountsTableConfiguration accounts; | ||||
|   private final DeletedAccountsTableConfiguration deletedAccounts; | ||||
|   private final Table deletedAccounts; | ||||
|   private final Table deletedAccountsLock; | ||||
|   private final IssuedReceiptsTableConfiguration issuedReceipts; | ||||
|   private final Table keys; | ||||
|  | @ -66,7 +66,7 @@ public class DynamoDbTables { | |||
| 
 | ||||
|   public DynamoDbTables( | ||||
|       @JsonProperty("accounts") final AccountsTableConfiguration accounts, | ||||
|       @JsonProperty("deletedAccounts") final DeletedAccountsTableConfiguration deletedAccounts, | ||||
|       @JsonProperty("deletedAccounts") final Table deletedAccounts, | ||||
|       @JsonProperty("deletedAccountsLock") final Table deletedAccountsLock, | ||||
|       @JsonProperty("issuedReceipts") final IssuedReceiptsTableConfiguration issuedReceipts, | ||||
|       @JsonProperty("keys") final Table keys, | ||||
|  | @ -110,7 +110,7 @@ public class DynamoDbTables { | |||
| 
 | ||||
|   @NotNull | ||||
|   @Valid | ||||
|   public DeletedAccountsTableConfiguration getDeletedAccounts() { | ||||
|   public Table getDeletedAccounts() { | ||||
|     return deletedAccounts; | ||||
|   } | ||||
| 
 | ||||
|  |  | |||
|  | @ -43,9 +43,6 @@ public class DynamicConfiguration { | |||
|   @Valid | ||||
|   private DynamicRateLimitChallengeConfiguration rateLimitChallenge = new DynamicRateLimitChallengeConfiguration(); | ||||
| 
 | ||||
|   @JsonProperty | ||||
|   private DynamicDirectoryReconcilerConfiguration directoryReconciler = new DynamicDirectoryReconcilerConfiguration(); | ||||
| 
 | ||||
|   @JsonProperty | ||||
|   @Valid | ||||
|   private DynamicPushLatencyConfiguration pushLatency = new DynamicPushLatencyConfiguration(Collections.emptyMap()); | ||||
|  | @ -97,10 +94,6 @@ public class DynamicConfiguration { | |||
|     return rateLimitChallenge; | ||||
|   } | ||||
| 
 | ||||
|   public DynamicDirectoryReconcilerConfiguration getDirectoryReconcilerConfiguration() { | ||||
|     return directoryReconciler; | ||||
|   } | ||||
| 
 | ||||
|   public DynamicPushLatencyConfiguration getPushLatencyConfiguration() { | ||||
|     return pushLatency; | ||||
|   } | ||||
|  |  | |||
|  | @ -1,18 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2021 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| 
 | ||||
| package org.whispersystems.textsecuregcm.configuration.dynamic; | ||||
| 
 | ||||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||||
| 
 | ||||
| public class DynamicDirectoryReconcilerConfiguration { | ||||
| 
 | ||||
|   @JsonProperty | ||||
|   private boolean enabled = true; | ||||
| 
 | ||||
|   public boolean isEnabled() { | ||||
|     return enabled; | ||||
|   } | ||||
| } | ||||
|  | @ -1,71 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| package org.whispersystems.textsecuregcm.controllers; | ||||
| 
 | ||||
| import com.codahale.metrics.annotation.Timed; | ||||
| import io.dropwizard.auth.Auth; | ||||
| import io.swagger.v3.oas.annotations.tags.Tag; | ||||
| import javax.ws.rs.Consumes; | ||||
| import javax.ws.rs.GET; | ||||
| import javax.ws.rs.PUT; | ||||
| import javax.ws.rs.Path; | ||||
| import javax.ws.rs.Produces; | ||||
| import javax.ws.rs.core.MediaType; | ||||
| import javax.ws.rs.core.Response; | ||||
| import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; | ||||
| import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator; | ||||
| import org.whispersystems.textsecuregcm.configuration.DirectoryClientConfiguration; | ||||
| 
 | ||||
| @Path("/v1/directory") | ||||
| @Tag(name = "Directory") | ||||
| public class DirectoryController { | ||||
| 
 | ||||
|   private final ExternalServiceCredentialsGenerator directoryServiceTokenGenerator; | ||||
| 
 | ||||
|   public static ExternalServiceCredentialsGenerator credentialsGenerator(final DirectoryClientConfiguration cfg) { | ||||
|     return ExternalServiceCredentialsGenerator | ||||
|         .builder(cfg.getUserAuthenticationTokenSharedSecret()) | ||||
|         .withUserDerivationKey(cfg.getUserAuthenticationTokenUserIdSecret()) | ||||
|         .build(); | ||||
|   } | ||||
| 
 | ||||
|   public DirectoryController(ExternalServiceCredentialsGenerator userTokenGenerator) { | ||||
|     this.directoryServiceTokenGenerator = userTokenGenerator; | ||||
|   } | ||||
| 
 | ||||
|   @Timed | ||||
|   @GET | ||||
|   @Path("/auth") | ||||
|   @Produces(MediaType.APPLICATION_JSON) | ||||
|   public Response getAuthToken(@Auth AuthenticatedAccount auth) { | ||||
|     return Response.ok().entity(directoryServiceTokenGenerator.generateFor(auth.getAccount().getNumber())).build(); | ||||
|   } | ||||
| 
 | ||||
|   @PUT | ||||
|   @Path("/feedback-v3/{status}") | ||||
|   @Consumes(MediaType.APPLICATION_JSON) | ||||
|   @Produces(MediaType.APPLICATION_JSON) | ||||
|   public Response setFeedback(@Auth AuthenticatedAccount auth) { | ||||
|     return Response.ok().build(); | ||||
|   } | ||||
| 
 | ||||
| 
 | ||||
|   @Timed | ||||
|   @GET | ||||
|   @Path("/{token}") | ||||
|   @Produces(MediaType.APPLICATION_JSON) | ||||
|   public Response getTokenPresence(@Auth AuthenticatedAccount auth) { | ||||
|     return Response.status(429).build(); | ||||
|   } | ||||
| 
 | ||||
|   @Timed | ||||
|   @PUT | ||||
|   @Path("/tokens") | ||||
|   @Produces(MediaType.APPLICATION_JSON) | ||||
|   @Consumes(MediaType.APPLICATION_JSON) | ||||
|   public Response getContactIntersection(@Auth AuthenticatedAccount auth) { | ||||
|     return Response.status(429).build(); | ||||
|   } | ||||
| } | ||||
|  | @ -1,71 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013-2020 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| package org.whispersystems.textsecuregcm.entities; | ||||
| 
 | ||||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||||
| import java.util.List; | ||||
| import java.util.UUID; | ||||
| 
 | ||||
| public class DirectoryReconciliationRequest { | ||||
| 
 | ||||
|   @JsonProperty | ||||
|   private List<User> users; | ||||
| 
 | ||||
|   public DirectoryReconciliationRequest() { | ||||
|   } | ||||
| 
 | ||||
|   public DirectoryReconciliationRequest(List<User> users) { | ||||
|     this.users = users; | ||||
|   } | ||||
| 
 | ||||
|   public List<User> getUsers() { | ||||
|     return users; | ||||
|   } | ||||
| 
 | ||||
|   public static class User { | ||||
| 
 | ||||
|     @JsonProperty | ||||
|     private UUID uuid; | ||||
| 
 | ||||
|     @JsonProperty | ||||
|     private String number; | ||||
| 
 | ||||
|     public User() { | ||||
|     } | ||||
| 
 | ||||
|     public User(UUID uuid, String number) { | ||||
|       this.uuid = uuid; | ||||
|       this.number = number; | ||||
|     } | ||||
| 
 | ||||
|     public UUID getUuid() { | ||||
|       return uuid; | ||||
|     } | ||||
| 
 | ||||
|     public String getNumber() { | ||||
|       return number; | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public boolean equals(Object o) { | ||||
|       if (this == o) return true; | ||||
|       if (o == null || getClass() != o.getClass()) return false; | ||||
| 
 | ||||
|       User user = (User) o; | ||||
| 
 | ||||
|       if (uuid != null ? !uuid.equals(user.uuid) : user.uuid != null) return false; | ||||
|       if (number != null ? !number.equals(user.number) : user.number != null) return false; | ||||
| 
 | ||||
|       return true; | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public int hashCode() { | ||||
|       int result = uuid != null ? uuid.hashCode() : 0; | ||||
|       result = 31 * result + (number != null ? number.hashCode() : 0); | ||||
|       return result; | ||||
|     } | ||||
|   } | ||||
| } | ||||
|  | @ -1,33 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013-2020 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| 
 | ||||
| package org.whispersystems.textsecuregcm.entities; | ||||
| 
 | ||||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||||
| import javax.validation.constraints.NotEmpty; | ||||
| 
 | ||||
| public class DirectoryReconciliationResponse { | ||||
| 
 | ||||
|   @JsonProperty | ||||
|   @NotEmpty | ||||
|   private Status status; | ||||
| 
 | ||||
|   public DirectoryReconciliationResponse() { | ||||
|   } | ||||
| 
 | ||||
|   public DirectoryReconciliationResponse(Status status) { | ||||
|     this.status = status; | ||||
|   } | ||||
| 
 | ||||
|   public Status getStatus() { | ||||
|     return status; | ||||
|   } | ||||
| 
 | ||||
|   public enum Status { | ||||
|     OK, | ||||
|     MISSING, | ||||
|   } | ||||
| 
 | ||||
| } | ||||
|  | @ -1,154 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013-2021 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| package org.whispersystems.textsecuregcm.sqs; | ||||
| 
 | ||||
| import static com.codahale.metrics.MetricRegistry.name; | ||||
| 
 | ||||
| import com.codahale.metrics.Meter; | ||||
| import com.codahale.metrics.MetricRegistry; | ||||
| import com.codahale.metrics.SharedMetricRegistries; | ||||
| import com.codahale.metrics.Timer; | ||||
| import com.google.common.annotations.VisibleForTesting; | ||||
| import io.dropwizard.lifecycle.Managed; | ||||
| import io.micrometer.core.instrument.Metrics; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.UUID; | ||||
| import java.util.concurrent.atomic.AtomicInteger; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| import org.whispersystems.textsecuregcm.configuration.SqsConfiguration; | ||||
| import org.whispersystems.textsecuregcm.storage.Account; | ||||
| import org.whispersystems.textsecuregcm.util.Constants; | ||||
| import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; | ||||
| import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; | ||||
| import software.amazon.awssdk.core.exception.SdkClientException; | ||||
| import software.amazon.awssdk.core.exception.SdkServiceException; | ||||
| import software.amazon.awssdk.regions.Region; | ||||
| import software.amazon.awssdk.services.sqs.SqsAsyncClient; | ||||
| import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; | ||||
| import software.amazon.awssdk.services.sqs.model.SendMessageRequest; | ||||
| 
 | ||||
| public class DirectoryQueue implements Managed { | ||||
| 
 | ||||
|   private static final Logger  logger = LoggerFactory.getLogger(DirectoryQueue.class); | ||||
| 
 | ||||
|   private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); | ||||
|   private final Meter serviceErrorMeter = metricRegistry.meter(name(DirectoryQueue.class, "serviceError")); | ||||
|   private final Meter clientErrorMeter = metricRegistry.meter(name(DirectoryQueue.class, "clientError")); | ||||
|   private final Timer sendMessageBatchTimer = metricRegistry.timer(name(DirectoryQueue.class, "sendMessageBatch")); | ||||
| 
 | ||||
|   private final List<String> queueUrls; | ||||
|   private final SqsAsyncClient sqs; | ||||
| 
 | ||||
|   private final AtomicInteger outstandingRequests = new AtomicInteger(); | ||||
| 
 | ||||
|   private enum UpdateAction { | ||||
|     ADD("add"), | ||||
|     DELETE("delete"); | ||||
| 
 | ||||
|     private final String action; | ||||
| 
 | ||||
|     UpdateAction(final String action) { | ||||
|       this.action = action; | ||||
|     } | ||||
| 
 | ||||
|     public MessageAttributeValue toMessageAttributeValue() { | ||||
|       return MessageAttributeValue.builder().dataType("String").stringValue(action).build(); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   public DirectoryQueue(SqsConfiguration sqsConfig) { | ||||
|     StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create( | ||||
|         sqsConfig.getAccessKey(), sqsConfig.getAccessSecret())); | ||||
| 
 | ||||
|     this.queueUrls = sqsConfig.getQueueUrls(); | ||||
| 
 | ||||
|     this.sqs = SqsAsyncClient.builder() | ||||
|         .region(Region.of(sqsConfig.getRegion())) | ||||
|         .credentialsProvider(credentialsProvider) | ||||
|         .build(); | ||||
| 
 | ||||
|     Metrics.gauge(name(getClass(), "outstandingRequests"), outstandingRequests); | ||||
|   } | ||||
| 
 | ||||
|   @VisibleForTesting | ||||
|   DirectoryQueue(final List<String> queueUrls, final SqsAsyncClient sqs) { | ||||
|     this.queueUrls = queueUrls; | ||||
|     this.sqs = sqs; | ||||
|   } | ||||
| 
 | ||||
|   @Override | ||||
|   public void start() throws Exception { | ||||
|   } | ||||
| 
 | ||||
|   @Override | ||||
|   public void stop() throws Exception { | ||||
|     synchronized (outstandingRequests) { | ||||
|       while (outstandingRequests.get() > 0) { | ||||
|         outstandingRequests.wait(); | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     sqs.close(); | ||||
|   } | ||||
| 
 | ||||
|   public void refreshAccount(final Account account) { | ||||
|     sendUpdateMessage(account.getUuid(), account.getNumber(), | ||||
|             account.shouldBeVisibleInDirectory() ? UpdateAction.ADD : UpdateAction.DELETE); | ||||
|   } | ||||
| 
 | ||||
|   public void deleteAccount(final Account account) { | ||||
|     sendUpdateMessage(account.getUuid(), account.getNumber(), UpdateAction.DELETE); | ||||
|   } | ||||
| 
 | ||||
|   public void changePhoneNumber(final Account account, final String originalNumber, final String newNumber) { | ||||
|     sendUpdateMessage(account.getUuid(), originalNumber, UpdateAction.DELETE); | ||||
|     sendUpdateMessage(account.getUuid(), newNumber, account.shouldBeVisibleInDirectory() ? UpdateAction.ADD : UpdateAction.DELETE); | ||||
|   } | ||||
| 
 | ||||
|   private void sendUpdateMessage(final UUID uuid, final String number, final UpdateAction action) { | ||||
|     for (final String queueUrl : queueUrls) { | ||||
|       final Timer.Context timerContext = sendMessageBatchTimer.time(); | ||||
| 
 | ||||
|       final SendMessageRequest request = SendMessageRequest.builder() | ||||
|           .queueUrl(queueUrl) | ||||
|           .messageBody("-") | ||||
|           .messageDeduplicationId(UUID.randomUUID().toString()) | ||||
|           .messageGroupId(number) | ||||
|           .messageAttributes(Map.of( | ||||
|               "id", MessageAttributeValue.builder().dataType("String").stringValue(number).build(), | ||||
|               "uuid", MessageAttributeValue.builder().dataType("String").stringValue(uuid.toString()).build(), | ||||
|               "action", action.toMessageAttributeValue() | ||||
|           )) | ||||
|           .build(); | ||||
| 
 | ||||
|       synchronized (outstandingRequests) { | ||||
|         outstandingRequests.incrementAndGet(); | ||||
|       } | ||||
| 
 | ||||
|       sqs.sendMessage(request).whenComplete((response, cause) -> { | ||||
|         try { | ||||
|           if (cause instanceof SdkServiceException) { | ||||
|             serviceErrorMeter.mark(); | ||||
|             logger.warn("sqs service error", cause); | ||||
|           } else if (cause instanceof SdkClientException) { | ||||
|             clientErrorMeter.mark(); | ||||
|             logger.warn("sqs client error", cause); | ||||
|           } else if (cause != null) { | ||||
|             logger.warn("sqs unexpected error", cause); | ||||
|           } | ||||
|         } finally { | ||||
|           synchronized (outstandingRequests) { | ||||
|             outstandingRequests.decrementAndGet(); | ||||
|             outstandingRequests.notifyAll(); | ||||
|           } | ||||
| 
 | ||||
|           timerContext.close(); | ||||
|         } | ||||
|       }); | ||||
|     } | ||||
|   } | ||||
| } | ||||
|  | @ -17,7 +17,6 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; | |||
| public class AccountDatabaseCrawlerCache { | ||||
| 
 | ||||
|   public static final String GENERAL_PURPOSE_PREFIX = ""; | ||||
|   public static final String DIRECTORY_RECONCILER_PREFIX = "directory-reconciler"; | ||||
|   public static final String ACCOUNT_CLEANER_PREFIX = "account-cleaner"; | ||||
| 
 | ||||
|   private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker"; | ||||
|  |  | |||
|  | @ -52,7 +52,6 @@ import org.whispersystems.textsecuregcm.redis.RedisOperation; | |||
| import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient; | ||||
| import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; | ||||
| import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; | ||||
| import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; | ||||
| import org.whispersystems.textsecuregcm.util.Constants; | ||||
| import org.whispersystems.textsecuregcm.util.DestinationDeviceValidator; | ||||
| import org.whispersystems.textsecuregcm.util.SystemMapper; | ||||
|  | @ -89,7 +88,6 @@ public class AccountsManager { | |||
|   private final PhoneNumberIdentifiers phoneNumberIdentifiers; | ||||
|   private final FaultTolerantRedisCluster cacheCluster; | ||||
|   private final DeletedAccountsManager deletedAccountsManager; | ||||
|   private final DirectoryQueue directoryQueue; | ||||
|   private final Keys keys; | ||||
|   private final MessagesManager messagesManager; | ||||
|   private final ProfilesManager profilesManager; | ||||
|  | @ -133,7 +131,6 @@ public class AccountsManager { | |||
|       final PhoneNumberIdentifiers phoneNumberIdentifiers, | ||||
|       final FaultTolerantRedisCluster cacheCluster, | ||||
|       final DeletedAccountsManager deletedAccountsManager, | ||||
|       final DirectoryQueue directoryQueue, | ||||
|       final Keys keys, | ||||
|       final MessagesManager messagesManager, | ||||
|       final ProfilesManager profilesManager, | ||||
|  | @ -149,7 +146,6 @@ public class AccountsManager { | |||
|     this.phoneNumberIdentifiers = phoneNumberIdentifiers; | ||||
|     this.cacheCluster = cacheCluster; | ||||
|     this.deletedAccountsManager = deletedAccountsManager; | ||||
|     this.directoryQueue = directoryQueue; | ||||
|     this.keys = keys; | ||||
|     this.messagesManager = messagesManager; | ||||
|     this.profilesManager = profilesManager; | ||||
|  | @ -237,11 +233,6 @@ public class AccountsManager { | |||
| 
 | ||||
|         Metrics.counter(CREATE_COUNTER_NAME, tags).increment(); | ||||
| 
 | ||||
|         if (!account.isDiscoverableByPhoneNumber()) { | ||||
|           // The newly-created account has explicitly opted out of discoverability | ||||
|           directoryQueue.deleteAccount(account); | ||||
|         } | ||||
| 
 | ||||
|         accountAttributes.recoveryPassword().ifPresent(registrationRecoveryPassword -> | ||||
|             registrationRecoveryPasswordsManager.storeForCurrentNumber(account.getNumber(), registrationRecoveryPassword)); | ||||
|       }); | ||||
|  | @ -277,7 +268,6 @@ public class AccountsManager { | |||
| 
 | ||||
|       if (maybeExistingAccount.isPresent()) { | ||||
|         delete(maybeExistingAccount.get()); | ||||
|         directoryQueue.deleteAccount(maybeExistingAccount.get()); | ||||
|         displacedUuid = maybeExistingAccount.map(Account::getUuid); | ||||
|       } else { | ||||
|         displacedUuid = deletedAci; | ||||
|  | @ -296,7 +286,6 @@ public class AccountsManager { | |||
|           AccountChangeValidator.NUMBER_CHANGE_VALIDATOR); | ||||
| 
 | ||||
|       updatedAccount.set(numberChangedAccount); | ||||
|       directoryQueue.changePhoneNumber(numberChangedAccount, originalNumber, number); | ||||
| 
 | ||||
|       keys.delete(phoneNumberIdentifier); | ||||
|       keys.delete(originalPhoneNumberIdentifier); | ||||
|  | @ -363,7 +352,7 @@ public class AccountsManager { | |||
| 
 | ||||
|   /** | ||||
|    * Reserve a username hash so that no other accounts may take it. | ||||
|    * | ||||
|    * <p> | ||||
|    * The reserved hash can later be set with {@link #confirmReservedUsernameHash(Account, byte[])}. The reservation | ||||
|    * will eventually expire, after which point confirmReservedUsernameHash may fail if another account has taken the | ||||
|    * username hash. | ||||
|  | @ -409,7 +398,7 @@ public class AccountsManager { | |||
|   } | ||||
| 
 | ||||
|   /** | ||||
|    * Set a username hash previously reserved with {@link #reserveUsernameHash(Account, List<String>)} | ||||
|    * Set a username hash previously reserved with {@link #reserveUsernameHash(Account, List)} | ||||
|    * | ||||
|    * @param account the account to update | ||||
|    * @param reservedUsernameHash the previously reserved username hash | ||||
|  | @ -500,8 +489,6 @@ public class AccountsManager { | |||
|    */ | ||||
|   private Account update(Account account, Function<Account, Boolean> updater) { | ||||
| 
 | ||||
|     final boolean wasVisibleBeforeUpdate = account.shouldBeVisibleInDirectory(); | ||||
| 
 | ||||
|     final Account updatedAccount; | ||||
| 
 | ||||
|     try (Timer.Context ignored = updateTimer.time()) { | ||||
|  | @ -519,12 +506,6 @@ public class AccountsManager { | |||
|       redisSet(updatedAccount); | ||||
|     } | ||||
| 
 | ||||
|     final boolean isVisibleAfterUpdate = updatedAccount.shouldBeVisibleInDirectory(); | ||||
| 
 | ||||
|     if (wasVisibleBeforeUpdate != isVisibleAfterUpdate) { | ||||
|       directoryQueue.refreshAccount(updatedAccount); | ||||
|     } | ||||
| 
 | ||||
|     return updatedAccount; | ||||
|   } | ||||
| 
 | ||||
|  | @ -653,10 +634,6 @@ public class AccountsManager { | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   public Optional<String> getNumberForPhoneNumberIdentifier(UUID pni) { | ||||
|     return phoneNumberIdentifiers.getPhoneNumber(pni); | ||||
|   } | ||||
| 
 | ||||
|   public UUID getPhoneNumberIdentifier(String e164) { | ||||
|     return phoneNumberIdentifiers.getPhoneNumberIdentifier(e164); | ||||
|   } | ||||
|  | @ -673,7 +650,6 @@ public class AccountsManager { | |||
|     try (final Timer.Context ignored = deleteTimer.time()) { | ||||
|       deletedAccountsManager.lockAndPut(account.getNumber(), () -> { | ||||
|         delete(account); | ||||
|         directoryQueue.deleteAccount(account); | ||||
| 
 | ||||
|         return account.getUuid(); | ||||
|       }); | ||||
|  |  | |||
|  | @ -6,33 +6,17 @@ package org.whispersystems.textsecuregcm.storage; | |||
| 
 | ||||
| import java.time.Duration; | ||||
| import java.time.Instant; | ||||
| import java.util.ArrayDeque; | ||||
| import java.util.ArrayList; | ||||
| import java.util.Collection; | ||||
| import java.util.Collections; | ||||
| import java.util.HashSet; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.Optional; | ||||
| import java.util.Queue; | ||||
| import java.util.Set; | ||||
| import java.util.UUID; | ||||
| import java.util.stream.Collectors; | ||||
| import org.whispersystems.textsecuregcm.util.AttributeValues; | ||||
| import org.whispersystems.textsecuregcm.util.Pair; | ||||
| import software.amazon.awssdk.services.dynamodb.DynamoDbClient; | ||||
| import software.amazon.awssdk.services.dynamodb.model.AttributeValue; | ||||
| import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest; | ||||
| import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse; | ||||
| import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; | ||||
| import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; | ||||
| import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; | ||||
| import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes; | ||||
| import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; | ||||
| import software.amazon.awssdk.services.dynamodb.model.QueryRequest; | ||||
| import software.amazon.awssdk.services.dynamodb.model.QueryResponse; | ||||
| import software.amazon.awssdk.services.dynamodb.model.ScanRequest; | ||||
| import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; | ||||
| 
 | ||||
| public class DeletedAccounts extends AbstractDynamoDbStore { | ||||
| 
 | ||||
|  | @ -40,7 +24,6 @@ public class DeletedAccounts extends AbstractDynamoDbStore { | |||
|   static final String KEY_ACCOUNT_E164 = "P"; | ||||
|   static final String ATTR_ACCOUNT_UUID = "U"; | ||||
|   static final String ATTR_EXPIRES = "E"; | ||||
|   static final String ATTR_NEEDS_CDS_RECONCILIATION = "R"; | ||||
| 
 | ||||
|   static final String UUID_TO_E164_INDEX_NAME = "u_to_p"; | ||||
| 
 | ||||
|  | @ -50,23 +33,20 @@ public class DeletedAccounts extends AbstractDynamoDbStore { | |||
|   static final int GET_BATCH_SIZE = 100; | ||||
| 
 | ||||
|   private final String tableName; | ||||
|   private final String needsReconciliationIndexName; | ||||
| 
 | ||||
|   public DeletedAccounts(final DynamoDbClient dynamoDb, final String tableName, final String needsReconciliationIndexName) { | ||||
|   public DeletedAccounts(final DynamoDbClient dynamoDb, final String tableName) { | ||||
| 
 | ||||
|     super(dynamoDb); | ||||
|     this.tableName = tableName; | ||||
|     this.needsReconciliationIndexName = needsReconciliationIndexName; | ||||
|   } | ||||
| 
 | ||||
|   void put(UUID uuid, String e164, boolean needsReconciliation) { | ||||
|   void put(UUID uuid, String e164) { | ||||
|     db().putItem(PutItemRequest.builder() | ||||
|         .tableName(tableName) | ||||
|         .item(Map.of( | ||||
|             KEY_ACCOUNT_E164, AttributeValues.fromString(e164), | ||||
|             ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(uuid), | ||||
|             ATTR_EXPIRES, AttributeValues.fromLong(Instant.now().plus(TIME_TO_LIVE).getEpochSecond()), | ||||
|             ATTR_NEEDS_CDS_RECONCILIATION, AttributeValues.fromInt(needsReconciliation ? 1 : 0))) | ||||
|             ATTR_EXPIRES, AttributeValues.fromLong(Instant.now().plus(TIME_TO_LIVE).getEpochSecond()))) | ||||
|         .build()); | ||||
|   } | ||||
| 
 | ||||
|  | @ -108,72 +88,4 @@ public class DeletedAccounts extends AbstractDynamoDbStore { | |||
|         .key(Map.of(KEY_ACCOUNT_E164, AttributeValues.fromString(e164))) | ||||
|         .build()); | ||||
|   } | ||||
| 
 | ||||
|   List<Pair<UUID, String>> listAccountsToReconcile(final int max) { | ||||
| 
 | ||||
|     final ScanRequest scanRequest = ScanRequest.builder() | ||||
|         .tableName(tableName) | ||||
|         .indexName(needsReconciliationIndexName) | ||||
|         .limit(max) | ||||
|         .build(); | ||||
| 
 | ||||
|     return scan(scanRequest, max) | ||||
|         .stream() | ||||
|         .map(item -> new Pair<>( | ||||
|             AttributeValues.getUUID(item, ATTR_ACCOUNT_UUID, null), | ||||
|             AttributeValues.getString(item, KEY_ACCOUNT_E164, null))) | ||||
|         .collect(Collectors.toList()); | ||||
|   } | ||||
| 
 | ||||
|   Set<String> getAccountsNeedingReconciliation(final Collection<String> e164s) { | ||||
|     final Queue<Map<String, AttributeValue>> pendingKeys = e164s.stream() | ||||
|         .map(e164 -> Map.of(KEY_ACCOUNT_E164, AttributeValues.fromString(e164))) | ||||
|         .collect(Collectors.toCollection(() -> new ArrayDeque<>(e164s.size()))); | ||||
| 
 | ||||
|     final Set<String> accountsNeedingReconciliation = new HashSet<>(e164s.size()); | ||||
|     final List<Map<String, AttributeValue>> batchKeys = new ArrayList<>(GET_BATCH_SIZE); | ||||
| 
 | ||||
|     while (!pendingKeys.isEmpty()) { | ||||
|       batchKeys.clear(); | ||||
| 
 | ||||
|       for (int i = 0; i < GET_BATCH_SIZE && !pendingKeys.isEmpty(); i++) { | ||||
|         batchKeys.add(pendingKeys.remove()); | ||||
|       } | ||||
| 
 | ||||
|       final BatchGetItemResponse response = db().batchGetItem(BatchGetItemRequest.builder() | ||||
|           .requestItems(Map.of(tableName, KeysAndAttributes.builder() | ||||
|               .consistentRead(true) | ||||
|               .keys(batchKeys) | ||||
|               .build())) | ||||
|           .build()); | ||||
| 
 | ||||
|       response.responses().getOrDefault(tableName, Collections.emptyList()).stream() | ||||
|           .filter(attributes -> AttributeValues.getInt(attributes, ATTR_NEEDS_CDS_RECONCILIATION, 0) == 1) | ||||
|           .map(attributes -> AttributeValues.getString(attributes, KEY_ACCOUNT_E164, null)) | ||||
|           .forEach(accountsNeedingReconciliation::add); | ||||
| 
 | ||||
|       if (response.hasUnprocessedKeys() && response.unprocessedKeys().containsKey(tableName)) { | ||||
|         pendingKeys.addAll(response.unprocessedKeys().get(tableName).keys()); | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     return accountsNeedingReconciliation; | ||||
|   } | ||||
| 
 | ||||
|   void markReconciled(final Collection<String> phoneNumbersReconciled) { | ||||
| 
 | ||||
|     phoneNumbersReconciled.forEach(number -> db().updateItem( | ||||
|         UpdateItemRequest.builder() | ||||
|             .tableName(tableName) | ||||
|             .key(Map.of( | ||||
|                 KEY_ACCOUNT_E164, AttributeValues.fromString(number) | ||||
|             )) | ||||
|             .updateExpression("REMOVE #needs_reconciliation") | ||||
|             .expressionAttributeNames(Map.of( | ||||
|                 "#needs_reconciliation", ATTR_NEEDS_CDS_RECONCILIATION | ||||
|             )) | ||||
|             .build() | ||||
|     )); | ||||
|   } | ||||
| 
 | ||||
| } | ||||
|  |  | |||
|  | @ -1,69 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013-2021 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| package org.whispersystems.textsecuregcm.storage; | ||||
| 
 | ||||
| import static com.codahale.metrics.MetricRegistry.name; | ||||
| 
 | ||||
| import io.micrometer.core.instrument.Counter; | ||||
| import io.micrometer.core.instrument.Metrics; | ||||
| import io.micrometer.core.instrument.Timer; | ||||
| import java.util.List; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; | ||||
| import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest.User; | ||||
| import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; | ||||
| 
 | ||||
| public class DeletedAccountsDirectoryReconciler { | ||||
| 
 | ||||
|   private final Logger logger = LoggerFactory.getLogger(DeletedAccountsDirectoryReconciler.class); | ||||
| 
 | ||||
|   private final DirectoryReconciliationClient directoryReconciliationClient; | ||||
| 
 | ||||
|   private final Timer deleteTimer; | ||||
|   private final Counter errorCounter; | ||||
| 
 | ||||
|   public DeletedAccountsDirectoryReconciler( | ||||
|       final String replicationName, | ||||
|       final DirectoryReconciliationClient directoryReconciliationClient) { | ||||
|     this.directoryReconciliationClient = directoryReconciliationClient; | ||||
| 
 | ||||
|     deleteTimer = Timer.builder(name(DeletedAccountsDirectoryReconciler.class, "delete")) | ||||
|         .tag("replicationName", replicationName) | ||||
|         .register(Metrics.globalRegistry); | ||||
| 
 | ||||
|     errorCounter = Counter.builder(name(DeletedAccountsDirectoryReconciler.class, "error")) | ||||
|         .tag("replicationName", replicationName) | ||||
|         .register(Metrics.globalRegistry); | ||||
|   } | ||||
| 
 | ||||
|   public void onCrawlChunk(final List<User> deletedUsers) throws ChunkProcessingFailedException { | ||||
| 
 | ||||
|     try { | ||||
|       deleteTimer.recordCallable(() -> { | ||||
|         try { | ||||
|           final DirectoryReconciliationResponse response = directoryReconciliationClient.delete( | ||||
|               new DirectoryReconciliationRequest(deletedUsers)); | ||||
| 
 | ||||
|           if (response.getStatus() != DirectoryReconciliationResponse.Status.OK) { | ||||
|             errorCounter.increment(); | ||||
|             throw new ChunkProcessingFailedException("Response status: " + response.getStatus()); | ||||
|           } | ||||
|         } catch (final Exception e) { | ||||
|           errorCounter.increment(); | ||||
|           throw new ChunkProcessingFailedException(e); | ||||
|         } | ||||
| 
 | ||||
|         return null; | ||||
|       }); | ||||
|     } catch (final ChunkProcessingFailedException e) { | ||||
|       throw e; | ||||
|     } catch (final Exception e) { | ||||
|       logger.warn("Unexpected exception", e); | ||||
|       throw new RuntimeException(e); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
| } | ||||
|  | @ -11,21 +11,16 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient; | |||
| import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions; | ||||
| import com.amazonaws.services.dynamodbv2.LockItem; | ||||
| import com.amazonaws.services.dynamodbv2.ReleaseLockOptions; | ||||
| import com.amazonaws.services.dynamodbv2.model.LockCurrentlyUnavailableException; | ||||
| import java.util.ArrayList; | ||||
| import java.util.Collection; | ||||
| import java.util.List; | ||||
| import java.util.Optional; | ||||
| import java.util.Set; | ||||
| import java.util.UUID; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| import java.util.function.BiFunction; | ||||
| import java.util.function.Consumer; | ||||
| import java.util.function.Supplier; | ||||
| import java.util.stream.Collectors; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| import org.whispersystems.textsecuregcm.util.Pair; | ||||
| 
 | ||||
| public class DeletedAccountsManager { | ||||
| 
 | ||||
|  | @ -35,20 +30,6 @@ public class DeletedAccountsManager { | |||
| 
 | ||||
|   private static final Logger log = LoggerFactory.getLogger(DeletedAccountsManager.class); | ||||
| 
 | ||||
|   @FunctionalInterface | ||||
|   public interface DeletedAccountReconciliationConsumer { | ||||
| 
 | ||||
|     /** | ||||
|      * Reconcile a list of deleted account records. | ||||
|      * | ||||
|      * @param deletedAccounts the account records to reconcile | ||||
|      * @return a list of account records that were successfully reconciled; accounts that were not successfully | ||||
|      * reconciled may be retried later | ||||
|      * @throws ChunkProcessingFailedException in the event of an error while processing the batch of account records | ||||
|      */ | ||||
|     Collection<String> reconcile(List<Pair<UUID, String>> deletedAccounts) throws ChunkProcessingFailedException; | ||||
|   } | ||||
| 
 | ||||
|   public DeletedAccountsManager(final DeletedAccounts deletedAccounts, final AmazonDynamoDB lockDynamoDb, final String lockTableName) { | ||||
|     this.deletedAccounts = deletedAccounts; | ||||
| 
 | ||||
|  | @ -98,7 +79,7 @@ public class DeletedAccountsManager { | |||
|   public void lockAndPut(final String e164, final Supplier<UUID> supplier) throws InterruptedException { | ||||
|     withLock(List.of(e164), ignored -> { | ||||
|       try { | ||||
|         deletedAccounts.put(supplier.get(), e164, true); | ||||
|         deletedAccounts.put(supplier.get(), e164); | ||||
|       } catch (final Exception e) { | ||||
|         log.warn("Supplier threw an exception while holding lock on a deleted account record", e); | ||||
|         throw new RuntimeException(e); | ||||
|  | @ -123,7 +104,7 @@ public class DeletedAccountsManager { | |||
| 
 | ||||
|     withLock(List.of(original, target), acis -> { | ||||
|       try { | ||||
|         function.apply(acis.get(0), acis.get(1)).ifPresent(aci -> deletedAccounts.put(aci, original, true)); | ||||
|         function.apply(acis.get(0), acis.get(1)).ifPresent(aci -> deletedAccounts.put(aci, original)); | ||||
|       } catch (final Exception e) { | ||||
|         log.warn("Supplier threw an exception while holding lock on a deleted account record", e); | ||||
|         throw new RuntimeException(e); | ||||
|  | @ -154,48 +135,6 @@ public class DeletedAccountsManager { | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   public void lockAndReconcileAccounts(final int max, final DeletedAccountReconciliationConsumer consumer) throws ChunkProcessingFailedException { | ||||
|     final List<LockItem> lockItems = new ArrayList<>(); | ||||
|     try { | ||||
|       final List<Pair<UUID, String>> reconciliationCandidates = deletedAccounts.listAccountsToReconcile(max).stream() | ||||
|           .filter(pair -> { | ||||
|             boolean lockAcquired = false; | ||||
| 
 | ||||
|             try { | ||||
|               lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(pair.second()) | ||||
|                   .withAcquireReleasedLocksConsistently(true) | ||||
|                   .withShouldSkipBlockingWait(true) | ||||
|                   .build())); | ||||
| 
 | ||||
|               lockAcquired = true; | ||||
|             } catch (final InterruptedException e) { | ||||
|               log.warn("Interrupted while acquiring lock for reconciliation", e); | ||||
|             } catch (final LockCurrentlyUnavailableException ignored) { | ||||
|             } | ||||
| 
 | ||||
|             return lockAcquired; | ||||
|           }).toList(); | ||||
| 
 | ||||
|       assert lockItems.size() == reconciliationCandidates.size(); | ||||
| 
 | ||||
|       // A deleted account's status may have changed in the time between getting a list of candidates and acquiring a lock | ||||
|       // on the candidate records. Now that we hold the lock, check which of the candidates still need to be reconciled. | ||||
|       final Set<String> numbersNeedingReconciliationAfterLock = | ||||
|           deletedAccounts.getAccountsNeedingReconciliation(reconciliationCandidates.stream() | ||||
|               .map(Pair::second) | ||||
|               .collect(Collectors.toList())); | ||||
| 
 | ||||
|       final List<Pair<UUID, String>> accountsToReconcile = reconciliationCandidates.stream() | ||||
|           .filter(candidate -> numbersNeedingReconciliationAfterLock.contains(candidate.second())) | ||||
|           .collect(Collectors.toList()); | ||||
| 
 | ||||
|       deletedAccounts.markReconciled(consumer.reconcile(accountsToReconcile)); | ||||
|     } finally { | ||||
|       lockItems.forEach( | ||||
|           lockItem -> lockClient.releaseLock(ReleaseLockOptions.builder(lockItem).withBestEffort(true).build())); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   public Optional<UUID> findDeletedAccountAci(final String e164) { | ||||
|     return deletedAccounts.findUuid(e164); | ||||
|   } | ||||
|  |  | |||
|  | @ -1,65 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013-2020 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| package org.whispersystems.textsecuregcm.storage; | ||||
| 
 | ||||
| import static com.codahale.metrics.MetricRegistry.name; | ||||
| 
 | ||||
| import io.micrometer.core.instrument.Metrics; | ||||
| import java.io.IOException; | ||||
| import java.time.Duration; | ||||
| import java.util.List; | ||||
| import java.util.concurrent.ScheduledExecutorService; | ||||
| import java.util.stream.Collectors; | ||||
| import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest.User; | ||||
| import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; | ||||
| import org.whispersystems.textsecuregcm.util.Pair; | ||||
| 
 | ||||
| public class DeletedAccountsTableCrawler extends ManagedPeriodicWork { | ||||
| 
 | ||||
|   private static final Duration WORKER_TTL = Duration.ofMinutes(2); | ||||
|   private static final Duration RUN_INTERVAL = Duration.ofMinutes(15); | ||||
|   private static final String ACTIVE_WORKER_KEY = "deleted_accounts_crawler_cache_active_worker"; | ||||
| 
 | ||||
|   private static final int MAX_BATCH_SIZE = 5_000; | ||||
|   private static final String BATCH_SIZE_DISTRIBUTION_NAME = name(DeletedAccountsTableCrawler.class, "batchSize"); | ||||
| 
 | ||||
|   private final DeletedAccountsManager deletedAccountsManager; | ||||
|   private final List<DeletedAccountsDirectoryReconciler> reconcilers; | ||||
| 
 | ||||
|   public DeletedAccountsTableCrawler( | ||||
|       final DeletedAccountsManager deletedAccountsManager, | ||||
|       final List<DeletedAccountsDirectoryReconciler> reconcilers, | ||||
|       final FaultTolerantRedisCluster cluster, | ||||
|       final ScheduledExecutorService executorService) throws IOException { | ||||
| 
 | ||||
|     super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL, executorService); | ||||
| 
 | ||||
|     this.deletedAccountsManager = deletedAccountsManager; | ||||
|     this.reconcilers = reconcilers; | ||||
|   } | ||||
| 
 | ||||
|   @Override | ||||
|   public void doPeriodicWork() throws Exception { | ||||
| 
 | ||||
|     deletedAccountsManager.lockAndReconcileAccounts(MAX_BATCH_SIZE, deletedAccounts -> { | ||||
|       final List<User> deletedUsers = deletedAccounts.stream() | ||||
|           .map(pair -> new User(pair.first(), pair.second())) | ||||
|           .collect(Collectors.toList()); | ||||
| 
 | ||||
|       for (DeletedAccountsDirectoryReconciler reconciler : reconcilers) { | ||||
|         reconciler.onCrawlChunk(deletedUsers); | ||||
|       } | ||||
| 
 | ||||
|       final List<String> reconciledPhoneNumbers = deletedAccounts.stream() | ||||
|           .map(Pair::second) | ||||
|           .collect(Collectors.toList()); | ||||
| 
 | ||||
|       Metrics.summary(BATCH_SIZE_DISTRIBUTION_NAME).record(reconciledPhoneNumbers.size()); | ||||
| 
 | ||||
|       return reconciledPhoneNumbers; | ||||
|     }); | ||||
|   } | ||||
| 
 | ||||
| } | ||||
|  | @ -1,117 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013-2021 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| package org.whispersystems.textsecuregcm.storage; | ||||
| 
 | ||||
| import static com.codahale.metrics.MetricRegistry.name; | ||||
| 
 | ||||
| import io.micrometer.core.instrument.Metrics; | ||||
| import java.util.ArrayList; | ||||
| import java.util.List; | ||||
| import java.util.Optional; | ||||
| import java.util.UUID; | ||||
| import java.util.function.Function; | ||||
| import javax.ws.rs.ProcessingException; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; | ||||
| import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; | ||||
| import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; | ||||
| import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse.Status; | ||||
| 
 | ||||
| public class DirectoryReconciler extends AccountDatabaseCrawlerListener { | ||||
| 
 | ||||
|   private static final Logger logger = LoggerFactory.getLogger(DirectoryReconciler.class); | ||||
|   private static final String SEND_TIMER_NAME = name(DirectoryReconciler.class, "sendRequest"); | ||||
| 
 | ||||
|   private final String replicationName; | ||||
|   private final DirectoryReconciliationClient reconciliationClient; | ||||
|   private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager; | ||||
| 
 | ||||
|   public DirectoryReconciler(String replicationName, DirectoryReconciliationClient reconciliationClient, | ||||
|       DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) { | ||||
|     this.reconciliationClient = reconciliationClient; | ||||
|     this.replicationName = replicationName; | ||||
|     this.dynamicConfigurationManager = dynamicConfigurationManager; | ||||
|   } | ||||
| 
 | ||||
|   @Override | ||||
|   public void onCrawlStart() { | ||||
|   } | ||||
| 
 | ||||
|   @Override | ||||
|   public void onCrawlEnd(Optional<UUID> fromUuid) { | ||||
|     if (!dynamicConfigurationManager.getConfiguration().getDirectoryReconcilerConfiguration().isEnabled()) { | ||||
|       return; | ||||
|     } | ||||
| 
 | ||||
|     reconciliationClient.complete(); | ||||
|   } | ||||
| 
 | ||||
|   @Override | ||||
|   protected void onCrawlChunk(final Optional<UUID> fromUuid, final List<Account> accounts) | ||||
|       throws AccountDatabaseCrawlerRestartException { | ||||
| 
 | ||||
|     if (!dynamicConfigurationManager.getConfiguration().getDirectoryReconcilerConfiguration().isEnabled()) { | ||||
|       return; | ||||
|     } | ||||
| 
 | ||||
|     final DirectoryReconciliationRequest addUsersRequest; | ||||
|     final DirectoryReconciliationRequest deleteUsersRequest; | ||||
|     { | ||||
|       final List<DirectoryReconciliationRequest.User> addedUsers = new ArrayList<>(accounts.size()); | ||||
|       final List<DirectoryReconciliationRequest.User> deletedUsers = new ArrayList<>(accounts.size()); | ||||
| 
 | ||||
|       accounts.forEach(account -> { | ||||
|         if (account.shouldBeVisibleInDirectory()) { | ||||
|           addedUsers.add(new DirectoryReconciliationRequest.User(account.getUuid(), account.getNumber())); | ||||
|         } else { | ||||
|           deletedUsers.add(new DirectoryReconciliationRequest.User(account.getUuid(), account.getNumber())); | ||||
|         } | ||||
|       }); | ||||
| 
 | ||||
|       addUsersRequest = new DirectoryReconciliationRequest(addedUsers); | ||||
|       deleteUsersRequest = new DirectoryReconciliationRequest(deletedUsers); | ||||
|     } | ||||
| 
 | ||||
|     final DirectoryReconciliationResponse addUsersResponse = sendAdditions(addUsersRequest); | ||||
|     final DirectoryReconciliationResponse deleteUsersResponse = sendDeletes(deleteUsersRequest); | ||||
| 
 | ||||
|     if (addUsersResponse.getStatus() == DirectoryReconciliationResponse.Status.MISSING | ||||
|         || deleteUsersResponse.getStatus() == Status.MISSING) { | ||||
| 
 | ||||
|       throw new AccountDatabaseCrawlerRestartException("directory reconciler missing"); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private DirectoryReconciliationResponse sendDeletes(final DirectoryReconciliationRequest request) { | ||||
|     return sendRequest(request, reconciliationClient::delete, "delete"); | ||||
| 
 | ||||
|   } | ||||
| 
 | ||||
|   private DirectoryReconciliationResponse sendAdditions(final DirectoryReconciliationRequest request) { | ||||
|     return sendRequest(request, reconciliationClient::add, "add"); | ||||
|   } | ||||
| 
 | ||||
|   private DirectoryReconciliationResponse sendRequest(final DirectoryReconciliationRequest request, | ||||
|       final Function<DirectoryReconciliationRequest, DirectoryReconciliationResponse> requestHandler, | ||||
|       final String context) { | ||||
| 
 | ||||
|     return Metrics.timer(SEND_TIMER_NAME, "context", context, "replication", replicationName) | ||||
|         .record(() -> { | ||||
|           try { | ||||
|             final DirectoryReconciliationResponse response = requestHandler.apply(request); | ||||
| 
 | ||||
|             if (response.getStatus() != DirectoryReconciliationResponse.Status.OK) { | ||||
|               logger.warn("reconciliation error: {} ({})", response.getStatus(), context); | ||||
|             } | ||||
|             return response; | ||||
|           } catch (ProcessingException ex) { | ||||
|             logger.warn("request error: ", ex); | ||||
|             throw new ProcessingException(ex); | ||||
|           } | ||||
|         }); | ||||
|   } | ||||
| 
 | ||||
| } | ||||
|  | @ -1,69 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013-2020 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| package org.whispersystems.textsecuregcm.storage; | ||||
| 
 | ||||
| import java.security.KeyStore; | ||||
| import java.security.cert.CertificateException; | ||||
| import javax.net.ssl.SSLContext; | ||||
| import javax.ws.rs.client.Client; | ||||
| import javax.ws.rs.client.ClientBuilder; | ||||
| import javax.ws.rs.client.Entity; | ||||
| import javax.ws.rs.core.MediaType; | ||||
| import org.glassfish.jersey.SslConfigurator; | ||||
| import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; | ||||
| import org.whispersystems.textsecuregcm.configuration.DirectoryServerConfiguration; | ||||
| import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; | ||||
| import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; | ||||
| import org.whispersystems.textsecuregcm.util.CertificateUtil; | ||||
| 
 | ||||
| public class DirectoryReconciliationClient { | ||||
| 
 | ||||
|   private final String replicationUrl; | ||||
|   private final Client client; | ||||
| 
 | ||||
|   public DirectoryReconciliationClient(DirectoryServerConfiguration directoryServerConfiguration) | ||||
|       throws CertificateException | ||||
|   { | ||||
|     this.replicationUrl = directoryServerConfiguration.getReplicationUrl(); | ||||
|     this.client = initializeClient(directoryServerConfiguration); | ||||
|   } | ||||
| 
 | ||||
|   public DirectoryReconciliationResponse add(DirectoryReconciliationRequest request) { | ||||
|     return client.target(replicationUrl) | ||||
|         .path("/v3/directory/exists") | ||||
|         .request(MediaType.APPLICATION_JSON_TYPE) | ||||
|         .put(Entity.json(request), DirectoryReconciliationResponse.class); | ||||
|   } | ||||
| 
 | ||||
|   public DirectoryReconciliationResponse delete(DirectoryReconciliationRequest request) { | ||||
|     return client.target(replicationUrl) | ||||
|         .path("/v3/directory/deletes") | ||||
|         .request(MediaType.APPLICATION_JSON_TYPE) | ||||
|         .put(Entity.json(request), DirectoryReconciliationResponse.class); | ||||
|   } | ||||
| 
 | ||||
|   public DirectoryReconciliationResponse complete() { | ||||
|     return client.target(replicationUrl) | ||||
|         .path("/v3/directory/complete") | ||||
|         .request(MediaType.APPLICATION_JSON_TYPE) | ||||
|         .post(null, DirectoryReconciliationResponse.class); | ||||
|   } | ||||
| 
 | ||||
|   private static Client initializeClient(DirectoryServerConfiguration directoryServerConfiguration) | ||||
|       throws CertificateException { | ||||
|     KeyStore trustStore = CertificateUtil.buildKeyStoreForPem( | ||||
|         directoryServerConfiguration.getReplicationCaCertificates().toArray(new String[0])); | ||||
|     SSLContext sslContext = SslConfigurator.newInstance() | ||||
|         .securityProtocol("TLSv1.2") | ||||
|         .trustStore(trustStore) | ||||
|         .createSSLContext(); | ||||
| 
 | ||||
|     return ClientBuilder.newBuilder() | ||||
|         .register( | ||||
|             HttpAuthenticationFeature.basic("signal", directoryServerConfiguration.getReplicationPassword().getBytes())) | ||||
|         .sslContext(sslContext) | ||||
|         .build(); | ||||
|   } | ||||
| } | ||||
|  | @ -36,7 +36,6 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; | |||
| import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient; | ||||
| import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; | ||||
| import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; | ||||
| import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; | ||||
| import org.whispersystems.textsecuregcm.storage.Account; | ||||
| import org.whispersystems.textsecuregcm.storage.Accounts; | ||||
| import org.whispersystems.textsecuregcm.storage.AccountsManager; | ||||
|  | @ -150,8 +149,7 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi | |||
|         .build(); | ||||
| 
 | ||||
|     DeletedAccounts deletedAccounts = new DeletedAccounts(dynamoDbClient, | ||||
|         configuration.getDynamoDbTables().getDeletedAccounts().getTableName(), | ||||
|         configuration.getDynamoDbTables().getDeletedAccounts().getNeedsReconciliationIndexName()); | ||||
|         configuration.getDynamoDbTables().getDeletedAccounts().getTableName()); | ||||
|     VerificationCodeStore pendingAccounts = new VerificationCodeStore(dynamoDbClient, | ||||
|         configuration.getDynamoDbTables().getPendingAccounts().getTableName()); | ||||
|     RegistrationRecoveryPasswords registrationRecoveryPasswords = new RegistrationRecoveryPasswords( | ||||
|  | @ -199,8 +197,6 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi | |||
|         Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor); | ||||
|     MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster, | ||||
|         keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC()); | ||||
|     DirectoryQueue directoryQueue = new DirectoryQueue( | ||||
|         configuration.getDirectoryConfiguration().getSqsConfiguration()); | ||||
|     ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); | ||||
|     ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient, | ||||
|         configuration.getDynamoDbTables().getReportMessage().getTableName(), | ||||
|  | @ -214,7 +210,7 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi | |||
|         configuration.getDynamoDbTables().getDeletedAccountsLock().getTableName()); | ||||
|     StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts); | ||||
|     AccountsManager accountsManager = new AccountsManager(accounts, phoneNumberIdentifiers, cacheCluster, | ||||
|         deletedAccountsManager, directoryQueue, keys, messagesManager, profilesManager, | ||||
|         deletedAccountsManager, keys, messagesManager, profilesManager, | ||||
|         pendingAccountsManager, secureStorageClient, secureBackupClient, secureValueRecovery2Client, clientPresenceManager, | ||||
|         experimentEnrollmentManager, registrationRecoveryPasswordsManager, Clock.systemUTC()); | ||||
| 
 | ||||
|  |  | |||
|  | @ -31,7 +31,6 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; | |||
| import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient; | ||||
| import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; | ||||
| import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; | ||||
| import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; | ||||
| import org.whispersystems.textsecuregcm.storage.Accounts; | ||||
| import org.whispersystems.textsecuregcm.storage.AccountsManager; | ||||
| import org.whispersystems.textsecuregcm.storage.DeletedAccounts; | ||||
|  | @ -129,8 +128,7 @@ record CommandDependencies( | |||
|         .build(); | ||||
| 
 | ||||
|     DeletedAccounts deletedAccounts = new DeletedAccounts(dynamoDbClient, | ||||
|         configuration.getDynamoDbTables().getDeletedAccounts().getTableName(), | ||||
|         configuration.getDynamoDbTables().getDeletedAccounts().getNeedsReconciliationIndexName()); | ||||
|         configuration.getDynamoDbTables().getDeletedAccounts().getTableName()); | ||||
|     VerificationCodeStore pendingAccounts = new VerificationCodeStore(dynamoDbClient, | ||||
|         configuration.getDynamoDbTables().getPendingAccounts().getTableName()); | ||||
|     RegistrationRecoveryPasswords registrationRecoveryPasswords = new RegistrationRecoveryPasswords( | ||||
|  | @ -181,8 +179,6 @@ record CommandDependencies( | |||
|         Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor); | ||||
|     MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster, | ||||
|         keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC()); | ||||
|     DirectoryQueue directoryQueue = new DirectoryQueue( | ||||
|         configuration.getDirectoryConfiguration().getSqsConfiguration()); | ||||
|     ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); | ||||
|     ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient, | ||||
|         configuration.getDynamoDbTables().getReportMessage().getTableName(), | ||||
|  | @ -196,7 +192,7 @@ record CommandDependencies( | |||
|         configuration.getDynamoDbTables().getDeletedAccountsLock().getTableName()); | ||||
|     StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts); | ||||
|     AccountsManager accountsManager = new AccountsManager(accounts, phoneNumberIdentifiers, cacheCluster, | ||||
|         deletedAccountsManager, directoryQueue, keys, messagesManager, profilesManager, | ||||
|         deletedAccountsManager, keys, messagesManager, profilesManager, | ||||
|         pendingAccountsManager, secureStorageClient, secureBackupClient, secureValueRecovery2Client, clientPresenceManager, | ||||
|         experimentEnrollmentManager, registrationRecoveryPasswordsManager, clock); | ||||
| 
 | ||||
|  |  | |||
|  | @ -343,30 +343,6 @@ class DynamicConfigurationTest { | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void testParseDirectoryReconciler() throws JsonProcessingException { | ||||
|     { | ||||
|       final String emptyConfigYaml = REQUIRED_CONFIG.concat("test: true"); | ||||
|       final DynamicConfiguration emptyConfig = | ||||
|           DynamicConfigurationManager.parseConfiguration(emptyConfigYaml, DynamicConfiguration.class).orElseThrow(); | ||||
| 
 | ||||
|       assertThat(emptyConfig.getDirectoryReconcilerConfiguration().isEnabled()).isTrue(); | ||||
|     } | ||||
| 
 | ||||
|     { | ||||
|       final String directoryReconcilerConfig = REQUIRED_CONFIG.concat(""" | ||||
|           directoryReconciler: | ||||
|             enabled: false | ||||
|           """); | ||||
| 
 | ||||
|       DynamicDirectoryReconcilerConfiguration directoryReconcilerConfiguration = | ||||
|           DynamicConfigurationManager.parseConfiguration(directoryReconcilerConfig, DynamicConfiguration.class).orElseThrow() | ||||
|               .getDirectoryReconcilerConfiguration(); | ||||
| 
 | ||||
|       assertThat(directoryReconcilerConfiguration.isEnabled()).isFalse(); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void testParseTurnConfig() throws JsonProcessingException { | ||||
|     { | ||||
|  |  | |||
|  | @ -1,122 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013-2021 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| 
 | ||||
| package org.whispersystems.textsecuregcm.sqs; | ||||
| 
 | ||||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||||
| import static org.mockito.ArgumentMatchers.any; | ||||
| import static org.mockito.Mockito.mock; | ||||
| import static org.mockito.Mockito.times; | ||||
| import static org.mockito.Mockito.verify; | ||||
| import static org.mockito.Mockito.when; | ||||
| 
 | ||||
| import java.util.List; | ||||
| import java.util.UUID; | ||||
| import java.util.concurrent.CompletableFuture; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| import java.util.concurrent.TimeoutException; | ||||
| import java.util.stream.Stream; | ||||
| import org.junit.jupiter.api.BeforeEach; | ||||
| import org.junit.jupiter.api.Test; | ||||
| import org.junit.jupiter.params.ParameterizedTest; | ||||
| import org.junit.jupiter.params.provider.Arguments; | ||||
| import org.junit.jupiter.params.provider.MethodSource; | ||||
| import org.mockito.ArgumentCaptor; | ||||
| import org.whispersystems.textsecuregcm.storage.Account; | ||||
| import software.amazon.awssdk.services.sqs.SqsAsyncClient; | ||||
| import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; | ||||
| import software.amazon.awssdk.services.sqs.model.SendMessageRequest; | ||||
| import software.amazon.awssdk.services.sqs.model.SendMessageResponse; | ||||
| 
 | ||||
| public class DirectoryQueueTest { | ||||
| 
 | ||||
|   private SqsAsyncClient sqsAsyncClient; | ||||
| 
 | ||||
|   @BeforeEach | ||||
|   void setUp() { | ||||
|     sqsAsyncClient = mock(SqsAsyncClient.class); | ||||
| 
 | ||||
|     when(sqsAsyncClient.sendMessage(any(SendMessageRequest.class))) | ||||
|         .thenReturn(CompletableFuture.completedFuture(SendMessageResponse.builder().build())); | ||||
|   } | ||||
| 
 | ||||
|   @ParameterizedTest | ||||
|   @MethodSource("argumentsForTestRefreshRegisteredUser") | ||||
|   void testRefreshRegisteredUser(final boolean shouldBeVisibleInDirectory, final String expectedAction) { | ||||
|     final DirectoryQueue directoryQueue = new DirectoryQueue(List.of("sqs://test"), sqsAsyncClient); | ||||
| 
 | ||||
|     final Account account = mock(Account.class); | ||||
|     when(account.getNumber()).thenReturn("+18005556543"); | ||||
|     when(account.getUuid()).thenReturn(UUID.randomUUID()); | ||||
|     when(account.shouldBeVisibleInDirectory()).thenReturn(shouldBeVisibleInDirectory); | ||||
| 
 | ||||
|     directoryQueue.refreshAccount(account); | ||||
| 
 | ||||
|     final ArgumentCaptor<SendMessageRequest> requestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); | ||||
|     verify(sqsAsyncClient).sendMessage(requestCaptor.capture()); | ||||
| 
 | ||||
|     assertEquals(MessageAttributeValue.builder().dataType("String").stringValue(expectedAction).build(), | ||||
|         requestCaptor.getValue().messageAttributes().get("action")); | ||||
|   } | ||||
| 
 | ||||
|   @SuppressWarnings("unused") | ||||
|   private static Stream<Arguments> argumentsForTestRefreshRegisteredUser() { | ||||
|     return Stream.of( | ||||
|         Arguments.of(true, "add"), | ||||
|         Arguments.of(false, "delete")); | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void testSendMessageMultipleQueues() { | ||||
|     final DirectoryQueue directoryQueue = new DirectoryQueue(List.of("sqs://first", "sqs://second"), sqsAsyncClient); | ||||
| 
 | ||||
|     final Account account = mock(Account.class); | ||||
|     when(account.getNumber()).thenReturn("+18005556543"); | ||||
|     when(account.getUuid()).thenReturn(UUID.randomUUID()); | ||||
|     when(account.shouldBeVisibleInDirectory()).thenReturn(true); | ||||
| 
 | ||||
|     directoryQueue.refreshAccount(account); | ||||
| 
 | ||||
|     final ArgumentCaptor<SendMessageRequest> requestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); | ||||
|     verify(sqsAsyncClient, times(2)).sendMessage(requestCaptor.capture()); | ||||
| 
 | ||||
|     for (final SendMessageRequest sendMessageRequest : requestCaptor.getAllValues()) { | ||||
|       assertEquals(MessageAttributeValue.builder().dataType("String").stringValue("add").build(), | ||||
|           sendMessageRequest.messageAttributes().get("action")); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void testStop() { | ||||
|     final CompletableFuture<SendMessageResponse> sendMessageFuture = new CompletableFuture<>(); | ||||
|     when(sqsAsyncClient.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageFuture); | ||||
| 
 | ||||
|     final DirectoryQueue directoryQueue = new DirectoryQueue(List.of("sqs://test"), sqsAsyncClient); | ||||
| 
 | ||||
|     final Account account = mock(Account.class); | ||||
|     when(account.getNumber()).thenReturn("+18005556543"); | ||||
|     when(account.getUuid()).thenReturn(UUID.randomUUID()); | ||||
|     when(account.shouldBeVisibleInDirectory()).thenReturn(true); | ||||
| 
 | ||||
|     directoryQueue.refreshAccount(account); | ||||
| 
 | ||||
|     final CompletableFuture<Boolean> stopFuture = CompletableFuture.supplyAsync(() -> { | ||||
|       try { | ||||
|         directoryQueue.stop(); | ||||
|         return true; | ||||
|       } catch (final Exception e) { | ||||
|         return false; | ||||
|       } | ||||
|     }); | ||||
| 
 | ||||
|     assertThrows(TimeoutException.class, () -> stopFuture.get(1, TimeUnit.SECONDS), | ||||
|         "Directory queue should not finish shutting down until all outstanding requests are resolved"); | ||||
| 
 | ||||
|     sendMessageFuture.complete(SendMessageResponse.builder().build()); | ||||
|     assertTrue(stopFuture.join()); | ||||
|   } | ||||
| } | ||||
|  | @ -33,15 +33,10 @@ import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; | |||
| import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient; | ||||
| import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; | ||||
| import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; | ||||
| import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; | ||||
| import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Indexes; | ||||
| import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; | ||||
| 
 | ||||
| class AccountsManagerChangeNumberIntegrationTest { | ||||
| 
 | ||||
|   private static final String NUMBERS_TABLE_NAME = "numbers_test"; | ||||
|   private static final String PNI_ASSIGNMENT_TABLE_NAME = "pni_assignment_test"; | ||||
|   private static final String USERNAMES_TABLE_NAME = "usernames_test"; | ||||
|   private static final int SCAN_PAGE_SIZE = 1; | ||||
| 
 | ||||
|   @RegisterExtension | ||||
|  | @ -82,8 +77,7 @@ class AccountsManagerChangeNumberIntegrationTest { | |||
|           SCAN_PAGE_SIZE); | ||||
| 
 | ||||
|       deletedAccounts = new DeletedAccounts(DYNAMO_DB_EXTENSION.getDynamoDbClient(), | ||||
|           Tables.DELETED_ACCOUNTS.tableName(), | ||||
|           Indexes.DELETED_ACCOUNTS_NEEDS_RECONCILIATION.indexName()); | ||||
|           Tables.DELETED_ACCOUNTS.tableName()); | ||||
| 
 | ||||
|       final DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts, | ||||
|           DYNAMO_DB_EXTENSION.getLegacyDynamoClient(), | ||||
|  | @ -108,7 +102,6 @@ class AccountsManagerChangeNumberIntegrationTest { | |||
|           phoneNumberIdentifiers, | ||||
|           CACHE_CLUSTER_EXTENSION.getRedisCluster(), | ||||
|           deletedAccountsManager, | ||||
|           mock(DirectoryQueue.class), | ||||
|           mock(Keys.class), | ||||
|           mock(MessagesManager.class), | ||||
|           mock(ProfilesManager.class), | ||||
|  |  | |||
|  | @ -47,7 +47,6 @@ import org.whispersystems.textsecuregcm.push.ClientPresenceManager; | |||
| import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient; | ||||
| import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; | ||||
| import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; | ||||
| import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; | ||||
| import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; | ||||
| import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; | ||||
| import org.whispersystems.textsecuregcm.tests.util.JsonHelpers; | ||||
|  | @ -111,7 +110,6 @@ class AccountsManagerConcurrentModificationIntegrationTest { | |||
|           phoneNumberIdentifiers, | ||||
|           RedisClusterHelper.builder().stringCommands(commands).build(), | ||||
|           deletedAccountsManager, | ||||
|           mock(DirectoryQueue.class), | ||||
|           mock(Keys.class), | ||||
|           mock(MessagesManager.class), | ||||
|           mock(ProfilesManager.class), | ||||
|  |  | |||
|  | @ -59,7 +59,6 @@ import org.whispersystems.textsecuregcm.push.ClientPresenceManager; | |||
| import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient; | ||||
| import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; | ||||
| import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; | ||||
| import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; | ||||
| import org.whispersystems.textsecuregcm.storage.Device.DeviceCapabilities; | ||||
| import org.whispersystems.textsecuregcm.tests.util.AccountsHelper; | ||||
| import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; | ||||
|  | @ -73,7 +72,6 @@ class AccountsManagerTest { | |||
| 
 | ||||
|   private Accounts accounts; | ||||
|   private DeletedAccountsManager deletedAccountsManager; | ||||
|   private DirectoryQueue directoryQueue; | ||||
|   private Keys keys; | ||||
|   private MessagesManager messagesManager; | ||||
|   private ProfilesManager profilesManager; | ||||
|  | @ -96,7 +94,6 @@ class AccountsManagerTest { | |||
|   void setup() throws InterruptedException { | ||||
|     accounts = mock(Accounts.class); | ||||
|     deletedAccountsManager = mock(DeletedAccountsManager.class); | ||||
|     directoryQueue = mock(DirectoryQueue.class); | ||||
|     keys = mock(Keys.class); | ||||
|     messagesManager = mock(MessagesManager.class); | ||||
|     profilesManager = mock(ProfilesManager.class); | ||||
|  | @ -153,7 +150,6 @@ class AccountsManagerTest { | |||
|         phoneNumberIdentifiers, | ||||
|         RedisClusterHelper.builder().stringCommands(commands).build(), | ||||
|         deletedAccountsManager, | ||||
|         directoryQueue, | ||||
|         keys, | ||||
|         messagesManager, | ||||
|         profilesManager, | ||||
|  | @ -598,10 +594,6 @@ class AccountsManagerTest { | |||
|     final Account account = accountsManager.create("+18005550123", "password", null, attributes, new ArrayList<>()); | ||||
| 
 | ||||
|     assertEquals(discoverable, account.isDiscoverableByPhoneNumber()); | ||||
| 
 | ||||
|     if (!discoverable) { | ||||
|       verify(directoryQueue).deleteAccount(account); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   @ParameterizedTest | ||||
|  | @ -615,32 +607,6 @@ class AccountsManagerTest { | |||
|     assertEquals(hasStorage, account.isStorageSupported()); | ||||
|   } | ||||
| 
 | ||||
|   @ParameterizedTest | ||||
|   @MethodSource | ||||
|   void testUpdateDirectoryQueue(final boolean visibleBeforeUpdate, final boolean visibleAfterUpdate, | ||||
|       final boolean expectRefresh) { | ||||
|     final Account account = AccountsHelper.generateTestAccount("+14152222222", UUID.randomUUID(), UUID.randomUUID(), new ArrayList<>(), new byte[16]); | ||||
| 
 | ||||
|     // this sets up the appropriate result for Account#shouldBeVisibleInDirectory | ||||
|     final Device device = generateTestDevice(0); | ||||
|     account.addDevice(device); | ||||
|     account.setDiscoverableByPhoneNumber(visibleBeforeUpdate); | ||||
| 
 | ||||
|     final Account updatedAccount = accountsManager.update(account, | ||||
|         a -> a.setDiscoverableByPhoneNumber(visibleAfterUpdate)); | ||||
| 
 | ||||
|     verify(directoryQueue, times(expectRefresh ? 1 : 0)).refreshAccount(updatedAccount); | ||||
|   } | ||||
| 
 | ||||
|   @SuppressWarnings("unused") | ||||
|   private static Stream<Arguments> testUpdateDirectoryQueue() { | ||||
|     return Stream.of( | ||||
|         Arguments.of(false, false, false), | ||||
|         Arguments.of(true, true, false), | ||||
|         Arguments.of(false, true, true), | ||||
|         Arguments.of(true, false, true)); | ||||
|   } | ||||
| 
 | ||||
|   @ParameterizedTest | ||||
|   @MethodSource | ||||
|   void testUpdateDeviceLastSeen(final boolean expectUpdate, final long initialLastSeen, final long updatedLastSeen) { | ||||
|  | @ -680,7 +646,6 @@ class AccountsManagerTest { | |||
| 
 | ||||
|     assertTrue(phoneNumberIdentifiersByE164.containsKey(targetNumber)); | ||||
| 
 | ||||
|     verify(directoryQueue).changePhoneNumber(argThat(a -> a.getUuid().equals(uuid)), eq(originalNumber), eq(targetNumber)); | ||||
|     verify(keys).delete(originalPni); | ||||
|     verify(keys).delete(phoneNumberIdentifiersByE164.get(targetNumber)); | ||||
|   } | ||||
|  | @ -694,7 +659,6 @@ class AccountsManagerTest { | |||
| 
 | ||||
|     assertEquals(number, account.getNumber()); | ||||
|     verify(deletedAccountsManager, never()).lockAndPut(anyString(), anyString(), any()); | ||||
|     verify(directoryQueue, never()).changePhoneNumber(any(), any(), any()); | ||||
|     verify(keys, never()).delete(any()); | ||||
|   } | ||||
| 
 | ||||
|  | @ -736,8 +700,6 @@ class AccountsManagerTest { | |||
| 
 | ||||
|     assertTrue(phoneNumberIdentifiersByE164.containsKey(targetNumber)); | ||||
| 
 | ||||
|     verify(directoryQueue).changePhoneNumber(argThat(a -> a.getUuid().equals(uuid)), eq(originalNumber), eq(targetNumber)); | ||||
|     verify(directoryQueue).deleteAccount(existingAccount); | ||||
|     verify(keys).delete(originalPni); | ||||
|     verify(keys).delete(targetPni); | ||||
|   } | ||||
|  |  | |||
|  | @ -44,7 +44,6 @@ import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; | |||
| import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient; | ||||
| import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; | ||||
| import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; | ||||
| import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; | ||||
| import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; | ||||
| import org.whispersystems.textsecuregcm.util.AttributeValues; | ||||
| import software.amazon.awssdk.services.dynamodb.model.AttributeValue; | ||||
|  | @ -114,7 +113,6 @@ class AccountsManagerUsernameIntegrationTest { | |||
|         phoneNumberIdentifiers, | ||||
|         CACHE_CLUSTER_EXTENSION.getRedisCluster(), | ||||
|         deletedAccountsManager, | ||||
|         mock(DirectoryQueue.class), | ||||
|         mock(Keys.class), | ||||
|         mock(MessagesManager.class), | ||||
|         mock(ProfilesManager.class), | ||||
|  |  | |||
|  | @ -5,21 +5,14 @@ | |||
| 
 | ||||
| package org.whispersystems.textsecuregcm.storage; | ||||
| 
 | ||||
| import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; | ||||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||||
| 
 | ||||
| import java.lang.Thread.State; | ||||
| import java.util.HashMap; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.Optional; | ||||
| import java.util.UUID; | ||||
| import org.junit.jupiter.api.BeforeEach; | ||||
| import org.junit.jupiter.api.Test; | ||||
| import org.junit.jupiter.api.extension.RegisterExtension; | ||||
| import org.junit.jupiter.api.function.Executable; | ||||
| import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Indexes; | ||||
| import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; | ||||
| 
 | ||||
| class DeletedAccountsManagerTest { | ||||
|  | @ -34,8 +27,7 @@ class DeletedAccountsManagerTest { | |||
|   @BeforeEach | ||||
|   void setUp() { | ||||
|     deletedAccounts = new DeletedAccounts(DYNAMO_DB_EXTENSION.getDynamoDbClient(), | ||||
|         Tables.DELETED_ACCOUNTS.tableName(), | ||||
|         Indexes.DELETED_ACCOUNTS_NEEDS_RECONCILIATION.indexName()); | ||||
|         Tables.DELETED_ACCOUNTS.tableName()); | ||||
| 
 | ||||
|     deletedAccountsManager = new DeletedAccountsManager(deletedAccounts, | ||||
|         DYNAMO_DB_EXTENSION.getLegacyDynamoClient(), | ||||
|  | @ -47,7 +39,7 @@ class DeletedAccountsManagerTest { | |||
|     final UUID uuid = UUID.randomUUID(); | ||||
|     final String e164 = "+18005551234"; | ||||
| 
 | ||||
|     deletedAccounts.put(uuid, e164, true); | ||||
|     deletedAccounts.put(uuid, e164); | ||||
|     deletedAccountsManager.lockAndTake(e164, maybeUuid -> assertEquals(Optional.of(uuid), maybeUuid)); | ||||
|     assertEquals(Optional.empty(), deletedAccounts.findUuid(e164)); | ||||
|   } | ||||
|  | @ -57,7 +49,7 @@ class DeletedAccountsManagerTest { | |||
|     final UUID uuid = UUID.randomUUID(); | ||||
|     final String e164 = "+18005551234"; | ||||
| 
 | ||||
|     deletedAccounts.put(uuid, e164, true); | ||||
|     deletedAccounts.put(uuid, e164); | ||||
| 
 | ||||
|     assertThrows(RuntimeException.class, () -> deletedAccountsManager.lockAndTake(e164, maybeUuid -> { | ||||
|       assertEquals(Optional.of(uuid), maybeUuid); | ||||
|  | @ -66,73 +58,4 @@ class DeletedAccountsManagerTest { | |||
| 
 | ||||
|     assertEquals(Optional.of(uuid), deletedAccounts.findUuid(e164)); | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void testReconciliationLockContention() throws ChunkProcessingFailedException { | ||||
| 
 | ||||
|     final UUID[] uuids = new UUID[3]; | ||||
|     final String[] e164s = new String[uuids.length]; | ||||
| 
 | ||||
|     for (int i = 0; i < uuids.length; i++) { | ||||
|       uuids[i] = UUID.randomUUID(); | ||||
|       e164s[i] = String.format("+1800555%04d", i); | ||||
|     } | ||||
| 
 | ||||
|     final Map<String, UUID> expectedReconciledAccounts = new HashMap<>(); | ||||
| 
 | ||||
|     for (int i = 0; i < uuids.length; i++) { | ||||
|       deletedAccounts.put(uuids[i], e164s[i], true); | ||||
|       expectedReconciledAccounts.put(e164s[i], uuids[i]); | ||||
|     } | ||||
| 
 | ||||
|     final UUID replacedUUID = UUID.randomUUID(); | ||||
|     final Map<String, UUID> reconciledAccounts = new HashMap<>(); | ||||
| 
 | ||||
|     final Thread putThread = new Thread(() -> { | ||||
|       try { | ||||
|         deletedAccountsManager.lockAndPut(e164s[0], () -> replacedUUID); | ||||
|       } catch (InterruptedException e) { | ||||
|         throw new RuntimeException(e); | ||||
|       } | ||||
|     }, | ||||
|         getClass().getSimpleName() + "-put"); | ||||
| 
 | ||||
|     final Thread reconcileThread = new Thread(() -> { | ||||
|       try { | ||||
|         deletedAccountsManager.lockAndReconcileAccounts(uuids.length, deletedAccounts -> { | ||||
|           // We hold the lock for the first account, so a thread trying to operate on that first count should block | ||||
|           // waiting for the lock. | ||||
|           putThread.start(); | ||||
| 
 | ||||
|           // Make sure the other thread really does actually block at some point | ||||
|           while (putThread.getState() != State.TIMED_WAITING) { | ||||
|             Thread.yield(); | ||||
|           } | ||||
| 
 | ||||
|           deletedAccounts.forEach(pair -> reconciledAccounts.put(pair.second(), pair.first())); | ||||
|           return reconciledAccounts.keySet(); | ||||
|         }); | ||||
|       } catch (ChunkProcessingFailedException e) { | ||||
|         throw new AssertionError(e); | ||||
|       } | ||||
|     }, getClass().getSimpleName() + "-reconcile"); | ||||
| 
 | ||||
|     reconcileThread.start(); | ||||
| 
 | ||||
|     assertDoesNotThrow((Executable) reconcileThread::join); | ||||
|     assertDoesNotThrow((Executable) putThread::join); | ||||
| 
 | ||||
|     assertEquals(expectedReconciledAccounts, reconciledAccounts); | ||||
| 
 | ||||
|     // The "put" thread should have completed after the reconciliation thread wrapped up. We can verify that's true by | ||||
|     // reconciling again; the updated account (and only that account) should appear in the "needs reconciliation" list. | ||||
|     deletedAccountsManager.lockAndReconcileAccounts(uuids.length, deletedAccounts -> { | ||||
|       assertEquals(1, deletedAccounts.size()); | ||||
|       assertEquals(replacedUUID, deletedAccounts.get(0).first()); | ||||
|       assertEquals(e164s[0], deletedAccounts.get(0).second()); | ||||
| 
 | ||||
|       return List.of(deletedAccounts.get(0).second()); | ||||
|     }); | ||||
|   } | ||||
| 
 | ||||
| } | ||||
|  |  | |||
|  | @ -5,20 +5,13 @@ | |||
| package org.whispersystems.textsecuregcm.storage; | ||||
| 
 | ||||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||||
| 
 | ||||
| import java.util.Collections; | ||||
| import java.util.HashSet; | ||||
| import java.util.List; | ||||
| import java.util.Optional; | ||||
| import java.util.Set; | ||||
| import java.util.UUID; | ||||
| import org.junit.jupiter.api.BeforeEach; | ||||
| import org.junit.jupiter.api.Test; | ||||
| import org.junit.jupiter.api.extension.RegisterExtension; | ||||
| import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Indexes; | ||||
| import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; | ||||
| import org.whispersystems.textsecuregcm.util.Pair; | ||||
| 
 | ||||
| class DeletedAccountsTest { | ||||
| 
 | ||||
|  | @ -29,41 +22,7 @@ class DeletedAccountsTest { | |||
| 
 | ||||
|   @BeforeEach | ||||
|   void setUp() { | ||||
|     deletedAccounts = new DeletedAccounts(DYNAMO_DB_EXTENSION.getDynamoDbClient(), | ||||
|         Tables.DELETED_ACCOUNTS.tableName(), | ||||
|         Indexes.DELETED_ACCOUNTS_NEEDS_RECONCILIATION.indexName()); | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void testPutList() { | ||||
|     UUID firstUuid = UUID.randomUUID(); | ||||
|     UUID secondUuid = UUID.randomUUID(); | ||||
|     UUID thirdUuid = UUID.randomUUID(); | ||||
| 
 | ||||
|     String firstNumber = "+14152221234"; | ||||
|     String secondNumber = "+14152225678"; | ||||
|     String thirdNumber = "+14159998765"; | ||||
| 
 | ||||
|     assertTrue(deletedAccounts.listAccountsToReconcile(1).isEmpty()); | ||||
| 
 | ||||
|     deletedAccounts.put(firstUuid, firstNumber, true); | ||||
|     deletedAccounts.put(secondUuid, secondNumber, true); | ||||
|     deletedAccounts.put(thirdUuid, thirdNumber, true); | ||||
| 
 | ||||
|     assertEquals(1, deletedAccounts.listAccountsToReconcile(1).size()); | ||||
| 
 | ||||
|     assertTrue(deletedAccounts.listAccountsToReconcile(10).containsAll( | ||||
|         List.of( | ||||
|             new Pair<>(firstUuid, firstNumber), | ||||
|             new Pair<>(secondUuid, secondNumber)))); | ||||
| 
 | ||||
|     deletedAccounts.markReconciled(List.of(firstNumber, secondNumber)); | ||||
| 
 | ||||
|     assertEquals(List.of(new Pair<>(thirdUuid, thirdNumber)), deletedAccounts.listAccountsToReconcile(10)); | ||||
| 
 | ||||
|     deletedAccounts.markReconciled(List.of(thirdNumber)); | ||||
| 
 | ||||
|     assertTrue(deletedAccounts.listAccountsToReconcile(1).isEmpty()); | ||||
|     deletedAccounts = new DeletedAccounts(DYNAMO_DB_EXTENSION.getDynamoDbClient(), Tables.DELETED_ACCOUNTS.tableName()); | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|  | @ -73,7 +32,7 @@ class DeletedAccountsTest { | |||
| 
 | ||||
|     assertEquals(Optional.empty(), deletedAccounts.findUuid(e164)); | ||||
| 
 | ||||
|     deletedAccounts.put(uuid, e164, true); | ||||
|     deletedAccounts.put(uuid, e164); | ||||
| 
 | ||||
|     assertEquals(Optional.of(uuid), deletedAccounts.findUuid(e164)); | ||||
|   } | ||||
|  | @ -85,7 +44,7 @@ class DeletedAccountsTest { | |||
| 
 | ||||
|     assertEquals(Optional.empty(), deletedAccounts.findUuid(e164)); | ||||
| 
 | ||||
|     deletedAccounts.put(uuid, e164, true); | ||||
|     deletedAccounts.put(uuid, e164); | ||||
| 
 | ||||
|     assertEquals(Optional.of(uuid), deletedAccounts.findUuid(e164)); | ||||
| 
 | ||||
|  | @ -94,45 +53,6 @@ class DeletedAccountsTest { | |||
|     assertEquals(Optional.empty(), deletedAccounts.findUuid(e164)); | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void testGetAccountsNeedingReconciliation() { | ||||
|     final UUID firstUuid = UUID.randomUUID(); | ||||
|     final UUID secondUuid = UUID.randomUUID(); | ||||
| 
 | ||||
|     final String firstNumber = "+14152221234"; | ||||
|     final String secondNumber = "+14152225678"; | ||||
|     final String thirdNumber = "+14159998765"; | ||||
| 
 | ||||
|     assertEquals(Collections.emptySet(), | ||||
|         deletedAccounts.getAccountsNeedingReconciliation(List.of(firstNumber, secondNumber, thirdNumber))); | ||||
| 
 | ||||
|     deletedAccounts.put(firstUuid, firstNumber, true); | ||||
|     deletedAccounts.put(secondUuid, secondNumber, true); | ||||
| 
 | ||||
|     assertEquals(Set.of(firstNumber, secondNumber), | ||||
|         deletedAccounts.getAccountsNeedingReconciliation(List.of(firstNumber, secondNumber, thirdNumber))); | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void testGetAccountsNeedingReconciliationLargeBatch() { | ||||
|     final int itemCount = (DeletedAccounts.GET_BATCH_SIZE * 3) + 1; | ||||
| 
 | ||||
|     final Set<String> expectedAccountsNeedingReconciliation = new HashSet<>(itemCount); | ||||
| 
 | ||||
|     for (int i = 0; i < itemCount; i++) { | ||||
|       final String e164 = String.format("+18000555%04d", i); | ||||
| 
 | ||||
|       deletedAccounts.put(UUID.randomUUID(), e164, true); | ||||
|       expectedAccountsNeedingReconciliation.add(e164); | ||||
|     } | ||||
| 
 | ||||
|     final Set<String> accountsNeedingReconciliation = | ||||
|         deletedAccounts.getAccountsNeedingReconciliation(expectedAccountsNeedingReconciliation); | ||||
| 
 | ||||
|     assertEquals(expectedAccountsNeedingReconciliation, accountsNeedingReconciliation); | ||||
|   } | ||||
| 
 | ||||
| 
 | ||||
|   @Test | ||||
|   void testFindE164() { | ||||
|     assertEquals(Optional.empty(), deletedAccounts.findE164(UUID.randomUUID())); | ||||
|  | @ -140,7 +60,7 @@ class DeletedAccountsTest { | |||
|     final UUID uuid = UUID.randomUUID(); | ||||
|     final String e164 = "+18005551234"; | ||||
| 
 | ||||
|     deletedAccounts.put(uuid, e164, true); | ||||
|     deletedAccounts.put(uuid, e164); | ||||
| 
 | ||||
|     assertEquals(Optional.of(e164), deletedAccounts.findE164(uuid)); | ||||
|   } | ||||
|  | @ -153,7 +73,7 @@ class DeletedAccountsTest { | |||
| 
 | ||||
|     final UUID uuid = UUID.randomUUID(); | ||||
| 
 | ||||
|     deletedAccounts.put(uuid, e164, true); | ||||
|     deletedAccounts.put(uuid, e164); | ||||
| 
 | ||||
|     assertEquals(Optional.of(uuid), deletedAccounts.findUuid(e164)); | ||||
|   } | ||||
|  |  | |||
|  | @ -18,20 +18,6 @@ import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; | |||
| 
 | ||||
| public final class DynamoDbExtensionSchema { | ||||
| 
 | ||||
|   public enum Indexes { | ||||
| 
 | ||||
|       DELETED_ACCOUNTS_NEEDS_RECONCILIATION("needs_reconciliation_test"); | ||||
| 
 | ||||
|       private final String name; | ||||
| 
 | ||||
|       public String indexName() { | ||||
|         return name; | ||||
|       } | ||||
| 
 | ||||
|       Indexes(final String name) { this.name = name; } | ||||
| 
 | ||||
|   } | ||||
| 
 | ||||
|   public enum Tables implements DynamoDbExtension.TableSchema { | ||||
| 
 | ||||
|     ACCOUNTS("accounts_test", | ||||
|  | @ -50,22 +36,11 @@ public final class DynamoDbExtensionSchema { | |||
|             AttributeDefinition.builder() | ||||
|                 .attributeName(DeletedAccounts.KEY_ACCOUNT_E164) | ||||
|                 .attributeType(ScalarAttributeType.S).build(), | ||||
|             AttributeDefinition.builder() | ||||
|                 .attributeName(DeletedAccounts.ATTR_NEEDS_CDS_RECONCILIATION) | ||||
|                 .attributeType(ScalarAttributeType.N) | ||||
|                 .build(), | ||||
|             AttributeDefinition.builder() | ||||
|                 .attributeName(DeletedAccounts.ATTR_ACCOUNT_UUID) | ||||
|                 .attributeType(ScalarAttributeType.B) | ||||
|                 .build()), | ||||
|         List.of( | ||||
|             GlobalSecondaryIndex.builder() | ||||
|                 .indexName(Indexes.DELETED_ACCOUNTS_NEEDS_RECONCILIATION.indexName()) | ||||
|                 .keySchema(KeySchemaElement.builder().attributeName(DeletedAccounts.KEY_ACCOUNT_E164).keyType(KeyType.HASH).build(), | ||||
|                     KeySchemaElement.builder().attributeName(DeletedAccounts.ATTR_NEEDS_CDS_RECONCILIATION).keyType(KeyType.RANGE).build()) | ||||
|                 .projection(Projection.builder().projectionType(ProjectionType.INCLUDE).nonKeyAttributes(DeletedAccounts.ATTR_ACCOUNT_UUID).build()) | ||||
|                 .provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(10L).writeCapacityUnits(10L).build()) | ||||
|                 .build(), | ||||
|             GlobalSecondaryIndex.builder() | ||||
|                 .indexName(DeletedAccounts.UUID_TO_E164_INDEX_NAME) | ||||
|                 .keySchema( | ||||
|  |  | |||
|  | @ -1,103 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| 
 | ||||
| package org.whispersystems.textsecuregcm.tests.controllers; | ||||
| 
 | ||||
| import static org.assertj.core.api.Assertions.assertThat; | ||||
| import static org.mockito.ArgumentMatchers.eq; | ||||
| import static org.mockito.Mockito.mock; | ||||
| import static org.mockito.Mockito.when; | ||||
| 
 | ||||
| import com.google.common.collect.ImmutableSet; | ||||
| import com.google.common.net.HttpHeaders; | ||||
| import io.dropwizard.auth.PolymorphicAuthValueFactoryProvider; | ||||
| import io.dropwizard.testing.junit5.DropwizardExtensionsSupport; | ||||
| import io.dropwizard.testing.junit5.ResourceExtension; | ||||
| import java.util.Collections; | ||||
| import javax.ws.rs.client.Entity; | ||||
| import javax.ws.rs.core.MediaType; | ||||
| import javax.ws.rs.core.Response; | ||||
| import javax.ws.rs.core.Response.Status.Family; | ||||
| import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory; | ||||
| import org.junit.jupiter.api.BeforeEach; | ||||
| import org.junit.jupiter.api.Test; | ||||
| import org.junit.jupiter.api.extension.ExtendWith; | ||||
| import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; | ||||
| import org.whispersystems.textsecuregcm.auth.DisabledPermittedAuthenticatedAccount; | ||||
| import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials; | ||||
| import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator; | ||||
| import org.whispersystems.textsecuregcm.controllers.DirectoryController; | ||||
| import org.whispersystems.textsecuregcm.tests.util.AuthHelper; | ||||
| 
 | ||||
| @ExtendWith(DropwizardExtensionsSupport.class) | ||||
| class DirectoryControllerTest { | ||||
| 
 | ||||
|   private static final ExternalServiceCredentialsGenerator directoryCredentialsGenerator = mock(ExternalServiceCredentialsGenerator.class); | ||||
|   private static final ExternalServiceCredentials         validCredentials              = new ExternalServiceCredentials("username", "password"); | ||||
| 
 | ||||
|   private static final ResourceExtension resources = ResourceExtension.builder() | ||||
|       .addProvider(AuthHelper.getAuthFilter()) | ||||
|       .addProvider(new PolymorphicAuthValueFactoryProvider.Binder<>( | ||||
|           ImmutableSet.of(AuthenticatedAccount.class, DisabledPermittedAuthenticatedAccount.class))) | ||||
|       .setTestContainerFactory(new GrizzlyWebTestContainerFactory()) | ||||
|       .addResource(new DirectoryController(directoryCredentialsGenerator)) | ||||
|       .build(); | ||||
| 
 | ||||
|   @BeforeEach | ||||
|   void setup() { | ||||
|     when(directoryCredentialsGenerator.generateFor(eq(AuthHelper.VALID_NUMBER))).thenReturn(validCredentials); | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void testFeedbackOk() { | ||||
|     Response response = | ||||
|         resources.getJerseyTest() | ||||
|                  .target("/v1/directory/feedback-v3/ok") | ||||
|                  .request() | ||||
|                  .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_UUID, AuthHelper.VALID_PASSWORD)) | ||||
|                  .put(Entity.json("{\"reason\": \"test reason\"}")); | ||||
|     assertThat(response.getStatusInfo().getFamily()).isEqualTo(Family.SUCCESSFUL); | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void testGetAuthToken() { | ||||
|     ExternalServiceCredentials token = | ||||
|             resources.getJerseyTest() | ||||
|                      .target("/v1/directory/auth") | ||||
|                      .request() | ||||
|                      .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_UUID, AuthHelper.VALID_PASSWORD)) | ||||
|                      .get(ExternalServiceCredentials.class); | ||||
|     assertThat(token.username()).isEqualTo(validCredentials.username()); | ||||
|     assertThat(token.password()).isEqualTo(validCredentials.password()); | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void testDisabledGetAuthToken() { | ||||
|     Response response = | ||||
|         resources.getJerseyTest() | ||||
|                  .target("/v1/directory/auth") | ||||
|                  .request() | ||||
|                  .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.DISABLED_UUID, AuthHelper.DISABLED_PASSWORD)) | ||||
|                  .get(); | ||||
|     assertThat(response.getStatus()).isEqualTo(401); | ||||
|   } | ||||
| 
 | ||||
| 
 | ||||
|   @Test | ||||
|   void testContactIntersection() { | ||||
|     Response response = | ||||
|         resources.getJerseyTest() | ||||
|                  .target("/v1/directory/tokens/") | ||||
|                  .request() | ||||
|                  .header("Authorization", | ||||
|                          AuthHelper.getAuthHeader(AuthHelper.VALID_UUID, | ||||
|                                                   AuthHelper.VALID_PASSWORD)) | ||||
|                  .header(HttpHeaders.X_FORWARDED_FOR, "192.168.1.1, 1.1.1.1") | ||||
|                  .put(Entity.entity(Collections.emptyMap(), MediaType.APPLICATION_JSON_TYPE)); | ||||
| 
 | ||||
| 
 | ||||
|     assertThat(response.getStatus()).isEqualTo(429); | ||||
|   } | ||||
| } | ||||
|  | @ -1,87 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2013-2021 Signal Messenger, LLC | ||||
|  * SPDX-License-Identifier: AGPL-3.0-only | ||||
|  */ | ||||
| 
 | ||||
| package org.whispersystems.textsecuregcm.tests.storage; | ||||
| 
 | ||||
| import static org.assertj.core.api.Assertions.assertThat; | ||||
| import static org.mockito.ArgumentMatchers.any; | ||||
| import static org.mockito.Mockito.mock; | ||||
| import static org.mockito.Mockito.times; | ||||
| import static org.mockito.Mockito.verify; | ||||
| import static org.mockito.Mockito.verifyNoMoreInteractions; | ||||
| import static org.mockito.Mockito.when; | ||||
| 
 | ||||
| import java.util.Arrays; | ||||
| import java.util.List; | ||||
| import java.util.Optional; | ||||
| import java.util.UUID; | ||||
| import org.junit.jupiter.api.BeforeEach; | ||||
| import org.junit.jupiter.api.Test; | ||||
| import org.mockito.ArgumentCaptor; | ||||
| import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; | ||||
| import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; | ||||
| import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest.User; | ||||
| import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; | ||||
| import org.whispersystems.textsecuregcm.storage.Account; | ||||
| import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException; | ||||
| import org.whispersystems.textsecuregcm.storage.DirectoryReconciler; | ||||
| import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient; | ||||
| import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; | ||||
| 
 | ||||
| class DirectoryReconcilerTest { | ||||
| 
 | ||||
|   private static final UUID VALID_UUID = UUID.randomUUID(); | ||||
|   private static final String VALID_NUMBER = "+14152222222"; | ||||
|   private static final UUID UNDISCOVERABLE_UUID = UUID.randomUUID(); | ||||
|   private static final String UNDISCOVERABLE_NUMBER = "+14153333333"; | ||||
| 
 | ||||
|   private final Account visibleAccount = mock(Account.class); | ||||
|   private final Account undiscoverableAccount = mock(Account.class); | ||||
|   private final DirectoryReconciliationClient reconciliationClient = mock(DirectoryReconciliationClient.class); | ||||
|   private final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class); | ||||
|   private final DirectoryReconciler directoryReconciler = new DirectoryReconciler("test", reconciliationClient, | ||||
|       dynamicConfigurationManager); | ||||
| 
 | ||||
|   private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse( | ||||
|       DirectoryReconciliationResponse.Status.OK); | ||||
| 
 | ||||
|   @BeforeEach | ||||
|   void setup() { | ||||
|     when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); | ||||
| 
 | ||||
|     when(visibleAccount.getUuid()).thenReturn(VALID_UUID); | ||||
|     when(visibleAccount.getNumber()).thenReturn(VALID_NUMBER); | ||||
|     when(visibleAccount.shouldBeVisibleInDirectory()).thenReturn(true); | ||||
|     when(undiscoverableAccount.getUuid()).thenReturn(UNDISCOVERABLE_UUID); | ||||
|     when(undiscoverableAccount.getNumber()).thenReturn(UNDISCOVERABLE_NUMBER); | ||||
|     when(undiscoverableAccount.shouldBeVisibleInDirectory()).thenReturn(false); | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void testCrawlChunkValid() throws AccountDatabaseCrawlerRestartException { | ||||
| 
 | ||||
|     when(reconciliationClient.add(any())).thenReturn(successResponse); | ||||
|     when(reconciliationClient.delete(any())).thenReturn(successResponse); | ||||
| 
 | ||||
|     directoryReconciler.timeAndProcessCrawlChunk(Optional.of(VALID_UUID), | ||||
|         Arrays.asList(visibleAccount, undiscoverableAccount)); | ||||
| 
 | ||||
|     ArgumentCaptor<DirectoryReconciliationRequest> chunkRequest = ArgumentCaptor.forClass( | ||||
|         DirectoryReconciliationRequest.class); | ||||
|     verify(reconciliationClient, times(1)).add(chunkRequest.capture()); | ||||
| 
 | ||||
|     assertThat(chunkRequest.getValue().getUsers()).isEqualTo(List.of(new User(VALID_UUID, VALID_NUMBER))); | ||||
| 
 | ||||
|     ArgumentCaptor<DirectoryReconciliationRequest> deletesRequest = ArgumentCaptor.forClass( | ||||
|         DirectoryReconciliationRequest.class); | ||||
|     verify(reconciliationClient, times(1)).delete(deletesRequest.capture()); | ||||
| 
 | ||||
|     assertThat(deletesRequest.getValue().getUsers()).isEqualTo( | ||||
|         List.of(new User(UNDISCOVERABLE_UUID, UNDISCOVERABLE_NUMBER))); | ||||
| 
 | ||||
|     verifyNoMoreInteractions(reconciliationClient); | ||||
|   } | ||||
| 
 | ||||
| } | ||||
		Loading…
	
		Reference in New Issue
	
	 Jon Chambers
						Jon Chambers