From c3fd2e22846c617d4c34b9d3edec4d20c70e87e3 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 6 Jul 2023 17:18:56 -0400 Subject: [PATCH] Retry key storage attempts when migrating signed pre-keys --- .../workers/MigrateSignedECPreKeysCommand.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java index fb2f65452..d525933fe 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java @@ -6,6 +6,7 @@ package org.whispersystems.textsecuregcm.workers; import io.micrometer.core.instrument.Metrics; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -18,6 +19,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.ParallelFlux; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; +import reactor.util.retry.Retry; public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccountsCommand { @@ -50,8 +52,9 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun return Flux.fromIterable(keys); })) - .flatMap(keyTuple -> Mono.fromFuture( - keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3())), false, MAX_CONCURRENCY) + .flatMap(keyTuple -> Mono.fromFuture(keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3())) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).onRetryExhaustedThrow((spec, rs) -> rs.failure())), + false, MAX_CONCURRENCY) .doOnNext(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment()) .then() .block();