Retry key storage attempts when migrating signed pre-keys
This commit is contained in:
parent
872ef5d0a0
commit
c3fd2e2284
|
@ -6,6 +6,7 @@
|
||||||
package org.whispersystems.textsecuregcm.workers;
|
package org.whispersystems.textsecuregcm.workers;
|
||||||
|
|
||||||
import io.micrometer.core.instrument.Metrics;
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
@ -18,6 +19,7 @@ import reactor.core.publisher.Mono;
|
||||||
import reactor.core.publisher.ParallelFlux;
|
import reactor.core.publisher.ParallelFlux;
|
||||||
import reactor.util.function.Tuple3;
|
import reactor.util.function.Tuple3;
|
||||||
import reactor.util.function.Tuples;
|
import reactor.util.function.Tuples;
|
||||||
|
import reactor.util.retry.Retry;
|
||||||
|
|
||||||
public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccountsCommand {
|
public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccountsCommand {
|
||||||
|
|
||||||
|
@ -50,8 +52,9 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun
|
||||||
|
|
||||||
return Flux.fromIterable(keys);
|
return Flux.fromIterable(keys);
|
||||||
}))
|
}))
|
||||||
.flatMap(keyTuple -> Mono.fromFuture(
|
.flatMap(keyTuple -> Mono.fromFuture(keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3()))
|
||||||
keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3())), false, MAX_CONCURRENCY)
|
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).onRetryExhaustedThrow((spec, rs) -> rs.failure())),
|
||||||
|
false, MAX_CONCURRENCY)
|
||||||
.doOnNext(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment())
|
.doOnNext(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment())
|
||||||
.then()
|
.then()
|
||||||
.block();
|
.block();
|
||||||
|
|
Loading…
Reference in New Issue