diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 0b0d56791..4de9003ab 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2013-2021 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ package org.whispersystems.textsecuregcm; @@ -154,6 +154,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private DynamoDbConfiguration migrationDeletedAccountsDynamoDb; + @Valid + @NotNull + @JsonProperty + private DynamoDbConfiguration migrationMismatchedAccountsDynamoDb; + @Valid @NotNull @JsonProperty @@ -407,6 +412,10 @@ public class WhisperServerConfiguration extends Configuration { return migrationDeletedAccountsDynamoDb; } + public DynamoDbConfiguration getMigrationMismatchedAccountsDynamoDbConfiguration() { + return migrationMismatchedAccountsDynamoDb; + } + public DynamoDbConfiguration getMigrationRetryAccountsDynamoDbConfiguration() { return migrationRetryAccountsDynamoDb; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 52f8fec54..501b4edbc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -175,6 +175,8 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts; +import org.whispersystems.textsecuregcm.storage.MigrationMismatchedAccounts; +import org.whispersystems.textsecuregcm.storage.MigrationMismatchedAccountsTableCrawler; import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts; import org.whispersystems.textsecuregcm.storage.MigrationRetryAccountsTableCrawler; import org.whispersystems.textsecuregcm.storage.Profiles; @@ -326,41 +328,64 @@ public class WhisperServerService extends Application keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(10_000); Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), keyspaceNotificationDispatchQueue); - ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(3).build(); - ScheduledExecutorService declinedMessageReceiptExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "declined-receipt-%d")).threads(2).build(); + ScheduledExecutorService recurringJobExecutor = environment.lifecycle() + .scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build(); + ScheduledExecutorService declinedMessageReceiptExecutor = environment.lifecycle() + .scheduledExecutorService(name(getClass(), "declined-receipt-%d")).threads(2).build(); ScheduledExecutorService retrySchedulingExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "retry-%d")).threads(2).build(); ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build(); ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build(); @@ -521,7 +548,11 @@ public class WhisperServerService extends Application item = primaryKey(uuid); + item.put("T", AttributeValues.fromLong(clock.millis())); + db().putItem(PutItemRequest.builder() + .tableName(tableName) + .item(item) + .build()); + } + + public MigrationMismatchedAccounts(DynamoDbClient dynamoDb, String tableName) { + this(dynamoDb, tableName, Clock.systemUTC()); + } + + @VisibleForTesting + MigrationMismatchedAccounts(DynamoDbClient dynamoDb, String tableName, final Clock clock) { + super(dynamoDb); + + this.tableName = tableName; + this.clock = clock; + } + + /** + * returns a list of UUIDs stored in the table that have passed {@link #MISMATCH_CHECK_DELAY_MILLIS} + */ + public List getUuids(int max) { + + final List uuids = new ArrayList<>(); + + final ScanIterable scanPaginator = db().scanPaginator(ScanRequest.builder() + .tableName(tableName) + .filterExpression("#timestamp <= :timestamp") + .expressionAttributeNames(Map.of("#timestamp", ATTR_TIMESTAMP)) + .expressionAttributeValues(Map.of(":timestamp", + AttributeValues.fromLong(clock.millis() - MISMATCH_CHECK_DELAY_MILLIS))) + .build()); + + for (ScanResponse response : scanPaginator) { + + for (Map item : response.items()) { + uuids.add(AttributeValues.getUUID(item, KEY_UUID, null)); + + if (uuids.size() >= max) { + break; + } + } + + if (uuids.size() >= max) { + break; + } + } + + return uuids; + } + + @VisibleForTesting + public static Map primaryKey(UUID uuid) { + final HashMap item = new HashMap<>(); + item.put(KEY_UUID, AttributeValues.fromUUID(uuid)); + return item; + } + + public void delete(final List uuidsToDelete) { + + writeInBatches(uuidsToDelete, (uuids -> { + + final List deletes = uuids.stream() + .map(uuid -> WriteRequest.builder().deleteRequest( + DeleteRequest.builder().key(Map.of(KEY_UUID, AttributeValues.fromUUID(uuid))).build()).build()) + .collect(Collectors.toList()); + + executeTableWriteItemsUntilComplete(Map.of(tableName, deletes)); + })); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationMismatchedAccountsTableCrawler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationMismatchedAccountsTableCrawler.java new file mode 100644 index 000000000..d81932245 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationMismatchedAccountsTableCrawler.java @@ -0,0 +1,103 @@ +/* + * Copyright 2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.storage; + + +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; + +public class MigrationMismatchedAccountsTableCrawler extends ManagedPeriodicWork { + + private static final Logger logger = LoggerFactory.getLogger(MigrationMismatchedAccountsTableCrawler.class); + + private static final Duration WORKER_TTL = Duration.ofMinutes(2); + private static final Duration RUN_INTERVAL = Duration.ofMinutes(1); + private static final String ACTIVE_WORKER_KEY = "migration_mismatched_accounts_crawler_cache_active_worker"; + + private static final int MAX_BATCH_SIZE = 5_000; + + private static final Counter COMPARISONS_COUNTER = Metrics.counter( + name(MigrationMismatchedAccountsTableCrawler.class, "comparisons")); + private static final String MISMATCH_COUNTER_NAME = name(MigrationMismatchedAccountsTableCrawler.class, "mismatches"); + private static final Counter ERRORS_COUNTER = Metrics.counter( + name(MigrationMismatchedAccountsTableCrawler.class, "errors")); + + private final MigrationMismatchedAccounts mismatchedAccounts; + private final AccountsManager accountsManager; + private final Accounts accountsDb; + private final AccountsDynamoDb accountsDynamoDb; + + private final DynamicConfigurationManager dynamicConfigurationManager; + + public MigrationMismatchedAccountsTableCrawler( + final MigrationMismatchedAccounts mismatchedAccounts, + final AccountsManager accountsManager, + final Accounts accountsDb, + final AccountsDynamoDb accountsDynamoDb, + final DynamicConfigurationManager dynamicConfigurationManager, + final FaultTolerantRedisCluster cluster, + final ScheduledExecutorService executorService) throws IOException { + + super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL, executorService); + + this.mismatchedAccounts = mismatchedAccounts; + this.accountsManager = accountsManager; + this.accountsDb = accountsDb; + this.accountsDynamoDb = accountsDynamoDb; + this.dynamicConfigurationManager = dynamicConfigurationManager; + } + + @Override + public void doPeriodicWork() { + + final List uuids = this.mismatchedAccounts.getUuids(MAX_BATCH_SIZE); + + final List processedUuids = new ArrayList<>(uuids.size()); + + try { + for (UUID uuid : uuids) { + + try { + + final Optional result = accountsManager.compareAccounts(accountsDb.get(uuid), + accountsDynamoDb.get(uuid)); + + COMPARISONS_COUNTER.increment(); + + result.ifPresent(mismatchType -> { + Metrics.counter(MISMATCH_COUNTER_NAME, "type", mismatchType) + .increment(); + + if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration() + .isLogMismatches()) { + logger.info("Mismatch: {} - {}", uuid, mismatchType); + } + }); + + processedUuids.add(uuid); + + } catch (final Exception e) { + ERRORS_COUNTER.increment(); + logger.warn("Failed to check account mismatch", e); + } + + } + } finally { + this.mismatchedAccounts.delete(processedUuids); + } + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MigrationMismatchAccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MigrationMismatchAccountsTest.java new file mode 100644 index 000000000..987eae4ff --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MigrationMismatchAccountsTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.Clock; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; + +class MigrationMismatchAccountsTest { + + @RegisterExtension + static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder() + .tableName("account_migration_mismatches_test") + .hashKey(MigrationRetryAccounts.KEY_UUID) + .attributeDefinition(AttributeDefinition.builder() + .attributeName(MigrationRetryAccounts.KEY_UUID) + .attributeType(ScalarAttributeType.B) + .build()) + .build(); + + @Test + void test() { + + final Clock clock = mock(Clock.class); + when(clock.millis()).thenReturn(0L); + + final MigrationMismatchedAccounts migrationMismatchedAccounts = new MigrationMismatchedAccounts( + dynamoDbExtension.getDynamoDbClient(), + dynamoDbExtension.getTableName(), clock); + + UUID firstUuid = UUID.randomUUID(); + UUID secondUuid = UUID.randomUUID(); + + assertTrue(migrationMismatchedAccounts.getUuids(10).isEmpty()); + + migrationMismatchedAccounts.put(firstUuid); + migrationMismatchedAccounts.put(secondUuid); + + assertTrue(migrationMismatchedAccounts.getUuids(10).isEmpty()); + + when(clock.millis()).thenReturn(MigrationMismatchedAccounts.MISMATCH_CHECK_DELAY_MILLIS); + + assertTrue(migrationMismatchedAccounts.getUuids(10).containsAll(List.of(firstUuid, secondUuid))); + } +}