From 681a5bafb4b1f4eb24a2307451493ffb61085dd3 Mon Sep 17 00:00:00 2001 From: Chris Eager <79161849+eager-signal@users.noreply.github.com> Date: Tue, 8 Nov 2022 09:38:52 -0600 Subject: [PATCH] Update `MessagesManager#getMessagesForDevice` - add `subscribeOn()` - use `CompletableFuture` for consistency --- .../storage/MessagesManager.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 18422b557..d11589e0e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -9,7 +9,6 @@ import static com.codahale.metrics.MetricRegistry.name; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -28,6 +27,7 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Pair; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; public class MessagesManager { @@ -75,13 +75,20 @@ public class MessagesManager { public Pair, Boolean> getMessagesForDevice(UUID destinationUuid, long destinationDevice, boolean cachedMessagesOnly) { - final List envelopes = Flux.from( - getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly)) - .take(RESULT_SET_CHUNK_SIZE, true) - .collectList() - .blockOptional().orElse(Collections.emptyList()); + try { + final List envelopes = Flux.from( + getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly)) + .take(RESULT_SET_CHUNK_SIZE, true) + .collectList() + .subscribeOn(Schedulers.boundedElastic()) + .toFuture() + .get(5, TimeUnit.SECONDS); + + return new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE); + } catch (Exception e) { + throw new RuntimeException(e); + } - return new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE); } public Publisher getMessagesForDeviceReactive(UUID destinationUuid, long destinationDevice,