diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/AccountController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/AccountController.java index af35ba872..03e9f99f8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/AccountController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/AccountController.java @@ -357,7 +357,7 @@ public class AccountController { accounts.update(account); if (!wasAccountEnabled && account.isEnabled()) { - directoryQueue.addRegisteredUser(account.getNumber()); + directoryQueue.addRegisteredUser(account.getUuid(), account.getNumber()); } } @@ -373,7 +373,7 @@ public class AccountController { accounts.update(account); if (!account.isEnabled()) { - directoryQueue.deleteRegisteredUser(account.getNumber()); + directoryQueue.deleteRegisteredUser(account.getUuid(), account.getNumber()); } } @@ -393,7 +393,7 @@ public class AccountController { accounts.update(account); if (!wasAccountEnabled && account.isEnabled()) { - directoryQueue.addRegisteredUser(account.getNumber()); + directoryQueue.addRegisteredUser(account.getUuid(), account.getNumber()); } } @@ -409,7 +409,7 @@ public class AccountController { accounts.update(account); if (!account.isEnabled()) { - directoryQueue.deleteRegisteredUser(account.getNumber()); + directoryQueue.deleteRegisteredUser(account.getUuid(), account.getNumber()); } } @@ -617,9 +617,9 @@ public class AccountController { } if (account.isEnabled()) { - directoryQueue.addRegisteredUser(number); + directoryQueue.addRegisteredUser(account.getUuid(), number); } else { - directoryQueue.deleteRegisteredUser(number); + directoryQueue.deleteRegisteredUser(account.getUuid(), number); } messagesManager.clear(number); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceController.java index 807bce9a8..5746f5f45 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceController.java @@ -113,7 +113,7 @@ public class DeviceController { accounts.update(account); if (!account.isEnabled()) { - directoryQueue.deleteRegisteredUser(account.getNumber()); + directoryQueue.deleteRegisteredUser(account.getUuid(), account.getNumber()); } messages.clear(account.getNumber(), deviceId); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeysController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeysController.java index 6622f5b86..4ce3be4d1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeysController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeysController.java @@ -107,7 +107,7 @@ public class KeysController { accounts.update(account); if (!wasAccountEnabled && account.isEnabled()) { - directoryQueue.addRegisteredUser(account.getNumber()); + directoryQueue.addRegisteredUser(account.getUuid(), account.getNumber()); } } @@ -173,7 +173,7 @@ public class KeysController { accounts.update(account); if (!wasAccountEnabled && account.isEnabled()) { - directoryQueue.addRegisteredUser(account.getNumber()); + directoryQueue.addRegisteredUser(account.getUuid(), account.getNumber()); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/entities/DirectoryReconciliationRequest.java b/service/src/main/java/org/whispersystems/textsecuregcm/entities/DirectoryReconciliationRequest.java index 9eeb6d565..bea0b34d0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/entities/DirectoryReconciliationRequest.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/entities/DirectoryReconciliationRequest.java @@ -29,15 +29,19 @@ public class DirectoryReconciliationRequest { @JsonProperty private UUID toUuid; + @JsonProperty + private List uuids; + @JsonProperty private List numbers; public DirectoryReconciliationRequest() { } - public DirectoryReconciliationRequest(UUID fromUuid, UUID toUuid, List numbers) { + public DirectoryReconciliationRequest(UUID fromUuid, UUID toUuid, List uuids, List numbers) { this.fromUuid = fromUuid; this.toUuid = toUuid; + this.uuids = uuids; this.numbers = numbers; } @@ -49,6 +53,10 @@ public class DirectoryReconciliationRequest { return toUuid; } + public List getUuids() { + return uuids; + } + public List getNumbers() { return numbers; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java b/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java index bd05995df..af8b89a08 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java @@ -35,6 +35,7 @@ import org.whispersystems.textsecuregcm.util.Constants; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import static com.codahale.metrics.MetricRegistry.name; @@ -52,28 +53,29 @@ public class DirectoryQueue { public DirectoryQueue(SqsConfiguration sqsConfig) { final AWSCredentials credentials = new BasicAWSCredentials(sqsConfig.getAccessKey(), sqsConfig.getAccessSecret()); final AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials); - + this.queueUrl = sqsConfig.getQueueUrl(); this.sqs = AmazonSQSClientBuilder.standard().withRegion(sqsConfig.getRegion()).withCredentials(credentialsProvider).build(); } - public void addRegisteredUser(String user) { - sendMessage("add", user); + public void addRegisteredUser(UUID uuid, String number) { + sendMessage("add", uuid, number); } - public void deleteRegisteredUser(String user) { - sendMessage("delete", user); + public void deleteRegisteredUser(UUID uuid, String number) { + sendMessage("delete", uuid, number); } - private void sendMessage(String action, String user) { + private void sendMessage(String action, UUID uuid, String number) { final Map messageAttributes = new HashMap<>(); - messageAttributes.put("id", new MessageAttributeValue().withDataType("String").withStringValue(user)); + messageAttributes.put("id", new MessageAttributeValue().withDataType("String").withStringValue(number)); + messageAttributes.put("uuid", new MessageAttributeValue().withDataType("String").withStringValue(uuid.toString())); messageAttributes.put("action", new MessageAttributeValue().withDataType("String").withStringValue(action)); SendMessageRequest sendMessageRequest = new SendMessageRequest() .withQueueUrl(queueUrl) .withMessageBody("-") - .withMessageDeduplicationId(user + action) - .withMessageGroupId(user) + .withMessageDeduplicationId(UUID.randomUUID().toString()) + .withMessageGroupId(number) .withMessageAttributes(messageAttributes); try { sqs.sendMessage(sendMessageRequest); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java index 53cd74f74..cf68231aa 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java @@ -68,7 +68,7 @@ public class AccountCleaner implements AccountDatabaseCrawlerListener { accountUpdateCount++; accountsManager.update(account); - directoryQueue.deleteRegisteredUser(account.getNumber()); + directoryQueue.deleteRegisteredUser(account.getUuid(), account.getNumber()); } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java index f9718f02b..18b57c0e9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java @@ -30,6 +30,8 @@ import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Util; import javax.ws.rs.ProcessingException; + +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -57,7 +59,7 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener { public void onCrawlStart() { } public void onCrawlEnd(Optional fromUuid) { - DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList()); + DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList(), Collections.emptyList()); DirectoryReconciliationResponse response = sendChunk(request); } @@ -93,10 +95,14 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener { @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private DirectoryReconciliationRequest createChunkRequest(Optional fromUuid, List accounts) { - List numbers = accounts.stream() - .filter(Account::isEnabled) - .map(Account::getNumber) - .collect(Collectors.toList()); + List uuids = new ArrayList<>(accounts.size()); + List numbers = new ArrayList<>(accounts.size()); + for (Account account : accounts) { + if (account.isEnabled()) { + uuids.add(account.getUuid()); + numbers.add(account.getNumber()); + } + } Optional toUuid = Optional.empty(); @@ -104,7 +110,7 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener { toUuid = Optional.of(accounts.get(accounts.size() - 1).getUuid()); } - return new DirectoryReconciliationRequest(fromUuid.orElse(null), toUuid.orElse(null), numbers); + return new DirectoryReconciliationRequest(fromUuid.orElse(null), toUuid.orElse(null), uuids, numbers); } private DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java index c656f1c95..fd3d6ceb8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java @@ -59,7 +59,7 @@ public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener { accountsManager.update(account); if (!account.isEnabled()) { - directoryQueue.deleteRegisteredUser(account.getNumber()); + directoryQueue.deleteRegisteredUser(account.getUuid(), account.getNumber()); } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index 119c49987..e7864660a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -88,7 +88,7 @@ public class DeleteUserCommand extends EnvironmentCommand request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); verify(reconciliationClient, times(1)).sendChunk(request.capture());