Update `MessagesManager#getMessagesForDevice`

- add `subscribeOn()`
- use `CompletableFuture` for consistency
This commit is contained in:
Chris Eager 2022-11-08 09:38:52 -06:00 committed by GitHub
parent 5bec89ecc8
commit 681a5bafb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 14 additions and 7 deletions

View File

@ -9,7 +9,6 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -28,6 +27,7 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class MessagesManager {
@ -75,13 +75,20 @@ public class MessagesManager {
public Pair<List<Envelope>, Boolean> getMessagesForDevice(UUID destinationUuid, long destinationDevice,
boolean cachedMessagesOnly) {
final List<Envelope> envelopes = Flux.from(
getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly))
.take(RESULT_SET_CHUNK_SIZE, true)
.collectList()
.blockOptional().orElse(Collections.emptyList());
try {
final List<Envelope> envelopes = Flux.from(
getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly))
.take(RESULT_SET_CHUNK_SIZE, true)
.collectList()
.subscribeOn(Schedulers.boundedElastic())
.toFuture()
.get(5, TimeUnit.SECONDS);
return new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE);
} catch (Exception e) {
throw new RuntimeException(e);
}
return new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE);
}
public Publisher<Envelope> getMessagesForDeviceReactive(UUID destinationUuid, long destinationDevice,