Handle errors in deleted-accounts crawler
This commit is contained in:
parent
ab94d3045d
commit
cdd6f78c73
|
@ -18,6 +18,8 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
import reactor.util.retry.Retry;
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
public class MigrateDeletedAccountsCommand extends AbstractCommandWithDependencies {
|
public class MigrateDeletedAccountsCommand extends AbstractCommandWithDependencies {
|
||||||
|
|
||||||
|
@ -79,7 +81,6 @@ public class MigrateDeletedAccountsCommand extends AbstractCommandWithDependenci
|
||||||
final int concurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
|
final int concurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
|
||||||
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
|
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
|
||||||
|
|
||||||
final String deletedAccountsTableName = configuration.getDynamoDbTables().getDeletedAccounts().getTableName();
|
|
||||||
logger.info("Crawling deleted accounts with {} segments and {} processors",
|
logger.info("Crawling deleted accounts with {} segments and {} processors",
|
||||||
segments,
|
segments,
|
||||||
Runtime.getRuntime().availableProcessors());
|
Runtime.getRuntime().availableProcessors());
|
||||||
|
@ -98,8 +99,13 @@ public class MigrateDeletedAccountsCommand extends AbstractCommandWithDependenci
|
||||||
tuple -> dryRun
|
tuple -> dryRun
|
||||||
? Mono.just(false)
|
? Mono.just(false)
|
||||||
: Mono.fromFuture(
|
: Mono.fromFuture(
|
||||||
accounts.migrateDeletedAccount(
|
accounts.migrateDeletedAccount(
|
||||||
tuple.getT1(), tuple.getT2(), tuple.getT3())),
|
tuple.getT1(), tuple.getT2(), tuple.getT3()))
|
||||||
|
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
|
||||||
|
.onErrorResume(throwable -> {
|
||||||
|
logger.warn("Failed to migrate record for {}", tuple.getT1(), throwable);
|
||||||
|
return Mono.empty();
|
||||||
|
}),
|
||||||
concurrency)
|
concurrency)
|
||||||
.filter(migrated -> migrated)
|
.filter(migrated -> migrated)
|
||||||
.doOnNext(ignored -> recordsMigratedCounter.increment())
|
.doOnNext(ignored -> recordsMigratedCounter.increment())
|
||||||
|
|
Loading…
Reference in New Issue