use explicit `Timer` rather than micrometer annotation for send-message latency distribution
This commit is contained in:
parent
d884700b61
commit
fef57dce0d
|
@ -6,6 +6,7 @@ package org.whispersystems.textsecuregcm.controllers;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
|
|
||||||
|
import com.codahale.metrics.annotation.Timed;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
@ -13,11 +14,12 @@ import com.google.common.net.HttpHeaders;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import io.dropwizard.auth.Auth;
|
import io.dropwizard.auth.Auth;
|
||||||
import io.dropwizard.util.DataSize;
|
import io.dropwizard.util.DataSize;
|
||||||
import io.micrometer.core.annotation.Timed;
|
|
||||||
import io.micrometer.core.instrument.Counter;
|
import io.micrometer.core.instrument.Counter;
|
||||||
import io.micrometer.core.instrument.Metrics;
|
import io.micrometer.core.instrument.Metrics;
|
||||||
import io.micrometer.core.instrument.Tag;
|
import io.micrometer.core.instrument.Tag;
|
||||||
import io.micrometer.core.instrument.Tags;
|
import io.micrometer.core.instrument.Tags;
|
||||||
|
import io.micrometer.core.instrument.Timer;
|
||||||
|
import io.micrometer.core.instrument.Timer.Sample;
|
||||||
import io.swagger.v3.oas.annotations.Operation;
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
import io.swagger.v3.oas.annotations.Parameter;
|
import io.swagger.v3.oas.annotations.Parameter;
|
||||||
import io.swagger.v3.oas.annotations.media.Content;
|
import io.swagger.v3.oas.annotations.media.Content;
|
||||||
|
@ -104,6 +106,7 @@ import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
|
||||||
import org.whispersystems.textsecuregcm.limits.CardinalityEstimator;
|
import org.whispersystems.textsecuregcm.limits.CardinalityEstimator;
|
||||||
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
||||||
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
|
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||||
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
||||||
import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider;
|
import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider;
|
||||||
import org.whispersystems.textsecuregcm.push.MessageSender;
|
import org.whispersystems.textsecuregcm.push.MessageSender;
|
||||||
|
@ -174,6 +177,10 @@ public class MessageController {
|
||||||
|
|
||||||
private static final String REJECT_INVALID_ENVELOPE_TYPE = name(MessageController.class, "rejectInvalidEnvelopeType");
|
private static final String REJECT_INVALID_ENVELOPE_TYPE = name(MessageController.class, "rejectInvalidEnvelopeType");
|
||||||
private static final String UNEXPECTED_MISSING_USER_COUNTER_NAME = name(MessageController.class, "unexpectedMissingDestinationForMultiRecipientMessage");
|
private static final String UNEXPECTED_MISSING_USER_COUNTER_NAME = name(MessageController.class, "unexpectedMissingDestinationForMultiRecipientMessage");
|
||||||
|
private static final Timer SEND_MESSAGE_LATENCY_TIMER =
|
||||||
|
Timer.builder(MetricsUtil.name(MessageController.class, "sendMessageLatency"))
|
||||||
|
.publishPercentileHistogram(true)
|
||||||
|
.register(Metrics.globalRegistry);
|
||||||
|
|
||||||
private static final String EPHEMERAL_TAG_NAME = "ephemeral";
|
private static final String EPHEMERAL_TAG_NAME = "ephemeral";
|
||||||
private static final String SENDER_TYPE_TAG_NAME = "senderType";
|
private static final String SENDER_TYPE_TAG_NAME = "senderType";
|
||||||
|
@ -222,7 +229,7 @@ public class MessageController {
|
||||||
this.spamChecker = spamChecker;
|
this.spamChecker = spamChecker;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Timed(value = "chat.MessageController.sendMessageLatency", histogram = true)
|
@Timed
|
||||||
@Path("/{destination}")
|
@Path("/{destination}")
|
||||||
@PUT
|
@PUT
|
||||||
@Consumes(MediaType.APPLICATION_JSON)
|
@Consumes(MediaType.APPLICATION_JSON)
|
||||||
|
@ -236,155 +243,159 @@ public class MessageController {
|
||||||
@NotNull @Valid IncomingMessageList messages,
|
@NotNull @Valid IncomingMessageList messages,
|
||||||
@Context ContainerRequestContext context) throws RateLimitExceededException {
|
@Context ContainerRequestContext context) throws RateLimitExceededException {
|
||||||
|
|
||||||
|
final Sample sample = Timer.start();
|
||||||
if (source.isEmpty() && accessKey.isEmpty() && !isStory) {
|
|
||||||
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
|
|
||||||
}
|
|
||||||
|
|
||||||
final String senderType;
|
|
||||||
if (source.isPresent()) {
|
|
||||||
if (source.get().getAccount().isIdentifiedBy(destinationIdentifier)) {
|
|
||||||
senderType = SENDER_TYPE_SELF;
|
|
||||||
} else {
|
|
||||||
senderType = SENDER_TYPE_IDENTIFIED;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
senderType = SENDER_TYPE_UNIDENTIFIED;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean isSyncMessage = source.isPresent() && source.get().getAccount().isIdentifiedBy(destinationIdentifier);
|
|
||||||
|
|
||||||
if (isSyncMessage && destinationIdentifier.identityType() == IdentityType.PNI) {
|
|
||||||
throw new WebApplicationException(Status.FORBIDDEN);
|
|
||||||
}
|
|
||||||
|
|
||||||
Optional<Account> destination;
|
|
||||||
if (!isSyncMessage) {
|
|
||||||
destination = accountsManager.getByServiceIdentifier(destinationIdentifier);
|
|
||||||
} else {
|
|
||||||
destination = source.map(AuthenticatedAccount::getAccount);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Optional<Response> spamCheck = spamChecker.checkForSpam(
|
|
||||||
context, source.map(AuthenticatedAccount::getAccount), destination);
|
|
||||||
if (spamCheck.isPresent()) {
|
|
||||||
return spamCheck.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
final Optional<byte[]> spamReportToken = switch (senderType) {
|
|
||||||
case SENDER_TYPE_IDENTIFIED ->
|
|
||||||
reportSpamTokenProvider.makeReportSpamToken(context, source.get().getAccount(), destination);
|
|
||||||
default -> Optional.empty();
|
|
||||||
};
|
|
||||||
|
|
||||||
int totalContentLength = 0;
|
|
||||||
|
|
||||||
for (final IncomingMessage message : messages.messages()) {
|
|
||||||
int contentLength = 0;
|
|
||||||
|
|
||||||
if (StringUtils.isNotEmpty(message.content())) {
|
|
||||||
contentLength += message.content().length();
|
|
||||||
}
|
|
||||||
|
|
||||||
validateContentLength(contentLength, userAgent);
|
|
||||||
validateEnvelopeType(message.type(), userAgent);
|
|
||||||
|
|
||||||
totalContentLength += contentLength;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength);
|
if (source.isEmpty() && accessKey.isEmpty() && !isStory) {
|
||||||
} catch (final RateLimitExceededException e) {
|
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
|
||||||
if (dynamicConfigurationManager.getConfiguration().getInboundMessageByteLimitConfiguration().enforceInboundLimit()) {
|
|
||||||
messageByteLimitEstimator.add(destinationIdentifier.uuid().toString());
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Stories will be checked by the client; we bypass access checks here for stories.
|
|
||||||
if (!isStory) {
|
|
||||||
OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean needsSync = !isSyncMessage && source.isPresent() && source.get().getAccount().hasEnabledLinkedDevice();
|
final String senderType;
|
||||||
|
if (source.isPresent()) {
|
||||||
// We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify
|
if (source.get().getAccount().isIdentifiedBy(destinationIdentifier)) {
|
||||||
// we leak information about whether a destination UUID exists if we return any other code (e.g. 404) from
|
senderType = SENDER_TYPE_SELF;
|
||||||
// these requests.
|
} else {
|
||||||
if (isStory && destination.isEmpty()) {
|
senderType = SENDER_TYPE_IDENTIFIED;
|
||||||
return Response.ok(new SendMessageResponse(needsSync)).build();
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// if destination is empty we would either throw an exception in OptionalAccess.verify when isStory is false
|
|
||||||
// or else return a 200 response when isStory is true.
|
|
||||||
assert destination.isPresent();
|
|
||||||
|
|
||||||
if (source.isPresent() && !isSyncMessage) {
|
|
||||||
checkMessageRateLimit(source.get(), destination.get(), userAgent);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isStory) {
|
|
||||||
rateLimiters.getStoriesLimiter().validate(destination.get().getUuid());
|
|
||||||
}
|
|
||||||
|
|
||||||
final Set<Byte> excludedDeviceIds;
|
|
||||||
|
|
||||||
if (isSyncMessage) {
|
|
||||||
excludedDeviceIds = Set.of(source.get().getAuthenticatedDevice().getId());
|
|
||||||
} else {
|
} else {
|
||||||
excludedDeviceIds = Collections.emptySet();
|
senderType = SENDER_TYPE_UNIDENTIFIED;
|
||||||
}
|
}
|
||||||
|
|
||||||
DestinationDeviceValidator.validateCompleteDeviceList(destination.get(),
|
boolean isSyncMessage = source.isPresent() && source.get().getAccount().isIdentifiedBy(destinationIdentifier);
|
||||||
messages.messages().stream().map(IncomingMessage::destinationDeviceId).collect(Collectors.toSet()),
|
|
||||||
excludedDeviceIds);
|
|
||||||
|
|
||||||
DestinationDeviceValidator.validateRegistrationIds(destination.get(),
|
if (isSyncMessage && destinationIdentifier.identityType() == IdentityType.PNI) {
|
||||||
messages.messages(),
|
throw new WebApplicationException(Status.FORBIDDEN);
|
||||||
IncomingMessage::destinationDeviceId,
|
}
|
||||||
IncomingMessage::destinationRegistrationId,
|
|
||||||
destination.get().getPhoneNumberIdentifier().equals(destinationIdentifier.uuid()));
|
|
||||||
|
|
||||||
final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(userAgent),
|
Optional<Account> destination;
|
||||||
Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(messages.online())),
|
if (!isSyncMessage) {
|
||||||
Tag.of(SENDER_TYPE_TAG_NAME, senderType),
|
destination = accountsManager.getByServiceIdentifier(destinationIdentifier);
|
||||||
Tag.of(IDENTITY_TYPE_TAG_NAME, destinationIdentifier.identityType().name()));
|
} else {
|
||||||
|
destination = source.map(AuthenticatedAccount::getAccount);
|
||||||
|
}
|
||||||
|
|
||||||
for (IncomingMessage incomingMessage : messages.messages()) {
|
final Optional<Response> spamCheck = spamChecker.checkForSpam(
|
||||||
Optional<Device> destinationDevice = destination.get().getDevice(incomingMessage.destinationDeviceId());
|
context, source.map(AuthenticatedAccount::getAccount), destination);
|
||||||
|
if (spamCheck.isPresent()) {
|
||||||
|
return spamCheck.get();
|
||||||
|
}
|
||||||
|
|
||||||
if (destinationDevice.isPresent()) {
|
final Optional<byte[]> spamReportToken = switch (senderType) {
|
||||||
Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment();
|
case SENDER_TYPE_IDENTIFIED ->
|
||||||
sendIndividualMessage(
|
reportSpamTokenProvider.makeReportSpamToken(context, source.get().getAccount(), destination);
|
||||||
source,
|
default -> Optional.empty();
|
||||||
destination.get(),
|
};
|
||||||
destinationDevice.get(),
|
|
||||||
destinationIdentifier,
|
int totalContentLength = 0;
|
||||||
messages.timestamp(),
|
|
||||||
messages.online(),
|
for (final IncomingMessage message : messages.messages()) {
|
||||||
isStory,
|
int contentLength = 0;
|
||||||
messages.urgent(),
|
|
||||||
incomingMessage,
|
if (StringUtils.isNotEmpty(message.content())) {
|
||||||
userAgent,
|
contentLength += message.content().length();
|
||||||
spamReportToken);
|
}
|
||||||
|
|
||||||
|
validateContentLength(contentLength, userAgent);
|
||||||
|
validateEnvelopeType(message.type(), userAgent);
|
||||||
|
|
||||||
|
totalContentLength += contentLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength);
|
||||||
|
} catch (final RateLimitExceededException e) {
|
||||||
|
if (dynamicConfigurationManager.getConfiguration().getInboundMessageByteLimitConfiguration().enforceInboundLimit()) {
|
||||||
|
messageByteLimitEstimator.add(destinationIdentifier.uuid().toString());
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return Response.ok(new SendMessageResponse(needsSync)).build();
|
try {
|
||||||
} catch (NoSuchUserException e) {
|
// Stories will be checked by the client; we bypass access checks here for stories.
|
||||||
throw new WebApplicationException(Response.status(404).build());
|
if (!isStory) {
|
||||||
} catch (MismatchedDevicesException e) {
|
OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination);
|
||||||
throw new WebApplicationException(Response.status(409)
|
}
|
||||||
.type(MediaType.APPLICATION_JSON_TYPE)
|
|
||||||
.entity(new MismatchedDevices(e.getMissingDevices(),
|
boolean needsSync = !isSyncMessage && source.isPresent() && source.get().getAccount().hasEnabledLinkedDevice();
|
||||||
e.getExtraDevices()))
|
|
||||||
.build());
|
// We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify
|
||||||
} catch (StaleDevicesException e) {
|
// we leak information about whether a destination UUID exists if we return any other code (e.g. 404) from
|
||||||
throw new WebApplicationException(Response.status(410)
|
// these requests.
|
||||||
.type(MediaType.APPLICATION_JSON)
|
if (isStory && destination.isEmpty()) {
|
||||||
.entity(new StaleDevices(e.getStaleDevices()))
|
return Response.ok(new SendMessageResponse(needsSync)).build();
|
||||||
.build());
|
}
|
||||||
|
|
||||||
|
// if destination is empty we would either throw an exception in OptionalAccess.verify when isStory is false
|
||||||
|
// or else return a 200 response when isStory is true.
|
||||||
|
assert destination.isPresent();
|
||||||
|
|
||||||
|
if (source.isPresent() && !isSyncMessage) {
|
||||||
|
checkMessageRateLimit(source.get(), destination.get(), userAgent);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isStory) {
|
||||||
|
rateLimiters.getStoriesLimiter().validate(destination.get().getUuid());
|
||||||
|
}
|
||||||
|
|
||||||
|
final Set<Byte> excludedDeviceIds;
|
||||||
|
|
||||||
|
if (isSyncMessage) {
|
||||||
|
excludedDeviceIds = Set.of(source.get().getAuthenticatedDevice().getId());
|
||||||
|
} else {
|
||||||
|
excludedDeviceIds = Collections.emptySet();
|
||||||
|
}
|
||||||
|
|
||||||
|
DestinationDeviceValidator.validateCompleteDeviceList(destination.get(),
|
||||||
|
messages.messages().stream().map(IncomingMessage::destinationDeviceId).collect(Collectors.toSet()),
|
||||||
|
excludedDeviceIds);
|
||||||
|
|
||||||
|
DestinationDeviceValidator.validateRegistrationIds(destination.get(),
|
||||||
|
messages.messages(),
|
||||||
|
IncomingMessage::destinationDeviceId,
|
||||||
|
IncomingMessage::destinationRegistrationId,
|
||||||
|
destination.get().getPhoneNumberIdentifier().equals(destinationIdentifier.uuid()));
|
||||||
|
|
||||||
|
final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(userAgent),
|
||||||
|
Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(messages.online())),
|
||||||
|
Tag.of(SENDER_TYPE_TAG_NAME, senderType),
|
||||||
|
Tag.of(IDENTITY_TYPE_TAG_NAME, destinationIdentifier.identityType().name()));
|
||||||
|
|
||||||
|
for (IncomingMessage incomingMessage : messages.messages()) {
|
||||||
|
Optional<Device> destinationDevice = destination.get().getDevice(incomingMessage.destinationDeviceId());
|
||||||
|
|
||||||
|
if (destinationDevice.isPresent()) {
|
||||||
|
Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment();
|
||||||
|
sendIndividualMessage(
|
||||||
|
source,
|
||||||
|
destination.get(),
|
||||||
|
destinationDevice.get(),
|
||||||
|
destinationIdentifier,
|
||||||
|
messages.timestamp(),
|
||||||
|
messages.online(),
|
||||||
|
isStory,
|
||||||
|
messages.urgent(),
|
||||||
|
incomingMessage,
|
||||||
|
userAgent,
|
||||||
|
spamReportToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Response.ok(new SendMessageResponse(needsSync)).build();
|
||||||
|
} catch (NoSuchUserException e) {
|
||||||
|
throw new WebApplicationException(Response.status(404).build());
|
||||||
|
} catch (MismatchedDevicesException e) {
|
||||||
|
throw new WebApplicationException(Response.status(409)
|
||||||
|
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||||
|
.entity(new MismatchedDevices(e.getMissingDevices(),
|
||||||
|
e.getExtraDevices()))
|
||||||
|
.build());
|
||||||
|
} catch (StaleDevicesException e) {
|
||||||
|
throw new WebApplicationException(Response.status(410)
|
||||||
|
.type(MediaType.APPLICATION_JSON)
|
||||||
|
.entity(new StaleDevices(e.getStaleDevices()))
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
sample.stop(SEND_MESSAGE_LATENCY_TIMER);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue