Re-check mismatched accounts after a delay, to avoid false positives from concurrent requests

This commit is contained in:
Chris Eager 2021-09-08 14:06:11 -07:00 committed by Chris Eager
parent 8cd93d68e4
commit 49489a6021
5 changed files with 333 additions and 22 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm;
@ -154,6 +154,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private DynamoDbConfiguration migrationDeletedAccountsDynamoDb;
@Valid
@NotNull
@JsonProperty
private DynamoDbConfiguration migrationMismatchedAccountsDynamoDb;
@Valid
@NotNull
@JsonProperty
@ -407,6 +412,10 @@ public class WhisperServerConfiguration extends Configuration {
return migrationDeletedAccountsDynamoDb;
}
public DynamoDbConfiguration getMigrationMismatchedAccountsDynamoDbConfiguration() {
return migrationMismatchedAccountsDynamoDb;
}
public DynamoDbConfiguration getMigrationRetryAccountsDynamoDbConfiguration() {
return migrationRetryAccountsDynamoDb;
}

View File

@ -175,6 +175,8 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationMismatchedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationMismatchedAccountsTableCrawler;
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccountsTableCrawler;
import org.whispersystems.textsecuregcm.storage.Profiles;
@ -326,41 +328,64 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DynamoDbClient recentlyDeletedAccountsDynamoDb = DynamoDbFromConfig.client(config.getMigrationDeletedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient pushChallengeDynamoDbClient = DynamoDbFromConfig.client(config.getPushChallengeDynamoDbConfiguration(),
DynamoDbClient pushChallengeDynamoDbClient = DynamoDbFromConfig.client(
config.getPushChallengeDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient reportMessageDynamoDbClient = DynamoDbFromConfig.client(config.getReportMessageDynamoDbConfiguration(),
DynamoDbClient reportMessageDynamoDbClient = DynamoDbFromConfig.client(
config.getReportMessageDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(config.getMigrationRetryAccountsDynamoDbConfiguration(),
DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(
config.getMigrationRetryAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient pendingAccountsDynamoDbClient = DynamoDbFromConfig.client(config.getPendingAccountsDynamoDbConfiguration(),
DynamoDbClient migrationMismatchedAccountsDynamoDb = DynamoDbFromConfig.client(
config.getMigrationMismatchedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient pendingDevicesDynamoDbClient = DynamoDbFromConfig.client(config.getPendingDevicesDynamoDbConfiguration(),
DynamoDbClient pendingAccountsDynamoDbClient = DynamoDbFromConfig.client(
config.getPendingAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient pendingDevicesDynamoDbClient = DynamoDbFromConfig.client(
config.getPendingDevicesDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
AmazonDynamoDB deletedAccountsLockDynamoDbClient = AmazonDynamoDBClientBuilder.standard()
.withRegion(config.getDeletedAccountsLockDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getDeletedAccountsLockDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) config.getDeletedAccountsLockDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(
((int) config.getDeletedAccountsLockDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout(
(int) config.getDeletedAccountsLockDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance())
.build();
DeletedAccounts deletedAccounts = new DeletedAccounts(deletedAccountsDynamoDbClient, config.getDeletedAccountsDynamoDbConfiguration().getTableName(), config.getDeletedAccountsDynamoDbConfiguration().getNeedsReconciliationIndexName());
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(recentlyDeletedAccountsDynamoDb, config.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, config.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());
DeletedAccounts deletedAccounts = new DeletedAccounts(deletedAccountsDynamoDbClient,
config.getDeletedAccountsDynamoDbConfiguration().getTableName(),
config.getDeletedAccountsDynamoDbConfiguration().getNeedsReconciliationIndexName());
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(recentlyDeletedAccountsDynamoDb,
config.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb,
config.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());
MigrationMismatchedAccounts mismatchedAccounts = new MigrationMismatchedAccounts(
migrationMismatchedAccountsDynamoDb,
config.getMigrationMismatchedAccountsDynamoDbConfiguration().getTableName());
Accounts accounts = new Accounts(accountDatabase);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient, accountsDynamoDbMigrationThreadPool, config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts);
Usernames usernames = new Usernames(accountDatabase);
Accounts accounts = new Accounts(accountDatabase);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient,
accountsDynamoDbMigrationThreadPool, config.getAccountsDynamoDbConfiguration().getTableName(),
config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts,
migrationRetryAccounts);
Usernames usernames = new Usernames(accountDatabase);
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
KeysDynamoDb keysDynamoDb = new KeysDynamoDb(preKeyDynamoDb, config.getKeysDynamoDbConfiguration().getTableName());
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb, config.getMessageDynamoDbConfiguration().getTableName(), config.getMessageDynamoDbConfiguration().getTimeToLive());
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
KeysDynamoDb keysDynamoDb = new KeysDynamoDb(preKeyDynamoDb, config.getKeysDynamoDbConfiguration().getTableName());
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb,
config.getMessageDynamoDbConfiguration().getTableName(),
config.getMessageDynamoDbConfiguration().getTimeToLive());
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase);
PushChallengeDynamoDb pushChallengeDynamoDb = new PushChallengeDynamoDb(pushChallengeDynamoDbClient, config.getPushChallengeDynamoDbConfiguration().getTableName());
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessageDynamoDbClient, config.getReportMessageDynamoDbConfiguration().getTableName());
VerificationCodeStore pendingAccounts = new VerificationCodeStore(pendingAccountsDynamoDbClient, config.getPendingAccountsDynamoDbConfiguration().getTableName());
@ -391,8 +416,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
BlockingQueue<Runnable> keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(10_000);
Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), keyspaceNotificationDispatchQueue);
ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(3).build();
ScheduledExecutorService declinedMessageReceiptExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "declined-receipt-%d")).threads(2).build();
ScheduledExecutorService recurringJobExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build();
ScheduledExecutorService declinedMessageReceiptExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "declined-receipt-%d")).threads(2).build();
ScheduledExecutorService retrySchedulingExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "retry-%d")).threads(2).build();
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build();
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build();
@ -521,7 +548,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
dynamicConfigurationManager);
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccountsManager, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
MigrationRetryAccountsTableCrawler migrationRetryAccountsTableCrawler = new MigrationRetryAccountsTableCrawler(migrationRetryAccounts, accountsManager, accountsDynamoDb, cacheCluster, recurringJobExecutor);
MigrationRetryAccountsTableCrawler migrationRetryAccountsTableCrawler = new MigrationRetryAccountsTableCrawler(
migrationRetryAccounts, accountsManager, accountsDynamoDb, cacheCluster, recurringJobExecutor);
MigrationMismatchedAccountsTableCrawler migrationMismatchedAccountsTableCrawler = new MigrationMismatchedAccountsTableCrawler(
mismatchedAccounts, accountsManager, accounts, accountsDynamoDb, dynamicConfigurationManager, cacheCluster,
recurringJobExecutor);
apnSender.setApnFallbackManager(apnFallbackManager);
environment.lifecycle().manage(new ApplicationShutdownMonitor());
@ -532,6 +563,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(accountDynamoDbMigrationCrawler);
environment.lifecycle().manage(deletedAccountsTableCrawler);
environment.lifecycle().manage(migrationRetryAccountsTableCrawler);
environment.lifecycle().manage(migrationMismatchedAccountsTableCrawler);
environment.lifecycle().manage(remoteConfigsManager);
environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(messagePersister);

View File

@ -0,0 +1,111 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import software.amazon.awssdk.services.dynamodb.paginators.ScanIterable;
public class MigrationMismatchedAccounts extends AbstractDynamoDbStore {
static final String KEY_UUID = "U";
static final String ATTR_TIMESTAMP = "T";
@VisibleForTesting
static final long MISMATCH_CHECK_DELAY_MILLIS = Duration.ofMinutes(1).toMillis();
private final String tableName;
private final Clock clock;
public void put(UUID uuid) {
final Map<String, AttributeValue> item = primaryKey(uuid);
item.put("T", AttributeValues.fromLong(clock.millis()));
db().putItem(PutItemRequest.builder()
.tableName(tableName)
.item(item)
.build());
}
public MigrationMismatchedAccounts(DynamoDbClient dynamoDb, String tableName) {
this(dynamoDb, tableName, Clock.systemUTC());
}
@VisibleForTesting
MigrationMismatchedAccounts(DynamoDbClient dynamoDb, String tableName, final Clock clock) {
super(dynamoDb);
this.tableName = tableName;
this.clock = clock;
}
/**
* returns a list of UUIDs stored in the table that have passed {@link #MISMATCH_CHECK_DELAY_MILLIS}
*/
public List<UUID> getUuids(int max) {
final List<UUID> uuids = new ArrayList<>();
final ScanIterable scanPaginator = db().scanPaginator(ScanRequest.builder()
.tableName(tableName)
.filterExpression("#timestamp <= :timestamp")
.expressionAttributeNames(Map.of("#timestamp", ATTR_TIMESTAMP))
.expressionAttributeValues(Map.of(":timestamp",
AttributeValues.fromLong(clock.millis() - MISMATCH_CHECK_DELAY_MILLIS)))
.build());
for (ScanResponse response : scanPaginator) {
for (Map<String, AttributeValue> item : response.items()) {
uuids.add(AttributeValues.getUUID(item, KEY_UUID, null));
if (uuids.size() >= max) {
break;
}
}
if (uuids.size() >= max) {
break;
}
}
return uuids;
}
@VisibleForTesting
public static Map<String, AttributeValue> primaryKey(UUID uuid) {
final HashMap<String, AttributeValue> item = new HashMap<>();
item.put(KEY_UUID, AttributeValues.fromUUID(uuid));
return item;
}
public void delete(final List<UUID> uuidsToDelete) {
writeInBatches(uuidsToDelete, (uuids -> {
final List<WriteRequest> deletes = uuids.stream()
.map(uuid -> WriteRequest.builder().deleteRequest(
DeleteRequest.builder().key(Map.of(KEY_UUID, AttributeValues.fromUUID(uuid))).build()).build())
.collect(Collectors.toList());
executeTableWriteItemsUntilComplete(Map.of(tableName, deletes));
}));
}
}

View File

@ -0,0 +1,103 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
public class MigrationMismatchedAccountsTableCrawler extends ManagedPeriodicWork {
private static final Logger logger = LoggerFactory.getLogger(MigrationMismatchedAccountsTableCrawler.class);
private static final Duration WORKER_TTL = Duration.ofMinutes(2);
private static final Duration RUN_INTERVAL = Duration.ofMinutes(1);
private static final String ACTIVE_WORKER_KEY = "migration_mismatched_accounts_crawler_cache_active_worker";
private static final int MAX_BATCH_SIZE = 5_000;
private static final Counter COMPARISONS_COUNTER = Metrics.counter(
name(MigrationMismatchedAccountsTableCrawler.class, "comparisons"));
private static final String MISMATCH_COUNTER_NAME = name(MigrationMismatchedAccountsTableCrawler.class, "mismatches");
private static final Counter ERRORS_COUNTER = Metrics.counter(
name(MigrationMismatchedAccountsTableCrawler.class, "errors"));
private final MigrationMismatchedAccounts mismatchedAccounts;
private final AccountsManager accountsManager;
private final Accounts accountsDb;
private final AccountsDynamoDb accountsDynamoDb;
private final DynamicConfigurationManager dynamicConfigurationManager;
public MigrationMismatchedAccountsTableCrawler(
final MigrationMismatchedAccounts mismatchedAccounts,
final AccountsManager accountsManager,
final Accounts accountsDb,
final AccountsDynamoDb accountsDynamoDb,
final DynamicConfigurationManager dynamicConfigurationManager,
final FaultTolerantRedisCluster cluster,
final ScheduledExecutorService executorService) throws IOException {
super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL, executorService);
this.mismatchedAccounts = mismatchedAccounts;
this.accountsManager = accountsManager;
this.accountsDb = accountsDb;
this.accountsDynamoDb = accountsDynamoDb;
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
@Override
public void doPeriodicWork() {
final List<UUID> uuids = this.mismatchedAccounts.getUuids(MAX_BATCH_SIZE);
final List<UUID> processedUuids = new ArrayList<>(uuids.size());
try {
for (UUID uuid : uuids) {
try {
final Optional<String> result = accountsManager.compareAccounts(accountsDb.get(uuid),
accountsDynamoDb.get(uuid));
COMPARISONS_COUNTER.increment();
result.ifPresent(mismatchType -> {
Metrics.counter(MISMATCH_COUNTER_NAME, "type", mismatchType)
.increment();
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.isLogMismatches()) {
logger.info("Mismatch: {} - {}", uuid, mismatchType);
}
});
processedUuids.add(uuid);
} catch (final Exception e) {
ERRORS_COUNTER.increment();
logger.warn("Failed to check account mismatch", e);
}
}
} finally {
this.mismatchedAccounts.delete(processedUuids);
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.time.Clock;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
class MigrationMismatchAccountsTest {
@RegisterExtension
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
.tableName("account_migration_mismatches_test")
.hashKey(MigrationRetryAccounts.KEY_UUID)
.attributeDefinition(AttributeDefinition.builder()
.attributeName(MigrationRetryAccounts.KEY_UUID)
.attributeType(ScalarAttributeType.B)
.build())
.build();
@Test
void test() {
final Clock clock = mock(Clock.class);
when(clock.millis()).thenReturn(0L);
final MigrationMismatchedAccounts migrationMismatchedAccounts = new MigrationMismatchedAccounts(
dynamoDbExtension.getDynamoDbClient(),
dynamoDbExtension.getTableName(), clock);
UUID firstUuid = UUID.randomUUID();
UUID secondUuid = UUID.randomUUID();
assertTrue(migrationMismatchedAccounts.getUuids(10).isEmpty());
migrationMismatchedAccounts.put(firstUuid);
migrationMismatchedAccounts.put(secondUuid);
assertTrue(migrationMismatchedAccounts.getUuids(10).isEmpty());
when(clock.millis()).thenReturn(MigrationMismatchedAccounts.MISMATCH_CHECK_DELAY_MILLIS);
assertTrue(migrationMismatchedAccounts.getUuids(10).containsAll(List.of(firstUuid, secondUuid)));
}
}