Reduce and centralize message-sending metrics
This commit is contained in:
parent
6013d00654
commit
ffa98e5b34
|
@ -10,7 +10,6 @@ import com.codahale.metrics.annotation.Timed;
|
|||
import com.google.common.net.HttpHeaders;
|
||||
import io.dropwizard.auth.Auth;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Tag;
|
||||
import io.micrometer.core.instrument.Tags;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import io.micrometer.core.instrument.Timer.Sample;
|
||||
|
@ -151,31 +150,25 @@ public class MessageController {
|
|||
|
||||
private static final CompletableFuture<?>[] EMPTY_FUTURE_ARRAY = new CompletableFuture<?>[0];
|
||||
|
||||
private static final String SENT_MESSAGE_COUNTER_NAME = name(MessageController.class, "sentMessages");
|
||||
private static final String OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME = name(MessageController.class, "outgoingMessageListSizeBytes");
|
||||
private static final String RATE_LIMITED_MESSAGE_COUNTER_NAME = name(MessageController.class, "rateLimitedMessage");
|
||||
|
||||
private static final String SEND_MESSAGE_LATENCY_TIMER_NAME = MetricsUtil.name(MessageController.class, "sendMessageLatency");
|
||||
private static final Timer INDIVIDUAL_MESSAGE_LATENCY_TIMER;
|
||||
private static final Timer MULTI_RECIPIENT_MESSAGE_LATENCY_TIMER;
|
||||
|
||||
private static final String EPHEMERAL_TAG_NAME = "ephemeral";
|
||||
private static final String SENDER_TYPE_TAG_NAME = "senderType";
|
||||
private static final String AUTH_TYPE_TAG_NAME = "authType";
|
||||
private static final String SENDER_COUNTRY_TAG_NAME = "senderCountry";
|
||||
private static final String RATE_LIMIT_REASON_TAG_NAME = "rateLimitReason";
|
||||
private static final String IDENTITY_TYPE_TAG_NAME = "identityType";
|
||||
private static final String ENDPOINT_TYPE_TAG_NAME = "endpoint";
|
||||
static {
|
||||
final String timerName = MetricsUtil.name(MessageController.class, "sendMessageLatency");
|
||||
final String multiRecipientTagName = "multiRecipient";
|
||||
|
||||
private static final String SENDER_TYPE_IDENTIFIED = "identified";
|
||||
private static final String SENDER_TYPE_UNIDENTIFIED = "unidentified";
|
||||
private static final String SENDER_TYPE_SELF = "self";
|
||||
INDIVIDUAL_MESSAGE_LATENCY_TIMER = Timer.builder(timerName)
|
||||
.tags(multiRecipientTagName, "false")
|
||||
.publishPercentileHistogram(true)
|
||||
.register(Metrics.globalRegistry);
|
||||
|
||||
private static final String AUTH_TYPE_IDENTIFIED = "identified";
|
||||
private static final String AUTH_TYPE_ACCESS_KEY = "accessKey";
|
||||
private static final String AUTH_TYPE_GROUP_SEND_TOKEN = "groupSendToken";
|
||||
private static final String AUTH_TYPE_STORY = "story";
|
||||
|
||||
private static final String ENDPOINT_TYPE_SINGLE = "single";
|
||||
private static final String ENDPOINT_TYPE_MULTI = "multi";
|
||||
MULTI_RECIPIENT_MESSAGE_LATENCY_TIMER = Timer.builder(timerName)
|
||||
.tags(multiRecipientTagName, "true")
|
||||
.publishPercentileHistogram(true)
|
||||
.register(Metrics.globalRegistry);
|
||||
}
|
||||
|
||||
// The Signal desktop client (really, JavaScript in general) can handle message timestamps at most 100,000,000 days
|
||||
// past the epoch; please see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date#the_epoch_timestamps_and_invalid_date
|
||||
|
@ -282,13 +275,11 @@ public class MessageController {
|
|||
}
|
||||
}
|
||||
|
||||
final String senderType = source.map(
|
||||
s -> s.getAccount().isIdentifiedBy(destinationIdentifier) ? SENDER_TYPE_SELF : SENDER_TYPE_IDENTIFIED)
|
||||
.orElse(SENDER_TYPE_UNIDENTIFIED);
|
||||
|
||||
final Sample sample = Timer.start();
|
||||
|
||||
try {
|
||||
final boolean isSyncMessage = senderType.equals(SENDER_TYPE_SELF);
|
||||
final boolean isSyncMessage =
|
||||
source.map(s -> s.getAccount().isIdentifiedBy(destinationIdentifier)).orElse(false);
|
||||
|
||||
if (isSyncMessage && destinationIdentifier.identityType() == IdentityType.PNI) {
|
||||
throw new WebApplicationException(Status.FORBIDDEN);
|
||||
|
@ -359,7 +350,7 @@ public class MessageController {
|
|||
final Account destination = maybeDestination.orElseThrow();
|
||||
|
||||
if (source.isPresent() && !isSyncMessage) {
|
||||
checkMessageRateLimit(source.get(), destination, userAgent);
|
||||
rateLimiters.getMessagesLimiter().validate(source.get().getAccount().getUuid(), destination.getUuid());
|
||||
}
|
||||
|
||||
if (isStory) {
|
||||
|
@ -387,27 +378,8 @@ public class MessageController {
|
|||
final Map<Byte, Integer> registrationIdsByDeviceId = messages.messages().stream()
|
||||
.collect(Collectors.toMap(IncomingMessage::destinationDeviceId, IncomingMessage::destinationRegistrationId));
|
||||
|
||||
final String authType;
|
||||
if (SENDER_TYPE_IDENTIFIED.equals(senderType)) {
|
||||
authType = AUTH_TYPE_IDENTIFIED;
|
||||
} else if (isStory) {
|
||||
authType = AUTH_TYPE_STORY;
|
||||
} else if (groupSendToken != null) {
|
||||
authType = AUTH_TYPE_GROUP_SEND_TOKEN;
|
||||
} else {
|
||||
authType = AUTH_TYPE_ACCESS_KEY;
|
||||
}
|
||||
|
||||
messageSender.sendMessages(destination, destinationIdentifier, messagesByDeviceId, registrationIdsByDeviceId, userAgent);
|
||||
|
||||
Metrics.counter(SENT_MESSAGE_COUNTER_NAME, List.of(UserAgentTagUtil.getPlatformTag(userAgent),
|
||||
Tag.of(ENDPOINT_TYPE_TAG_NAME, ENDPOINT_TYPE_SINGLE),
|
||||
Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(messages.online())),
|
||||
Tag.of(SENDER_TYPE_TAG_NAME, senderType),
|
||||
Tag.of(AUTH_TYPE_TAG_NAME, authType),
|
||||
Tag.of(IDENTITY_TYPE_TAG_NAME, destinationIdentifier.identityType().name())))
|
||||
.increment(messagesByDeviceId.size());
|
||||
|
||||
return Response.ok(new SendMessageResponse(needsSync)).build();
|
||||
} catch (final MismatchedDevicesException e) {
|
||||
if (!e.getMismatchedDevices().staleDeviceIds().isEmpty()) {
|
||||
|
@ -426,10 +398,7 @@ public class MessageController {
|
|||
throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE);
|
||||
}
|
||||
} finally {
|
||||
sample.stop(Timer.builder(SEND_MESSAGE_LATENCY_TIMER_NAME)
|
||||
.tags(SENDER_TYPE_TAG_NAME, senderType)
|
||||
.publishPercentileHistogram(true)
|
||||
.register(Metrics.globalRegistry));
|
||||
sample.stop(INDIVIDUAL_MESSAGE_LATENCY_TIMER);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -514,6 +483,9 @@ public class MessageController {
|
|||
}
|
||||
}
|
||||
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
try {
|
||||
final SpamCheckResult<Response> spamCheckResult = spamChecker.checkForMultiRecipientSpamHttp(
|
||||
isStory ? MessageType.MULTI_RECIPIENT_STORY : MessageType.MULTI_RECIPIENT_SEALED_SENDER,
|
||||
context);
|
||||
|
@ -587,22 +559,13 @@ public class MessageController {
|
|||
}
|
||||
}
|
||||
|
||||
final String authType;
|
||||
if (isStory) {
|
||||
authType = AUTH_TYPE_STORY;
|
||||
} else if (groupSendToken != null) {
|
||||
authType = AUTH_TYPE_GROUP_SEND_TOKEN;
|
||||
} else {
|
||||
authType = AUTH_TYPE_ACCESS_KEY;
|
||||
}
|
||||
|
||||
try {
|
||||
if (!resolvedRecipients.isEmpty()) {
|
||||
messageSender.sendMultiRecipientMessage(multiRecipientMessage, resolvedRecipients, timestamp, isStory, online, isUrgent, userAgent).get();
|
||||
}
|
||||
|
||||
final List<ServiceIdentifier> unresolvedRecipientServiceIds;
|
||||
if (AUTH_TYPE_GROUP_SEND_TOKEN.equals(authType)) {
|
||||
if (groupSendToken != null) {
|
||||
unresolvedRecipientServiceIds = multiRecipientMessage.getRecipients().entrySet().stream()
|
||||
.filter(entry -> !resolvedRecipients.containsKey(entry.getValue()))
|
||||
.map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey()))
|
||||
|
@ -611,30 +574,6 @@ public class MessageController {
|
|||
unresolvedRecipientServiceIds = List.of();
|
||||
}
|
||||
|
||||
multiRecipientMessage.getRecipients().forEach((serviceId, recipient) -> {
|
||||
if (!resolvedRecipients.containsKey(recipient)) {
|
||||
// We skipped sending to this recipient because we couldn't resolve the recipient to an
|
||||
// existing account; don't increment the counter for this recipient. If the client was
|
||||
// using a GSE, track the missing recipients to include in the response.
|
||||
return;
|
||||
}
|
||||
|
||||
final String identityType = switch (serviceId) {
|
||||
case ServiceId.Aci ignored -> "ACI";
|
||||
case ServiceId.Pni ignored -> "PNI";
|
||||
default -> "unknown";
|
||||
};
|
||||
|
||||
Metrics.counter(SENT_MESSAGE_COUNTER_NAME, Tags.of(
|
||||
UserAgentTagUtil.getPlatformTag(userAgent),
|
||||
Tag.of(ENDPOINT_TYPE_TAG_NAME, ENDPOINT_TYPE_MULTI),
|
||||
Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(online)),
|
||||
Tag.of(SENDER_TYPE_TAG_NAME, SENDER_TYPE_UNIDENTIFIED),
|
||||
Tag.of(AUTH_TYPE_TAG_NAME, authType),
|
||||
Tag.of(IDENTITY_TYPE_TAG_NAME, identityType)))
|
||||
.increment(recipient.getDevices().length);
|
||||
});
|
||||
|
||||
return Response.ok(new SendMultiRecipientMessageResponse(unresolvedRecipientServiceIds)).build();
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("interrupted while delivering multi-recipient messages", e);
|
||||
|
@ -680,6 +619,9 @@ public class MessageController {
|
|||
} catch (final MessageTooLargeException e) {
|
||||
throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE);
|
||||
}
|
||||
} finally {
|
||||
sample.stop(MULTI_RECIPIENT_MESSAGE_LATENCY_TIMER);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkGroupSendToken(
|
||||
|
@ -869,21 +811,4 @@ public class MessageController {
|
|||
return Response.status(Status.ACCEPTED)
|
||||
.build();
|
||||
}
|
||||
|
||||
private void checkMessageRateLimit(AuthenticatedDevice source, Account destination, String userAgent)
|
||||
throws RateLimitExceededException {
|
||||
final String senderCountryCode = Util.getCountryCode(source.getAccount().getNumber());
|
||||
|
||||
try {
|
||||
rateLimiters.getMessagesLimiter().validate(source.getAccount().getUuid(), destination.getUuid());
|
||||
} catch (final RateLimitExceededException e) {
|
||||
Metrics.counter(RATE_LIMITED_MESSAGE_COUNTER_NAME,
|
||||
Tags.of(
|
||||
UserAgentTagUtil.getPlatformTag(userAgent),
|
||||
Tag.of(SENDER_COUNTRY_TAG_NAME, senderCountryCode),
|
||||
Tag.of(RATE_LIMIT_REASON_TAG_NAME, "singleDestinationRate"))).increment();
|
||||
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,20 +55,18 @@ public class MessageSender {
|
|||
|
||||
// Note that these names deliberately reference `MessageController` for metric continuity
|
||||
private static final String REJECT_OVERSIZE_MESSAGE_COUNTER_NAME = name(MessageController.class, "rejectOversizeMessage");
|
||||
private static final String LARGE_BUT_NOT_OVERSIZE_MESSAGE_COUNTER_NAME = name(MessageController.class, "largeMessage");
|
||||
private static final String CONTENT_SIZE_DISTRIBUTION_NAME = MetricsUtil.name(MessageController.class, "messageContentSize");
|
||||
|
||||
private static final String SEND_COUNTER_NAME = name(MessageSender.class, "sendMessage");
|
||||
private static final String CHANNEL_TAG_NAME = "channel";
|
||||
private static final String EPHEMERAL_TAG_NAME = "ephemeral";
|
||||
private static final String CLIENT_ONLINE_TAG_NAME = "clientOnline";
|
||||
private static final String URGENT_TAG_NAME = "urgent";
|
||||
private static final String STORY_TAG_NAME = "story";
|
||||
private static final String SEALED_SENDER_TAG_NAME = "sealedSender";
|
||||
private static final String MULTI_RECIPIENT_TAG_NAME = "multiRecipient";
|
||||
|
||||
@VisibleForTesting
|
||||
public static final int MAX_MESSAGE_SIZE = (int) DataSize.kibibytes(256).toBytes();
|
||||
private static final long LARGE_MESSAGE_SIZE = DataSize.kibibytes(8).toBytes();
|
||||
|
||||
@VisibleForTesting
|
||||
static final byte NO_EXCLUDED_DEVICE_ID = -1;
|
||||
|
@ -137,14 +135,16 @@ public class MessageSender {
|
|||
}
|
||||
}
|
||||
|
||||
Metrics.counter(SEND_COUNTER_NAME,
|
||||
CHANNEL_TAG_NAME, destination.getDevice(deviceId).map(MessageSender::getDeliveryChannelName).orElse("unknown"),
|
||||
final Tags tags = Tags.of(
|
||||
EPHEMERAL_TAG_NAME, String.valueOf(message.getEphemeral()),
|
||||
CLIENT_ONLINE_TAG_NAME, String.valueOf(destinationPresent),
|
||||
URGENT_TAG_NAME, String.valueOf(message.getUrgent()),
|
||||
STORY_TAG_NAME, String.valueOf(message.getStory()),
|
||||
SEALED_SENDER_TAG_NAME, String.valueOf(!message.hasSourceServiceId()))
|
||||
.increment();
|
||||
SEALED_SENDER_TAG_NAME, String.valueOf(!message.hasSourceServiceId()),
|
||||
MULTI_RECIPIENT_TAG_NAME, "false")
|
||||
.and(UserAgentTagUtil.getPlatformTag(userAgent));
|
||||
|
||||
Metrics.counter(SEND_COUNTER_NAME, tags).increment();
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -219,32 +219,20 @@ public class MessageSender {
|
|||
}
|
||||
}
|
||||
|
||||
Metrics.counter(SEND_COUNTER_NAME,
|
||||
CHANNEL_TAG_NAME,
|
||||
account.getDevice(deviceId).map(MessageSender::getDeliveryChannelName).orElse("unknown"),
|
||||
final Tags tags = Tags.of(
|
||||
EPHEMERAL_TAG_NAME, String.valueOf(isEphemeral),
|
||||
CLIENT_ONLINE_TAG_NAME, String.valueOf(clientPresent),
|
||||
URGENT_TAG_NAME, String.valueOf(isUrgent),
|
||||
STORY_TAG_NAME, String.valueOf(isStory),
|
||||
SEALED_SENDER_TAG_NAME, String.valueOf(true))
|
||||
.increment();
|
||||
SEALED_SENDER_TAG_NAME, "true",
|
||||
MULTI_RECIPIENT_TAG_NAME, "true")
|
||||
.and(UserAgentTagUtil.getPlatformTag(userAgent));
|
||||
|
||||
Metrics.counter(SEND_COUNTER_NAME, tags).increment();
|
||||
})))
|
||||
.thenRun(Util.NOOP);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getDeliveryChannelName(final Device device) {
|
||||
if (device.getGcmId() != null) {
|
||||
return "gcm";
|
||||
} else if (device.getApnId() != null) {
|
||||
return "apn";
|
||||
} else if (device.getFetchesMessages()) {
|
||||
return "websocket";
|
||||
} else {
|
||||
return "none";
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static void validateContentLength(final int contentLength,
|
||||
final boolean isMultiRecipientMessage,
|
||||
|
@ -273,13 +261,6 @@ public class MessageSender {
|
|||
|
||||
throw new MessageTooLargeException();
|
||||
}
|
||||
|
||||
if (contentLength > LARGE_MESSAGE_SIZE) {
|
||||
Metrics.counter(
|
||||
LARGE_BUT_NOT_OVERSIZE_MESSAGE_COUNTER_NAME,
|
||||
Tags.of(UserAgentTagUtil.getPlatformTag(userAgent), Tag.of("multiRecipientMessage", String.valueOf(isMultiRecipientMessage))))
|
||||
.increment();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -18,7 +18,6 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.verifyNoInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -254,41 +253,6 @@ class MessageSenderTest {
|
|||
mismatchedDevicesException.getMismatchedDevicesByServiceIdentifier());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource
|
||||
void getDeliveryChannelName(final Device device, final String expectedChannelName) {
|
||||
assertEquals(expectedChannelName, MessageSender.getDeliveryChannelName(device));
|
||||
}
|
||||
|
||||
private static List<Arguments> getDeliveryChannelName() {
|
||||
final List<Arguments> arguments = new ArrayList<>();
|
||||
|
||||
{
|
||||
final Device apnDevice = mock(Device.class);
|
||||
when(apnDevice.getApnId()).thenReturn("apns-token");
|
||||
|
||||
arguments.add(Arguments.of(apnDevice, "apn"));
|
||||
}
|
||||
|
||||
{
|
||||
final Device fcmDevice = mock(Device.class);
|
||||
when(fcmDevice.getGcmId()).thenReturn("fcm-token");
|
||||
|
||||
arguments.add(Arguments.of(fcmDevice, "gcm"));
|
||||
}
|
||||
|
||||
{
|
||||
final Device fetchesMessagesDevice = mock(Device.class);
|
||||
when(fetchesMessagesDevice.getFetchesMessages()).thenReturn(true);
|
||||
|
||||
arguments.add(Arguments.of(fetchesMessagesDevice, "websocket"));
|
||||
}
|
||||
|
||||
arguments.add(Arguments.of(mock(Device.class), "none"));
|
||||
|
||||
return arguments;
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateContentLength() {
|
||||
assertThrows(MessageTooLargeException.class, () ->
|
||||
|
|
Loading…
Reference in New Issue