Add dynamic configuration for setting Dynamo as primary

This commit is contained in:
Chris Eager 2021-09-15 18:20:04 -07:00 committed by Chris Eager
parent ecee189ad8
commit 8161f55a82
4 changed files with 161 additions and 126 deletions

View File

@ -5,6 +5,9 @@ import com.google.common.annotations.VisibleForTesting;
public class DynamicAccountsDynamoDbMigrationConfiguration {
@JsonProperty
boolean dynamoPrimary;
@JsonProperty
boolean backgroundMigrationEnabled;
@ -35,6 +38,10 @@ public class DynamicAccountsDynamoDbMigrationConfiguration {
@JsonProperty
int dynamoCrawlerScanPageSize = 10;
public boolean isDynamoPrimary() {
return dynamoPrimary;
}
public boolean isBackgroundMigrationEnabled() {
return backgroundMigrationEnabled;
}

View File

@ -170,38 +170,29 @@ public class AccountsManager {
final UUID originalUuid = account.getUuid();
boolean freshUser = databaseCreate(account);
boolean freshUser = primaryCreate(account);
// databaseCreate() sometimes updates the UUID, if there was a number conflict.
// for metrics, we want dynamo to run with the same original UUID
// create() sometimes updates the UUID, if there was a number conflict.
// for metrics, we want secondary to run with the same original UUID
final UUID actualUuid = account.getUuid();
try {
if (dynamoWriteEnabled()) {
if (secondaryWriteEnabled()) {
account.setUuid(originalUuid);
runSafelyAndRecordMetrics(() -> dynamoCreate(account), Optional.of(account.getUuid()), freshUser,
(databaseResult, dynamoResult) -> {
runSafelyAndRecordMetrics(() -> secondaryCreate(account), Optional.of(account.getUuid()), freshUser,
(primaryResult, secondaryResult) -> {
if (!account.getUuid().equals(actualUuid)) {
// This is expected towards the beginning of the background migration, as Dynamo wont
// have many accounts available for re-registration
logger.warn("dynamoCreate() did not return correct UUID");
accountsDynamoDb.deleteInvalidMigration(account.getUuid());
return Optional.of("dynamoIncorrectUUID");
}
if (databaseResult.equals(dynamoResult)) {
if (primaryResult.equals(secondaryResult)) {
return Optional.empty();
}
if (dynamoResult) {
return Optional.of("dynamoFreshUser");
if (secondaryResult) {
return Optional.of("secondaryFreshUser");
}
return Optional.of("dbFreshUser");
return Optional.of("primaryFreshUser");
},
"create");
}
@ -302,14 +293,17 @@ public class AccountsManager {
final UUID uuid = account.getUuid();
updatedAccount = updateWithRetries(account, updater, this::databaseUpdate, () -> databaseGet(uuid).get());
updatedAccount = updateWithRetries(account, updater, this::primaryUpdate, () -> primaryGet(uuid).get());
if (dynamoWriteEnabled()) {
runSafelyAndRecordMetrics(() -> dynamoGet(uuid).map(dynamoAccount -> {
if (secondaryWriteEnabled()) {
runSafelyAndRecordMetrics(() -> secondaryGet(uuid).map(secondaryAccount -> {
try {
return updateWithRetries(dynamoAccount, updater, this::dynamoUpdate, () -> dynamoGet(uuid).get());
return updateWithRetries(secondaryAccount, updater, this::secondaryUpdate, () -> secondaryGet(uuid).get());
} catch (final OptimisticLockRetryLimitExceededException e) {
accountsDynamoDb.putUuidForMigrationRetry(uuid);
if (!dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.isDynamoPrimary()) {
accountsDynamoDb.putUuidForMigrationRetry(uuid);
}
throw e;
}
@ -385,11 +379,11 @@ public class AccountsManager {
Optional<Account> account = redisGet(number);
if (!account.isPresent()) {
account = databaseGet(number);
account = primaryGet(number);
account.ifPresent(value -> redisSet(value));
if (dynamoReadEnabled()) {
runSafelyAndRecordMetrics(() -> dynamoGet(number), Optional.empty(), account, this::compareAccounts,
if (secondaryReadEnabled()) {
runSafelyAndRecordMetrics(() -> secondaryGet(number), Optional.empty(), account, this::compareAccounts,
"getByNumber");
}
}
@ -403,11 +397,11 @@ public class AccountsManager {
Optional<Account> account = redisGet(uuid);
if (!account.isPresent()) {
account = databaseGet(uuid);
account = primaryGet(uuid);
account.ifPresent(value -> redisSet(value));
if (dynamoReadEnabled()) {
runSafelyAndRecordMetrics(() -> dynamoGet(uuid), Optional.of(uuid), account, this::compareAccounts,
if (secondaryReadEnabled()) {
runSafelyAndRecordMetrics(() -> secondaryGet(uuid), Optional.of(uuid), account, this::compareAccounts,
"getByUuid");
}
}
@ -453,13 +447,13 @@ public class AccountsManager {
deleteBackupServiceDataFuture.join();
redisDelete(account);
databaseDelete(account);
primaryDelete(account);
if (dynamoDeleteEnabled()) {
if (secondaryDeleteEnabled()) {
try {
dynamoDelete(account);
secondaryDelete(account);
} catch (final Exception e) {
logger.error("Could not delete account {} from dynamo", account.getUuid().toString());
logger.error("Could not delete account {} from secondary", account.getUuid().toString());
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER_NAME, "action", "delete").increment();
}
}
@ -538,7 +532,82 @@ public class AccountsManager {
private void redisDelete(final Account account) {
try (final Timer.Context ignored = redisDeleteTimer.time()) {
cacheCluster.useCluster(connection -> connection.sync().del(getAccountMapKey(account.getNumber()), getAccountEntityKey(account.getUuid())));
cacheCluster.useCluster(connection -> connection.sync()
.del(getAccountMapKey(account.getNumber()), getAccountEntityKey(account.getUuid())));
}
}
private Optional<Account> primaryGet(String number) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
dynamoGet(number) :
databaseGet(number);
}
private Optional<Account> secondaryGet(String number) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
databaseGet(number) :
dynamoGet(number);
}
private Optional<Account> primaryGet(UUID uuid) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
dynamoGet(uuid) :
databaseGet(uuid);
}
private Optional<Account> secondaryGet(UUID uuid) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
databaseGet(uuid) :
dynamoGet(uuid);
}
private boolean primaryCreate(Account account) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
dynamoCreate(account) :
databaseCreate(account);
}
private boolean secondaryCreate(Account account) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
databaseCreate(account) :
dynamoCreate(account);
}
private void primaryUpdate(Account account) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) {
dynamoUpdate(account);
} else {
databaseUpdate(account);
}
}
private void secondaryUpdate(Account account) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) {
databaseUpdate(account);
} else {
dynamoUpdate(account);
}
}
private void primaryDelete(Account account) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) {
dynamoDelete(account);
} else {
databaseDelete(account);
}
}
private void secondaryDelete(Account account) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) {
databaseDelete(account);
} else {
dynamoDelete(account);
}
}
@ -582,69 +651,72 @@ public class AccountsManager {
accountsDynamoDb.delete(account.getUuid());
}
private boolean dynamoDeleteEnabled() {
private boolean secondaryDeleteEnabled() {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDeleteEnabled();
}
private boolean dynamoReadEnabled() {
private boolean secondaryReadEnabled() {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isReadEnabled();
}
private boolean dynamoWriteEnabled() {
return dynamoDeleteEnabled()
private boolean secondaryWriteEnabled() {
return secondaryDeleteEnabled()
&& dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isWriteEnabled();
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public Optional<String> compareAccounts(final Optional<Account> maybeDatabaseAccount, final Optional<Account> maybeDynamoAccount) {
public Optional<String> compareAccounts(final Optional<Account> maybePrimaryAccount,
final Optional<Account> maybeSecondaryAccount) {
if (maybeDatabaseAccount.isEmpty() && maybeDynamoAccount.isEmpty()) {
if (maybePrimaryAccount.isEmpty() && maybeSecondaryAccount.isEmpty()) {
return Optional.empty();
}
if (maybeDatabaseAccount.isEmpty()) {
return Optional.of("dbMissing");
if (maybePrimaryAccount.isEmpty()) {
return Optional.of("primaryMissing");
}
if (maybeDynamoAccount.isEmpty()) {
return Optional.of("dynamoMissing");
if (maybeSecondaryAccount.isEmpty()) {
return Optional.of("secondaryMissing");
}
final Account databaseAccount = maybeDatabaseAccount.get();
final Account dynamoAccount = maybeDynamoAccount.get();
final Account primaryAccount = maybePrimaryAccount.get();
final Account secondaryAccount = maybeSecondaryAccount.get();
final int uuidCompare = databaseAccount.getUuid().compareTo(dynamoAccount.getUuid());
final int uuidCompare = primaryAccount.getUuid().compareTo(secondaryAccount.getUuid());
if (uuidCompare != 0) {
return Optional.of("uuid");
}
final int numberCompare = databaseAccount.getNumber().compareTo(dynamoAccount.getNumber());
final int numberCompare = primaryAccount.getNumber().compareTo(secondaryAccount.getNumber());
if (numberCompare != 0) {
return Optional.of("number");
}
if (!Objects.equals(databaseAccount.getIdentityKey(), dynamoAccount.getIdentityKey())) {
if (!Objects.equals(primaryAccount.getIdentityKey(), secondaryAccount.getIdentityKey())) {
return Optional.of("identityKey");
}
if (!Objects.equals(databaseAccount.getCurrentProfileVersion(), dynamoAccount.getCurrentProfileVersion())) {
if (!Objects.equals(primaryAccount.getCurrentProfileVersion(), secondaryAccount.getCurrentProfileVersion())) {
return Optional.of("currentProfileVersion");
}
if (!Objects.equals(databaseAccount.getProfileName(), dynamoAccount.getProfileName())) {
if (!Objects.equals(primaryAccount.getProfileName(), secondaryAccount.getProfileName())) {
return Optional.of("profileName");
}
if (!Objects.equals(databaseAccount.getAvatar(), dynamoAccount.getAvatar())) {
if (!Objects.equals(primaryAccount.getAvatar(), secondaryAccount.getAvatar())) {
return Optional.of("avatar");
}
if (!Objects.equals(databaseAccount.getUnidentifiedAccessKey(), dynamoAccount.getUnidentifiedAccessKey())) {
if (databaseAccount.getUnidentifiedAccessKey().isPresent() && dynamoAccount.getUnidentifiedAccessKey().isPresent()) {
if (!Objects.equals(primaryAccount.getUnidentifiedAccessKey(), secondaryAccount.getUnidentifiedAccessKey())) {
if (primaryAccount.getUnidentifiedAccessKey().isPresent() && secondaryAccount.getUnidentifiedAccessKey()
.isPresent()) {
if (Arrays.compare(databaseAccount.getUnidentifiedAccessKey().get(), dynamoAccount.getUnidentifiedAccessKey().get()) != 0) {
if (Arrays.compare(primaryAccount.getUnidentifiedAccessKey().get(),
secondaryAccount.getUnidentifiedAccessKey().get()) != 0) {
return Optional.of("unidentifiedAccessKey");
}
@ -653,40 +725,41 @@ public class AccountsManager {
}
}
if (!Objects.equals(databaseAccount.isUnrestrictedUnidentifiedAccess(), dynamoAccount.isUnrestrictedUnidentifiedAccess())) {
if (!Objects.equals(primaryAccount.isUnrestrictedUnidentifiedAccess(),
secondaryAccount.isUnrestrictedUnidentifiedAccess())) {
return Optional.of("unrestrictedUnidentifiedAccess");
}
if (!Objects.equals(databaseAccount.isDiscoverableByPhoneNumber(), dynamoAccount.isDiscoverableByPhoneNumber())) {
if (!Objects.equals(primaryAccount.isDiscoverableByPhoneNumber(), secondaryAccount.isDiscoverableByPhoneNumber())) {
return Optional.of("discoverableByPhoneNumber");
}
if (databaseAccount.getMasterDevice().isPresent() && dynamoAccount.getMasterDevice().isPresent()) {
if (!Objects.equals(databaseAccount.getMasterDevice().get().getSignedPreKey(),
dynamoAccount.getMasterDevice().get().getSignedPreKey())) {
if (primaryAccount.getMasterDevice().isPresent() && secondaryAccount.getMasterDevice().isPresent()) {
if (!Objects.equals(primaryAccount.getMasterDevice().get().getSignedPreKey(),
secondaryAccount.getMasterDevice().get().getSignedPreKey())) {
return Optional.of("masterDeviceSignedPreKey");
}
}
try {
if (!serializedEquals(databaseAccount.getDevices(), dynamoAccount.getDevices())) {
if (!serializedEquals(primaryAccount.getDevices(), secondaryAccount.getDevices())) {
return Optional.of("devices");
}
if (databaseAccount.getVersion() != dynamoAccount.getVersion()) {
if (primaryAccount.getVersion() != secondaryAccount.getVersion()) {
return Optional.of("version");
}
if (databaseAccount.getMasterDevice().isPresent() && dynamoAccount.getMasterDevice().isPresent()) {
if (Math.abs(databaseAccount.getMasterDevice().get().getPushTimestamp() -
dynamoAccount.getMasterDevice().get().getPushTimestamp()) > 60 * 1_000L) {
if (primaryAccount.getMasterDevice().isPresent() && secondaryAccount.getMasterDevice().isPresent()) {
if (Math.abs(primaryAccount.getMasterDevice().get().getPushTimestamp() -
secondaryAccount.getMasterDevice().get().getPushTimestamp()) > 60 * 1_000L) {
// These are generally few milliseconds off, because the setter uses System.currentTimeMillis() internally,
// but we can be more relaxed
return Optional.of("masterDevicePushTimestamp");
}
}
if (!serializedEquals(databaseAccount, dynamoAccount)) {
if (!serializedEquals(primaryAccount, secondaryAccount)) {
return Optional.of("serialization");
}
@ -698,7 +771,8 @@ public class AccountsManager {
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private <T> void runSafelyAndRecordMetrics(Callable<T> callable, Optional<UUID> maybeUuid, final T databaseResult, final BiFunction<T, T, Optional<String>> mismatchClassifier, final String action) {
private <T> void runSafelyAndRecordMetrics(Callable<T> callable, Optional<UUID> maybeUuid, final T primaryResult,
final BiFunction<T, T, Optional<String>> mismatchClassifier, final String action) {
if (maybeUuid.isPresent()) {
// the only time we dont have a UUID is in getByNumber, which is sufficiently low volume to not be a concern, and
@ -712,8 +786,8 @@ public class AccountsManager {
try {
final T dynamoResult = callable.call();
compare(databaseResult, dynamoResult, mismatchClassifier, action, maybeUuid);
final T secondaryResult = callable.call();
compare(primaryResult, secondaryResult, mismatchClassifier, action, maybeUuid);
} catch (final Exception e) {
logger.error("Error running " + action + " in Dynamo", e);
@ -723,15 +797,17 @@ public class AccountsManager {
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private <T> void compare(final T databaseResult, final T dynamoResult, final BiFunction<T, T, Optional<String>> mismatchClassifier, final String action, final Optional<UUID> maybeUUid) {
private <T> void compare(final T primaryResult, final T secondaryResult,
final BiFunction<T, T, Optional<String>> mismatchClassifier, final String action,
final Optional<UUID> maybeUUid) {
DYNAMO_MIGRATION_COMPARISON_COUNTER.increment();
mismatchClassifier.apply(databaseResult, dynamoResult)
mismatchClassifier.apply(primaryResult, secondaryResult)
.ifPresent(mismatchType -> {
final String mismatchDescription = action + ":" + mismatchType;
Metrics.counter(DYNAMO_MIGRATION_MISMATCH_COUNTER_NAME,
"mismatchType", mismatchDescription)
"mismatchType", mismatchDescription)
.increment();
maybeUUid.ifPresent(uuid -> {
@ -762,10 +838,10 @@ public class AccountsManager {
}
private boolean serializedEquals(final Object database, final Object dynamo) throws JsonProcessingException {
final byte[] databaseSerialized = migrationComparisonMapper.writeValueAsBytes(database);
final byte[] dynamoSerialized = migrationComparisonMapper.writeValueAsBytes(dynamo);
final int serializeCompare = Arrays.compare(databaseSerialized, dynamoSerialized);
private boolean serializedEquals(final Object primary, final Object secondary) throws JsonProcessingException {
final byte[] primarySerialized = migrationComparisonMapper.writeValueAsBytes(primary);
final byte[] secondarySerialized = migrationComparisonMapper.writeValueAsBytes(secondary);
final int serializeCompare = Arrays.compare(primarySerialized, secondarySerialized);
return serializeCompare == 0;
}

View File

@ -5,9 +5,6 @@
package org.whispersystems.textsecuregcm.storage;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@ -17,7 +14,6 @@ import com.opentable.db.postgres.embedded.LiquibasePreparer;
import com.opentable.db.postgres.junit5.EmbeddedPostgresExtension;
import com.opentable.db.postgres.junit5.PreparedDbExtension;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
@ -25,12 +21,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.AccountAttributes;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
@ -274,46 +268,4 @@ class AccountsDynamoDbMigrationCrawlerIntegrationTest {
ACCOUNTS_DYNAMODB_EXTENSION.getDynamoDbClient().createTable(createMigrationRetryAccountsTableRequest);
}
@Test
void testReregistration() throws Exception {
final String e164 = "+18001111234";
final UUID uuid = accountsManager.create(e164, "qefiv132oin4", "OWT", new AccountAttributes()).getUuid();
assertEquals(1, getAllPostgresAccounts().size());
assertTrue(getAllDynamoAccounts().isEmpty());
accountMigrationConfiguration.setReadEnabled(true);
accountMigrationConfiguration.setDeleteEnabled(true);
accountMigrationConfiguration.setWriteEnabled(true);
accountsManager.create(e164, "qefiv132oin4", "OWT", new AccountAttributes());
assertEquals(1, getAllPostgresAccounts().size());
assertTrue(getAllDynamoAccounts().isEmpty());
assertEquals(uuid, accountsManager.get(e164).orElseThrow().getUuid());
accountMigrationConfiguration.setBackgroundMigrationExecutorThreads(5);
accountDatabaseCrawler.doPeriodicWork();
assertEquals(1, getAllDynamoAccounts().size());
final Optional<Account> dbAccount = accounts.get(e164);
final Optional<Account> dynamoAccount = accountsDynamoDb.get(e164);
assertAll(() -> assertTrue(dbAccount.isPresent()),
() -> assertTrue(dynamoAccount.isPresent()),
() -> assertEquals(Optional.empty(), accountsManager.compareAccounts(dbAccount, dynamoAccount)));
}
private List<Account> getAllPostgresAccounts() {
return accounts.getAllFrom(100).getAccounts();
}
private List<Account> getAllDynamoAccounts() {
return accountsDynamoDb.getAllFromStart(100, 1000).getAccounts();
}
}

View File

@ -470,7 +470,7 @@ class AccountsManagerTest {
final UUID uuidA = UUID.randomUUID();
final Account a1 = new Account("+14152222222", uuidA, new HashSet<>(), new byte[16]);
assertEquals(Optional.of("dbMissing"), accountsManager.compareAccounts(Optional.empty(), Optional.of(a1)));
assertEquals(Optional.of("primaryMissing"), accountsManager.compareAccounts(Optional.empty(), Optional.of(a1)));
final Account a2 = new Account("+14152222222", uuidA, new HashSet<>(), new byte[16]);