diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java index 06da37ea3..56120af90 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java @@ -16,6 +16,7 @@ import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; import java.security.MessageDigest; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -98,6 +99,7 @@ import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException; import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil; import org.whispersystems.textsecuregcm.websocket.WebSocketConnection; import org.whispersystems.websocket.Stories; +import reactor.core.scheduler.Schedulers; @SuppressWarnings("OptionalUsedAsFieldOrParameterType") @Path("/v1/messages") @@ -482,7 +484,7 @@ public class MessageController { @Timed @GET @Produces(MediaType.APPLICATION_JSON) - public OutgoingMessageEntityList getPendingMessages(@Auth AuthenticatedAccount auth, + public CompletableFuture getPendingMessages(@Auth AuthenticatedAccount auth, @HeaderParam(Stories.X_SIGNAL_RECEIVE_STORIES) String receiveStoriesHeader, @HeaderParam("User-Agent") String userAgent) { @@ -490,39 +492,40 @@ public class MessageController { pushNotificationManager.handleMessagesRetrieved(auth.getAccount(), auth.getAuthenticatedDevice(), userAgent); - final OutgoingMessageEntityList outgoingMessages; - { - final Pair, Boolean> messagesAndHasMore = messagesManager.getMessagesForDevice( - auth.getAccount().getUuid(), - auth.getAuthenticatedDevice().getId(), - false); + return messagesManager.getMessagesForDevice( + auth.getAccount().getUuid(), + auth.getAuthenticatedDevice().getId(), + false) + .map(messagesAndHasMore -> { + Stream envelopes = messagesAndHasMore.first().stream(); + if (!shouldReceiveStories) { + envelopes = envelopes.filter(e -> !e.getStory()); + } - Stream envelopes = messagesAndHasMore.first().stream(); - if (!shouldReceiveStories) { - envelopes = envelopes.filter(e -> !e.getStory()); - } + final OutgoingMessageEntityList messages = new OutgoingMessageEntityList(envelopes + .map(OutgoingMessageEntity::fromEnvelope) + .peek( + outgoingMessageEntity -> MessageMetrics.measureAccountOutgoingMessageUuidMismatches(auth.getAccount(), + outgoingMessageEntity)) + .collect(Collectors.toList()), + messagesAndHasMore.second()); - outgoingMessages = new OutgoingMessageEntityList(envelopes - .map(OutgoingMessageEntity::fromEnvelope) - .peek(outgoingMessageEntity -> MessageMetrics.measureAccountOutgoingMessageUuidMismatches(auth.getAccount(), - outgoingMessageEntity)) - .collect(Collectors.toList()), - messagesAndHasMore.second()); - } + String platform; - { - String platform; + try { + platform = UserAgentUtil.parseUserAgentString(userAgent).getPlatform().name().toLowerCase(); + } catch (final UnrecognizedUserAgentException ignored) { + platform = "unrecognized"; + } - try { - platform = UserAgentUtil.parseUserAgentString(userAgent).getPlatform().name().toLowerCase(); - } catch (final UnrecognizedUserAgentException ignored) { - platform = "unrecognized"; - } + Metrics.summary(OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME, "platform", platform) + .record(estimateMessageListSizeBytes(messages)); - Metrics.summary(OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME, "platform", platform).record(estimateMessageListSizeBytes(outgoingMessages)); - } - - return outgoingMessages; + return messages; + }) + .timeout(Duration.ofSeconds(5)) + .subscribeOn(Schedulers.boundedElastic()) + .toFuture(); } private static long estimateMessageListSizeBytes(final OutgoingMessageEntityList messageList) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index d11589e0e..de50f8680 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -27,7 +27,7 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Pair; import reactor.core.publisher.Flux; -import reactor.core.scheduler.Schedulers; +import reactor.core.publisher.Mono; public class MessagesManager { @@ -72,23 +72,14 @@ public class MessagesManager { return messagesCache.hasMessages(destinationUuid, destinationDevice); } - public Pair, Boolean> getMessagesForDevice(UUID destinationUuid, long destinationDevice, + public Mono, Boolean>> getMessagesForDevice(UUID destinationUuid, long destinationDevice, boolean cachedMessagesOnly) { - try { - final List envelopes = Flux.from( - getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly)) - .take(RESULT_SET_CHUNK_SIZE, true) - .collectList() - .subscribeOn(Schedulers.boundedElastic()) - .toFuture() - .get(5, TimeUnit.SECONDS); - - return new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE); - } catch (Exception e) { - throw new RuntimeException(e); - } - + return Flux.from( + getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly)) + .take(RESULT_SET_CHUNK_SIZE, true) + .collectList() + .map(envelopes -> new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE)); } public Publisher getMessagesForDeviceReactive(UUID destinationUuid, long destinationDevice, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java index 3a2caa3e1..42ac33d79 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java @@ -54,6 +54,7 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.glassfish.jersey.server.ServerProperties; import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -96,6 +97,7 @@ import org.whispersystems.textsecuregcm.tests.util.AuthHelper; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.websocket.Stories; +import reactor.core.publisher.Mono; @ExtendWith(DropwizardExtensionsSupport.class) class MessageControllerTest { @@ -138,6 +140,7 @@ class MessageControllerTest { private static final ExecutorService multiRecipientMessageExecutor = mock(ExecutorService.class); private static final ResourceExtension resources = ResourceExtension.builder() + .addProperty(ServerProperties.UNWRAP_COMPLETION_STAGE_IN_WRITER_ENABLE, Boolean.TRUE) .addProvider(AuthHelper.getAuthFilter()) .addProvider(new PolymorphicAuthValueFactoryProvider.Binder<>( ImmutableSet.of(AuthenticatedAccount.class, DisabledPermittedAuthenticatedAccount.class))) @@ -461,7 +464,7 @@ class MessageControllerTest { ); when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_UUID), eq(1L), anyBoolean())) - .thenReturn(new Pair<>(envelopes, false)); + .thenReturn(Mono.just(new Pair<>(envelopes, false))); final String userAgent = "Test-UA"; @@ -515,7 +518,7 @@ class MessageControllerTest { ); when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_UUID), eq(1L), anyBoolean())) - .thenReturn(new Pair<>(messages, false)); + .thenReturn(Mono.just(new Pair<>(messages, false))); Response response = resources.getJerseyTest().target("/v1/messages/") @@ -528,7 +531,7 @@ class MessageControllerTest { } @Test - void testDeleteMessages() throws Exception { + void testDeleteMessages() { long timestamp = System.currentTimeMillis(); UUID sourceUuid = UUID.randomUUID();