From cdd6f78c7365e5b57db5d628f141ade584dafea0 Mon Sep 17 00:00:00 2001 From: Jonathan Klabunde Tomer Date: Mon, 25 Nov 2024 17:53:17 -0800 Subject: [PATCH] Handle errors in deleted-accounts crawler --- .../workers/MigrateDeletedAccountsCommand.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateDeletedAccountsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateDeletedAccountsCommand.java index 0f12e4657..94f369a18 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateDeletedAccountsCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateDeletedAccountsCommand.java @@ -18,6 +18,8 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.storage.AccountsManager; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; +import java.time.Duration; public class MigrateDeletedAccountsCommand extends AbstractCommandWithDependencies { @@ -79,7 +81,6 @@ public class MigrateDeletedAccountsCommand extends AbstractCommandWithDependenci final int concurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT); final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT); - final String deletedAccountsTableName = configuration.getDynamoDbTables().getDeletedAccounts().getTableName(); logger.info("Crawling deleted accounts with {} segments and {} processors", segments, Runtime.getRuntime().availableProcessors()); @@ -98,8 +99,13 @@ public class MigrateDeletedAccountsCommand extends AbstractCommandWithDependenci tuple -> dryRun ? Mono.just(false) : Mono.fromFuture( - accounts.migrateDeletedAccount( - tuple.getT1(), tuple.getT2(), tuple.getT3())), + accounts.migrateDeletedAccount( + tuple.getT1(), tuple.getT2(), tuple.getT3())) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) + .onErrorResume(throwable -> { + logger.warn("Failed to migrate record for {}", tuple.getT1(), throwable); + return Mono.empty(); + }), concurrency) .filter(migrated -> migrated) .doOnNext(ignored -> recordsMigratedCounter.increment())