Make `MessagesController#getPendingMessages` fully async

This commit is contained in:
Chris Eager 2022-11-11 10:43:04 -06:00 committed by Chris Eager
parent 77d691df59
commit f41bdf1acb
3 changed files with 45 additions and 48 deletions

View File

@ -16,6 +16,7 @@ import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Tags;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Base64; import java.util.Base64;
@ -98,6 +99,7 @@ import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil; import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
import org.whispersystems.textsecuregcm.websocket.WebSocketConnection; import org.whispersystems.textsecuregcm.websocket.WebSocketConnection;
import org.whispersystems.websocket.Stories; import org.whispersystems.websocket.Stories;
import reactor.core.scheduler.Schedulers;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@Path("/v1/messages") @Path("/v1/messages")
@ -482,7 +484,7 @@ public class MessageController {
@Timed @Timed
@GET @GET
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public OutgoingMessageEntityList getPendingMessages(@Auth AuthenticatedAccount auth, public CompletableFuture<OutgoingMessageEntityList> getPendingMessages(@Auth AuthenticatedAccount auth,
@HeaderParam(Stories.X_SIGNAL_RECEIVE_STORIES) String receiveStoriesHeader, @HeaderParam(Stories.X_SIGNAL_RECEIVE_STORIES) String receiveStoriesHeader,
@HeaderParam("User-Agent") String userAgent) { @HeaderParam("User-Agent") String userAgent) {
@ -490,39 +492,40 @@ public class MessageController {
pushNotificationManager.handleMessagesRetrieved(auth.getAccount(), auth.getAuthenticatedDevice(), userAgent); pushNotificationManager.handleMessagesRetrieved(auth.getAccount(), auth.getAuthenticatedDevice(), userAgent);
final OutgoingMessageEntityList outgoingMessages; return messagesManager.getMessagesForDevice(
{ auth.getAccount().getUuid(),
final Pair<List<Envelope>, Boolean> messagesAndHasMore = messagesManager.getMessagesForDevice( auth.getAuthenticatedDevice().getId(),
auth.getAccount().getUuid(), false)
auth.getAuthenticatedDevice().getId(), .map(messagesAndHasMore -> {
false); Stream<Envelope> envelopes = messagesAndHasMore.first().stream();
if (!shouldReceiveStories) {
envelopes = envelopes.filter(e -> !e.getStory());
}
Stream<Envelope> envelopes = messagesAndHasMore.first().stream(); final OutgoingMessageEntityList messages = new OutgoingMessageEntityList(envelopes
if (!shouldReceiveStories) { .map(OutgoingMessageEntity::fromEnvelope)
envelopes = envelopes.filter(e -> !e.getStory()); .peek(
} outgoingMessageEntity -> MessageMetrics.measureAccountOutgoingMessageUuidMismatches(auth.getAccount(),
outgoingMessageEntity))
.collect(Collectors.toList()),
messagesAndHasMore.second());
outgoingMessages = new OutgoingMessageEntityList(envelopes String platform;
.map(OutgoingMessageEntity::fromEnvelope)
.peek(outgoingMessageEntity -> MessageMetrics.measureAccountOutgoingMessageUuidMismatches(auth.getAccount(),
outgoingMessageEntity))
.collect(Collectors.toList()),
messagesAndHasMore.second());
}
{ try {
String platform; platform = UserAgentUtil.parseUserAgentString(userAgent).getPlatform().name().toLowerCase();
} catch (final UnrecognizedUserAgentException ignored) {
platform = "unrecognized";
}
try { Metrics.summary(OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME, "platform", platform)
platform = UserAgentUtil.parseUserAgentString(userAgent).getPlatform().name().toLowerCase(); .record(estimateMessageListSizeBytes(messages));
} catch (final UnrecognizedUserAgentException ignored) {
platform = "unrecognized";
}
Metrics.summary(OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME, "platform", platform).record(estimateMessageListSizeBytes(outgoingMessages)); return messages;
} })
.timeout(Duration.ofSeconds(5))
return outgoingMessages; .subscribeOn(Schedulers.boundedElastic())
.toFuture();
} }
private static long estimateMessageListSizeBytes(final OutgoingMessageEntityList messageList) { private static long estimateMessageListSizeBytes(final OutgoingMessageEntityList messageList) {

View File

@ -27,7 +27,7 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.Pair;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers; import reactor.core.publisher.Mono;
public class MessagesManager { public class MessagesManager {
@ -72,23 +72,14 @@ public class MessagesManager {
return messagesCache.hasMessages(destinationUuid, destinationDevice); return messagesCache.hasMessages(destinationUuid, destinationDevice);
} }
public Pair<List<Envelope>, Boolean> getMessagesForDevice(UUID destinationUuid, long destinationDevice, public Mono<Pair<List<Envelope>, Boolean>> getMessagesForDevice(UUID destinationUuid, long destinationDevice,
boolean cachedMessagesOnly) { boolean cachedMessagesOnly) {
try { return Flux.from(
final List<Envelope> envelopes = Flux.from( getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly))
getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly)) .take(RESULT_SET_CHUNK_SIZE, true)
.take(RESULT_SET_CHUNK_SIZE, true) .collectList()
.collectList() .map(envelopes -> new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE));
.subscribeOn(Schedulers.boundedElastic())
.toFuture()
.get(5, TimeUnit.SECONDS);
return new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE);
} catch (Exception e) {
throw new RuntimeException(e);
}
} }
public Publisher<Envelope> getMessagesForDeviceReactive(UUID destinationUuid, long destinationDevice, public Publisher<Envelope> getMessagesForDeviceReactive(UUID destinationUuid, long destinationDevice,

View File

@ -54,6 +54,7 @@ import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation; import javax.ws.rs.client.Invocation;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory; import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -96,6 +97,7 @@ import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.websocket.Stories; import org.whispersystems.websocket.Stories;
import reactor.core.publisher.Mono;
@ExtendWith(DropwizardExtensionsSupport.class) @ExtendWith(DropwizardExtensionsSupport.class)
class MessageControllerTest { class MessageControllerTest {
@ -138,6 +140,7 @@ class MessageControllerTest {
private static final ExecutorService multiRecipientMessageExecutor = mock(ExecutorService.class); private static final ExecutorService multiRecipientMessageExecutor = mock(ExecutorService.class);
private static final ResourceExtension resources = ResourceExtension.builder() private static final ResourceExtension resources = ResourceExtension.builder()
.addProperty(ServerProperties.UNWRAP_COMPLETION_STAGE_IN_WRITER_ENABLE, Boolean.TRUE)
.addProvider(AuthHelper.getAuthFilter()) .addProvider(AuthHelper.getAuthFilter())
.addProvider(new PolymorphicAuthValueFactoryProvider.Binder<>( .addProvider(new PolymorphicAuthValueFactoryProvider.Binder<>(
ImmutableSet.of(AuthenticatedAccount.class, DisabledPermittedAuthenticatedAccount.class))) ImmutableSet.of(AuthenticatedAccount.class, DisabledPermittedAuthenticatedAccount.class)))
@ -461,7 +464,7 @@ class MessageControllerTest {
); );
when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_UUID), eq(1L), anyBoolean())) when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_UUID), eq(1L), anyBoolean()))
.thenReturn(new Pair<>(envelopes, false)); .thenReturn(Mono.just(new Pair<>(envelopes, false)));
final String userAgent = "Test-UA"; final String userAgent = "Test-UA";
@ -515,7 +518,7 @@ class MessageControllerTest {
); );
when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_UUID), eq(1L), anyBoolean())) when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_UUID), eq(1L), anyBoolean()))
.thenReturn(new Pair<>(messages, false)); .thenReturn(Mono.just(new Pair<>(messages, false)));
Response response = Response response =
resources.getJerseyTest().target("/v1/messages/") resources.getJerseyTest().target("/v1/messages/")
@ -528,7 +531,7 @@ class MessageControllerTest {
} }
@Test @Test
void testDeleteMessages() throws Exception { void testDeleteMessages() {
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
UUID sourceUuid = UUID.randomUUID(); UUID sourceUuid = UUID.randomUUID();