From fef57dce0d4af0e5e7d208644a9a7c591e4d942e Mon Sep 17 00:00:00 2001 From: Jonathan Klabunde Tomer <125505367+jkt-signal@users.noreply.github.com> Date: Thu, 15 Feb 2024 14:58:43 -0800 Subject: [PATCH] use explicit `Timer` rather than micrometer annotation for send-message latency distribution --- .../controllers/MessageController.java | 287 +++++++++--------- 1 file changed, 149 insertions(+), 138 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java index 78f043ee9..7befad701 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java @@ -6,6 +6,7 @@ package org.whispersystems.textsecuregcm.controllers; import static com.codahale.metrics.MetricRegistry.name; +import com.codahale.metrics.annotation.Timed; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -13,11 +14,12 @@ import com.google.common.net.HttpHeaders; import com.google.protobuf.ByteString; import io.dropwizard.auth.Auth; import io.dropwizard.util.DataSize; -import io.micrometer.core.annotation.Timed; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.Timer.Sample; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.media.Content; @@ -104,6 +106,7 @@ import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; import org.whispersystems.textsecuregcm.limits.CardinalityEstimator; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider; import org.whispersystems.textsecuregcm.push.MessageSender; @@ -174,6 +177,10 @@ public class MessageController { private static final String REJECT_INVALID_ENVELOPE_TYPE = name(MessageController.class, "rejectInvalidEnvelopeType"); private static final String UNEXPECTED_MISSING_USER_COUNTER_NAME = name(MessageController.class, "unexpectedMissingDestinationForMultiRecipientMessage"); + private static final Timer SEND_MESSAGE_LATENCY_TIMER = + Timer.builder(MetricsUtil.name(MessageController.class, "sendMessageLatency")) + .publishPercentileHistogram(true) + .register(Metrics.globalRegistry); private static final String EPHEMERAL_TAG_NAME = "ephemeral"; private static final String SENDER_TYPE_TAG_NAME = "senderType"; @@ -222,7 +229,7 @@ public class MessageController { this.spamChecker = spamChecker; } - @Timed(value = "chat.MessageController.sendMessageLatency", histogram = true) + @Timed @Path("/{destination}") @PUT @Consumes(MediaType.APPLICATION_JSON) @@ -236,155 +243,159 @@ public class MessageController { @NotNull @Valid IncomingMessageList messages, @Context ContainerRequestContext context) throws RateLimitExceededException { - - if (source.isEmpty() && accessKey.isEmpty() && !isStory) { - throw new WebApplicationException(Response.Status.UNAUTHORIZED); - } - - final String senderType; - if (source.isPresent()) { - if (source.get().getAccount().isIdentifiedBy(destinationIdentifier)) { - senderType = SENDER_TYPE_SELF; - } else { - senderType = SENDER_TYPE_IDENTIFIED; - } - } else { - senderType = SENDER_TYPE_UNIDENTIFIED; - } - - boolean isSyncMessage = source.isPresent() && source.get().getAccount().isIdentifiedBy(destinationIdentifier); - - if (isSyncMessage && destinationIdentifier.identityType() == IdentityType.PNI) { - throw new WebApplicationException(Status.FORBIDDEN); - } - - Optional destination; - if (!isSyncMessage) { - destination = accountsManager.getByServiceIdentifier(destinationIdentifier); - } else { - destination = source.map(AuthenticatedAccount::getAccount); - } - - final Optional spamCheck = spamChecker.checkForSpam( - context, source.map(AuthenticatedAccount::getAccount), destination); - if (spamCheck.isPresent()) { - return spamCheck.get(); - } - - final Optional spamReportToken = switch (senderType) { - case SENDER_TYPE_IDENTIFIED -> - reportSpamTokenProvider.makeReportSpamToken(context, source.get().getAccount(), destination); - default -> Optional.empty(); - }; - - int totalContentLength = 0; - - for (final IncomingMessage message : messages.messages()) { - int contentLength = 0; - - if (StringUtils.isNotEmpty(message.content())) { - contentLength += message.content().length(); - } - - validateContentLength(contentLength, userAgent); - validateEnvelopeType(message.type(), userAgent); - - totalContentLength += contentLength; - } - + final Sample sample = Timer.start(); try { - rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength); - } catch (final RateLimitExceededException e) { - if (dynamicConfigurationManager.getConfiguration().getInboundMessageByteLimitConfiguration().enforceInboundLimit()) { - messageByteLimitEstimator.add(destinationIdentifier.uuid().toString()); - throw e; - } - } - - try { - // Stories will be checked by the client; we bypass access checks here for stories. - if (!isStory) { - OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination); + if (source.isEmpty() && accessKey.isEmpty() && !isStory) { + throw new WebApplicationException(Response.Status.UNAUTHORIZED); } - boolean needsSync = !isSyncMessage && source.isPresent() && source.get().getAccount().hasEnabledLinkedDevice(); - - // We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify - // we leak information about whether a destination UUID exists if we return any other code (e.g. 404) from - // these requests. - if (isStory && destination.isEmpty()) { - return Response.ok(new SendMessageResponse(needsSync)).build(); - } - - // if destination is empty we would either throw an exception in OptionalAccess.verify when isStory is false - // or else return a 200 response when isStory is true. - assert destination.isPresent(); - - if (source.isPresent() && !isSyncMessage) { - checkMessageRateLimit(source.get(), destination.get(), userAgent); - } - - if (isStory) { - rateLimiters.getStoriesLimiter().validate(destination.get().getUuid()); - } - - final Set excludedDeviceIds; - - if (isSyncMessage) { - excludedDeviceIds = Set.of(source.get().getAuthenticatedDevice().getId()); + final String senderType; + if (source.isPresent()) { + if (source.get().getAccount().isIdentifiedBy(destinationIdentifier)) { + senderType = SENDER_TYPE_SELF; + } else { + senderType = SENDER_TYPE_IDENTIFIED; + } } else { - excludedDeviceIds = Collections.emptySet(); + senderType = SENDER_TYPE_UNIDENTIFIED; } - DestinationDeviceValidator.validateCompleteDeviceList(destination.get(), - messages.messages().stream().map(IncomingMessage::destinationDeviceId).collect(Collectors.toSet()), - excludedDeviceIds); + boolean isSyncMessage = source.isPresent() && source.get().getAccount().isIdentifiedBy(destinationIdentifier); - DestinationDeviceValidator.validateRegistrationIds(destination.get(), - messages.messages(), - IncomingMessage::destinationDeviceId, - IncomingMessage::destinationRegistrationId, - destination.get().getPhoneNumberIdentifier().equals(destinationIdentifier.uuid())); + if (isSyncMessage && destinationIdentifier.identityType() == IdentityType.PNI) { + throw new WebApplicationException(Status.FORBIDDEN); + } - final List tags = List.of(UserAgentTagUtil.getPlatformTag(userAgent), - Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(messages.online())), - Tag.of(SENDER_TYPE_TAG_NAME, senderType), - Tag.of(IDENTITY_TYPE_TAG_NAME, destinationIdentifier.identityType().name())); + Optional destination; + if (!isSyncMessage) { + destination = accountsManager.getByServiceIdentifier(destinationIdentifier); + } else { + destination = source.map(AuthenticatedAccount::getAccount); + } - for (IncomingMessage incomingMessage : messages.messages()) { - Optional destinationDevice = destination.get().getDevice(incomingMessage.destinationDeviceId()); + final Optional spamCheck = spamChecker.checkForSpam( + context, source.map(AuthenticatedAccount::getAccount), destination); + if (spamCheck.isPresent()) { + return spamCheck.get(); + } - if (destinationDevice.isPresent()) { - Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment(); - sendIndividualMessage( - source, - destination.get(), - destinationDevice.get(), - destinationIdentifier, - messages.timestamp(), - messages.online(), - isStory, - messages.urgent(), - incomingMessage, - userAgent, - spamReportToken); + final Optional spamReportToken = switch (senderType) { + case SENDER_TYPE_IDENTIFIED -> + reportSpamTokenProvider.makeReportSpamToken(context, source.get().getAccount(), destination); + default -> Optional.empty(); + }; + + int totalContentLength = 0; + + for (final IncomingMessage message : messages.messages()) { + int contentLength = 0; + + if (StringUtils.isNotEmpty(message.content())) { + contentLength += message.content().length(); + } + + validateContentLength(contentLength, userAgent); + validateEnvelopeType(message.type(), userAgent); + + totalContentLength += contentLength; + } + + try { + rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength); + } catch (final RateLimitExceededException e) { + if (dynamicConfigurationManager.getConfiguration().getInboundMessageByteLimitConfiguration().enforceInboundLimit()) { + messageByteLimitEstimator.add(destinationIdentifier.uuid().toString()); + throw e; } } - return Response.ok(new SendMessageResponse(needsSync)).build(); - } catch (NoSuchUserException e) { - throw new WebApplicationException(Response.status(404).build()); - } catch (MismatchedDevicesException e) { - throw new WebApplicationException(Response.status(409) - .type(MediaType.APPLICATION_JSON_TYPE) - .entity(new MismatchedDevices(e.getMissingDevices(), - e.getExtraDevices())) - .build()); - } catch (StaleDevicesException e) { - throw new WebApplicationException(Response.status(410) - .type(MediaType.APPLICATION_JSON) - .entity(new StaleDevices(e.getStaleDevices())) - .build()); + try { + // Stories will be checked by the client; we bypass access checks here for stories. + if (!isStory) { + OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination); + } + + boolean needsSync = !isSyncMessage && source.isPresent() && source.get().getAccount().hasEnabledLinkedDevice(); + + // We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify + // we leak information about whether a destination UUID exists if we return any other code (e.g. 404) from + // these requests. + if (isStory && destination.isEmpty()) { + return Response.ok(new SendMessageResponse(needsSync)).build(); + } + + // if destination is empty we would either throw an exception in OptionalAccess.verify when isStory is false + // or else return a 200 response when isStory is true. + assert destination.isPresent(); + + if (source.isPresent() && !isSyncMessage) { + checkMessageRateLimit(source.get(), destination.get(), userAgent); + } + + if (isStory) { + rateLimiters.getStoriesLimiter().validate(destination.get().getUuid()); + } + + final Set excludedDeviceIds; + + if (isSyncMessage) { + excludedDeviceIds = Set.of(source.get().getAuthenticatedDevice().getId()); + } else { + excludedDeviceIds = Collections.emptySet(); + } + + DestinationDeviceValidator.validateCompleteDeviceList(destination.get(), + messages.messages().stream().map(IncomingMessage::destinationDeviceId).collect(Collectors.toSet()), + excludedDeviceIds); + + DestinationDeviceValidator.validateRegistrationIds(destination.get(), + messages.messages(), + IncomingMessage::destinationDeviceId, + IncomingMessage::destinationRegistrationId, + destination.get().getPhoneNumberIdentifier().equals(destinationIdentifier.uuid())); + + final List tags = List.of(UserAgentTagUtil.getPlatformTag(userAgent), + Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(messages.online())), + Tag.of(SENDER_TYPE_TAG_NAME, senderType), + Tag.of(IDENTITY_TYPE_TAG_NAME, destinationIdentifier.identityType().name())); + + for (IncomingMessage incomingMessage : messages.messages()) { + Optional destinationDevice = destination.get().getDevice(incomingMessage.destinationDeviceId()); + + if (destinationDevice.isPresent()) { + Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment(); + sendIndividualMessage( + source, + destination.get(), + destinationDevice.get(), + destinationIdentifier, + messages.timestamp(), + messages.online(), + isStory, + messages.urgent(), + incomingMessage, + userAgent, + spamReportToken); + } + } + + return Response.ok(new SendMessageResponse(needsSync)).build(); + } catch (NoSuchUserException e) { + throw new WebApplicationException(Response.status(404).build()); + } catch (MismatchedDevicesException e) { + throw new WebApplicationException(Response.status(409) + .type(MediaType.APPLICATION_JSON_TYPE) + .entity(new MismatchedDevices(e.getMissingDevices(), + e.getExtraDevices())) + .build()); + } catch (StaleDevicesException e) { + throw new WebApplicationException(Response.status(410) + .type(MediaType.APPLICATION_JSON) + .entity(new StaleDevices(e.getStaleDevices())) + .build()); + } + } finally { + sample.stop(SEND_MESSAGE_LATENCY_TIMER); } }