Add crawler to process migration retry accounts

This commit is contained in:
Chris Eager 2021-06-04 14:24:37 -05:00 committed by Chris Eager
parent a85afe827d
commit c22ea78672
4 changed files with 107 additions and 1 deletions

View File

@ -169,6 +169,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccountsTableCrawler;
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
import org.whispersystems.textsecuregcm.storage.PendingAccountsManager;
import org.whispersystems.textsecuregcm.storage.PendingDevices;
@ -495,6 +496,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs());
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccounts, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
MigrationRetryAccountsTableCrawler migrationRetryAccountsTableCrawler = new MigrationRetryAccountsTableCrawler(migrationRetryAccounts, accountsManager, accountsDynamoDb, cacheCluster, recurringJobExecutor);
apnSender.setApnFallbackManager(apnFallbackManager);
environment.lifecycle().manage(apnFallbackManager);
@ -502,6 +504,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(messageSender);
environment.lifecycle().manage(accountDatabaseCrawler);
environment.lifecycle().manage(deletedAccountsTableCrawler);
environment.lifecycle().manage(migrationRetryAccountsTableCrawler);
environment.lifecycle().manage(remoteConfigsManager);
environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(messagePersister);

View File

@ -5,12 +5,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
public class MigrationRetryAccounts extends AbstractDynamoDbStore {
@ -58,4 +61,16 @@ public class MigrationRetryAccounts extends AbstractDynamoDbStore {
return Map.of(KEY_UUID, AttributeValues.fromUUID(uuid));
}
public void delete(final List<UUID> uuidsToDelete) {
writeInBatches(uuidsToDelete, (uuids -> {
final List<WriteRequest> deletes = uuids.stream()
.map(uuid -> WriteRequest.builder().deleteRequest(
DeleteRequest.builder().key(Map.of(KEY_UUID, AttributeValues.fromUUID(uuid))).build()).build())
.collect(Collectors.toList());
executeTableWriteItemsUntilComplete(Map.of(tableName, deletes));
}));
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
public class MigrationRetryAccountsTableCrawler extends ManagedPeriodicWork {
private static final Logger logger = LoggerFactory.getLogger(MigrationRetryAccountsTableCrawler.class);
private static final Duration WORKER_TTL = Duration.ofMinutes(2);
private static final Duration RUN_INTERVAL = Duration.ofMinutes(15);
private static final String ACTIVE_WORKER_KEY = "migration_retry_accounts_crawler_cache_active_worker";
private static final int MAX_BATCH_SIZE = 5_000;
private static final Counter MIGRATED_COUNTER = Metrics.counter(name(MigrationRetryAccountsTableCrawler.class, "migrated"));
private static final Counter ERROR_COUNTER = Metrics.counter(name(MigrationRetryAccountsTableCrawler.class, "error"));
private static final Counter TOTAL_COUNTER = Metrics.counter(name(MigrationRetryAccountsTableCrawler.class, "total"));
private final MigrationRetryAccounts retryAccounts;
private final AccountsManager accountsManager;
private final AccountsDynamoDb accountsDynamoDb;
public MigrationRetryAccountsTableCrawler(
final MigrationRetryAccounts retryAccounts,
final AccountsManager accountsManager,
final AccountsDynamoDb accountsDynamoDb,
final FaultTolerantRedisCluster cluster,
final ScheduledExecutorService executorService) throws IOException {
super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL, executorService);
this.retryAccounts = retryAccounts;
this.accountsManager = accountsManager;
this.accountsDynamoDb = accountsDynamoDb;
}
@Override
public void doPeriodicWork() {
final List<UUID> uuids = this.retryAccounts.getUuids(MAX_BATCH_SIZE);
final List<UUID> processedUuids = new ArrayList<>(uuids.size());
try {
for (UUID uuid : uuids) {
try {
final Optional<Account> maybeDynamoAccount = accountsDynamoDb.get(uuid);
if (maybeDynamoAccount.isEmpty()) {
accountsManager.get(uuid).ifPresent(account -> {
accountsDynamoDb.migrate(account);
MIGRATED_COUNTER.increment();
});
}
processedUuids.add(uuid);
TOTAL_COUNTER.increment();
} catch (final Exception e) {
ERROR_COUNTER.increment();
logger.warn("Failed to migrate account");
}
}
} finally {
this.retryAccounts.delete(processedUuids);
}
}
}

View File

@ -54,7 +54,7 @@ class AccountsDynamoDbTest {
private static final String ACCOUNTS_TABLE_NAME = "accounts_test";
private static final String NUMBERS_TABLE_NAME = "numbers_test";
private static final String MIGRATION_DELETED_ACCOUNTS_TABLE_NAME = "migration_deleted_accounts_test";
private static final String MIGRATION_RETRY_ACCOUNTS_TABLE_NAME = "miration_retry_accounts_test";
private static final String MIGRATION_RETRY_ACCOUNTS_TABLE_NAME = "migration_retry_accounts_test";
@RegisterExtension
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()