diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 1d294a2a6..67b1dc6d6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -7,7 +7,15 @@ package org.whispersystems.textsecuregcm; import com.fasterxml.jackson.annotation.JsonProperty; import io.dropwizard.Configuration; import io.dropwizard.client.JerseyClientConfiguration; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; import org.whispersystems.textsecuregcm.configuration.AccountDatabaseCrawlerConfiguration; +import org.whispersystems.textsecuregcm.configuration.AccountsDatabaseConfiguration; +import org.whispersystems.textsecuregcm.configuration.AccountsDynamoDbConfiguration; import org.whispersystems.textsecuregcm.configuration.ApnConfiguration; import org.whispersystems.textsecuregcm.configuration.AppConfigConfiguration; import org.whispersystems.textsecuregcm.configuration.AwsAttachmentsConfiguration; @@ -17,7 +25,6 @@ import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration; import org.whispersystems.textsecuregcm.configuration.DynamoDbConfiguration; import org.whispersystems.textsecuregcm.configuration.GcmConfiguration; import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguration; -import org.whispersystems.textsecuregcm.configuration.AccountsDatabaseConfiguration; import org.whispersystems.textsecuregcm.configuration.MaxDeviceConfiguration; import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration; import org.whispersystems.textsecuregcm.configuration.MessageDynamoDbConfiguration; @@ -39,13 +46,6 @@ import org.whispersystems.textsecuregcm.configuration.VoiceVerificationConfigura import org.whispersystems.textsecuregcm.configuration.ZkConfig; import org.whispersystems.websocket.configuration.WebSocketConfiguration; -import javax.validation.Valid; -import javax.validation.constraints.NotNull; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - /** @noinspection MismatchedQueryAndUpdateOfCollection, WeakerAccess */ public class WhisperServerConfiguration extends Configuration { @@ -129,6 +129,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private DynamoDbConfiguration keysDynamoDb; + @Valid + @NotNull + @JsonProperty + private AccountsDynamoDbConfiguration accountsDynamoDb; + @Valid @NotNull @JsonProperty @@ -302,6 +307,10 @@ public class WhisperServerConfiguration extends Configuration { return keysDynamoDb; } + public AccountsDynamoDbConfiguration getAccountsDynamoDbConfiguration() { + return accountsDynamoDb; + } + public DatabaseConfiguration getAbuseDatabaseConfiguration() { return abuseDatabase; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 2cb99e290..9925830e6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -12,6 +12,7 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.s3.AmazonS3; @@ -141,6 +142,8 @@ import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler; import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache; import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener; import org.whispersystems.textsecuregcm.storage.Accounts; +import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb; +import org.whispersystems.textsecuregcm.storage.AccountsDynamoDbMigrator; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.ActiveUserCounter; import org.whispersystems.textsecuregcm.storage.DirectoryReconciler; @@ -276,10 +279,20 @@ public class WhisperServerService extends Application getExperimentEnrollmentConfiguration( final String experimentName) { return Optional.ofNullable(experiments.get(experimentName)); @@ -86,4 +89,8 @@ public class DynamicConfiguration { public DynamicSignupCaptchaConfiguration getSignupCaptchaConfiguration() { return signupCaptcha; } + + public DynamicAccountsDynamoDbMigrationConfiguration getAccountsDynamoDbMigrationConfiguration() { + return accountsDynamoDbMigration; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Account.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Account.java index 3860a07a3..c9aa46f71 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Account.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Account.java @@ -58,6 +58,9 @@ public class Account implements Principal { @JsonProperty("inCds") private boolean discoverableByPhoneNumber = true; + @JsonProperty("_ddbV") + private int dynamoDbMigrationVersion; + @JsonIgnore private Device authenticatedDevice; @@ -265,6 +268,14 @@ public class Account implements Principal { this.discoverableByPhoneNumber = discoverableByPhoneNumber; } + public int getDynamoDbMigrationVersion() { + return dynamoDbMigrationVersion; + } + + public void setDynamoDbMigrationVersion(int dynamoDbMigrationVersion) { + this.dynamoDbMigrationVersion = dynamoDbMigrationVersion; + } + // Principal implementation @Override diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountStore.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountStore.java new file mode 100644 index 000000000..8b9e733c2 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountStore.java @@ -0,0 +1,17 @@ +package org.whispersystems.textsecuregcm.storage; + +import java.util.Optional; +import java.util.UUID; + +public interface AccountStore { + + boolean create(Account account); + + void update(Account account); + + Optional get(String number); + + Optional get(UUID uuid); + + void delete(final UUID uuid); +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java index 3d51eb705..5ff498a2f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java @@ -4,23 +4,22 @@ */ package org.whispersystems.textsecuregcm.storage; +import static com.codahale.metrics.MetricRegistry.name; + import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; +import java.util.Optional; +import java.util.UUID; import org.jdbi.v3.core.transaction.TransactionIsolationLevel; import org.whispersystems.textsecuregcm.storage.mappers.AccountRowMapper; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.SystemMapper; -import java.util.List; -import java.util.Optional; -import java.util.UUID; - -import static com.codahale.metrics.MetricRegistry.name; - -public class Accounts { +public class Accounts implements AccountStore { public static final String ID = "id"; public static final String UID = "uuid"; @@ -46,6 +45,7 @@ public class Accounts { this.database.getDatabase().registerRowMapper(new AccountRowMapper()); } + @Override public boolean create(Account account) { return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { try (Timer.Context ignored = createTimer.time()) { @@ -65,6 +65,7 @@ public class Accounts { })); } + @Override public void update(Account account) { database.use(jdbi -> jdbi.useHandle(handle -> { try (Timer.Context ignored = updateTimer.time()) { @@ -78,6 +79,7 @@ public class Accounts { })); } + @Override public Optional get(String number) { return database.with(jdbi -> jdbi.withHandle(handle -> { try (Timer.Context ignored = getByNumberTimer.time()) { @@ -89,6 +91,7 @@ public class Accounts { })); } + @Override public Optional get(UUID uuid) { return database.with(jdbi -> jdbi.withHandle(handle -> { try (Timer.Context ignored = getByUuidTimer.time()) { @@ -123,6 +126,7 @@ public class Accounts { })); } + @Override public void delete(final UUID uuid) { database.use(jdbi -> jdbi.useHandle(handle -> { try (Timer.Context ignored = deleteTimer.time()) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java new file mode 100644 index 000000000..441c3d1ab --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java @@ -0,0 +1,262 @@ +package org.whispersystems.textsecuregcm.storage; + +import static com.codahale.metrics.MetricRegistry.name; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.document.AttributeUpdate; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.PrimaryKey; +import com.amazonaws.services.dynamodbv2.document.Table; +import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec; +import com.amazonaws.services.dynamodbv2.document.spec.UpdateItemSpec; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.CancellationReason; +import com.amazonaws.services.dynamodbv2.model.Delete; +import com.amazonaws.services.dynamodbv2.model.GetItemResult; +import com.amazonaws.services.dynamodbv2.model.Put; +import com.amazonaws.services.dynamodbv2.model.ReturnValuesOnConditionCheckFailure; +import com.amazonaws.services.dynamodbv2.model.TransactWriteItem; +import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsRequest; +import com.amazonaws.services.dynamodbv2.model.TransactionCanceledException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.whispersystems.textsecuregcm.util.SystemMapper; +import org.whispersystems.textsecuregcm.util.UUIDUtil; + +public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountStore { + + // uuid, primary key + static final String KEY_ACCOUNT_UUID = "U"; + // phone number + static final String ATTR_ACCOUNT_E164 = "P"; + // account, serialized to JSON + static final String ATTR_ACCOUNT_DATA = "D"; + + static final String ATTR_MIGRATION_VERSION = "V"; + + private final AmazonDynamoDB client; + private final Table accountsTable; + + private final String phoneNumbersTableName; + + private static final Timer CREATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "create")); + private static final Timer UPDATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "update")); + private static final Timer GET_BY_NUMBER_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getByNumber")); + private static final Timer GET_BY_UUID_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getByUuid")); + private static final Timer DELETE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "delete")); + + public AccountsDynamoDb(AmazonDynamoDB client, DynamoDB dynamoDb, String accountsTableName, String phoneNumbersTableName) { + super(dynamoDb); + + this.client = client; + this.accountsTable = dynamoDb.getTable(accountsTableName); + this.phoneNumbersTableName = phoneNumbersTableName; + } + + @Override + public boolean create(Account account) { + + return CREATE_TIMER.record(() -> { + + try { + TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid()); + + TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid()); + + final TransactWriteItemsRequest request = new TransactWriteItemsRequest() + .withTransactItems(phoneNumberConstraintPut, accountPut); + + try { + client.transactWriteItems(request); + } catch (TransactionCanceledException e) { + + final CancellationReason accountCancellationReason = e.getCancellationReasons().get(1); + + if ("ConditionalCheckFailed".equals(accountCancellationReason.getCode())) { + throw new IllegalArgumentException("uuid present with different phone number"); + } + + final CancellationReason phoneNumberConstraintCancellationReason = e.getCancellationReasons().get(0); + + if ("ConditionalCheckFailed".equals(phoneNumberConstraintCancellationReason.getCode())) { + + ByteBuffer actualAccountUuid = phoneNumberConstraintCancellationReason.getItem().get(KEY_ACCOUNT_UUID).getB(); + account.setUuid(UUIDUtil.fromByteBuffer(actualAccountUuid)); + + update(account); + + return false; + } + + // this shouldn’t happen + throw new RuntimeException("could not create account"); + } + } catch (JsonProcessingException e) { + throw new IllegalArgumentException(e); + } + + return true; + }); + } + + private TransactWriteItem buildPutWriteItemForAccount(Account account, UUID uuid) throws JsonProcessingException { + return new TransactWriteItem() + .withPut( + new Put() + .withTableName(accountsTable.getTableName()) + .withItem(Map.of( + KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)), + ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()), + ATTR_ACCOUNT_DATA, new AttributeValue() + .withB(ByteBuffer.wrap(SystemMapper.getMapper().writeValueAsBytes(account))), + ATTR_MIGRATION_VERSION, new AttributeValue().withN( + String.valueOf(account.getDynamoDbMigrationVersion())))) + .withConditionExpression("attribute_not_exists(#number) OR #number = :number") + .withExpressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164)) + .withExpressionAttributeValues(Map.of(":number", new AttributeValue(account.getNumber())))); + } + + private TransactWriteItem buildPutWriteItemForPhoneNumberConstraint(Account account, UUID uuid) { + return new TransactWriteItem() + .withPut( + new Put() + .withTableName(phoneNumbersTableName) + .withItem(Map.of( + ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()), + KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)))) + .withConditionExpression( + "attribute_not_exists(#number) OR (attribute_exists(#number) AND #uuid = :uuid)") + .withExpressionAttributeNames( + Map.of("#uuid", KEY_ACCOUNT_UUID, + "#number", ATTR_ACCOUNT_E164)) + .withExpressionAttributeValues( + Map.of(":uuid", new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)))) + .withReturnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD)); + } + + @Override + public void update(Account account) { + UPDATE_TIMER.record(() -> { + UpdateItemSpec updateItemSpec; + try { + updateItemSpec = new UpdateItemSpec() + .withPrimaryKey( + new PrimaryKey(KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(account.getUuid()))) + .withAttributeUpdate( + new AttributeUpdate(ATTR_ACCOUNT_DATA).put(SystemMapper.getMapper().writeValueAsBytes(account)), + new AttributeUpdate(ATTR_MIGRATION_VERSION).put(String.valueOf(account.getDynamoDbMigrationVersion()))); + + } catch (JsonProcessingException e) { + throw new IllegalArgumentException(e); + } + + accountsTable.updateItem(updateItemSpec); + }); + } + + @Override + public Optional get(String number) { + + return GET_BY_NUMBER_TIMER.record(() -> { + + final GetItemResult phoneNumberAndUuid = client.getItem(phoneNumbersTableName, + Map.of(ATTR_ACCOUNT_E164, new AttributeValue(number)), true); + + return Optional.ofNullable(phoneNumberAndUuid.getItem()) + .map(item -> item.get(KEY_ACCOUNT_UUID).getB()) + .map(uuid -> accountsTable.getItem(new GetItemSpec() + .withPrimaryKey(KEY_ACCOUNT_UUID, uuid.array()) + .withConsistentRead(true))) + .map(AccountsDynamoDb::fromItem); + }); + } + + @Override + public Optional get(UUID uuid) { + Optional maybeItem = GET_BY_UUID_TIMER.record(() -> + Optional.ofNullable(accountsTable.getItem(new GetItemSpec(). + withPrimaryKey(new PrimaryKey(KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(uuid))) + .withConsistentRead(true)))); + + return maybeItem.map(AccountsDynamoDb::fromItem); + } + + @Override + public void delete(UUID uuid) { + DELETE_TIMER.record(() -> { + + Optional maybeAccount = get(uuid); + + maybeAccount.ifPresent(account -> { + + TransactWriteItem phoneNumberDelete = new TransactWriteItem() + .withDelete(new Delete() + .withTableName(phoneNumbersTableName) + .withKey(Map.of(ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber())))); + + TransactWriteItem accountDelete = new TransactWriteItem().withDelete( + new Delete() + .withTableName(accountsTable.getTableName()) + .withKey(Map.of(KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid))))); + + TransactWriteItemsRequest request = new TransactWriteItemsRequest() + .withTransactItems(phoneNumberDelete, accountDelete); + + client.transactWriteItems(request); + }); + }); + } + + public boolean migrate(Account account) { + try { + TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid()); + + TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid()); + accountPut.getPut() + .setConditionExpression("attribute_not_exists(#uuid) OR (attribute_exists(#uuid) AND #version < :version)"); + accountPut.getPut() + .setExpressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID, + "#version", ATTR_MIGRATION_VERSION)); + accountPut.getPut() + .setExpressionAttributeValues( + Map.of(":version", new AttributeValue().withN(String.valueOf(account.getDynamoDbMigrationVersion())))); + + final TransactWriteItemsRequest request = new TransactWriteItemsRequest() + .withTransactItems(phoneNumberConstraintPut, accountPut); + + client.transactWriteItems(request); + + return true; + + } catch (JsonProcessingException e) { + throw new IllegalArgumentException(e); + } catch (TransactionCanceledException ignored) { + // account is already migrated + } + + return false; + } + + @VisibleForTesting + static Account fromItem(Item item) { + try { + Account account = SystemMapper.getMapper().readValue(item.getBinary(ATTR_ACCOUNT_DATA), Account.class); + + account.setNumber(item.getString(ATTR_ACCOUNT_E164)); + account.setUuid(UUIDUtil.fromByteBuffer(item.getByteBuffer(KEY_ACCOUNT_UUID))); + + return account; + + } catch (IOException e) { + throw new RuntimeException("Could not read stored account data", e); + } + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrator.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrator.java new file mode 100644 index 000000000..57bd951a5 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrator.java @@ -0,0 +1,53 @@ +package org.whispersystems.textsecuregcm.storage; + +import static com.codahale.metrics.MetricRegistry.name; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +public class AccountsDynamoDbMigrator extends AccountDatabaseCrawlerListener { + + private static final Counter MIGRATED_COUNTER = Metrics.counter(name(AccountsDynamoDbMigrator.class, "migrated")); + private static final Counter ERROR_COUNTER = Metrics.counter(name(AccountsDynamoDbMigrator.class, "error")); + + private final AccountsDynamoDb accountsDynamoDb; + private final DynamicConfigurationManager dynamicConfigurationManager; + + public AccountsDynamoDbMigrator(final AccountsDynamoDb accountsDynamoDb, final DynamicConfigurationManager dynamicConfigurationManager) { + this.accountsDynamoDb = accountsDynamoDb; + this.dynamicConfigurationManager = dynamicConfigurationManager; + } + + @Override + public void onCrawlStart() { + + } + + @Override + public void onCrawlEnd(Optional fromUuid) { + + } + + @Override + protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) { + + if (!dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isBackgroundMigrationEnabled()) { + return; + } + + for (Account account : chunkAccounts) { + try { + final boolean migrated = accountsDynamoDb.migrate(account); + if (migrated) { + MIGRATED_COUNTER.increment(); + } + } catch (final Exception e) { + + ERROR_COUNTER.increment(); + } + } + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index a4b759fb9..f5a57d27f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -14,15 +14,20 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.lettuce.core.RedisException; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; +import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; @@ -50,10 +55,14 @@ public class AccountsManager { private static final String COUNTRY_CODE_TAG_NAME = "country"; private static final String DELETION_REASON_TAG_NAME = "reason"; + private static final String DYNAMO_MIGRATION_ERROR_COUNTER = name(AccountsManager.class, "migration", "error"); + private static final Counter DYNAMO_MIGRATION_COMPARISON_COUNTER = Metrics.counter(name(AccountsManager.class, "migration", "comparisons")); + private static final Counter DYNAMO_MIGRATION_MISMATCH_COUNTER = Metrics.counter(name(AccountsManager.class, "migration", "mismatches")); private final Logger logger = LoggerFactory.getLogger(AccountsManager.class); private final Accounts accounts; + private final AccountsDynamoDb accountsDynamoDb; private final FaultTolerantRedisCluster cacheCluster; private final DirectoryQueue directoryQueue; private final KeysDynamoDb keysDynamoDb; @@ -64,6 +73,9 @@ public class AccountsManager { private final SecureBackupClient secureBackupClient; private final ObjectMapper mapper; + private final DynamicConfigurationManager dynamicConfigurationManager; + private final ExperimentEnrollmentManager experimentEnrollmentManager; + public enum DeletionReason { ADMIN_DELETED("admin"), EXPIRED ("expired"), @@ -76,11 +88,13 @@ public class AccountsManager { } } - public AccountsManager(Accounts accounts, FaultTolerantRedisCluster cacheCluster, final DirectoryQueue directoryQueue, + public AccountsManager(Accounts accounts, AccountsDynamoDb accountsDynamoDb, FaultTolerantRedisCluster cacheCluster, final DirectoryQueue directoryQueue, final KeysDynamoDb keysDynamoDb, final MessagesManager messagesManager, final UsernamesManager usernamesManager, final ProfilesManager profilesManager, final SecureStorageClient secureStorageClient, - final SecureBackupClient secureBackupClient) { + final SecureBackupClient secureBackupClient, + final ExperimentEnrollmentManager experimentEnrollmentManager, final DynamicConfigurationManager dynamicConfigurationManager) { this.accounts = accounts; + this.accountsDynamoDb = accountsDynamoDb; this.cacheCluster = cacheCluster; this.directoryQueue = directoryQueue; this.keysDynamoDb = keysDynamoDb; @@ -90,6 +104,9 @@ public class AccountsManager { this.secureStorageClient = secureStorageClient; this.secureBackupClient = secureBackupClient; this.mapper = SystemMapper.getMapper(); + + this.dynamicConfigurationManager = dynamicConfigurationManager; + this.experimentEnrollmentManager = experimentEnrollmentManager; } public boolean create(Account account) { @@ -97,14 +114,26 @@ public class AccountsManager { boolean freshUser = databaseCreate(account); redisSet(account); + if (dynamoWriteEnabled()) { + runSafelyAndRecordMetrics(() -> dynamoCreate(account), Optional.of(account.getUuid()), freshUser, + Boolean::compareTo, "create"); + } return freshUser; } } public void update(Account account) { try (Timer.Context ignored = updateTimer.time()) { + account.setDynamoDbMigrationVersion(account.getDynamoDbMigrationVersion() + 1); redisSet(account); databaseUpdate(account); + + if (dynamoWriteEnabled()) { + runSafelyAndRecordMetrics(() -> { + dynamoUpdate(account); + return true; + }, Optional.of(account.getUuid()), true, Boolean::compareTo, "update"); + } } } @@ -121,6 +150,11 @@ public class AccountsManager { if (!account.isPresent()) { account = databaseGet(number); account.ifPresent(value -> redisSet(value)); + + if (dynamoReadEnabled()) { + runSafelyAndRecordMetrics(() -> dynamoGet(number), Optional.empty(), account, this::compareAccounts, + "getByNumber"); + } } return account; @@ -134,6 +168,11 @@ public class AccountsManager { if (!account.isPresent()) { account = databaseGet(uuid); account.ifPresent(value -> redisSet(value)); + + if (dynamoReadEnabled()) { + runSafelyAndRecordMetrics(() -> dynamoGet(uuid), Optional.of(uuid), account, this::compareAccounts, + "getByUuid"); + } } return account; @@ -165,6 +204,16 @@ public class AccountsManager { redisDelete(account); databaseDelete(account); + + if (dynamoDeleteEnabled()) { + try { + dynamoDelete(account); + } catch (final Exception e) { + logger.error("Could not delete account {} from dynamo", account.getUuid().toString()); + Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER, "action", "delete"); + } + } + } catch (final Exception e) { logger.warn("Failed to delete account", e); @@ -265,4 +314,107 @@ public class AccountsManager { private void databaseDelete(final Account account) { accounts.delete(account.getUuid()); } + + private Optional dynamoGet(String number) { + return accountsDynamoDb.get(number); + } + + private Optional dynamoGet(UUID uuid) { + return accountsDynamoDb.get(uuid); + } + + private boolean dynamoCreate(Account account) { + return accountsDynamoDb.create(account); + } + + private void dynamoUpdate(Account account) { + accountsDynamoDb.update(account); + } + + private void dynamoDelete(final Account account) { + accountsDynamoDb.delete(account.getUuid()); + } + + private boolean dynamoDeleteEnabled() { + return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDeleteEnabled(); + } + + private boolean dynamoReadEnabled() { + return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isReadEnabled(); + } + + private boolean dynamoWriteEnabled() { + return dynamoDeleteEnabled() + && dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isWriteEnabled(); + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + public int compareAccounts(final Optional maybeDatabaseAccount, final Optional maybeDynamoAccount) { + + if (maybeDatabaseAccount.isEmpty() && maybeDynamoAccount.isEmpty()) { + return 0; + } + + if (maybeDatabaseAccount.isEmpty() || maybeDynamoAccount.isEmpty()) { + return 1; + } + + final Account databaseAccount = maybeDatabaseAccount.get(); + final Account dynamoAccount = maybeDynamoAccount.get(); + + final int uuidCompare = databaseAccount.getUuid().compareTo(dynamoAccount.getUuid()); + + if (uuidCompare != 0) { + return uuidCompare; + } + + final int numberCompare = databaseAccount.getNumber().compareTo(dynamoAccount.getNumber()); + + if (numberCompare != 0) { + return numberCompare; + } + + try { + final byte[] databaseSerialized = mapper.writeValueAsBytes(databaseAccount); + final byte[] dynamoSerialized = mapper.writeValueAsBytes(dynamoAccount); + + return Arrays.compare(databaseSerialized, dynamoSerialized); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private void runSafelyAndRecordMetrics(Callable callable, Optional maybeUuid, final T databaseResult, final Comparator comparator, final String action) { + + if (maybeUuid.isPresent()) { + // the only time we don’t have a UUID is in getByNumber, which is sufficiently low volume to not be a concern, and + // it will also be gated by the global readEnabled configuration + final boolean enrolled = experimentEnrollmentManager.isEnrolled(maybeUuid.get(), "accountsDynamoDbMigration"); + + if (!enrolled) { + return; + } + } + + try { + + final T dynamoResult = callable.call(); + compare(databaseResult, dynamoResult, comparator); + + } catch (final Exception e) { + logger.error("Error running " + action + " ih Dynamo", e); + + Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER, "action", action).increment(); + } + } + + private void compare(final T databaseResult, final T dynamoResult, final Comparator comparator) { + DYNAMO_MIGRATION_COMPARISON_COUNTER.increment(); + + if (comparator.compare(databaseResult, dynamoResult) != 0) { + DYNAMO_MIGRATION_MISMATCH_COUNTER.increment(); + } + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/UUIDUtil.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/UUIDUtil.java index 40bc5501f..ce368df77 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/util/UUIDUtil.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/UUIDUtil.java @@ -11,20 +11,27 @@ import java.util.UUID; public class UUIDUtil { public static byte[] toBytes(final UUID uuid) { - final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]); - byteBuffer.putLong(uuid.getMostSignificantBits()); - byteBuffer.putLong(uuid.getLeastSignificantBits()); - return byteBuffer.array(); + return toByteBuffer(uuid).array(); + } + + public static ByteBuffer toByteBuffer(final UUID uuid) { + final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]); + byteBuffer.putLong(uuid.getMostSignificantBits()); + byteBuffer.putLong(uuid.getLeastSignificantBits()); + return byteBuffer.flip(); } public static UUID fromBytes(final byte[] bytes) { - if (bytes.length != 16) { - throw new IllegalArgumentException("unexpected byte array length; was " + bytes.length + " but expected 16"); - } - - final ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - final long mostSigBits = byteBuffer.getLong(); - final long leastSigBits = byteBuffer.getLong(); - return new UUID(mostSigBits, leastSigBits); + return fromByteBuffer(ByteBuffer.wrap(bytes)); } + + public static UUID fromByteBuffer(final ByteBuffer byteBuffer) { + if (byteBuffer.array().length != 16) { + throw new IllegalArgumentException("unexpected byte array length; was " + byteBuffer.array().length + " but expected 16"); + } + + final long mostSigBits = byteBuffer.getLong(); + final long leastSigBits = byteBuffer.getLong(); + return new UUID(mostSigBits, leastSigBits); + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index 14f0b2d96..494a12dde 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -9,6 +9,7 @@ import static com.codahale.metrics.MetricRegistry.name; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -26,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialGenerator; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient; @@ -33,7 +35,9 @@ import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Accounts; +import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb; import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.KeysDynamoDb; @@ -100,9 +104,19 @@ public class DeleteUserCommand extends EnvironmentCommand account = accountsManager.get(user); if (account.isPresent()) { - accountsManager.delete(account.get(), AccountsManager.DeletionReason.ADMIN_DELETED); + accountsManager.delete(account.get(), DeletionReason.ADMIN_DELETED); logger.warn("Removed " + account.get().getNumber()); } else { logger.warn("Account not found"); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfigurationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfigurationTest.java index daf088a4e..b34d1e0cd 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfigurationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfigurationTest.java @@ -229,7 +229,7 @@ class DynamicConfigurationTest { } @Test - public void testParseTwilioConfiguration() throws JsonProcessingException { + void testParseTwilioConfiguration() throws JsonProcessingException { { final String emptyConfigYaml = "test: true"; final DynamicConfiguration emptyConfig = DynamicConfigurationManager.OBJECT_MAPPER @@ -254,7 +254,7 @@ class DynamicConfigurationTest { } @Test - public void testParsePaymentsConfiguration() throws JsonProcessingException { + void testParsePaymentsConfiguration() throws JsonProcessingException { { final String emptyConfigYaml = "test: true"; final DynamicConfiguration emptyConfig = DynamicConfigurationManager.OBJECT_MAPPER @@ -278,7 +278,7 @@ class DynamicConfigurationTest { } @Test - public void testParseSignupCaptchaConfiguration() throws JsonProcessingException { + void testParseSignupCaptchaConfiguration() throws JsonProcessingException { { final String emptyConfigYaml = "test: true"; final DynamicConfiguration emptyConfig = DynamicConfigurationManager.OBJECT_MAPPER @@ -300,4 +300,36 @@ class DynamicConfigurationTest { assertEquals(Set.of("1"), config.getCountryCodes()); } } + + @Test + void testParseAccountsDynamoDbMigrationConfiguration() throws JsonProcessingException { + { + final String emptyConfigYaml = "test: true"; + final DynamicConfiguration emptyConfig = DynamicConfigurationManager.OBJECT_MAPPER + .readValue(emptyConfigYaml, DynamicConfiguration.class); + + assertFalse(emptyConfig.getAccountsDynamoDbMigrationConfiguration().isBackgroundMigrationEnabled()); + assertFalse(emptyConfig.getAccountsDynamoDbMigrationConfiguration().isDeleteEnabled()); + assertFalse(emptyConfig.getAccountsDynamoDbMigrationConfiguration().isWriteEnabled()); + assertFalse(emptyConfig.getAccountsDynamoDbMigrationConfiguration().isReadEnabled()); + } + + { + final String accountsDynamoDbMigrationConfig = + "accountsDynamoDbMigration:\n" + + " backgroundMigrationEnabled: true\n" + + " deleteEnabled: true\n" + + " readEnabled: true\n" + + " writeEnabled: true"; + + final DynamicAccountsDynamoDbMigrationConfiguration config = DynamicConfigurationManager.OBJECT_MAPPER + .readValue(accountsDynamoDbMigrationConfig, DynamicConfiguration.class) + .getAccountsDynamoDbMigrationConfiguration(); + + assertTrue(config.isBackgroundMigrationEnabled()); + assertTrue(config.isDeleteEnabled()); + assertTrue(config.isWriteEnabled()); + assertTrue(config.isReadEnabled()); + } + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbTest.java new file mode 100644 index 000000000..8f99f5c3b --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbTest.java @@ -0,0 +1,370 @@ +/* + * Copyright 2013-2020 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.Table; +import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec; +import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import org.jdbi.v3.core.transaction.TransactionException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.entities.SignedPreKey; +import org.whispersystems.textsecuregcm.util.UUIDUtil; + +class AccountsDynamoDbTest { + + private static final String ACCOUNTS_TABLE_NAME = "accounts_test"; + private static final String NUMBERS_TABLE_NAME = "numbers_test"; + + @RegisterExtension + static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder() + .tableName(ACCOUNTS_TABLE_NAME) + .hashKey(AccountsDynamoDb.KEY_ACCOUNT_UUID) + .attributeDefinition(new AttributeDefinition(AccountsDynamoDb.KEY_ACCOUNT_UUID, ScalarAttributeType.B)) + .build(); + + private AccountsDynamoDb accountsDynamoDb; + + @BeforeEach + void setupAccountsDao() { + + CreateTableRequest createNumbersTableRequest = new CreateTableRequest() + .withTableName(NUMBERS_TABLE_NAME) + .withKeySchema(new KeySchemaElement(AccountsDynamoDb.ATTR_ACCOUNT_E164, KeyType.HASH)) + .withAttributeDefinitions(new AttributeDefinition(AccountsDynamoDb.ATTR_ACCOUNT_E164, ScalarAttributeType.S)) + .withProvisionedThroughput(DynamoDbExtension.DEFAULT_PROVISIONED_THROUGHPUT); + + final Table numbersTable = dynamoDbExtension.getDynamoDB().createTable(createNumbersTableRequest); + + this.accountsDynamoDb = new AccountsDynamoDb(dynamoDbExtension.getClient(), dynamoDbExtension.getDynamoDB(), dynamoDbExtension.getTableName(), numbersTable.getTableName()); + } + + @Test + void testStore() { + Device device = generateDevice (1 ); + Account account = generateAccount("+14151112222", UUID.randomUUID(), Collections.singleton(device)); + + accountsDynamoDb.create(account); + + verifyStoredState("+14151112222", account.getUuid(), account); + } + + @Test + void testStoreMulti() { + Set devices = new HashSet<>(); + devices.add(generateDevice(1)); + devices.add(generateDevice(2)); + + Account account = generateAccount("+14151112222", UUID.randomUUID(), devices); + + accountsDynamoDb.create(account); + + verifyStoredState("+14151112222", account.getUuid(), account); + } + + @Test + void testRetrieve() { + Set devicesFirst = new HashSet<>(); + devicesFirst.add(generateDevice(1)); + devicesFirst.add(generateDevice(2)); + + UUID uuidFirst = UUID.randomUUID(); + Account accountFirst = generateAccount("+14151112222", uuidFirst, devicesFirst); + + Set devicesSecond = new HashSet<>(); + devicesSecond.add(generateDevice(1)); + devicesSecond.add(generateDevice(2)); + + UUID uuidSecond = UUID.randomUUID(); + Account accountSecond = generateAccount("+14152221111", uuidSecond, devicesSecond); + + accountsDynamoDb.create(accountFirst); + accountsDynamoDb.create(accountSecond); + + Optional retrievedFirst = accountsDynamoDb.get("+14151112222"); + Optional retrievedSecond = accountsDynamoDb.get("+14152221111"); + + assertThat(retrievedFirst.isPresent()).isTrue(); + assertThat(retrievedSecond.isPresent()).isTrue(); + + verifyStoredState("+14151112222", uuidFirst, retrievedFirst.get(), accountFirst); + verifyStoredState("+14152221111", uuidSecond, retrievedSecond.get(), accountSecond); + + retrievedFirst = accountsDynamoDb.get(uuidFirst); + retrievedSecond = accountsDynamoDb.get(uuidSecond); + + assertThat(retrievedFirst.isPresent()).isTrue(); + assertThat(retrievedSecond.isPresent()).isTrue(); + + verifyStoredState("+14151112222", uuidFirst, retrievedFirst.get(), accountFirst); + verifyStoredState("+14152221111", uuidSecond, retrievedSecond.get(), accountSecond); + } + + @Test + void testOverwrite() { + Device device = generateDevice (1 ); + UUID firstUuid = UUID.randomUUID(); + Account account = generateAccount("+14151112222", firstUuid, Collections.singleton(device)); + + accountsDynamoDb.create(account); + + verifyStoredState("+14151112222", account.getUuid(), account); + + UUID secondUuid = UUID.randomUUID(); + + device = generateDevice(1); + account = generateAccount("+14151112222", secondUuid, Collections.singleton(device)); + + accountsDynamoDb.create(account); + verifyStoredState("+14151112222", firstUuid, account); + + device = generateDevice(1); + Account invalidAccount = generateAccount("+14151113333", firstUuid, Collections.singleton(device)); + + assertThatThrownBy(() -> accountsDynamoDb.create(invalidAccount)); + } + + @Test + void testUpdate() { + Device device = generateDevice (1 ); + Account account = generateAccount("+14151112222", UUID.randomUUID(), Collections.singleton(device)); + + accountsDynamoDb.create(account); + + device.setName("foobar"); + + accountsDynamoDb.update(account); + + Optional retrieved = accountsDynamoDb.get("+14151112222"); + + assertThat(retrieved.isPresent()).isTrue(); + verifyStoredState("+14151112222", account.getUuid(), retrieved.get(), account); + + retrieved = accountsDynamoDb.get(account.getUuid()); + + assertThat(retrieved.isPresent()).isTrue(); + verifyStoredState("+14151112222", account.getUuid(), retrieved.get(), account); + } + + @Test + void testDelete() { + final Device deletedDevice = generateDevice (1); + final Account deletedAccount = generateAccount("+14151112222", UUID.randomUUID(), Collections.singleton(deletedDevice)); + final Device retainedDevice = generateDevice (1); + final Account retainedAccount = generateAccount("+14151112345", UUID.randomUUID(), Collections.singleton(retainedDevice)); + + accountsDynamoDb.create(deletedAccount); + accountsDynamoDb.create(retainedAccount); + + assertThat(accountsDynamoDb.get(deletedAccount.getUuid())).isPresent(); + assertThat(accountsDynamoDb.get(retainedAccount.getUuid())).isPresent(); + + accountsDynamoDb.delete(deletedAccount.getUuid()); + + assertThat(accountsDynamoDb.get(deletedAccount.getUuid())).isNotPresent(); + + verifyStoredState(retainedAccount.getNumber(), retainedAccount.getUuid(), accountsDynamoDb.get(retainedAccount.getUuid()).get(), retainedAccount); + + { + final Account recreatedAccount = generateAccount(deletedAccount.getNumber(), UUID.randomUUID(), + Collections.singleton(generateDevice(1))); + + accountsDynamoDb.create(recreatedAccount); + + assertThat(accountsDynamoDb.get(recreatedAccount.getUuid())).isPresent(); + verifyStoredState(recreatedAccount.getNumber(), recreatedAccount.getUuid(), + accountsDynamoDb.get(recreatedAccount.getUuid()).get(), recreatedAccount); + } + } + + @Test + void testMissing() { + Device device = generateDevice (1 ); + Account account = generateAccount("+14151112222", UUID.randomUUID(), Collections.singleton(device)); + + accountsDynamoDb.create(account); + + Optional retrieved = accountsDynamoDb.get("+11111111"); + assertThat(retrieved.isPresent()).isFalse(); + + retrieved = accountsDynamoDb.get(UUID.randomUUID()); + assertThat(retrieved.isPresent()).isFalse(); + } + + @Test + @Disabled("Need fault tolerant dynamodb") + void testBreaker() throws InterruptedException { + + CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration(); + configuration.setWaitDurationInOpenStateInSeconds(1); + configuration.setRingBufferSizeInHalfOpenState(1); + configuration.setRingBufferSizeInClosedState(2); + configuration.setFailureRateThreshold(50); + + final AmazonDynamoDB client = mock(AmazonDynamoDB.class); + final DynamoDB dynamoDB = new DynamoDB(client); + + when(client.transactWriteItems(any())) + .thenThrow(RuntimeException.class); + + when(client.updateItem(any())) + .thenThrow(RuntimeException.class); + + AccountsDynamoDb accounts = new AccountsDynamoDb(client, dynamoDB, ACCOUNTS_TABLE_NAME, NUMBERS_TABLE_NAME); + Account account = generateAccount("+14151112222", UUID.randomUUID()); + + try { + accounts.update(account); + throw new AssertionError(); + } catch (TransactionException e) { + // good + } + + try { + accounts.update(account); + throw new AssertionError(); + } catch (TransactionException e) { + // good + } + + try { + accounts.update(account); + throw new AssertionError(); + } catch (CallNotPermittedException e) { + // good + } + + Thread.sleep(1100); + + try { + accounts.update(account); + throw new AssertionError(); + } catch (TransactionException e) { + // good + } + } + + @Test + void testMigrate() { + + Device device = generateDevice (1 ); + UUID firstUuid = UUID.randomUUID(); + Account account = generateAccount("+14151112222", firstUuid, Collections.singleton(device)); + + boolean migrated = accountsDynamoDb.migrate(account); + + assertThat(migrated).isTrue(); + + verifyStoredState("+14151112222", account.getUuid(), account); + + migrated = accountsDynamoDb.migrate(account); + + assertThat(migrated).isFalse(); + + verifyStoredState("+14151112222", account.getUuid(), account); + + UUID secondUuid = UUID.randomUUID(); + + device = generateDevice(1); + Account accountRemigrationWithDifferentUuid = generateAccount("+14151112222", secondUuid, Collections.singleton(device)); + + migrated = accountsDynamoDb.migrate(account); + + assertThat(migrated).isFalse(); + verifyStoredState("+14151112222", firstUuid, account); + + + account.setDynamoDbMigrationVersion(account.getDynamoDbMigrationVersion() + 1); + + migrated = accountsDynamoDb.migrate(account); + + assertThat(migrated).isTrue(); + } + + private Device generateDevice(long id) { + Random random = new Random(System.currentTimeMillis()); + SignedPreKey signedPreKey = new SignedPreKey(random.nextInt(), "testPublicKey-" + random.nextInt(), "testSignature-" + random.nextInt()); + return new Device(id, "testName-" + random.nextInt(), "testAuthToken-" + random.nextInt(), "testSalt-" + random.nextInt(), + "testGcmId-" + random.nextInt(), "testApnId-" + random.nextInt(), "testVoipApnId-" + random.nextInt(), random.nextBoolean(), random.nextInt(), signedPreKey, random.nextInt(), random.nextInt(), "testUserAgent-" + random.nextInt() , 0, new Device.DeviceCapabilities(random.nextBoolean(), random.nextBoolean(), random.nextBoolean(), random.nextBoolean(), random.nextBoolean(), random.nextBoolean())); + } + + private Account generateAccount(String number, UUID uuid) { + Device device = generateDevice(1); + return generateAccount(number, uuid, Collections.singleton(device)); + } + + private Account generateAccount(String number, UUID uuid, Set devices) { + byte[] unidentifiedAccessKey = new byte[16]; + Random random = new Random(System.currentTimeMillis()); + Arrays.fill(unidentifiedAccessKey, (byte)random.nextInt(255)); + + return new Account(number, uuid, devices, unidentifiedAccessKey); + } + + private void verifyStoredState(String number, UUID uuid, Account expecting) { + final Table accounts = dynamoDbExtension.getDynamoDB().getTable(dynamoDbExtension.getTableName()); + + Item item = accounts.getItem(new GetItemSpec() + .withPrimaryKey(AccountsDynamoDb.KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(uuid)) + .withConsistentRead(true)); + + if (item != null) { + String data = new String(item.getBinary(AccountsDynamoDb.ATTR_ACCOUNT_DATA), StandardCharsets.UTF_8); + assertThat(data).isNotEmpty(); + + Account result = AccountsDynamoDb.fromItem(item); + verifyStoredState(number, uuid, result, expecting); + } else { + throw new AssertionError("No data"); + } + } + + private void verifyStoredState(String number, UUID uuid, Account result, Account expecting) { + assertThat(result.getNumber()).isEqualTo(number); + assertThat(result.getLastSeen()).isEqualTo(expecting.getLastSeen()); + assertThat(result.getUuid()).isEqualTo(uuid); + assertThat(Arrays.equals(result.getUnidentifiedAccessKey().get(), expecting.getUnidentifiedAccessKey().get())).isTrue(); + + for (Device expectingDevice : expecting.getDevices()) { + Device resultDevice = result.getDevice(expectingDevice.getId()).get(); + assertThat(resultDevice.getApnId()).isEqualTo(expectingDevice.getApnId()); + assertThat(resultDevice.getGcmId()).isEqualTo(expectingDevice.getGcmId()); + assertThat(resultDevice.getLastSeen()).isEqualTo(expectingDevice.getLastSeen()); + assertThat(resultDevice.getSignedPreKey().getPublicKey()).isEqualTo(expectingDevice.getSignedPreKey().getPublicKey()); + assertThat(resultDevice.getSignedPreKey().getKeyId()).isEqualTo(expectingDevice.getSignedPreKey().getKeyId()); + assertThat(resultDevice.getSignedPreKey().getSignature()).isEqualTo(expectingDevice.getSignedPreKey().getSignature()); + assertThat(resultDevice.getFetchesMessages()).isEqualTo(expectingDevice.getFetchesMessages()); + assertThat(resultDevice.getUserAgent()).isEqualTo(expectingDevice.getUserAgent()); + assertThat(resultDevice.getName()).isEqualTo(expectingDevice.getName()); + assertThat(resultDevice.getCreated()).isEqualTo(expectingDevice.getCreated()); + } + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java new file mode 100644 index 000000000..97b2b8fa2 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java @@ -0,0 +1,184 @@ +package org.whispersystems.textsecuregcm.storage; + +import com.almworks.sqlite4java.SQLite; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.local.main.ServerRunner; +import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer; +import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback { + + static final String DEFAULT_TABLE_NAME = "test_table"; + + static final ProvisionedThroughput DEFAULT_PROVISIONED_THROUGHPUT = new ProvisionedThroughput(20L, 20L); + + private DynamoDBProxyServer server; + private int port; + + private final String tableName; + private final String hashKeyName; + private final String rangeKeyName; + + private final List attributeDefinitions; + private final List globalSecondaryIndexes; + + private final long readCapacityUnits; + private final long writeCapacityUnits; + + private AmazonDynamoDB client; + private DynamoDB dynamoDB; + + private DynamoDbExtension(String tableName, String hashKey, String rangeKey, List attributeDefinitions, List globalSecondaryIndexes, long readCapacityUnits, + long writeCapacityUnits) { + + this.tableName = tableName; + this.hashKeyName = hashKey; + this.rangeKeyName = rangeKey; + + this.readCapacityUnits = readCapacityUnits; + this.writeCapacityUnits = writeCapacityUnits; + + this.attributeDefinitions = attributeDefinitions; + this.globalSecondaryIndexes = globalSecondaryIndexes; + } + + public static DynamoDbExtensionBuilder builder() { + return new DynamoDbExtensionBuilder(); + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + try { + server.stop(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + + startServer(); + + initializeClient(); + + createTable(); + } + + private void createTable() { + KeySchemaElement[] keySchemaElements; + if (rangeKeyName == null) { + keySchemaElements = new KeySchemaElement[] { + new KeySchemaElement(hashKeyName, "HASH"), + }; + } else { + keySchemaElements = new KeySchemaElement[] { + new KeySchemaElement(hashKeyName, "HASH"), + new KeySchemaElement(rangeKeyName, "RANGE") + }; + } + + final CreateTableRequest createTableRequest = new CreateTableRequest() + .withTableName(tableName) + .withKeySchema(keySchemaElements) + .withAttributeDefinitions(attributeDefinitions.isEmpty() ? null : attributeDefinitions) + .withGlobalSecondaryIndexes(globalSecondaryIndexes.isEmpty() ? null : globalSecondaryIndexes) + .withProvisionedThroughput(new ProvisionedThroughput(readCapacityUnits, writeCapacityUnits)); + + getDynamoDB().createTable(createTableRequest); + } + + private void startServer() throws Exception { + SQLite.setLibraryPath("target/lib"); // if you see a library failed to load error, you need to run mvn test-compile at least once first + ServerSocket serverSocket = new ServerSocket(0); + serverSocket.setReuseAddress(false); + port = serverSocket.getLocalPort(); + serverSocket.close(); + server = ServerRunner.createServerFromCommandLineArgs(new String[]{"-inMemory", "-port", String.valueOf(port)}); + server.start(); + } + + private void initializeClient() { + client = AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration("http://localhost:" + port, "local-test-region")) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) + .build(); + + dynamoDB = new DynamoDB(client); + } + + static class DynamoDbExtensionBuilder { + private String tableName = DEFAULT_TABLE_NAME; + + private String hashKey; + private String rangeKey; + + private List attributeDefinitions = new ArrayList<>(); + private List globalSecondaryIndexes = new ArrayList<>(); + + private long readCapacityUnits = DEFAULT_PROVISIONED_THROUGHPUT.getReadCapacityUnits(); + private long writeCapacityUnits = DEFAULT_PROVISIONED_THROUGHPUT.getWriteCapacityUnits(); + + private DynamoDbExtensionBuilder() { + + } + + DynamoDbExtensionBuilder tableName(String databaseName) { + this.tableName = databaseName; + return this; + } + + DynamoDbExtensionBuilder hashKey(String hashKey) { + this.hashKey = hashKey; + return this; + } + + DynamoDbExtensionBuilder rangeKey(String rangeKey) { + this.rangeKey = rangeKey; + return this; + } + + DynamoDbExtensionBuilder attributeDefinition(AttributeDefinition attributeDefinition) { + attributeDefinitions.add(attributeDefinition); + return this; + } + + public DynamoDbExtensionBuilder globalSecondaryIndex(GlobalSecondaryIndex index) { + globalSecondaryIndexes.add(index); + return this; + } + + DynamoDbExtension build() { + return new DynamoDbExtension(tableName, hashKey, rangeKey, + attributeDefinitions, globalSecondaryIndexes, readCapacityUnits, writeCapacityUnits); + } + } + + public AmazonDynamoDB getClient() { + return client; + } + + public DynamoDB getDynamoDB() { + return dynamoDB; + } + + public String getTableName() { + return tableName; + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java index 214a2c597..ae9cb5fad 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java @@ -5,44 +5,67 @@ package org.whispersystems.textsecuregcm.tests.storage; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + import io.lettuce.core.RedisException; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; -import org.junit.Test; +import java.util.HashSet; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Accounts; +import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb; import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.KeysDynamoDb; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.ProfilesManager; import org.whispersystems.textsecuregcm.storage.UsernamesManager; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; -import java.util.HashSet; -import java.util.Optional; -import java.util.UUID; +class AccountsManagerTest { -import static junit.framework.TestCase.assertSame; -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; + private DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class); + private ExperimentEnrollmentManager experimentEnrollmentManager = mock(ExperimentEnrollmentManager.class); -public class AccountsManagerTest { + @BeforeEach + void setup() { - @Test - public void testGetAccountByNumberInCache() { + DynamicConfiguration dynamicConfiguration = new DynamicConfiguration(); + + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testGetAccountByNumberInCache(final boolean dynamoEnabled) { RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); + AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -53,10 +76,12 @@ public class AccountsManagerTest { UUID uuid = UUID.randomUUID(); + enableDynamo(dynamoEnabled); + when(commands.get(eq("AccountMap::+14152222222"))).thenReturn(uuid.toString()); when(commands.get(eq("Account3::" + uuid.toString()))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}"); - AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient); + AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); Optional account = accountsManager.get("+14152222222"); assertTrue(account.isPresent()); @@ -67,13 +92,30 @@ public class AccountsManagerTest { verify(commands, times(1)).get(eq("Account3::" + uuid.toString())); verifyNoMoreInteractions(commands); verifyNoMoreInteractions(accounts); + + verifyZeroInteractions(accountsDynamoDb); } - @Test - public void testGetAccountByUuidInCache() { + private void enableDynamo(boolean dynamoEnabled) { + final DynamicAccountsDynamoDbMigrationConfiguration config = dynamicConfigurationManager.getConfiguration() + .getAccountsDynamoDbMigrationConfiguration(); + + config.setDeleteEnabled(dynamoEnabled); + config.setReadEnabled(dynamoEnabled); + config.setWriteEnabled(dynamoEnabled); + + when(experimentEnrollmentManager.isEnrolled(any(UUID.class), anyString())) + .thenReturn(dynamoEnabled); + + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testGetAccountByUuidInCache(boolean dynamoEnabled) { RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); + AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -84,9 +126,11 @@ public class AccountsManagerTest { UUID uuid = UUID.randomUUID(); + enableDynamo(dynamoEnabled); + when(commands.get(eq("Account3::" + uuid.toString()))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}"); - AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient); + AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); Optional account = accountsManager.get(uuid); assertTrue(account.isPresent()); @@ -97,14 +141,18 @@ public class AccountsManagerTest { verify(commands, times(1)).get(eq("Account3::" + uuid.toString())); verifyNoMoreInteractions(commands); verifyNoMoreInteractions(accounts); + + verifyZeroInteractions(accountsDynamoDb); } - @Test - public void testGetAccountByNumberNotInCache() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testGetAccountByNumberNotInCache(boolean dynamoEnabled) { RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); + AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -115,10 +163,12 @@ public class AccountsManagerTest { UUID uuid = UUID.randomUUID(); Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); + enableDynamo(dynamoEnabled); + when(commands.get(eq("AccountMap::+14152222222"))).thenReturn(null); when(accounts.get(eq("+14152222222"))).thenReturn(Optional.of(account)); - AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient); + AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); Optional retrieved = accountsManager.get("+14152222222"); assertTrue(retrieved.isPresent()); @@ -131,13 +181,19 @@ public class AccountsManagerTest { verify(accounts, times(1)).get(eq("+14152222222")); verifyNoMoreInteractions(accounts); + + verify(accountsDynamoDb, dynamoEnabled ? times(1) : never()) + .get(eq("+14152222222")); + verifyNoMoreInteractions(accountsDynamoDb); } - @Test - public void testGetAccountByUuidNotInCache() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testGetAccountByUuidNotInCache(boolean dynamoEnabled) { RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); + AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -148,10 +204,12 @@ public class AccountsManagerTest { UUID uuid = UUID.randomUUID(); Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); + enableDynamo(dynamoEnabled); + when(commands.get(eq("Account3::" + uuid))).thenReturn(null); when(accounts.get(eq(uuid))).thenReturn(Optional.of(account)); - AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient); + AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); Optional retrieved = accountsManager.get(uuid); assertTrue(retrieved.isPresent()); @@ -164,13 +222,18 @@ public class AccountsManagerTest { verify(accounts, times(1)).get(eq(uuid)); verifyNoMoreInteractions(accounts); + + verify(accountsDynamoDb, dynamoEnabled ? times(1) : never()).get(eq(uuid)); + verifyNoMoreInteractions(accountsDynamoDb); } - @Test - public void testGetAccountByNumberBrokenCache() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testGetAccountByNumberBrokenCache(boolean dynamoEnabled) { RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); + AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -181,10 +244,12 @@ public class AccountsManagerTest { UUID uuid = UUID.randomUUID(); Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); + enableDynamo(dynamoEnabled); + when(commands.get(eq("AccountMap::+14152222222"))).thenThrow(new RedisException("Connection lost!")); when(accounts.get(eq("+14152222222"))).thenReturn(Optional.of(account)); - AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient); + AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); Optional retrieved = accountsManager.get("+14152222222"); assertTrue(retrieved.isPresent()); @@ -197,13 +262,18 @@ public class AccountsManagerTest { verify(accounts, times(1)).get(eq("+14152222222")); verifyNoMoreInteractions(accounts); + + verify(accountsDynamoDb, dynamoEnabled ? times(1) : never()).get(eq("+14152222222")); + verifyNoMoreInteractions(accountsDynamoDb); } - @Test - public void testGetAccountByUuidBrokenCache() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testGetAccountByUuidBrokenCache(boolean dynamoEnabled) { RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Accounts accounts = mock(Accounts.class); + AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); DirectoryQueue directoryQueue = mock(DirectoryQueue.class); KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); MessagesManager messagesManager = mock(MessagesManager.class); @@ -214,10 +284,12 @@ public class AccountsManagerTest { UUID uuid = UUID.randomUUID(); Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); + enableDynamo(dynamoEnabled); + when(commands.get(eq("Account3::" + uuid))).thenThrow(new RedisException("Connection lost!")); when(accounts.get(eq(uuid))).thenReturn(Optional.of(account)); - AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient); + AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); Optional retrieved = accountsManager.get(uuid); assertTrue(retrieved.isPresent()); @@ -230,7 +302,77 @@ public class AccountsManagerTest { verify(accounts, times(1)).get(eq(uuid)); verifyNoMoreInteractions(accounts); + + verify(accountsDynamoDb, dynamoEnabled ? times(1) : never()).get(eq(uuid)); + verifyNoMoreInteractions(accountsDynamoDb); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testUpdate_dynamoDbMigration(boolean dynamoEnabled) { + RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); + FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); + Accounts accounts = mock(Accounts.class); + AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); + DirectoryQueue directoryQueue = mock(DirectoryQueue.class); + KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); + MessagesManager messagesManager = mock(MessagesManager.class); + UsernamesManager usernamesManager = mock(UsernamesManager.class); + ProfilesManager profilesManager = mock(ProfilesManager.class); + SecureBackupClient secureBackupClient = mock(SecureBackupClient.class); + SecureStorageClient secureStorageClient = mock(SecureStorageClient.class); + UUID uuid = UUID.randomUUID(); + Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); + + enableDynamo(dynamoEnabled); + + when(commands.get(eq("Account3::" + uuid))).thenReturn(null); + + AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); + + assertEquals(0, account.getDynamoDbMigrationVersion()); + + accountsManager.update(account); + + assertEquals(1, account.getDynamoDbMigrationVersion()); + + verify(accounts, times(1)).update(account); + verifyNoMoreInteractions(accounts); + + verify(accountsDynamoDb, dynamoEnabled ? times(1) : never()).update(account); + verifyNoMoreInteractions(accountsDynamoDb); } + @Test + void testCompareAccounts() { + RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); + FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); + Accounts accounts = mock(Accounts.class); + AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class); + DirectoryQueue directoryQueue = mock(DirectoryQueue.class); + KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class); + MessagesManager messagesManager = mock(MessagesManager.class); + UsernamesManager usernamesManager = mock(UsernamesManager.class); + ProfilesManager profilesManager = mock(ProfilesManager.class); + SecureBackupClient secureBackupClient = mock(SecureBackupClient.class); + SecureStorageClient secureStorageClient = mock(SecureStorageClient.class); + + AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager); + + assertEquals(0, accountsManager.compareAccounts(Optional.empty(), Optional.empty())); + + final UUID uuidA = UUID.randomUUID(); + final Account a1 = new Account("+14152222222", uuidA, new HashSet<>(), new byte[16]); + + assertEquals(1, accountsManager.compareAccounts(Optional.empty(), Optional.of(a1))); + + final Account a2 = new Account("+14152222222", uuidA, new HashSet<>(), new byte[16]); + + assertEquals(0, accountsManager.compareAccounts(Optional.of(a1), Optional.of(a2))); + + a2.setProfileName("name"); + + assertTrue(0 < accountsManager.compareAccounts(Optional.of(a1), Optional.of(a2))); + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsTest.java index 0a6924a52..8e4fb94fb 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsTest.java @@ -5,25 +5,17 @@ package org.whispersystems.textsecuregcm.tests.storage; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + import com.fasterxml.uuid.UUIDComparator; import com.opentable.db.postgres.embedded.LiquibasePreparer; import com.opentable.db.postgres.junit.EmbeddedPostgresRules; import com.opentable.db.postgres.junit.PreparedDbRule; import io.github.resilience4j.circuitbreaker.CallNotPermittedException; -import org.jdbi.v3.core.HandleConsumer; -import org.jdbi.v3.core.Jdbi; -import org.jdbi.v3.core.transaction.TransactionException; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.entities.SignedPreKey; -import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.Accounts; -import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; -import org.whispersystems.textsecuregcm.storage.mappers.AccountRowMapper; - import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -37,11 +29,19 @@ import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.UUID; - -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; +import org.jdbi.v3.core.HandleConsumer; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.transaction.TransactionException; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.entities.SignedPreKey; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Accounts; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; +import org.whispersystems.textsecuregcm.storage.mappers.AccountRowMapper; public class AccountsTest { @@ -140,6 +140,11 @@ public class AccountsTest { accounts.create(account); verifyStoredState(statement, "+14151112222", firstUuid, account); + + device = generateDevice(1); + Account invalidAccount = generateAccount("+14151113333", firstUuid, Collections.singleton(device)); + + assertThatThrownBy(() -> accounts.create(invalidAccount)); } @Test @@ -211,6 +216,17 @@ public class AccountsTest { assertThat(accounts.get(deletedAccount.getUuid())).isNotPresent(); verifyStoredState(retainedAccount.getNumber(), retainedAccount.getUuid(), accounts.get(retainedAccount.getUuid()).get(), retainedAccount); + + { + final Account recreatedAccount = generateAccount(deletedAccount.getNumber(), UUID.randomUUID(), + Collections.singleton(generateDevice(1))); + + accounts.create(recreatedAccount); + + assertThat(accounts.get(recreatedAccount.getUuid())).isPresent(); + verifyStoredState(recreatedAccount.getNumber(), recreatedAccount.getUuid(), + accounts.get(recreatedAccount.getUuid()).get(), recreatedAccount); + } } @Test