queueUrls, final SqsAsyncClient sqs) {
- this.queueUrls = queueUrls;
- this.sqs = sqs;
- }
-
- @Override
- public void start() throws Exception {
- }
-
- @Override
- public void stop() throws Exception {
- synchronized (outstandingRequests) {
- while (outstandingRequests.get() > 0) {
- outstandingRequests.wait();
- }
- }
-
- sqs.close();
- }
-
- public void refreshAccount(final Account account) {
- sendUpdateMessage(account.getUuid(), account.getNumber(),
- account.shouldBeVisibleInDirectory() ? UpdateAction.ADD : UpdateAction.DELETE);
- }
-
- public void deleteAccount(final Account account) {
- sendUpdateMessage(account.getUuid(), account.getNumber(), UpdateAction.DELETE);
- }
-
- public void changePhoneNumber(final Account account, final String originalNumber, final String newNumber) {
- sendUpdateMessage(account.getUuid(), originalNumber, UpdateAction.DELETE);
- sendUpdateMessage(account.getUuid(), newNumber, account.shouldBeVisibleInDirectory() ? UpdateAction.ADD : UpdateAction.DELETE);
- }
-
- private void sendUpdateMessage(final UUID uuid, final String number, final UpdateAction action) {
- for (final String queueUrl : queueUrls) {
- final Timer.Context timerContext = sendMessageBatchTimer.time();
-
- final SendMessageRequest request = SendMessageRequest.builder()
- .queueUrl(queueUrl)
- .messageBody("-")
- .messageDeduplicationId(UUID.randomUUID().toString())
- .messageGroupId(number)
- .messageAttributes(Map.of(
- "id", MessageAttributeValue.builder().dataType("String").stringValue(number).build(),
- "uuid", MessageAttributeValue.builder().dataType("String").stringValue(uuid.toString()).build(),
- "action", action.toMessageAttributeValue()
- ))
- .build();
-
- synchronized (outstandingRequests) {
- outstandingRequests.incrementAndGet();
- }
-
- sqs.sendMessage(request).whenComplete((response, cause) -> {
- try {
- if (cause instanceof SdkServiceException) {
- serviceErrorMeter.mark();
- logger.warn("sqs service error", cause);
- } else if (cause instanceof SdkClientException) {
- clientErrorMeter.mark();
- logger.warn("sqs client error", cause);
- } else if (cause != null) {
- logger.warn("sqs unexpected error", cause);
- }
- } finally {
- synchronized (outstandingRequests) {
- outstandingRequests.decrementAndGet();
- outstandingRequests.notifyAll();
- }
-
- timerContext.close();
- }
- });
- }
- }
-}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java
index 100da674f..284c936dc 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java
@@ -17,7 +17,6 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
public class AccountDatabaseCrawlerCache {
public static final String GENERAL_PURPOSE_PREFIX = "";
- public static final String DIRECTORY_RECONCILER_PREFIX = "directory-reconciler";
public static final String ACCOUNT_CLEANER_PREFIX = "account-cleaner";
private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker";
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java
index b18fc3bb5..d120d99ad 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java
@@ -52,7 +52,6 @@ import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client;
-import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.DestinationDeviceValidator;
import org.whispersystems.textsecuregcm.util.SystemMapper;
@@ -89,7 +88,6 @@ public class AccountsManager {
private final PhoneNumberIdentifiers phoneNumberIdentifiers;
private final FaultTolerantRedisCluster cacheCluster;
private final DeletedAccountsManager deletedAccountsManager;
- private final DirectoryQueue directoryQueue;
private final Keys keys;
private final MessagesManager messagesManager;
private final ProfilesManager profilesManager;
@@ -133,7 +131,6 @@ public class AccountsManager {
final PhoneNumberIdentifiers phoneNumberIdentifiers,
final FaultTolerantRedisCluster cacheCluster,
final DeletedAccountsManager deletedAccountsManager,
- final DirectoryQueue directoryQueue,
final Keys keys,
final MessagesManager messagesManager,
final ProfilesManager profilesManager,
@@ -149,7 +146,6 @@ public class AccountsManager {
this.phoneNumberIdentifiers = phoneNumberIdentifiers;
this.cacheCluster = cacheCluster;
this.deletedAccountsManager = deletedAccountsManager;
- this.directoryQueue = directoryQueue;
this.keys = keys;
this.messagesManager = messagesManager;
this.profilesManager = profilesManager;
@@ -237,11 +233,6 @@ public class AccountsManager {
Metrics.counter(CREATE_COUNTER_NAME, tags).increment();
- if (!account.isDiscoverableByPhoneNumber()) {
- // The newly-created account has explicitly opted out of discoverability
- directoryQueue.deleteAccount(account);
- }
-
accountAttributes.recoveryPassword().ifPresent(registrationRecoveryPassword ->
registrationRecoveryPasswordsManager.storeForCurrentNumber(account.getNumber(), registrationRecoveryPassword));
});
@@ -277,7 +268,6 @@ public class AccountsManager {
if (maybeExistingAccount.isPresent()) {
delete(maybeExistingAccount.get());
- directoryQueue.deleteAccount(maybeExistingAccount.get());
displacedUuid = maybeExistingAccount.map(Account::getUuid);
} else {
displacedUuid = deletedAci;
@@ -296,7 +286,6 @@ public class AccountsManager {
AccountChangeValidator.NUMBER_CHANGE_VALIDATOR);
updatedAccount.set(numberChangedAccount);
- directoryQueue.changePhoneNumber(numberChangedAccount, originalNumber, number);
keys.delete(phoneNumberIdentifier);
keys.delete(originalPhoneNumberIdentifier);
@@ -363,7 +352,7 @@ public class AccountsManager {
/**
* Reserve a username hash so that no other accounts may take it.
- *
+ *
* The reserved hash can later be set with {@link #confirmReservedUsernameHash(Account, byte[])}. The reservation
* will eventually expire, after which point confirmReservedUsernameHash may fail if another account has taken the
* username hash.
@@ -409,7 +398,7 @@ public class AccountsManager {
}
/**
- * Set a username hash previously reserved with {@link #reserveUsernameHash(Account, List)}
+ * Set a username hash previously reserved with {@link #reserveUsernameHash(Account, List)}
*
* @param account the account to update
* @param reservedUsernameHash the previously reserved username hash
@@ -500,8 +489,6 @@ public class AccountsManager {
*/
private Account update(Account account, Function updater) {
- final boolean wasVisibleBeforeUpdate = account.shouldBeVisibleInDirectory();
-
final Account updatedAccount;
try (Timer.Context ignored = updateTimer.time()) {
@@ -519,12 +506,6 @@ public class AccountsManager {
redisSet(updatedAccount);
}
- final boolean isVisibleAfterUpdate = updatedAccount.shouldBeVisibleInDirectory();
-
- if (wasVisibleBeforeUpdate != isVisibleAfterUpdate) {
- directoryQueue.refreshAccount(updatedAccount);
- }
-
return updatedAccount;
}
@@ -653,10 +634,6 @@ public class AccountsManager {
}
}
- public Optional getNumberForPhoneNumberIdentifier(UUID pni) {
- return phoneNumberIdentifiers.getPhoneNumber(pni);
- }
-
public UUID getPhoneNumberIdentifier(String e164) {
return phoneNumberIdentifiers.getPhoneNumberIdentifier(e164);
}
@@ -673,7 +650,6 @@ public class AccountsManager {
try (final Timer.Context ignored = deleteTimer.time()) {
deletedAccountsManager.lockAndPut(account.getNumber(), () -> {
delete(account);
- directoryQueue.deleteAccount(account);
return account.getUuid();
});
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccounts.java
index 72a0cf46f..b1834688f 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccounts.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccounts.java
@@ -6,33 +6,17 @@ package org.whispersystems.textsecuregcm.storage;
import java.time.Duration;
import java.time.Instant;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Queue;
-import java.util.Set;
import java.util.UUID;
-import java.util.stream.Collectors;
import org.whispersystems.textsecuregcm.util.AttributeValues;
-import org.whispersystems.textsecuregcm.util.Pair;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
-import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
-import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest;
-import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
-import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
-import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
-import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
public class DeletedAccounts extends AbstractDynamoDbStore {
@@ -40,7 +24,6 @@ public class DeletedAccounts extends AbstractDynamoDbStore {
static final String KEY_ACCOUNT_E164 = "P";
static final String ATTR_ACCOUNT_UUID = "U";
static final String ATTR_EXPIRES = "E";
- static final String ATTR_NEEDS_CDS_RECONCILIATION = "R";
static final String UUID_TO_E164_INDEX_NAME = "u_to_p";
@@ -50,23 +33,20 @@ public class DeletedAccounts extends AbstractDynamoDbStore {
static final int GET_BATCH_SIZE = 100;
private final String tableName;
- private final String needsReconciliationIndexName;
- public DeletedAccounts(final DynamoDbClient dynamoDb, final String tableName, final String needsReconciliationIndexName) {
+ public DeletedAccounts(final DynamoDbClient dynamoDb, final String tableName) {
super(dynamoDb);
this.tableName = tableName;
- this.needsReconciliationIndexName = needsReconciliationIndexName;
}
- void put(UUID uuid, String e164, boolean needsReconciliation) {
+ void put(UUID uuid, String e164) {
db().putItem(PutItemRequest.builder()
.tableName(tableName)
.item(Map.of(
KEY_ACCOUNT_E164, AttributeValues.fromString(e164),
ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
- ATTR_EXPIRES, AttributeValues.fromLong(Instant.now().plus(TIME_TO_LIVE).getEpochSecond()),
- ATTR_NEEDS_CDS_RECONCILIATION, AttributeValues.fromInt(needsReconciliation ? 1 : 0)))
+ ATTR_EXPIRES, AttributeValues.fromLong(Instant.now().plus(TIME_TO_LIVE).getEpochSecond())))
.build());
}
@@ -108,72 +88,4 @@ public class DeletedAccounts extends AbstractDynamoDbStore {
.key(Map.of(KEY_ACCOUNT_E164, AttributeValues.fromString(e164)))
.build());
}
-
- List> listAccountsToReconcile(final int max) {
-
- final ScanRequest scanRequest = ScanRequest.builder()
- .tableName(tableName)
- .indexName(needsReconciliationIndexName)
- .limit(max)
- .build();
-
- return scan(scanRequest, max)
- .stream()
- .map(item -> new Pair<>(
- AttributeValues.getUUID(item, ATTR_ACCOUNT_UUID, null),
- AttributeValues.getString(item, KEY_ACCOUNT_E164, null)))
- .collect(Collectors.toList());
- }
-
- Set getAccountsNeedingReconciliation(final Collection e164s) {
- final Queue