From c22ea786726e17ad007b60ba4f46d1a9da29c59b Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Fri, 4 Jun 2021 14:24:37 -0500 Subject: [PATCH] Add crawler to process migration retry accounts --- .../textsecuregcm/WhisperServerService.java | 3 + .../storage/MigrationRetryAccounts.java | 15 ++++ .../MigrationRetryAccountsTableCrawler.java | 88 +++++++++++++++++++ .../storage/AccountsDynamoDbTest.java | 2 +- 4 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationRetryAccountsTableCrawler.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 3168612ab..41c791c66 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -169,6 +169,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts; import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts; +import org.whispersystems.textsecuregcm.storage.MigrationRetryAccountsTableCrawler; import org.whispersystems.textsecuregcm.storage.PendingAccounts; import org.whispersystems.textsecuregcm.storage.PendingAccountsManager; import org.whispersystems.textsecuregcm.storage.PendingDevices; @@ -495,6 +496,7 @@ public class WhisperServerService extends Application uuidsToDelete) { + + writeInBatches(uuidsToDelete, (uuids -> { + + final List deletes = uuids.stream() + .map(uuid -> WriteRequest.builder().deleteRequest( + DeleteRequest.builder().key(Map.of(KEY_UUID, AttributeValues.fromUUID(uuid))).build()).build()) + .collect(Collectors.toList()); + + executeTableWriteItemsUntilComplete(Map.of(tableName, deletes)); + })); + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationRetryAccountsTableCrawler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationRetryAccountsTableCrawler.java new file mode 100644 index 000000000..4cf9857ee --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MigrationRetryAccountsTableCrawler.java @@ -0,0 +1,88 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.storage; + +import static com.codahale.metrics.MetricRegistry.name; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; + +public class MigrationRetryAccountsTableCrawler extends ManagedPeriodicWork { + + private static final Logger logger = LoggerFactory.getLogger(MigrationRetryAccountsTableCrawler.class); + + private static final Duration WORKER_TTL = Duration.ofMinutes(2); + private static final Duration RUN_INTERVAL = Duration.ofMinutes(15); + private static final String ACTIVE_WORKER_KEY = "migration_retry_accounts_crawler_cache_active_worker"; + + private static final int MAX_BATCH_SIZE = 5_000; + + private static final Counter MIGRATED_COUNTER = Metrics.counter(name(MigrationRetryAccountsTableCrawler.class, "migrated")); + private static final Counter ERROR_COUNTER = Metrics.counter(name(MigrationRetryAccountsTableCrawler.class, "error")); + private static final Counter TOTAL_COUNTER = Metrics.counter(name(MigrationRetryAccountsTableCrawler.class, "total")); + + private final MigrationRetryAccounts retryAccounts; + private final AccountsManager accountsManager; + private final AccountsDynamoDb accountsDynamoDb; + + public MigrationRetryAccountsTableCrawler( + final MigrationRetryAccounts retryAccounts, + final AccountsManager accountsManager, + final AccountsDynamoDb accountsDynamoDb, + final FaultTolerantRedisCluster cluster, + final ScheduledExecutorService executorService) throws IOException { + + super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL, executorService); + + this.retryAccounts = retryAccounts; + this.accountsManager = accountsManager; + this.accountsDynamoDb = accountsDynamoDb; + } + + @Override + public void doPeriodicWork() { + + final List uuids = this.retryAccounts.getUuids(MAX_BATCH_SIZE); + + final List processedUuids = new ArrayList<>(uuids.size()); + + try { + for (UUID uuid : uuids) { + + try { + final Optional maybeDynamoAccount = accountsDynamoDb.get(uuid); + + if (maybeDynamoAccount.isEmpty()) { + accountsManager.get(uuid).ifPresent(account -> { + accountsDynamoDb.migrate(account); + MIGRATED_COUNTER.increment(); + }); + } + + processedUuids.add(uuid); + + TOTAL_COUNTER.increment(); + + } catch (final Exception e) { + ERROR_COUNTER.increment(); + logger.warn("Failed to migrate account"); + } + + } + } finally { + this.retryAccounts.delete(processedUuids); + } + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbTest.java index 5f6393dfb..c34f54904 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbTest.java @@ -54,7 +54,7 @@ class AccountsDynamoDbTest { private static final String ACCOUNTS_TABLE_NAME = "accounts_test"; private static final String NUMBERS_TABLE_NAME = "numbers_test"; private static final String MIGRATION_DELETED_ACCOUNTS_TABLE_NAME = "migration_deleted_accounts_test"; - private static final String MIGRATION_RETRY_ACCOUNTS_TABLE_NAME = "miration_retry_accounts_test"; + private static final String MIGRATION_RETRY_ACCOUNTS_TABLE_NAME = "migration_retry_accounts_test"; @RegisterExtension static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()