Classify DynamoDB mismatches in AccountsManager
This commit is contained in:
parent
6906336dfb
commit
2a3ea13c9e
|
@ -11,6 +11,7 @@ import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
|
|||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.lettuce.core.RedisException;
|
||||
|
@ -19,12 +20,12 @@ import io.micrometer.core.instrument.Counter;
|
|||
import io.micrometer.core.instrument.Metrics;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.BiFunction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
|
||||
|
@ -56,9 +57,9 @@ public class AccountsManager {
|
|||
private static final String COUNTRY_CODE_TAG_NAME = "country";
|
||||
private static final String DELETION_REASON_TAG_NAME = "reason";
|
||||
|
||||
private static final String DYNAMO_MIGRATION_ERROR_COUNTER = name(AccountsManager.class, "migration", "error");
|
||||
private static final String DYNAMO_MIGRATION_ERROR_COUNTER_NAME = name(AccountsManager.class, "migration", "error");
|
||||
private static final Counter DYNAMO_MIGRATION_COMPARISON_COUNTER = Metrics.counter(name(AccountsManager.class, "migration", "comparisons"));
|
||||
private static final Counter DYNAMO_MIGRATION_MISMATCH_COUNTER = Metrics.counter(name(AccountsManager.class, "migration", "mismatches"));
|
||||
private static final String DYNAMO_MIGRATION_MISMATCH_COUNTER_NAME = name(AccountsManager.class, "migration", "mismatches");
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(AccountsManager.class);
|
||||
|
||||
|
@ -74,6 +75,8 @@ public class AccountsManager {
|
|||
private final SecureBackupClient secureBackupClient;
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
private final ObjectMapper migrationComparisonMapper;
|
||||
|
||||
private final DynamicConfigurationManager dynamicConfigurationManager;
|
||||
private final ExperimentEnrollmentManager experimentEnrollmentManager;
|
||||
|
||||
|
@ -106,6 +109,9 @@ public class AccountsManager {
|
|||
this.secureBackupClient = secureBackupClient;
|
||||
this.mapper = SystemMapper.getMapper();
|
||||
|
||||
this.migrationComparisonMapper = mapper.copy();
|
||||
migrationComparisonMapper.addMixIn(Account.class, AccountComparisonMixin.class);
|
||||
|
||||
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
||||
this.experimentEnrollmentManager = experimentEnrollmentManager;
|
||||
}
|
||||
|
@ -117,7 +123,18 @@ public class AccountsManager {
|
|||
|
||||
if (dynamoWriteEnabled()) {
|
||||
runSafelyAndRecordMetrics(() -> dynamoCreate(account), Optional.of(account.getUuid()), freshUser,
|
||||
Boolean::compareTo, "create");
|
||||
(databaseResult, dynamoResult) -> {
|
||||
if (databaseResult.equals(dynamoResult)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
if (dynamoResult) {
|
||||
return Optional.of("dynamoFreshUser");
|
||||
}
|
||||
|
||||
return Optional.of("dbFreshUser");
|
||||
},
|
||||
"create");
|
||||
}
|
||||
return freshUser;
|
||||
}
|
||||
|
@ -131,13 +148,15 @@ public class AccountsManager {
|
|||
|
||||
if (dynamoWriteEnabled()) {
|
||||
runSafelyAndRecordMetrics(() -> {
|
||||
try {
|
||||
dynamoUpdate(account);
|
||||
} catch (final ConditionalCheckFailedException e) {
|
||||
dynamoCreate(account);
|
||||
}
|
||||
return true;
|
||||
}, Optional.of(account.getUuid()), true, Boolean::compareTo, "update");
|
||||
try {
|
||||
dynamoUpdate(account);
|
||||
} catch (final ConditionalCheckFailedException e) {
|
||||
dynamoCreate(account);
|
||||
}
|
||||
return true;
|
||||
}, Optional.of(account.getUuid()), true,
|
||||
(databaseSuccess, dynamoSuccess) -> Optional.empty(), // both values are always true
|
||||
"update");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -215,7 +234,7 @@ public class AccountsManager {
|
|||
dynamoDelete(account);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Could not delete account {} from dynamo", account.getUuid().toString());
|
||||
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER, "action", "delete").increment();
|
||||
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER_NAME, "action", "delete").increment();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -354,14 +373,18 @@ public class AccountsManager {
|
|||
}
|
||||
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
public int compareAccounts(final Optional<Account> maybeDatabaseAccount, final Optional<Account> maybeDynamoAccount) {
|
||||
public Optional<String> compareAccounts(final Optional<Account> maybeDatabaseAccount, final Optional<Account> maybeDynamoAccount) {
|
||||
|
||||
if (maybeDatabaseAccount.isEmpty() && maybeDynamoAccount.isEmpty()) {
|
||||
return 0;
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
if (maybeDatabaseAccount.isEmpty() || maybeDynamoAccount.isEmpty()) {
|
||||
return 1;
|
||||
if (maybeDatabaseAccount.isEmpty()) {
|
||||
return Optional.of("dbMissing");
|
||||
}
|
||||
|
||||
if (maybeDynamoAccount.isEmpty()) {
|
||||
return Optional.of("dynamoMissing");
|
||||
}
|
||||
|
||||
final Account databaseAccount = maybeDatabaseAccount.get();
|
||||
|
@ -370,28 +393,38 @@ public class AccountsManager {
|
|||
final int uuidCompare = databaseAccount.getUuid().compareTo(dynamoAccount.getUuid());
|
||||
|
||||
if (uuidCompare != 0) {
|
||||
return uuidCompare;
|
||||
return Optional.of("uuid");
|
||||
}
|
||||
|
||||
final int numberCompare = databaseAccount.getNumber().compareTo(dynamoAccount.getNumber());
|
||||
|
||||
if (numberCompare != 0) {
|
||||
return numberCompare;
|
||||
return Optional.of("number");
|
||||
}
|
||||
|
||||
try {
|
||||
final byte[] databaseSerialized = mapper.writeValueAsBytes(databaseAccount);
|
||||
final byte[] dynamoSerialized = mapper.writeValueAsBytes(dynamoAccount);
|
||||
final byte[] databaseSerialized = migrationComparisonMapper.writeValueAsBytes(databaseAccount);
|
||||
final byte[] dynamoSerialized = migrationComparisonMapper.writeValueAsBytes(dynamoAccount);
|
||||
|
||||
return Arrays.compare(databaseSerialized, dynamoSerialized);
|
||||
final int serializeCompare = Arrays.compare(databaseSerialized, dynamoSerialized);
|
||||
|
||||
if (serializeCompare != 0) {
|
||||
return Optional.of("serialization");
|
||||
}
|
||||
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
if (databaseAccount.getDynamoDbMigrationVersion() != dynamoAccount.getDynamoDbMigrationVersion()) {
|
||||
return Optional.of("migrationVersion");
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
private <T> void runSafelyAndRecordMetrics(Callable<T> callable, Optional<UUID> maybeUuid, final T databaseResult, final Comparator<T> comparator, final String action) {
|
||||
private <T> void runSafelyAndRecordMetrics(Callable<T> callable, Optional<UUID> maybeUuid, final T databaseResult, final BiFunction<T, T, Optional<String>> mismatchClassifier, final String action) {
|
||||
|
||||
if (maybeUuid.isPresent()) {
|
||||
// the only time we don’t have a UUID is in getByNumber, which is sufficiently low volume to not be a concern, and
|
||||
|
@ -406,20 +439,31 @@ public class AccountsManager {
|
|||
try {
|
||||
|
||||
final T dynamoResult = callable.call();
|
||||
compare(databaseResult, dynamoResult, comparator);
|
||||
compare(databaseResult, dynamoResult, mismatchClassifier, action);
|
||||
|
||||
} catch (final Exception e) {
|
||||
logger.error("Error running " + action + " in Dynamo", e);
|
||||
|
||||
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER, "action", action).increment();
|
||||
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER_NAME, "action", action).increment();
|
||||
}
|
||||
}
|
||||
|
||||
private <T> void compare(final T databaseResult, final T dynamoResult, final Comparator<T> comparator) {
|
||||
private <T> void compare(final T databaseResult, final T dynamoResult, final BiFunction<T, T, Optional<String>> mismatchClassifier, final String action) {
|
||||
|
||||
DYNAMO_MIGRATION_COMPARISON_COUNTER.increment();
|
||||
|
||||
if (comparator.compare(databaseResult, dynamoResult) != 0) {
|
||||
DYNAMO_MIGRATION_MISMATCH_COUNTER.increment();
|
||||
}
|
||||
mismatchClassifier.apply(databaseResult, dynamoResult)
|
||||
.ifPresent(mismatchType ->
|
||||
Metrics.counter(DYNAMO_MIGRATION_MISMATCH_COUNTER_NAME,
|
||||
"mismatchType", action + ":" + mismatchType)
|
||||
.increment()
|
||||
);
|
||||
}
|
||||
|
||||
private static abstract class AccountComparisonMixin extends Account {
|
||||
|
||||
@JsonIgnore
|
||||
private int dynamoDbMigrationVersion;
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -398,19 +398,23 @@ class AccountsManagerTest {
|
|||
|
||||
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager);
|
||||
|
||||
assertEquals(0, accountsManager.compareAccounts(Optional.empty(), Optional.empty()));
|
||||
assertEquals(Optional.empty(), accountsManager.compareAccounts(Optional.empty(), Optional.empty()));
|
||||
|
||||
final UUID uuidA = UUID.randomUUID();
|
||||
final Account a1 = new Account("+14152222222", uuidA, new HashSet<>(), new byte[16]);
|
||||
|
||||
assertEquals(1, accountsManager.compareAccounts(Optional.empty(), Optional.of(a1)));
|
||||
assertEquals(Optional.of("dbMissing"), accountsManager.compareAccounts(Optional.empty(), Optional.of(a1)));
|
||||
|
||||
final Account a2 = new Account("+14152222222", uuidA, new HashSet<>(), new byte[16]);
|
||||
|
||||
assertEquals(0, accountsManager.compareAccounts(Optional.of(a1), Optional.of(a2)));
|
||||
assertEquals(Optional.empty(), accountsManager.compareAccounts(Optional.of(a1), Optional.of(a2)));
|
||||
|
||||
a1.setDynamoDbMigrationVersion(1);
|
||||
|
||||
assertEquals(Optional.of("migrationVersion"), accountsManager.compareAccounts(Optional.of(a1), Optional.of(a2)));
|
||||
|
||||
a2.setProfileName("name");
|
||||
|
||||
assertTrue(0 < accountsManager.compareAccounts(Optional.of(a1), Optional.of(a2)));
|
||||
assertEquals(Optional.of("serialization"), accountsManager.compareAccounts(Optional.of(a1), Optional.of(a2)));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue