From f57a4171baed7da96cf9849e8cb90fa7581ef7c9 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 4 Mar 2021 11:47:13 -0500 Subject: [PATCH] Gather IP-based metrics for international, unsealed-sender messages. --- .../controllers/MessageController.java | 57 +++++++++++++++++- ..._international_unsealed_sender_metrics.lua | 19 ++++++ .../MessageControllerMetricsTest.java | 59 +++++++++++++++++++ .../controllers/MessageControllerTest.java | 10 +++- 4 files changed, 143 insertions(+), 2 deletions(-) create mode 100644 service/src/main/resources/lua/record_international_unsealed_sender_metrics.lua create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerMetricsTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java index c35ea8b4f..d3ceb3345 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java @@ -12,9 +12,11 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; import com.codahale.metrics.annotation.Timed; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import io.dropwizard.auth.Auth; import io.dropwizard.util.DataSize; +import io.lettuce.core.ScriptOutputType; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; @@ -62,6 +64,7 @@ import org.whispersystems.textsecuregcm.push.ApnFallbackManager; import org.whispersystems.textsecuregcm.push.MessageSender; import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; import org.whispersystems.textsecuregcm.push.ReceiptSender; +import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.storage.Account; @@ -100,6 +103,8 @@ public class MessageController { private final Random random = new Random(); + private final ClusterLuaScript recordInternationalUnsealedSenderMetricsScript; + private static final String SENT_MESSAGE_COUNTER_NAME = name(MessageController.class, "sentMessages"); private static final String REJECT_UNSEALED_SENDER_COUNTER_NAME = name(MessageController.class, "rejectUnsealedSenderLimit"); private static final String INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME = name(MessageController.class, "internationalUnsealedSender"); @@ -109,6 +114,9 @@ public class MessageController { private static final String CONTENT_SIZE_DISTRIBUTION_NAME = name(MessageController.class, "messageContentSize"); private static final String OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME = name(MessageController.class, "outgoingMessageListSizeBytes"); + private static final String INTERNATIONAL_UNSEALED_SENDER_IP_MESSAGE_DISTRIBUTION_NAME = name(MessageController.class, "internationalUnsealedSenderMessagesByIp"); + private static final String INTERNATIONAL_UNSEALED_SENDER_IP_DESTINATION_DISTRIBUTION_NAME = name(MessageController.class, "internationalUnsealedSenderDestinationsByIp"); + private static final String EPHEMERAL_TAG_NAME = "ephemeral"; private static final String SENDER_TYPE_TAG_NAME = "senderType"; private static final String SENDER_COUNTRY_TAG_NAME = "senderCountry"; @@ -136,6 +144,13 @@ public class MessageController { this.dynamicConfigurationManager = dynamicConfigurationManager; this.metricsCluster = metricsCluster; this.receiptExecutorService = receiptExecutorService; + + try { + recordInternationalUnsealedSenderMetricsScript = ClusterLuaScript.fromResource(metricsCluster, "lua/record_international_unsealed_sender_metrics.lua", ScriptOutputType.MULTI); + } catch (IOException e) { + // This should never happen for a script included in our own resource bundle + throw new AssertionError("Failed to load script", e); + } } @Timed @@ -249,7 +264,7 @@ public class MessageController { final Device masterDevice = source.get().getMasterDevice().get(); if (!senderCountryCode.equals(destinationCountryCode)) { - Metrics.counter(INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME, SENDER_COUNTRY_TAG_NAME, senderCountryCode).increment(); + recordInternationalUnsealedSenderMetrics(forwardedFor, senderCountryCode, destination.get().getNumber()); if (StringUtils.isAllBlank(masterDevice.getApnId(), masterDevice.getVoipApnId(), masterDevice.getGcmId()) || masterDevice.getUninstalledFeedbackTimestamp() > 0) { if (dynamicConfigurationManager.getConfiguration().getMessageRateConfiguration().getRateLimitedCountryCodes().contains(senderCountryCode)) { @@ -552,4 +567,44 @@ public class MessageController { return Optional.empty(); } } + + @SuppressWarnings("unchecked") + @VisibleForTesting + void recordInternationalUnsealedSenderMetrics(final String senderIp, final String senderCountryCode, final String destinationNumber) { + final long distinctDestinations; + final long messageCount; + { + final String destinationSetKey = getDestinationSetKey(senderIp); + final String messageCountKey = getMessageCountKey(senderIp); + + final List counts = (List) recordInternationalUnsealedSenderMetricsScript.execute( + List.of(destinationSetKey, messageCountKey), + List.of(destinationNumber)); + + distinctDestinations = counts.get(0); + messageCount = counts.get(1); + } + + Metrics.counter(INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME, SENDER_COUNTRY_TAG_NAME, senderCountryCode).increment(); + + DistributionSummary.builder(INTERNATIONAL_UNSEALED_SENDER_IP_DESTINATION_DISTRIBUTION_NAME) + .publishPercentileHistogram() + .register(Metrics.globalRegistry) + .record(distinctDestinations); + + DistributionSummary.builder(INTERNATIONAL_UNSEALED_SENDER_IP_MESSAGE_DISTRIBUTION_NAME) + .publishPercentileHistogram() + .register(Metrics.globalRegistry) + .record(messageCount); + } + + @VisibleForTesting + static String getDestinationSetKey(final String senderIp) { + return "international_unsealed_sender_destinations::{" + senderIp + "}"; + } + + @VisibleForTesting + static String getMessageCountKey(final String senderIp) { + return "international_unsealed_sender_message_count::{" + senderIp + "}"; + } } diff --git a/service/src/main/resources/lua/record_international_unsealed_sender_metrics.lua b/service/src/main/resources/lua/record_international_unsealed_sender_metrics.lua new file mode 100644 index 000000000..d29ade8ac --- /dev/null +++ b/service/src/main/resources/lua/record_international_unsealed_sender_metrics.lua @@ -0,0 +1,19 @@ +local destinationSetKey = KEYS[1] +local messageCountKey = KEYS[2] + +local destination = ARGV[1] + +redis.call("PFADD", destinationSetKey, destination) +local distinctDestinationCount = redis.call("PFCOUNT", destinationSetKey) + +if redis.call("TTL", destinationSetKey) < 0 then + redis.call("EXPIRE", destinationSetKey, 86400) +end + +local messageCount = redis.call("INCR", messageCountKey) + +if messageCount == 1 then + redis.call("EXPIRE", messageCountKey, 86400) +end + +return { distinctDestinationCount, messageCount } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerMetricsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerMetricsTest.java new file mode 100644 index 000000000..1de8fe775 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerMetricsTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.controllers; + +import org.junit.Before; +import org.junit.Test; +import org.whispersystems.textsecuregcm.limits.RateLimiters; +import org.whispersystems.textsecuregcm.push.ApnFallbackManager; +import org.whispersystems.textsecuregcm.push.MessageSender; +import org.whispersystems.textsecuregcm.push.ReceiptSender; +import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; +import org.whispersystems.textsecuregcm.storage.MessagesManager; + +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class MessageControllerMetricsTest extends AbstractRedisClusterTest { + + private MessageController messageController; + + @Before + public void setUp() throws Exception { + super.setUp(); + + messageController = new MessageController(mock(RateLimiters.class), + mock(MessageSender.class), + mock(ReceiptSender.class), + mock(AccountsManager.class), + mock(MessagesManager.class), + mock(ApnFallbackManager.class), + mock(DynamicConfigurationManager.class), + getRedisCluster(), + mock(ScheduledExecutorService.class)); + } + + @Test + public void testRecordInternationalUnsealedSenderMetrics() { + final String senderIp = "127.0.0.1"; + + messageController.recordInternationalUnsealedSenderMetrics(senderIp, "84", "+18005551234"); + messageController.recordInternationalUnsealedSenderMetrics(senderIp, "84", "+18005551234"); + + getRedisCluster().useCluster(connection -> { + assertEquals(1, (long)connection.sync().pfcount(MessageController.getDestinationSetKey(senderIp))); + assertEquals(2, Long.parseLong(connection.sync().get(MessageController.getMessageCountKey(senderIp)), 10)); + + assertTrue(connection.sync().ttl(MessageController.getDestinationSetKey(senderIp)) >= 0); + assertTrue(connection.sync().ttl(MessageController.getMessageCountKey(senderIp)) >= 0); + }); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java index aabc992ee..c998ec6a8 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java @@ -44,6 +44,8 @@ import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import io.lettuce.core.ScriptOutputType; +import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import junitparams.JUnitParamsRunner; import junitparams.Parameters; import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory; @@ -81,6 +83,7 @@ import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.tests.util.AuthHelper; +import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; import org.whispersystems.textsecuregcm.util.Base64; @RunWith(JUnitParamsRunner.class) @@ -95,6 +98,9 @@ public class MessageControllerTest { private static final String INTERNATIONAL_RECIPIENT = "+61123456789"; private static final UUID INTERNATIONAL_UUID = UUID.randomUUID(); + @SuppressWarnings("unchecked") + private final RedisAdvancedClusterCommands redisCommands = mock(RedisAdvancedClusterCommands.class); + private final MessageSender messageSender = mock(MessageSender.class); private final ReceiptSender receiptSender = mock(ReceiptSender.class); private final AccountsManager accountsManager = mock(AccountsManager.class); @@ -104,7 +110,7 @@ public class MessageControllerTest { private final CardinalityRateLimiter unsealedSenderLimiter = mock(CardinalityRateLimiter.class); private final ApnFallbackManager apnFallbackManager = mock(ApnFallbackManager.class); private final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class); - private final FaultTolerantRedisCluster metricsCluster = mock(FaultTolerantRedisCluster.class); + private final FaultTolerantRedisCluster metricsCluster = RedisClusterHelper.buildMockRedisCluster(redisCommands); private final ScheduledExecutorService receiptExecutor = mock(ScheduledExecutorService.class); private final ObjectMapper mapper = new ObjectMapper(); @@ -207,6 +213,8 @@ public class MessageControllerTest { when(messageRateConfiguration.getReceiptDelayJitter()).thenReturn(Duration.ofMillis(1)); when(messageRateConfiguration.getReceiptProbability()).thenReturn(1.0); + when(redisCommands.evalsha(any(), any(), any(), any())).thenReturn(List.of(1L, 1L)); + Response response = resources.getJerseyTest() .target(String.format("/v1/messages/%s", INTERNATIONAL_RECIPIENT))