From 10575d80ad29d27a67fe1e0d4a7e6110e925505f Mon Sep 17 00:00:00 2001 From: Brian Acton Date: Tue, 18 Sep 2018 11:17:12 -0700 Subject: [PATCH] Contact Discovery Service --- config/sample.yml | 19 +- pom.xml | 9 +- .../WhisperServerConfiguration.java | 5 +- .../textsecuregcm/WhisperServerService.java | 22 +- .../auth/AuthorizationToken.java | 29 -- .../auth/AuthorizationTokenGenerator.java | 90 ------ .../auth/DirectoryCredentials.java | 28 ++ .../auth/DirectoryCredentialsGenerator.java | 105 +++++++ .../DirectoryClientConfiguration.java | 42 +++ .../configuration/DirectoryConfiguration.java | 62 ++++ .../DirectoryServerConfiguration.java | 47 +++ .../configuration/SqsConfiguration.java | 48 ++++ .../controllers/AccountController.java | 5 + .../controllers/DeviceController.java | 8 + .../controllers/DirectoryController.java | 56 +++- .../DirectoryReconciliationRequest.java | 55 ++++ .../DirectoryReconciliationResponse.java | 45 +++ .../sqs/ContactDiscoveryQueueSender.java | 93 ++++++ .../textsecuregcm/storage/Accounts.java | 10 +- .../storage/DirectoryReconciler.java | 269 ++++++++++++++++++ .../storage/DirectoryReconciliationCache.java | 118 ++++++++ .../DirectoryReconciliationClient.java | 95 +++++++ .../textsecuregcm/util/Util.java | 8 + .../workers/DeleteUserCommand.java | 2 +- .../workers/DirectoryCommand.java | 2 +- src/main/resources/lua/unlock.lua | 8 + .../controllers/AccountControllerTest.java | 6 +- .../controllers/DeviceControllerTest.java | 28 +- .../controllers/DirectoryControllerTest.java | 31 +- .../storage/DirectoryReconcilerTest.java | 242 ++++++++++++++++ 30 files changed, 1439 insertions(+), 148 deletions(-) delete mode 100644 src/main/java/org/whispersystems/textsecuregcm/auth/AuthorizationToken.java delete mode 100644 src/main/java/org/whispersystems/textsecuregcm/auth/AuthorizationTokenGenerator.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/auth/DirectoryCredentials.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/auth/DirectoryCredentialsGenerator.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryClientConfiguration.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryConfiguration.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/configuration/SqsConfiguration.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/entities/DirectoryReconciliationRequest.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/entities/DirectoryReconciliationResponse.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/sqs/ContactDiscoveryQueueSender.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java create mode 100644 src/main/resources/lua/unlock.lua create mode 100644 src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java diff --git a/config/sample.yml b/config/sample.yml index 572bde117..bae920a13 100644 --- a/config/sample.yml +++ b/config/sample.yml @@ -28,9 +28,22 @@ cache: # Redis server configuration for cache cluster url: replicaUrls: -directory: # Redis server configuration for directory cluster - url: - replicaUrls: +directory: + redis: # Redis server configuration for directory cluster + url: + replicaUrls: + client: # Configuration for interfacing with Contact Discovery Service cluster + userAuthenticationTokenSharedSecret: # hex-encoded secret shared with CDS used to generate auth tokens for Signal users + userAuthenticationTokenUserIdSecret: # hex-encoded secret shared among Signal-Servers to obscure user phone numbers from CDS + sqs: + accessKey: # AWS SQS accessKey + accessSecret: # AWS SQS accessSecret + queueUrl: # AWS SQS queue url + server: + replicationUrl: # CDS replication endpoint base url + replicationPassword: # CDS replication endpoint password + replicationCaCertificate: # CDS replication endpoint TLS certificate trust root + messageCache: # Redis server configuration for message store cache url: diff --git a/pom.xml b/pom.xml index f624ca358..70c6e28cc 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.whispersystems.textsecure TextSecureServer - 1.88 + 1.89-RC2 1.3.1 @@ -68,7 +68,12 @@ com.amazonaws aws-java-sdk-s3 - 1.11.115 + 1.11.366 + + + com.amazonaws + aws-java-sdk-sqs + 1.11.362 com.google.protobuf diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index e213f1f4b..a31024cfe 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -28,6 +28,7 @@ import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration; import org.whispersystems.textsecuregcm.configuration.RedPhoneConfiguration; import org.whispersystems.textsecuregcm.configuration.RedisConfiguration; import org.whispersystems.textsecuregcm.configuration.AttachmentsConfiguration; +import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration; import org.whispersystems.textsecuregcm.configuration.TestDeviceConfiguration; import org.whispersystems.textsecuregcm.configuration.TurnConfiguration; import org.whispersystems.textsecuregcm.configuration.TwilioConfiguration; @@ -74,7 +75,7 @@ public class WhisperServerConfiguration extends Configuration { @NotNull @Valid @JsonProperty - private RedisConfiguration directory; + private DirectoryConfiguration directory; @NotNull @Valid @@ -167,7 +168,7 @@ public class WhisperServerConfiguration extends Configuration { return cache; } - public RedisConfiguration getDirectoryConfiguration() { + public DirectoryConfiguration getDirectoryConfiguration() { return directory; } diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 57a592036..a1c264d18 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -29,6 +29,7 @@ import org.whispersystems.dropwizard.simpleauth.AuthDynamicFeature; import org.whispersystems.dropwizard.simpleauth.AuthValueFactoryProvider; import org.whispersystems.dropwizard.simpleauth.BasicCredentialAuthFilter; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; +import org.whispersystems.textsecuregcm.auth.DirectoryCredentialsGenerator; import org.whispersystems.textsecuregcm.auth.FederatedPeerAuthenticator; import org.whispersystems.textsecuregcm.auth.TurnTokenGenerator; import org.whispersystems.textsecuregcm.controllers.AccountController; @@ -67,10 +68,14 @@ import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.s3.UrlSigner; import org.whispersystems.textsecuregcm.sms.SmsSender; import org.whispersystems.textsecuregcm.sms.TwilioSmsSender; +import org.whispersystems.textsecuregcm.sqs.ContactDiscoveryQueueSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Accounts; import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient; import org.whispersystems.textsecuregcm.storage.DirectoryManager; +import org.whispersystems.textsecuregcm.storage.DirectoryReconciler; +import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationCache; import org.whispersystems.textsecuregcm.storage.Keys; import org.whispersystems.textsecuregcm.storage.Messages; import org.whispersystems.textsecuregcm.storage.MessagesCache; @@ -159,7 +164,7 @@ public class WhisperServerService extends Application. + */ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; +import org.hibernate.validator.constraints.NotEmpty; + +public class DirectoryClientConfiguration { + + @NotEmpty + @JsonProperty + private String userAuthenticationTokenSharedSecret; + + @NotEmpty + @JsonProperty + private String userAuthenticationTokenUserIdSecret; + + public byte[] getUserAuthenticationTokenSharedSecret() throws DecoderException { + return Hex.decodeHex(userAuthenticationTokenSharedSecret.toCharArray()); + } + + public byte[] getUserAuthenticationTokenUserIdSecret() throws DecoderException { + return Hex.decodeHex(userAuthenticationTokenUserIdSecret.toCharArray()); + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryConfiguration.java new file mode 100644 index 000000000..119d71427 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryConfiguration.java @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.Valid; +import javax.validation.constraints.NotNull; + +public class DirectoryConfiguration { + + @JsonProperty + @NotNull + @Valid + private RedisConfiguration redis; + + @JsonProperty + @NotNull + @Valid + private SqsConfiguration sqs; + + @JsonProperty + @NotNull + @Valid + private DirectoryClientConfiguration client; + + @JsonProperty + @NotNull + @Valid + private DirectoryServerConfiguration server; + + public RedisConfiguration getRedisConfiguration() { + return redis; + } + + public SqsConfiguration getSqsConfiguration() { + return sqs; + } + + public DirectoryClientConfiguration getDirectoryClientConfiguration() { + return client; + } + + public DirectoryServerConfiguration getDirectoryServerConfiguration() { + return server; + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java new file mode 100644 index 000000000..329526f15 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.hibernate.validator.constraints.NotEmpty; + +public class DirectoryServerConfiguration { + + @NotEmpty + @JsonProperty + private String replicationUrl; + + @NotEmpty + @JsonProperty + private String replicationPassword; + + @NotEmpty + @JsonProperty + private String replicationCaCertificate; + + public String getReplicationUrl() { + return replicationUrl; + } + + public String getReplicationPassword() { + return replicationPassword; + } + + public String getReplicationCaCertificate() { + return replicationCaCertificate; + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/configuration/SqsConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/configuration/SqsConfiguration.java new file mode 100644 index 000000000..56c7ca7ab --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/configuration/SqsConfiguration.java @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.hibernate.validator.constraints.NotEmpty; + +public class SqsConfiguration { + @NotEmpty + @JsonProperty + private String accessKey; + + @NotEmpty + @JsonProperty + private String accessSecret; + + @NotEmpty + @JsonProperty + private String queueUrl; + + public String getAccessKey() { + return accessKey; + } + + public String getAccessSecret() { + return accessSecret; + } + + public String getQueueUrl() { + return queueUrl; + } +} + + diff --git a/src/main/java/org/whispersystems/textsecuregcm/controllers/AccountController.java b/src/main/java/org/whispersystems/textsecuregcm/controllers/AccountController.java index ccb7546dd..85e2fc8f3 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/controllers/AccountController.java +++ b/src/main/java/org/whispersystems/textsecuregcm/controllers/AccountController.java @@ -38,6 +38,7 @@ import org.whispersystems.textsecuregcm.entities.RegistrationLockFailure; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.sms.SmsSender; import org.whispersystems.textsecuregcm.sms.TwilioSmsSender; +import org.whispersystems.textsecuregcm.sqs.ContactDiscoveryQueueSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; @@ -81,6 +82,7 @@ public class AccountController { private final AccountsManager accounts; private final RateLimiters rateLimiters; private final SmsSender smsSender; + private final ContactDiscoveryQueueSender cdsSender; private final MessagesManager messagesManager; private final TurnTokenGenerator turnTokenGenerator; private final Map testDevices; @@ -89,6 +91,7 @@ public class AccountController { AccountsManager accounts, RateLimiters rateLimiters, SmsSender smsSenderFactory, + ContactDiscoveryQueueSender cdsSender, MessagesManager messagesManager, TurnTokenGenerator turnTokenGenerator, Map testDevices) @@ -97,6 +100,7 @@ public class AccountController { this.accounts = accounts; this.rateLimiters = rateLimiters; this.smsSender = smsSenderFactory; + this.cdsSender = cdsSender; this.messagesManager = messagesManager; this.testDevices = testDevices; this.turnTokenGenerator = turnTokenGenerator; @@ -339,6 +343,7 @@ public class AccountController { if (accounts.create(account)) { newUserMeter.mark(); } + cdsSender.addRegisteredUser(number); messagesManager.clear(number); pendingAccounts.remove(number); diff --git a/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceController.java b/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceController.java index 76299470b..c32c7528a 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceController.java +++ b/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceController.java @@ -30,6 +30,7 @@ import org.whispersystems.textsecuregcm.entities.DeviceInfo; import org.whispersystems.textsecuregcm.entities.DeviceInfoList; import org.whispersystems.textsecuregcm.entities.DeviceResponse; import org.whispersystems.textsecuregcm.limits.RateLimiters; +import org.whispersystems.textsecuregcm.sqs.ContactDiscoveryQueueSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; @@ -71,15 +72,19 @@ public class DeviceController { private final RateLimiters rateLimiters; private final Map maxDeviceConfiguration; + private final ContactDiscoveryQueueSender cdsSender; + public DeviceController(PendingDevicesManager pendingDevices, AccountsManager accounts, MessagesManager messages, + ContactDiscoveryQueueSender cdsSender, RateLimiters rateLimiters, Map maxDeviceConfiguration) { this.pendingDevices = pendingDevices; this.accounts = accounts; this.messages = messages; + this.cdsSender = cdsSender; this.rateLimiters = rateLimiters; this.maxDeviceConfiguration = maxDeviceConfiguration; } @@ -108,6 +113,9 @@ public class DeviceController { account.removeDevice(deviceId); accounts.update(account); + if (!account.isActive()) { + cdsSender.deleteRegisteredUser(account.getNumber()); + } messages.clear(account.getNumber(), deviceId); } diff --git a/src/main/java/org/whispersystems/textsecuregcm/controllers/DirectoryController.java b/src/main/java/org/whispersystems/textsecuregcm/controllers/DirectoryController.java index 696d35783..cf5582ab3 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/controllers/DirectoryController.java +++ b/src/main/java/org/whispersystems/textsecuregcm/controllers/DirectoryController.java @@ -21,8 +21,12 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.annotation.Timed; import com.google.common.base.Optional; +import org.apache.commons.codec.DecoderException; +import org.hibernate.validator.constraints.NotEmpty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.auth.DirectoryCredentialsGenerator; +import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration; import org.whispersystems.textsecuregcm.entities.ClientContact; import org.whispersystems.textsecuregcm.entities.ClientContactTokens; import org.whispersystems.textsecuregcm.entities.ClientContacts; @@ -56,14 +60,56 @@ public class DirectoryController { private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private final Histogram contactsHistogram = metricRegistry.histogram(name(getClass(), "contacts")); - private final RateLimiters rateLimiters; - private final DirectoryManager directory; + private final RateLimiters rateLimiters; + private final DirectoryManager directory; + private final DirectoryCredentialsGenerator userTokenGenerator; - public DirectoryController(RateLimiters rateLimiters, DirectoryManager directory) { - this.directory = directory; - this.rateLimiters = rateLimiters; + public DirectoryController(RateLimiters rateLimiters, + DirectoryManager directory, + DirectoryCredentialsGenerator userTokenGenerator) + { + this.directory = directory; + this.rateLimiters = rateLimiters; + this.userTokenGenerator = userTokenGenerator; } + @Timed + @GET + @Path("/auth") + @Produces(MediaType.APPLICATION_JSON) + public Response getAuthToken(@Auth Account account) { + return Response.ok().entity(userTokenGenerator.generateFor(account.getNumber())).build(); + } + + @Timed + @PUT + @Path("/feedback/ok") + public Response setFeedbackOk(@Auth Account account) { + return Response.ok().build(); + } + + @Timed + @PUT + @Path("/feedback/mismatch") + public Response setFeedbackMismatch(@Auth Account account) { + return Response.ok().build(); + } + + @Timed + @PUT + @Path("/feedback/attestation-error") + public Response setFeedbackAttestationError(@Auth Account account) { + return Response.ok().build(); + } + + @Timed + @PUT + @Path("/feedback/unexpected-error") + public Response setFeedbackUnexpectedError(@Auth Account account) { + return Response.ok().build(); + } + + @Timed @GET @Path("/{token}") diff --git a/src/main/java/org/whispersystems/textsecuregcm/entities/DirectoryReconciliationRequest.java b/src/main/java/org/whispersystems/textsecuregcm/entities/DirectoryReconciliationRequest.java new file mode 100644 index 000000000..2731347eb --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/entities/DirectoryReconciliationRequest.java @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.entities; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +public class DirectoryReconciliationRequest { + + @JsonProperty + private String fromNumber; + + @JsonProperty + private String toNumber; + + @JsonProperty + private List numbers; + + public DirectoryReconciliationRequest() { + } + + public DirectoryReconciliationRequest(String fromNumber, String toNumber, List numbers) { + this.fromNumber = fromNumber; + this.toNumber = toNumber; + this.numbers = numbers; + } + + public String getFromNumber() { + return fromNumber; + } + + public String getToNumber() { + return toNumber; + } + + public List getNumbers() { + return numbers; + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/entities/DirectoryReconciliationResponse.java b/src/main/java/org/whispersystems/textsecuregcm/entities/DirectoryReconciliationResponse.java new file mode 100644 index 000000000..144e790e9 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/entities/DirectoryReconciliationResponse.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.whispersystems.textsecuregcm.entities; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.hibernate.validator.constraints.NotEmpty; + +public class DirectoryReconciliationResponse { + + @JsonProperty + @NotEmpty + private Status status; + + public DirectoryReconciliationResponse() { + } + + public DirectoryReconciliationResponse(Status status) { + this.status = status; + } + + public Status getStatus() { + return status; + } + + public enum Status { + OK, + MISSING, + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/sqs/ContactDiscoveryQueueSender.java b/src/main/java/org/whispersystems/textsecuregcm/sqs/ContactDiscoveryQueueSender.java new file mode 100644 index 000000000..94f5f5c85 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/sqs/ContactDiscoveryQueueSender.java @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.sqs; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.MessageAttributeValue; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration; +import org.whispersystems.textsecuregcm.configuration.SqsConfiguration; +import org.whispersystems.textsecuregcm.util.Constants; + +import java.util.HashMap; +import java.util.Map; + +import static com.codahale.metrics.MetricRegistry.name; + +public class ContactDiscoveryQueueSender { + + private static final Logger logger = LoggerFactory.getLogger(ContactDiscoveryQueueSender.class); + + private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + private final Meter serviceErrorMeter = metricRegistry.meter(name(ContactDiscoveryQueueSender.class, "serviceError")); + private final Meter clientErrorMeter = metricRegistry.meter(name(ContactDiscoveryQueueSender.class, "clientError")); + + private final String queueUrl; + private final AmazonSQS sqs; + + public ContactDiscoveryQueueSender(SqsConfiguration sqsConfig) { + final AWSCredentials credentials = new BasicAWSCredentials(sqsConfig.getAccessKey(), sqsConfig.getAccessSecret()); + final AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials); + + this.queueUrl = sqsConfig.getQueueUrl(); + this.sqs = AmazonSQSClientBuilder.standard().withCredentials(credentialsProvider).build(); + } + + public void addRegisteredUser(String user) { + sendMessage("add", user); + } + + public void deleteRegisteredUser(String user) { + sendMessage("delete", user); + } + + private void sendMessage(String action, String user) { + final Map messageAttributes = new HashMap<>(); + messageAttributes.put("id", new MessageAttributeValue().withDataType("String").withStringValue(user)); + messageAttributes.put("action", new MessageAttributeValue().withDataType("String").withStringValue(action)); + SendMessageRequest sendMessageRequest = new SendMessageRequest() + .withQueueUrl(queueUrl) + .withMessageBody("-") + .withMessageDeduplicationId(user + action) + .withMessageGroupId(user) + .withMessageAttributes(messageAttributes); + try { + sqs.sendMessage(sendMessageRequest); + } catch (AmazonServiceException ex) { + serviceErrorMeter.mark(); + logger.warn("sqs service error: ", ex); + } catch (AmazonClientException ex) { + clientErrorMeter.mark(); + logger.warn("sqs client error: ", ex); + } catch (Throwable t) { + logger.warn("sqs unexpected error: ", t); + } + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java b/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java index 43a243f76..067b2887b 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java @@ -66,7 +66,7 @@ public abstract class Accounts { abstract Account get(@Bind("number") String number); @SqlQuery("SELECT COUNT(DISTINCT " + NUMBER + ") from accounts") - abstract long getCount(); + public abstract long getCount(); @Mapper(AccountMapper.class) @SqlQuery("SELECT * FROM accounts OFFSET :offset LIMIT :limit") @@ -76,6 +76,14 @@ public abstract class Accounts { @SqlQuery("SELECT * FROM accounts") public abstract Iterator getAll(); + @Mapper(AccountMapper.class) + @SqlQuery("SELECT * FROM accounts ORDER BY " + NUMBER + " LIMIT :limit") + public abstract List getAllFrom(@Bind("limit") int length); + + @Mapper(AccountMapper.class) + @SqlQuery("SELECT * FROM accounts WHERE " + NUMBER + " > :from ORDER BY " + NUMBER + " LIMIT :limit") + public abstract List getAllFrom(@Bind("from") String from, @Bind("limit") int length); + @SqlQuery("SELECT COUNT(*) FROM accounts a, json_array_elements(a.data->'devices') devices WHERE devices->>'id' = '1' AND (devices->>'gcmId') is not null AND (devices->>'lastSeen')\\:\\:bigint >= :since") public abstract int getAndroidActiveSinceCount(@Bind("since") long since); diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java new file mode 100644 index 000000000..4f4c21f2e --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java @@ -0,0 +1,269 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.storage; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; +import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import io.dropwizard.lifecycle.Managed; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.entities.ClientContact; +import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; +import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; +import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle; +import org.whispersystems.textsecuregcm.util.Constants; +import org.whispersystems.textsecuregcm.util.Hex; +import org.whispersystems.textsecuregcm.util.Util; + +import javax.ws.rs.ProcessingException; +import java.security.SecureRandom; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static com.codahale.metrics.MetricRegistry.name; + +public class DirectoryReconciler implements Managed, Runnable { + + private static final Logger logger = LoggerFactory.getLogger(DirectoryReconciler.class); + + private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + private static final Timer readChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, "readChunk")); + private static final Timer sendChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, "sendChunk")); + private static final Meter sendChunkErrorMeter = metricRegistry.meter(name(DirectoryReconciler.class, "sendChunkError")); + + private static final long WORKER_TTL_MS = 120_000L; + private static final long PERIOD = 86400_000L; + private static final long MAXIMUM_CHUNK_INTERVAL = 30_000L; + private static final long DEFAULT_CHUNK_INTERVAL = 10_000L; + private static final long MINIMUM_CHUNK_INTERVAL = 500L; + private static final long ACCELERATED_CHUNK_INTERVAL = 10L; + private static final int CHUNK_SIZE = 1000; + private static final double JITTER_MAX = 0.20; + + private final Accounts readOnlyAccounts; + private final DirectoryManager directoryManager; + private final DirectoryReconciliationClient reconciliationClient; + private final DirectoryReconciliationCache reconciliationCache; + private final String workerId; + private final SecureRandom random; + + private boolean running; + private boolean finished; + + public DirectoryReconciler(DirectoryReconciliationClient reconciliationClient, + DirectoryReconciliationCache reconciliationCache, + DirectoryManager directoryManager, + Accounts readOnlyAccounts) { + this.readOnlyAccounts = readOnlyAccounts; + this.directoryManager = directoryManager; + this.reconciliationClient = reconciliationClient; + this.reconciliationCache = reconciliationCache; + this.random = new SecureRandom(); + this.workerId = generateWorkerId(random); + } + + private static String generateWorkerId(SecureRandom random) { + byte[] workerIdBytes = new byte[16]; + random.nextBytes(workerIdBytes); + return Hex.toString(workerIdBytes); + } + + @Override + public synchronized void start() { + running = true; + new Thread(this).start(); + } + + @Override + public synchronized void stop() { + running = false; + notifyAll(); + while (!finished) { + Util.wait(this); + } + } + + @Override + public void run() { + long delayMs = DEFAULT_CHUNK_INTERVAL; + + while (sleepWhileRunning(getDelayWithJitter(delayMs))) { + try { + delayMs = DEFAULT_CHUNK_INTERVAL; + delayMs = getBoundedChunkInterval(PERIOD * CHUNK_SIZE / getAccountCount()); + delayMs = doPeriodicWork(delayMs); + } catch (Throwable t) { + logger.warn("error in directory reconciliation: ", t); + } + } + + synchronized (this) { + finished = true; + notifyAll(); + } + } + + @VisibleForTesting + public long doPeriodicWork(long intervalMs) { + long nextIntervalTimeMs = System.currentTimeMillis() + intervalMs; + + if (reconciliationCache.claimActiveWork(workerId, WORKER_TTL_MS)) { + if (processChunk()) { + if (!reconciliationCache.isAccelerated()) { + long timeUntilNextIntervalMs = getTimeUntilNextInterval(nextIntervalTimeMs); + reconciliationCache.claimActiveWork(workerId, timeUntilNextIntervalMs); + return timeUntilNextIntervalMs; + } else { + return ACCELERATED_CHUNK_INTERVAL; + } + } + } + return intervalMs; + } + + @VisibleForTesting + public long getAccountCount() { + Optional cachedCount = reconciliationCache.getCachedAccountCount(); + + if (cachedCount.isPresent()) { + return cachedCount.get(); + } + + long count = readOnlyAccounts.getCount(); + reconciliationCache.setCachedAccountCount(count); + return count; + } + + private synchronized boolean sleepWhileRunning(long delayMs) { + long startTimeMs = System.currentTimeMillis(); + while (running && delayMs > 0) { + Util.wait(this, delayMs); + + long nowMs = System.currentTimeMillis(); + delayMs -= Math.abs(nowMs - startTimeMs); + } + return running; + } + + private long getTimeUntilNextInterval(long nextIntervalTimeMs) { + long nextIntervalMs = nextIntervalTimeMs - System.currentTimeMillis(); + return getBoundedChunkInterval(nextIntervalMs); + } + + private long getBoundedChunkInterval(long intervalMs) { + return Math.max(Math.min(intervalMs, MAXIMUM_CHUNK_INTERVAL), MINIMUM_CHUNK_INTERVAL); + } + + private long getDelayWithJitter(long delayMs) { + long randomJitterMs = (long) (random.nextDouble() * JITTER_MAX * delayMs); + return delayMs + randomJitterMs; + } + + private boolean processChunk() { + Optional fromNumber = reconciliationCache.getLastNumber(); + List chunkAccounts = readChunk(fromNumber, CHUNK_SIZE); + + writeChunktoDirectoryCache(chunkAccounts); + + DirectoryReconciliationRequest request = createChunkRequest(fromNumber, chunkAccounts); + DirectoryReconciliationResponse sendChunkResponse = sendChunk(request); + + if (sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.MISSING || + request.getToNumber() == null) { + reconciliationCache.clearAccelerate(); + } + + if (sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.OK) { + reconciliationCache.setLastNumber(Optional.fromNullable(request.getToNumber())); + } else if (sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.MISSING) { + reconciliationCache.setLastNumber(Optional.absent()); + } + + return sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.OK; + } + + private List readChunk(Optional fromNumber, int chunkSize) { + try (Timer.Context timer = readChunkTimer.time()) { + Optional> chunkAccounts; + + if (fromNumber.isPresent()) { + chunkAccounts = Optional.fromNullable(readOnlyAccounts.getAllFrom(fromNumber.get(), chunkSize)); + } else { + chunkAccounts = Optional.fromNullable(readOnlyAccounts.getAllFrom(chunkSize)); + } + + return chunkAccounts.or(Collections::emptyList); + } + } + + private void writeChunktoDirectoryCache(List accounts) { + if (accounts.isEmpty()) { + return; + } + + BatchOperationHandle batchOperation = directoryManager.startBatchOperation(); + try { + for (Account account : accounts) { + if (account.isActive()) { + byte[] token = Util.getContactToken(account.getNumber()); + ClientContact clientContact = new ClientContact(token, null, account.isVoiceSupported(), account.isVideoSupported()); + + directoryManager.add(batchOperation, clientContact); + } else { + directoryManager.remove(batchOperation, account.getNumber()); + } + } + } finally { + directoryManager.stopBatchOperation(batchOperation); + } + } + + private DirectoryReconciliationRequest createChunkRequest(Optional fromNumber, List accounts) { + List numbers = accounts.stream() + .filter(Account::isActive) + .map(Account::getNumber) + .collect(Collectors.toList()); + + Optional toNumber = Optional.absent(); + if (!accounts.isEmpty()) { + toNumber = Optional.of(accounts.get(accounts.size() - 1).getNumber()); + } + + return new DirectoryReconciliationRequest(fromNumber.orNull(), toNumber.orNull(), numbers); + } + + private DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) { + try (Timer.Context timer = sendChunkTimer.time()) { + DirectoryReconciliationResponse response = reconciliationClient.sendChunk(request); + if (response.getStatus() != DirectoryReconciliationResponse.Status.OK) { + sendChunkErrorMeter.mark(); + logger.warn("reconciliation error: " + response.getStatus()); + } + return response; + } catch (ProcessingException ex) { + sendChunkErrorMeter.mark(); + logger.warn("request error: ", ex); + throw new ProcessingException(ex); + } + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java new file mode 100644 index 000000000..37cf30f60 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java @@ -0,0 +1,118 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + *

+ * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + *

+ * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + *

+ * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.storage; + +import com.google.common.base.Optional; +import org.whispersystems.textsecuregcm.redis.LuaScript; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; +import redis.clients.jedis.Jedis; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class DirectoryReconciliationCache { + + private static final String ACTIVE_WORKER_KEY = "directory_reconciliation_active_worker"; + private static final String LAST_NUMBER_KEY = "directory_reconciliation_last_number"; + private static final String CACHED_COUNT_KEY = "directory_reconciliation_cached_count"; + private static final String ACCELERATE_KEY = "directory_reconciliation_accelerate"; + + private static final long CACHED_COUNT_TTL_MS = 21600_000L; + private static final long LAST_NUMBER_TTL_MS = 86400_000L; + + private final ReplicatedJedisPool jedisPool; + private final UnlockOperation unlockOperation; + + public DirectoryReconciliationCache(ReplicatedJedisPool jedisPool) throws IOException { + this.jedisPool = jedisPool; + this.unlockOperation = new UnlockOperation(jedisPool); + } + + public void clearAccelerate() { + try (Jedis jedis = jedisPool.getWriteResource()) { + jedis.del(ACCELERATE_KEY); + } + } + + public boolean isAccelerated() { + try (Jedis jedis = jedisPool.getWriteResource()) { + return "1".equals(jedis.get(ACCELERATE_KEY)); + } + } + + public boolean claimActiveWork(String workerId, long ttlMs) { + unlockOperation.unlock(ACTIVE_WORKER_KEY, workerId); + try (Jedis jedis = jedisPool.getWriteResource()) { + return "OK".equals(jedis.set(ACTIVE_WORKER_KEY, workerId, "NX", "PX", ttlMs)); + } + } + + public Optional getLastNumber() { + try (Jedis jedis = jedisPool.getWriteResource()) { + return Optional.fromNullable(jedis.get(LAST_NUMBER_KEY)); + } + } + + public void setLastNumber(Optional lastNumber) { + try (Jedis jedis = jedisPool.getWriteResource()) { + if (lastNumber.isPresent()) { + jedis.psetex(LAST_NUMBER_KEY, LAST_NUMBER_TTL_MS, lastNumber.get()); + } else { + jedis.del(LAST_NUMBER_KEY); + } + } + } + + public Optional getCachedAccountCount() { + try (Jedis jedis = jedisPool.getWriteResource()) { + Optional cachedAccountCount = Optional.fromNullable(jedis.get(CACHED_COUNT_KEY)); + if (!cachedAccountCount.isPresent()) { + return Optional.absent(); + } + + try { + return Optional.of(Long.parseUnsignedLong(cachedAccountCount.get())); + } catch (NumberFormatException ex) { + return Optional.absent(); + } + } + } + + public void setCachedAccountCount(long accountCount) { + try (Jedis jedis = jedisPool.getWriteResource()) { + jedis.psetex(CACHED_COUNT_KEY, CACHED_COUNT_TTL_MS, Long.toString(accountCount)); + } + } + + public static class UnlockOperation { + + private final LuaScript luaScript; + + UnlockOperation(ReplicatedJedisPool jedisPool) throws IOException { + this.luaScript = LuaScript.fromResource(jedisPool, "lua/unlock.lua"); + } + + public boolean unlock(String key, String value) { + List keys = Arrays.asList(key.getBytes()); + List args = Arrays.asList(value.getBytes()); + + return ((long) luaScript.execute(keys, args)) > 0; + } + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java new file mode 100644 index 000000000..441dd4745 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + *

+ * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + *

+ * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + *

+ * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.storage; + +import org.bouncycastle.openssl.PEMReader; +import org.glassfish.jersey.SslConfigurator; +import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; +import org.whispersystems.textsecuregcm.configuration.DirectoryServerConfiguration; +import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; +import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; + +import javax.net.ssl.SSLContext; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +public class DirectoryReconciliationClient { + + private final String replicationUrl; + private final Client client; + + public DirectoryReconciliationClient(DirectoryServerConfiguration directoryServerConfiguration) + throws CertificateException + { + this.replicationUrl = directoryServerConfiguration.getReplicationUrl(); + this.client = initializeClient(directoryServerConfiguration); + } + + public DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) { + return client.target(replicationUrl) + .path("/v1/directory/reconcile") + .request(MediaType.APPLICATION_JSON_TYPE) + .put(Entity.json(request), DirectoryReconciliationResponse.class); + } + + private static Client initializeClient(DirectoryServerConfiguration directoryServerConfiguration) + throws CertificateException + { + KeyStore trustStore = initializeKeyStore(directoryServerConfiguration.getReplicationCaCertificate()); + SSLContext sslContext = SslConfigurator.newInstance() + .securityProtocol("TLSv1.2") + .trustStore(trustStore) + .createSSLContext(); + return ClientBuilder.newBuilder() + .register(HttpAuthenticationFeature.basic("signal", directoryServerConfiguration.getReplicationPassword().getBytes())) + .sslContext(sslContext) + .build(); + } + + private static KeyStore initializeKeyStore(String caCertificatePem) + throws CertificateException + { + try { + PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(caCertificatePem.getBytes()))); + X509Certificate certificate = (X509Certificate) reader.readObject(); + + if (certificate == null) { + throw new CertificateException("No certificate found in parsing!"); + } + + KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); + keyStore.load(null); + keyStore.setCertificateEntry("ca", certificate); + return keyStore; + } catch (IOException | KeyStoreException ex) { + throw new CertificateException(ex); + } catch (NoSuchAlgorithmException ex) { + throw new AssertionError(ex); + } + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/util/Util.java b/src/main/java/org/whispersystems/textsecuregcm/util/Util.java index e07794e23..849e83083 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/util/Util.java +++ b/src/main/java/org/whispersystems/textsecuregcm/util/Util.java @@ -134,6 +134,14 @@ public class Util { } } + public static void wait(Object object, long timeoutMs) { + try { + object.wait(timeoutMs); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + public static int hashCode(Object... objects) { return Arrays.hashCode(objects); } diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index 09a23ac13..6c82cbf67 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -75,7 +75,7 @@ public class DeleteUserCommand extends EnvironmentCommand())) @@ -137,6 +140,7 @@ public class AccountControllerTest { assertThat(response.getStatus()).isEqualTo(204); verify(accountsManager, times(1)).create(isA(Account.class)); + verify(cdsSender, times(1)).addRegisteredUser(eq(SENDER)); } @Test @@ -282,4 +286,4 @@ public class AccountControllerTest { -} \ No newline at end of file +} diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DeviceControllerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DeviceControllerTest.java index 73a7531cb..2aeafb7a3 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DeviceControllerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DeviceControllerTest.java @@ -29,10 +29,8 @@ import org.whispersystems.textsecuregcm.entities.DeviceResponse; import org.whispersystems.textsecuregcm.limits.RateLimiter; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.mappers.DeviceLimitExceededExceptionMapper; -import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.AccountsManager; -import org.whispersystems.textsecuregcm.storage.MessagesManager; -import org.whispersystems.textsecuregcm.storage.PendingDevicesManager; +import org.whispersystems.textsecuregcm.sqs.ContactDiscoveryQueueSender; +import org.whispersystems.textsecuregcm.storage.*; import org.whispersystems.textsecuregcm.tests.util.AuthHelper; import org.whispersystems.textsecuregcm.util.VerificationCode; @@ -55,10 +53,11 @@ public class DeviceControllerTest { public DumbVerificationDeviceController(PendingDevicesManager pendingDevices, AccountsManager accounts, MessagesManager messages, + ContactDiscoveryQueueSender cdsSender, RateLimiters rateLimiters, Map deviceConfiguration) { - super(pendingDevices, accounts, messages, rateLimiters, deviceConfiguration); + super(pendingDevices, accounts, messages, cdsSender, rateLimiters, deviceConfiguration); } @Override @@ -70,10 +69,12 @@ public class DeviceControllerTest { private PendingDevicesManager pendingDevicesManager = mock(PendingDevicesManager.class); private AccountsManager accountsManager = mock(AccountsManager.class ); private MessagesManager messagesManager = mock(MessagesManager.class); + private ContactDiscoveryQueueSender cdsSender = mock(ContactDiscoveryQueueSender.class); private RateLimiters rateLimiters = mock(RateLimiters.class ); private RateLimiter rateLimiter = mock(RateLimiter.class ); private Account account = mock(Account.class ); private Account maxedAccount = mock(Account.class); + private Device masterDevice = mock(Device.class); private Map deviceConfiguration = new HashMap() {{ @@ -88,6 +89,7 @@ public class DeviceControllerTest { .addResource(new DumbVerificationDeviceController(pendingDevicesManager, accountsManager, messagesManager, + cdsSender, rateLimiters, deviceConfiguration)) .build(); @@ -101,9 +103,13 @@ public class DeviceControllerTest { when(rateLimiters.getAllocateDeviceLimiter()).thenReturn(rateLimiter); when(rateLimiters.getVerifyDeviceLimiter()).thenReturn(rateLimiter); + when(masterDevice.getId()).thenReturn(1L); + when(account.getNextDeviceId()).thenReturn(42L); when(account.getNumber()).thenReturn(AuthHelper.VALID_NUMBER); // when(maxedAccount.getActiveDeviceCount()).thenReturn(6); + when(account.getAuthenticatedDevice()).thenReturn(Optional.of(masterDevice)); + when(account.isActive()).thenReturn(false); when(pendingDevicesManager.getCodeForNumber(AuthHelper.VALID_NUMBER)).thenReturn(Optional.of(new StoredVerificationCode("5678901", System.currentTimeMillis()))); when(pendingDevicesManager.getCodeForNumber(AuthHelper.VALID_NUMBER_TWO)).thenReturn(Optional.of(new StoredVerificationCode("1112223", System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(31)))); @@ -195,4 +201,16 @@ public class DeviceControllerTest { assertEquals(response.getStatus(), 422); verifyNoMoreInteractions(messagesManager); } + + @Test + public void removeDeviceTest() throws Exception { + Response response = resources.getJerseyTest() + .target("/v1/devices/12345") + .request() + .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD)) + .delete(); + + assertEquals(204, response.getStatus()); + verify(cdsSender).deleteRegisteredUser(eq(AuthHelper.VALID_NUMBER)); + } } diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DirectoryControllerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DirectoryControllerTest.java index e244d1e63..342d3e124 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DirectoryControllerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DirectoryControllerTest.java @@ -1,5 +1,6 @@ package org.whispersystems.textsecuregcm.tests.controllers; +import com.google.common.base.Optional; import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory; import org.junit.Before; import org.junit.Rule; @@ -7,6 +8,10 @@ import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.whispersystems.dropwizard.simpleauth.AuthValueFactoryProvider; +import org.whispersystems.textsecuregcm.auth.DirectoryCredentials; +import org.whispersystems.textsecuregcm.auth.DirectoryCredentialsGenerator; +import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration; +import org.whispersystems.textsecuregcm.configuration.DirectoryClientConfiguration; import org.whispersystems.textsecuregcm.controllers.DirectoryController; import org.whispersystems.textsecuregcm.entities.ClientContactTokens; import org.whispersystems.textsecuregcm.limits.RateLimiter; @@ -24,15 +29,19 @@ import java.util.List; import io.dropwizard.testing.junit.ResourceTestRule; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyListOf; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Matchers.anyList; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class DirectoryControllerTest { - private final RateLimiters rateLimiters = mock(RateLimiters.class ); - private final RateLimiter rateLimiter = mock(RateLimiter.class ); - private final DirectoryManager directoryManager = mock(DirectoryManager.class); + private final RateLimiters rateLimiters = mock(RateLimiters.class); + private final RateLimiter rateLimiter = mock(RateLimiter.class); + private final DirectoryManager directoryManager = mock(DirectoryManager.class); + private final DirectoryCredentialsGenerator directoryCredentialsGenerator = mock(DirectoryCredentialsGenerator.class); + + private final DirectoryCredentials validCredentials = new DirectoryCredentials("username", "password"); @Rule public final ResourceTestRule resources = ResourceTestRule.builder() @@ -40,7 +49,8 @@ public class DirectoryControllerTest { .addProvider(new AuthValueFactoryProvider.Binder()) .setTestContainerFactory(new GrizzlyWebTestContainerFactory()) .addResource(new DirectoryController(rateLimiters, - directoryManager)) + directoryManager, + directoryCredentialsGenerator)) .build(); @@ -56,6 +66,19 @@ public class DirectoryControllerTest { return response; } }); + when(directoryCredentialsGenerator.generateFor(eq(AuthHelper.VALID_NUMBER))).thenReturn(validCredentials); + } + + @Test + public void testGetAuthToken() { + DirectoryCredentials token = + resources.getJerseyTest() + .target("/v1/directory/auth") + .request() + .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD)) + .get(DirectoryCredentials.class); + assertThat(token.getUsername()).isEqualTo(validCredentials.getUsername()); + assertThat(token.getPassword()).isEqualTo(validCredentials.getPassword()); } @Test diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java new file mode 100644 index 000000000..d90d225b8 --- /dev/null +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java @@ -0,0 +1,242 @@ +package org.whispersystems.textsecuregcm.tests.storage; + +import com.google.common.base.Optional; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.whispersystems.textsecuregcm.entities.ClientContact; +import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; +import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Accounts; +import org.whispersystems.textsecuregcm.storage.DirectoryManager; +import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle; +import org.whispersystems.textsecuregcm.storage.DirectoryReconciler; +import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationCache; +import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient; +import org.whispersystems.textsecuregcm.util.Util; + +import java.util.Arrays; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class DirectoryReconcilerTest { + + private static final String VALID_NUMBER = "valid"; + private static final String INACTIVE_NUMBER = "inactive"; + + private static final long ACCOUNT_COUNT = 0L; + private static final long INTERVAL_MS = 30_000L; + + private final Account account = mock(Account.class); + private final Account inactiveAccount = mock(Account.class); + private final Accounts accounts = mock(Accounts.class); + private final BatchOperationHandle batchOperationHandle = mock(BatchOperationHandle.class); + private final DirectoryManager directoryManager = mock(DirectoryManager.class); + private final DirectoryReconciliationClient reconciliationClient = mock(DirectoryReconciliationClient.class); + private final DirectoryReconciliationCache reconciliationCache = mock(DirectoryReconciliationCache.class); + private final DirectoryReconciler directoryReconciler = new DirectoryReconciler(reconciliationClient, reconciliationCache, directoryManager, accounts); + + private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK); + private final DirectoryReconciliationResponse notFoundResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.MISSING); + + @Before + public void setup() { + when(account.getNumber()).thenReturn(VALID_NUMBER); + when(account.isActive()).thenReturn(true); + when(account.isVideoSupported()).thenReturn(true); + when(account.isVoiceSupported()).thenReturn(true); + when(inactiveAccount.getNumber()).thenReturn(INACTIVE_NUMBER); + when(inactiveAccount.isActive()).thenReturn(false); + + when(directoryManager.startBatchOperation()).thenReturn(batchOperationHandle); + + when(accounts.getAllFrom(anyInt())).thenReturn(Arrays.asList(account, inactiveAccount)); + when(accounts.getAllFrom(eq(VALID_NUMBER), anyInt())).thenReturn(Arrays.asList(inactiveAccount)); + when(accounts.getAllFrom(eq(INACTIVE_NUMBER), anyInt())).thenReturn(Collections.emptyList()); + when(accounts.getCount()).thenReturn(ACCOUNT_COUNT); + + when(reconciliationClient.sendChunk(any())).thenReturn(successResponse); + + when(reconciliationCache.getLastNumber()).thenReturn(Optional.absent()); + when(reconciliationCache.claimActiveWork(any(), anyLong())).thenReturn(true); + when(reconciliationCache.isAccelerated()).thenReturn(false); + } + + @Test + public void testGetUncachedAccountCount() { + when(reconciliationCache.getCachedAccountCount()).thenReturn(Optional.absent()); + + long accountCount = directoryReconciler.getAccountCount(); + + assertThat(accountCount).isEqualTo(ACCOUNT_COUNT); + + verify(accounts, times(1)).getCount(); + + verify(reconciliationCache, times(1)).getCachedAccountCount(); + verify(reconciliationCache, times(1)).setCachedAccountCount(eq(ACCOUNT_COUNT)); + + verifyNoMoreInteractions(directoryManager); + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(reconciliationClient); + verifyNoMoreInteractions(reconciliationCache); + } + + @Test + public void testGetCachedAccountCount() { + when(reconciliationCache.getCachedAccountCount()).thenReturn(Optional.of(ACCOUNT_COUNT)); + + long accountCount = directoryReconciler.getAccountCount(); + + assertThat(accountCount).isEqualTo(ACCOUNT_COUNT); + + verify(reconciliationCache, times(1)).getCachedAccountCount(); + + verifyNoMoreInteractions(directoryManager); + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(reconciliationClient); + verifyNoMoreInteractions(reconciliationCache); + } + + @Test + public void testValid() { + long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS); + + assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS); + + verify(accounts, times(1)).getAllFrom(anyInt()); + + ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); + verify(reconciliationClient, times(1)).sendChunk(request.capture()); + + assertThat(request.getValue().getFromNumber()).isNull(); + assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER); + assertThat(request.getValue().getNumbers()).isEqualTo(Arrays.asList(VALID_NUMBER)); + + ArgumentCaptor addedContact = ArgumentCaptor.forClass(ClientContact.class); + verify(directoryManager, times(1)).startBatchOperation(); + verify(directoryManager, times(1)).add(eq(batchOperationHandle), addedContact.capture()); + verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER)); + verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle)); + + assertThat(addedContact.getValue().getToken()).isEqualTo(Util.getContactToken(VALID_NUMBER)); + + verify(reconciliationCache, times(1)).getLastNumber(); + verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.of(INACTIVE_NUMBER))); + verify(reconciliationCache, times(1)).isAccelerated(); + verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong()); + + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(directoryManager); + verifyNoMoreInteractions(reconciliationClient); + verifyNoMoreInteractions(reconciliationCache); + } + + @Test + public void testInProgress() { + when(reconciliationCache.getLastNumber()).thenReturn(Optional.of(VALID_NUMBER)); + + long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS); + + assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS); + + verify(accounts, times(1)).getAllFrom(eq(VALID_NUMBER), anyInt()); + + ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); + verify(reconciliationClient, times(1)).sendChunk(request.capture()); + + assertThat(request.getValue().getFromNumber()).isEqualTo(VALID_NUMBER); + assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER); + assertThat(request.getValue().getNumbers()).isEqualTo(Collections.emptyList()); + + verify(directoryManager, times(1)).startBatchOperation(); + verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER)); + verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle)); + + verify(reconciliationCache, times(1)).getLastNumber(); + verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.of(INACTIVE_NUMBER))); + verify(reconciliationCache, times(1)).isAccelerated(); + verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong()); + + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(directoryManager); + verifyNoMoreInteractions(reconciliationClient); + verifyNoMoreInteractions(reconciliationCache); + } + + @Test + public void testLastChunk() { + when(reconciliationCache.getLastNumber()).thenReturn(Optional.of(INACTIVE_NUMBER)); + + long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS); + + assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS); + + verify(accounts, times(1)).getAllFrom(eq(INACTIVE_NUMBER), anyInt()); + + ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); + verify(reconciliationClient, times(1)).sendChunk(request.capture()); + + assertThat(request.getValue().getFromNumber()).isEqualTo(INACTIVE_NUMBER); + assertThat(request.getValue().getToNumber()).isNull(); + assertThat(request.getValue().getNumbers()).isEqualTo(Collections.emptyList()); + + verify(reconciliationCache, times(1)).getLastNumber(); + verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.absent())); + verify(reconciliationCache, times(1)).clearAccelerate(); + verify(reconciliationCache, times(1)).isAccelerated(); + verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong()); + + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(directoryManager); + verifyNoMoreInteractions(reconciliationClient); + verifyNoMoreInteractions(reconciliationCache); + } + + @Test + public void testNotFound() { + when(reconciliationClient.sendChunk(any())).thenReturn(notFoundResponse); + + long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS); + + assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS); + + verify(accounts, times(1)).getAllFrom(anyInt()); + + ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); + verify(reconciliationClient, times(1)).sendChunk(request.capture()); + + assertThat(request.getValue().getFromNumber()).isNull(); + assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER); + assertThat(request.getValue().getNumbers()).isEqualTo(Arrays.asList(VALID_NUMBER)); + + ArgumentCaptor addedContact = ArgumentCaptor.forClass(ClientContact.class); + verify(directoryManager, times(1)).startBatchOperation(); + verify(directoryManager, times(1)).add(eq(batchOperationHandle), addedContact.capture()); + verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER)); + verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle)); + + assertThat(addedContact.getValue().getToken()).isEqualTo(Util.getContactToken(VALID_NUMBER)); + + verify(reconciliationCache, times(1)).getLastNumber(); + verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.absent())); + verify(reconciliationCache, times(1)).clearAccelerate(); + verify(reconciliationCache, times(1)).claimActiveWork(any(), anyLong()); + + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(directoryManager); + verifyNoMoreInteractions(reconciliationClient); + verifyNoMoreInteractions(reconciliationCache); + } + +}