From 8161f55a82bf6b5d61b54793837b9991fcf18f4b Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Wed, 15 Sep 2021 18:20:04 -0700 Subject: [PATCH] Add dynamic configuration for setting Dynamo as primary --- ...ccountsDynamoDbMigrationConfiguration.java | 7 + .../storage/AccountsManager.java | 230 ++++++++++++------ ...namoDbMigrationCrawlerIntegrationTest.java | 48 ---- .../tests/storage/AccountsManagerTest.java | 2 +- 4 files changed, 161 insertions(+), 126 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicAccountsDynamoDbMigrationConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicAccountsDynamoDbMigrationConfiguration.java index 2729d0a1b..fe630ddb1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicAccountsDynamoDbMigrationConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicAccountsDynamoDbMigrationConfiguration.java @@ -5,6 +5,9 @@ import com.google.common.annotations.VisibleForTesting; public class DynamicAccountsDynamoDbMigrationConfiguration { + @JsonProperty + boolean dynamoPrimary; + @JsonProperty boolean backgroundMigrationEnabled; @@ -35,6 +38,10 @@ public class DynamicAccountsDynamoDbMigrationConfiguration { @JsonProperty int dynamoCrawlerScanPageSize = 10; + public boolean isDynamoPrimary() { + return dynamoPrimary; + } + public boolean isBackgroundMigrationEnabled() { return backgroundMigrationEnabled; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index f9c4498dd..3b030fc12 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -170,38 +170,29 @@ public class AccountsManager { final UUID originalUuid = account.getUuid(); - boolean freshUser = databaseCreate(account); + boolean freshUser = primaryCreate(account); - // databaseCreate() sometimes updates the UUID, if there was a number conflict. - // for metrics, we want dynamo to run with the same original UUID + // create() sometimes updates the UUID, if there was a number conflict. + // for metrics, we want secondary to run with the same original UUID final UUID actualUuid = account.getUuid(); try { - if (dynamoWriteEnabled()) { + if (secondaryWriteEnabled()) { account.setUuid(originalUuid); - runSafelyAndRecordMetrics(() -> dynamoCreate(account), Optional.of(account.getUuid()), freshUser, - (databaseResult, dynamoResult) -> { + runSafelyAndRecordMetrics(() -> secondaryCreate(account), Optional.of(account.getUuid()), freshUser, + (primaryResult, secondaryResult) -> { - if (!account.getUuid().equals(actualUuid)) { - // This is expected towards the beginning of the background migration, as Dynamo won’t - // have many accounts available for re-registration - logger.warn("dynamoCreate() did not return correct UUID"); - accountsDynamoDb.deleteInvalidMigration(account.getUuid()); - return Optional.of("dynamoIncorrectUUID"); - - } - - if (databaseResult.equals(dynamoResult)) { + if (primaryResult.equals(secondaryResult)) { return Optional.empty(); } - if (dynamoResult) { - return Optional.of("dynamoFreshUser"); + if (secondaryResult) { + return Optional.of("secondaryFreshUser"); } - return Optional.of("dbFreshUser"); + return Optional.of("primaryFreshUser"); }, "create"); } @@ -302,14 +293,17 @@ public class AccountsManager { final UUID uuid = account.getUuid(); - updatedAccount = updateWithRetries(account, updater, this::databaseUpdate, () -> databaseGet(uuid).get()); + updatedAccount = updateWithRetries(account, updater, this::primaryUpdate, () -> primaryGet(uuid).get()); - if (dynamoWriteEnabled()) { - runSafelyAndRecordMetrics(() -> dynamoGet(uuid).map(dynamoAccount -> { + if (secondaryWriteEnabled()) { + runSafelyAndRecordMetrics(() -> secondaryGet(uuid).map(secondaryAccount -> { try { - return updateWithRetries(dynamoAccount, updater, this::dynamoUpdate, () -> dynamoGet(uuid).get()); + return updateWithRetries(secondaryAccount, updater, this::secondaryUpdate, () -> secondaryGet(uuid).get()); } catch (final OptimisticLockRetryLimitExceededException e) { - accountsDynamoDb.putUuidForMigrationRetry(uuid); + if (!dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration() + .isDynamoPrimary()) { + accountsDynamoDb.putUuidForMigrationRetry(uuid); + } throw e; } @@ -385,11 +379,11 @@ public class AccountsManager { Optional account = redisGet(number); if (!account.isPresent()) { - account = databaseGet(number); + account = primaryGet(number); account.ifPresent(value -> redisSet(value)); - if (dynamoReadEnabled()) { - runSafelyAndRecordMetrics(() -> dynamoGet(number), Optional.empty(), account, this::compareAccounts, + if (secondaryReadEnabled()) { + runSafelyAndRecordMetrics(() -> secondaryGet(number), Optional.empty(), account, this::compareAccounts, "getByNumber"); } } @@ -403,11 +397,11 @@ public class AccountsManager { Optional account = redisGet(uuid); if (!account.isPresent()) { - account = databaseGet(uuid); + account = primaryGet(uuid); account.ifPresent(value -> redisSet(value)); - if (dynamoReadEnabled()) { - runSafelyAndRecordMetrics(() -> dynamoGet(uuid), Optional.of(uuid), account, this::compareAccounts, + if (secondaryReadEnabled()) { + runSafelyAndRecordMetrics(() -> secondaryGet(uuid), Optional.of(uuid), account, this::compareAccounts, "getByUuid"); } } @@ -453,13 +447,13 @@ public class AccountsManager { deleteBackupServiceDataFuture.join(); redisDelete(account); - databaseDelete(account); + primaryDelete(account); - if (dynamoDeleteEnabled()) { + if (secondaryDeleteEnabled()) { try { - dynamoDelete(account); + secondaryDelete(account); } catch (final Exception e) { - logger.error("Could not delete account {} from dynamo", account.getUuid().toString()); + logger.error("Could not delete account {} from secondary", account.getUuid().toString()); Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER_NAME, "action", "delete").increment(); } } @@ -538,7 +532,82 @@ public class AccountsManager { private void redisDelete(final Account account) { try (final Timer.Context ignored = redisDeleteTimer.time()) { - cacheCluster.useCluster(connection -> connection.sync().del(getAccountMapKey(account.getNumber()), getAccountEntityKey(account.getUuid()))); + cacheCluster.useCluster(connection -> connection.sync() + .del(getAccountMapKey(account.getNumber()), getAccountEntityKey(account.getUuid()))); + } + } + + private Optional primaryGet(String number) { + return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary() + ? + dynamoGet(number) : + databaseGet(number); + } + + private Optional secondaryGet(String number) { + return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary() + ? + databaseGet(number) : + dynamoGet(number); + } + + private Optional primaryGet(UUID uuid) { + return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary() + ? + dynamoGet(uuid) : + databaseGet(uuid); + } + + private Optional secondaryGet(UUID uuid) { + return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary() + ? + databaseGet(uuid) : + dynamoGet(uuid); + } + + private boolean primaryCreate(Account account) { + return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary() + ? + dynamoCreate(account) : + databaseCreate(account); + } + + private boolean secondaryCreate(Account account) { + return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary() + ? + databaseCreate(account) : + dynamoCreate(account); + } + + private void primaryUpdate(Account account) { + if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) { + dynamoUpdate(account); + } else { + databaseUpdate(account); + } + } + + private void secondaryUpdate(Account account) { + if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) { + databaseUpdate(account); + } else { + dynamoUpdate(account); + } + } + + private void primaryDelete(Account account) { + if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) { + dynamoDelete(account); + } else { + databaseDelete(account); + } + } + + private void secondaryDelete(Account account) { + if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) { + databaseDelete(account); + } else { + dynamoDelete(account); } } @@ -582,69 +651,72 @@ public class AccountsManager { accountsDynamoDb.delete(account.getUuid()); } - private boolean dynamoDeleteEnabled() { + private boolean secondaryDeleteEnabled() { return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDeleteEnabled(); } - private boolean dynamoReadEnabled() { + private boolean secondaryReadEnabled() { return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isReadEnabled(); } - private boolean dynamoWriteEnabled() { - return dynamoDeleteEnabled() + private boolean secondaryWriteEnabled() { + return secondaryDeleteEnabled() && dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isWriteEnabled(); } @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - public Optional compareAccounts(final Optional maybeDatabaseAccount, final Optional maybeDynamoAccount) { + public Optional compareAccounts(final Optional maybePrimaryAccount, + final Optional maybeSecondaryAccount) { - if (maybeDatabaseAccount.isEmpty() && maybeDynamoAccount.isEmpty()) { + if (maybePrimaryAccount.isEmpty() && maybeSecondaryAccount.isEmpty()) { return Optional.empty(); } - if (maybeDatabaseAccount.isEmpty()) { - return Optional.of("dbMissing"); + if (maybePrimaryAccount.isEmpty()) { + return Optional.of("primaryMissing"); } - if (maybeDynamoAccount.isEmpty()) { - return Optional.of("dynamoMissing"); + if (maybeSecondaryAccount.isEmpty()) { + return Optional.of("secondaryMissing"); } - final Account databaseAccount = maybeDatabaseAccount.get(); - final Account dynamoAccount = maybeDynamoAccount.get(); + final Account primaryAccount = maybePrimaryAccount.get(); + final Account secondaryAccount = maybeSecondaryAccount.get(); - final int uuidCompare = databaseAccount.getUuid().compareTo(dynamoAccount.getUuid()); + final int uuidCompare = primaryAccount.getUuid().compareTo(secondaryAccount.getUuid()); if (uuidCompare != 0) { return Optional.of("uuid"); } - final int numberCompare = databaseAccount.getNumber().compareTo(dynamoAccount.getNumber()); + final int numberCompare = primaryAccount.getNumber().compareTo(secondaryAccount.getNumber()); if (numberCompare != 0) { return Optional.of("number"); } - if (!Objects.equals(databaseAccount.getIdentityKey(), dynamoAccount.getIdentityKey())) { + if (!Objects.equals(primaryAccount.getIdentityKey(), secondaryAccount.getIdentityKey())) { return Optional.of("identityKey"); } - if (!Objects.equals(databaseAccount.getCurrentProfileVersion(), dynamoAccount.getCurrentProfileVersion())) { + if (!Objects.equals(primaryAccount.getCurrentProfileVersion(), secondaryAccount.getCurrentProfileVersion())) { return Optional.of("currentProfileVersion"); } - if (!Objects.equals(databaseAccount.getProfileName(), dynamoAccount.getProfileName())) { + if (!Objects.equals(primaryAccount.getProfileName(), secondaryAccount.getProfileName())) { return Optional.of("profileName"); } - if (!Objects.equals(databaseAccount.getAvatar(), dynamoAccount.getAvatar())) { + if (!Objects.equals(primaryAccount.getAvatar(), secondaryAccount.getAvatar())) { return Optional.of("avatar"); } - if (!Objects.equals(databaseAccount.getUnidentifiedAccessKey(), dynamoAccount.getUnidentifiedAccessKey())) { - if (databaseAccount.getUnidentifiedAccessKey().isPresent() && dynamoAccount.getUnidentifiedAccessKey().isPresent()) { + if (!Objects.equals(primaryAccount.getUnidentifiedAccessKey(), secondaryAccount.getUnidentifiedAccessKey())) { + if (primaryAccount.getUnidentifiedAccessKey().isPresent() && secondaryAccount.getUnidentifiedAccessKey() + .isPresent()) { - if (Arrays.compare(databaseAccount.getUnidentifiedAccessKey().get(), dynamoAccount.getUnidentifiedAccessKey().get()) != 0) { + if (Arrays.compare(primaryAccount.getUnidentifiedAccessKey().get(), + secondaryAccount.getUnidentifiedAccessKey().get()) != 0) { return Optional.of("unidentifiedAccessKey"); } @@ -653,40 +725,41 @@ public class AccountsManager { } } - if (!Objects.equals(databaseAccount.isUnrestrictedUnidentifiedAccess(), dynamoAccount.isUnrestrictedUnidentifiedAccess())) { + if (!Objects.equals(primaryAccount.isUnrestrictedUnidentifiedAccess(), + secondaryAccount.isUnrestrictedUnidentifiedAccess())) { return Optional.of("unrestrictedUnidentifiedAccess"); } - if (!Objects.equals(databaseAccount.isDiscoverableByPhoneNumber(), dynamoAccount.isDiscoverableByPhoneNumber())) { + if (!Objects.equals(primaryAccount.isDiscoverableByPhoneNumber(), secondaryAccount.isDiscoverableByPhoneNumber())) { return Optional.of("discoverableByPhoneNumber"); } - if (databaseAccount.getMasterDevice().isPresent() && dynamoAccount.getMasterDevice().isPresent()) { - if (!Objects.equals(databaseAccount.getMasterDevice().get().getSignedPreKey(), - dynamoAccount.getMasterDevice().get().getSignedPreKey())) { + if (primaryAccount.getMasterDevice().isPresent() && secondaryAccount.getMasterDevice().isPresent()) { + if (!Objects.equals(primaryAccount.getMasterDevice().get().getSignedPreKey(), + secondaryAccount.getMasterDevice().get().getSignedPreKey())) { return Optional.of("masterDeviceSignedPreKey"); } } try { - if (!serializedEquals(databaseAccount.getDevices(), dynamoAccount.getDevices())) { + if (!serializedEquals(primaryAccount.getDevices(), secondaryAccount.getDevices())) { return Optional.of("devices"); } - if (databaseAccount.getVersion() != dynamoAccount.getVersion()) { + if (primaryAccount.getVersion() != secondaryAccount.getVersion()) { return Optional.of("version"); } - if (databaseAccount.getMasterDevice().isPresent() && dynamoAccount.getMasterDevice().isPresent()) { - if (Math.abs(databaseAccount.getMasterDevice().get().getPushTimestamp() - - dynamoAccount.getMasterDevice().get().getPushTimestamp()) > 60 * 1_000L) { + if (primaryAccount.getMasterDevice().isPresent() && secondaryAccount.getMasterDevice().isPresent()) { + if (Math.abs(primaryAccount.getMasterDevice().get().getPushTimestamp() - + secondaryAccount.getMasterDevice().get().getPushTimestamp()) > 60 * 1_000L) { // These are generally few milliseconds off, because the setter uses System.currentTimeMillis() internally, // but we can be more relaxed return Optional.of("masterDevicePushTimestamp"); } } - if (!serializedEquals(databaseAccount, dynamoAccount)) { + if (!serializedEquals(primaryAccount, secondaryAccount)) { return Optional.of("serialization"); } @@ -698,7 +771,8 @@ public class AccountsManager { } @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - private void runSafelyAndRecordMetrics(Callable callable, Optional maybeUuid, final T databaseResult, final BiFunction> mismatchClassifier, final String action) { + private void runSafelyAndRecordMetrics(Callable callable, Optional maybeUuid, final T primaryResult, + final BiFunction> mismatchClassifier, final String action) { if (maybeUuid.isPresent()) { // the only time we don’t have a UUID is in getByNumber, which is sufficiently low volume to not be a concern, and @@ -712,8 +786,8 @@ public class AccountsManager { try { - final T dynamoResult = callable.call(); - compare(databaseResult, dynamoResult, mismatchClassifier, action, maybeUuid); + final T secondaryResult = callable.call(); + compare(primaryResult, secondaryResult, mismatchClassifier, action, maybeUuid); } catch (final Exception e) { logger.error("Error running " + action + " in Dynamo", e); @@ -723,15 +797,17 @@ public class AccountsManager { } @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - private void compare(final T databaseResult, final T dynamoResult, final BiFunction> mismatchClassifier, final String action, final Optional maybeUUid) { + private void compare(final T primaryResult, final T secondaryResult, + final BiFunction> mismatchClassifier, final String action, + final Optional maybeUUid) { DYNAMO_MIGRATION_COMPARISON_COUNTER.increment(); - mismatchClassifier.apply(databaseResult, dynamoResult) + mismatchClassifier.apply(primaryResult, secondaryResult) .ifPresent(mismatchType -> { final String mismatchDescription = action + ":" + mismatchType; Metrics.counter(DYNAMO_MIGRATION_MISMATCH_COUNTER_NAME, - "mismatchType", mismatchDescription) + "mismatchType", mismatchDescription) .increment(); maybeUUid.ifPresent(uuid -> { @@ -762,10 +838,10 @@ public class AccountsManager { } - private boolean serializedEquals(final Object database, final Object dynamo) throws JsonProcessingException { - final byte[] databaseSerialized = migrationComparisonMapper.writeValueAsBytes(database); - final byte[] dynamoSerialized = migrationComparisonMapper.writeValueAsBytes(dynamo); - final int serializeCompare = Arrays.compare(databaseSerialized, dynamoSerialized); + private boolean serializedEquals(final Object primary, final Object secondary) throws JsonProcessingException { + final byte[] primarySerialized = migrationComparisonMapper.writeValueAsBytes(primary); + final byte[] secondarySerialized = migrationComparisonMapper.writeValueAsBytes(secondary); + final int serializeCompare = Arrays.compare(primarySerialized, secondarySerialized); return serializeCompare == 0; } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrationCrawlerIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrationCrawlerIntegrationTest.java index e3101dfb5..f72378d9f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrationCrawlerIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrationCrawlerIntegrationTest.java @@ -5,9 +5,6 @@ package org.whispersystems.textsecuregcm.storage; -import static org.junit.jupiter.api.Assertions.assertAll; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -17,7 +14,6 @@ import com.opentable.db.postgres.embedded.LiquibasePreparer; import com.opentable.db.postgres.junit5.EmbeddedPostgresExtension; import com.opentable.db.postgres.junit5.PreparedDbExtension; import java.util.List; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; @@ -25,12 +21,10 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; -import org.whispersystems.textsecuregcm.entities.AccountAttributes; import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.limits.RateLimiter; import org.whispersystems.textsecuregcm.limits.RateLimiters; @@ -274,46 +268,4 @@ class AccountsDynamoDbMigrationCrawlerIntegrationTest { ACCOUNTS_DYNAMODB_EXTENSION.getDynamoDbClient().createTable(createMigrationRetryAccountsTableRequest); } - - @Test - void testReregistration() throws Exception { - - final String e164 = "+18001111234"; - - final UUID uuid = accountsManager.create(e164, "qefiv132oin4", "OWT", new AccountAttributes()).getUuid(); - - assertEquals(1, getAllPostgresAccounts().size()); - assertTrue(getAllDynamoAccounts().isEmpty()); - - accountMigrationConfiguration.setReadEnabled(true); - accountMigrationConfiguration.setDeleteEnabled(true); - accountMigrationConfiguration.setWriteEnabled(true); - - accountsManager.create(e164, "qefiv132oin4", "OWT", new AccountAttributes()); - - assertEquals(1, getAllPostgresAccounts().size()); - assertTrue(getAllDynamoAccounts().isEmpty()); - assertEquals(uuid, accountsManager.get(e164).orElseThrow().getUuid()); - - accountMigrationConfiguration.setBackgroundMigrationExecutorThreads(5); - - accountDatabaseCrawler.doPeriodicWork(); - - assertEquals(1, getAllDynamoAccounts().size()); - - final Optional dbAccount = accounts.get(e164); - final Optional dynamoAccount = accountsDynamoDb.get(e164); - - assertAll(() -> assertTrue(dbAccount.isPresent()), - () -> assertTrue(dynamoAccount.isPresent()), - () -> assertEquals(Optional.empty(), accountsManager.compareAccounts(dbAccount, dynamoAccount))); - } - - private List getAllPostgresAccounts() { - return accounts.getAllFrom(100).getAccounts(); - } - - private List getAllDynamoAccounts() { - return accountsDynamoDb.getAllFromStart(100, 1000).getAccounts(); - } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java index d45a3b8fe..d6845a114 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java @@ -470,7 +470,7 @@ class AccountsManagerTest { final UUID uuidA = UUID.randomUUID(); final Account a1 = new Account("+14152222222", uuidA, new HashSet<>(), new byte[16]); - assertEquals(Optional.of("dbMissing"), accountsManager.compareAccounts(Optional.empty(), Optional.of(a1))); + assertEquals(Optional.of("primaryMissing"), accountsManager.compareAccounts(Optional.empty(), Optional.of(a1))); final Account a2 = new Account("+14152222222", uuidA, new HashSet<>(), new byte[16]);