diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 18422b557..d11589e0e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -9,7 +9,6 @@ import static com.codahale.metrics.MetricRegistry.name; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -28,6 +27,7 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Pair; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; public class MessagesManager { @@ -75,13 +75,20 @@ public class MessagesManager { public Pair, Boolean> getMessagesForDevice(UUID destinationUuid, long destinationDevice, boolean cachedMessagesOnly) { - final List envelopes = Flux.from( - getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly)) - .take(RESULT_SET_CHUNK_SIZE, true) - .collectList() - .blockOptional().orElse(Collections.emptyList()); + try { + final List envelopes = Flux.from( + getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly)) + .take(RESULT_SET_CHUNK_SIZE, true) + .collectList() + .subscribeOn(Schedulers.boundedElastic()) + .toFuture() + .get(5, TimeUnit.SECONDS); + + return new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE); + } catch (Exception e) { + throw new RuntimeException(e); + } - return new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE); } public Publisher getMessagesForDeviceReactive(UUID destinationUuid, long destinationDevice,