diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommand.java index 294c28c21..97a1c2e5a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommand.java @@ -84,12 +84,11 @@ public class StartPushNotificationExperimentCommand extends AbstractSinglePas getCommandDependencies().pushNotificationExperimentSamples(); accounts - .flatMap(account -> Flux.fromIterable(account.getDevices()) - .map(device -> Tuples.of(account, device))) - .doOnNext(ignored -> DEVICE_INSPECTED_COUNTER.increment()) - .filterWhen(accountAndDevice -> Mono.fromFuture(() -> - experiment.isDeviceEligible(accountAndDevice.getT1(), accountAndDevice.getT2())), - maxConcurrency) + .flatMap(account -> Flux.fromIterable(account.getDevices()).map(device -> Tuples.of(account, device))) + .doOnNext(ignored -> DEVICE_INSPECTED_COUNTER.increment()) + .flatMap(accountAndDevice -> Mono.fromFuture(() -> + experiment.isDeviceEligible(accountAndDevice.getT1(), accountAndDevice.getT2())) + .mapNotNull(eligible -> eligible ? accountAndDevice : null), maxConcurrency) .flatMap(accountAndDevice -> { final UUID accountIdentifier = accountAndDevice.getT1().getIdentifier(IdentityType.ACI); final byte deviceId = accountAndDevice.getT2().getId();