Add onErrorResume and retries to eligibility check in NotifyIdleDevicesCommand
This commit is contained in:
parent
c22b8fafa6
commit
744b05244d
|
@ -16,7 +16,9 @@ import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
|||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuples;
|
||||
import reactor.util.retry.Retry;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalTime;
|
||||
|
||||
public class NotifyIdleDevicesCommand extends AbstractSinglePassCrawlAccountsCommand {
|
||||
|
@ -79,7 +81,16 @@ public class NotifyIdleDevicesCommand extends AbstractSinglePassCrawlAccountsCom
|
|||
.doOnNext(ignored -> DEVICE_INSPECTED_COUNTER.increment())
|
||||
.flatMap(accountAndDevice -> Mono.fromFuture(() ->
|
||||
idleWakeupEligibilityChecker.isDeviceEligible(accountAndDevice.getT1(), accountAndDevice.getT2()))
|
||||
.mapNotNull(eligible -> eligible ? accountAndDevice : null),
|
||||
.mapNotNull(eligible -> eligible ? accountAndDevice : null)
|
||||
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
|
||||
.onErrorResume(throwable -> {
|
||||
log.warn("Failed to check eligibility for {}:{}",
|
||||
accountAndDevice.getT1().getIdentifier(IdentityType.ACI),
|
||||
accountAndDevice.getT2().getId(),
|
||||
throwable);
|
||||
|
||||
return Mono.empty();
|
||||
}),
|
||||
maxConcurrency)
|
||||
.flatMap(accountAndDevice -> {
|
||||
final Account account = accountAndDevice.getT1();
|
||||
|
|
Loading…
Reference in New Issue