Gather IP-based metrics for international, unsealed-sender messages.
This commit is contained in:
parent
df9dc82de5
commit
f57a4171ba
|
@ -12,9 +12,11 @@ import com.codahale.metrics.MetricRegistry;
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
import com.codahale.metrics.annotation.Timed;
|
import com.codahale.metrics.annotation.Timed;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import io.dropwizard.auth.Auth;
|
import io.dropwizard.auth.Auth;
|
||||||
import io.dropwizard.util.DataSize;
|
import io.dropwizard.util.DataSize;
|
||||||
|
import io.lettuce.core.ScriptOutputType;
|
||||||
import io.micrometer.core.instrument.DistributionSummary;
|
import io.micrometer.core.instrument.DistributionSummary;
|
||||||
import io.micrometer.core.instrument.Metrics;
|
import io.micrometer.core.instrument.Metrics;
|
||||||
import io.micrometer.core.instrument.Tag;
|
import io.micrometer.core.instrument.Tag;
|
||||||
|
@ -62,6 +64,7 @@ import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||||
import org.whispersystems.textsecuregcm.push.MessageSender;
|
import org.whispersystems.textsecuregcm.push.MessageSender;
|
||||||
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
|
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
|
||||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||||
|
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||||
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
|
@ -100,6 +103,8 @@ public class MessageController {
|
||||||
|
|
||||||
private final Random random = new Random();
|
private final Random random = new Random();
|
||||||
|
|
||||||
|
private final ClusterLuaScript recordInternationalUnsealedSenderMetricsScript;
|
||||||
|
|
||||||
private static final String SENT_MESSAGE_COUNTER_NAME = name(MessageController.class, "sentMessages");
|
private static final String SENT_MESSAGE_COUNTER_NAME = name(MessageController.class, "sentMessages");
|
||||||
private static final String REJECT_UNSEALED_SENDER_COUNTER_NAME = name(MessageController.class, "rejectUnsealedSenderLimit");
|
private static final String REJECT_UNSEALED_SENDER_COUNTER_NAME = name(MessageController.class, "rejectUnsealedSenderLimit");
|
||||||
private static final String INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME = name(MessageController.class, "internationalUnsealedSender");
|
private static final String INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME = name(MessageController.class, "internationalUnsealedSender");
|
||||||
|
@ -109,6 +114,9 @@ public class MessageController {
|
||||||
private static final String CONTENT_SIZE_DISTRIBUTION_NAME = name(MessageController.class, "messageContentSize");
|
private static final String CONTENT_SIZE_DISTRIBUTION_NAME = name(MessageController.class, "messageContentSize");
|
||||||
private static final String OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME = name(MessageController.class, "outgoingMessageListSizeBytes");
|
private static final String OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME = name(MessageController.class, "outgoingMessageListSizeBytes");
|
||||||
|
|
||||||
|
private static final String INTERNATIONAL_UNSEALED_SENDER_IP_MESSAGE_DISTRIBUTION_NAME = name(MessageController.class, "internationalUnsealedSenderMessagesByIp");
|
||||||
|
private static final String INTERNATIONAL_UNSEALED_SENDER_IP_DESTINATION_DISTRIBUTION_NAME = name(MessageController.class, "internationalUnsealedSenderDestinationsByIp");
|
||||||
|
|
||||||
private static final String EPHEMERAL_TAG_NAME = "ephemeral";
|
private static final String EPHEMERAL_TAG_NAME = "ephemeral";
|
||||||
private static final String SENDER_TYPE_TAG_NAME = "senderType";
|
private static final String SENDER_TYPE_TAG_NAME = "senderType";
|
||||||
private static final String SENDER_COUNTRY_TAG_NAME = "senderCountry";
|
private static final String SENDER_COUNTRY_TAG_NAME = "senderCountry";
|
||||||
|
@ -136,6 +144,13 @@ public class MessageController {
|
||||||
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
||||||
this.metricsCluster = metricsCluster;
|
this.metricsCluster = metricsCluster;
|
||||||
this.receiptExecutorService = receiptExecutorService;
|
this.receiptExecutorService = receiptExecutorService;
|
||||||
|
|
||||||
|
try {
|
||||||
|
recordInternationalUnsealedSenderMetricsScript = ClusterLuaScript.fromResource(metricsCluster, "lua/record_international_unsealed_sender_metrics.lua", ScriptOutputType.MULTI);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// This should never happen for a script included in our own resource bundle
|
||||||
|
throw new AssertionError("Failed to load script", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Timed
|
@Timed
|
||||||
|
@ -249,7 +264,7 @@ public class MessageController {
|
||||||
final Device masterDevice = source.get().getMasterDevice().get();
|
final Device masterDevice = source.get().getMasterDevice().get();
|
||||||
|
|
||||||
if (!senderCountryCode.equals(destinationCountryCode)) {
|
if (!senderCountryCode.equals(destinationCountryCode)) {
|
||||||
Metrics.counter(INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME, SENDER_COUNTRY_TAG_NAME, senderCountryCode).increment();
|
recordInternationalUnsealedSenderMetrics(forwardedFor, senderCountryCode, destination.get().getNumber());
|
||||||
|
|
||||||
if (StringUtils.isAllBlank(masterDevice.getApnId(), masterDevice.getVoipApnId(), masterDevice.getGcmId()) || masterDevice.getUninstalledFeedbackTimestamp() > 0) {
|
if (StringUtils.isAllBlank(masterDevice.getApnId(), masterDevice.getVoipApnId(), masterDevice.getGcmId()) || masterDevice.getUninstalledFeedbackTimestamp() > 0) {
|
||||||
if (dynamicConfigurationManager.getConfiguration().getMessageRateConfiguration().getRateLimitedCountryCodes().contains(senderCountryCode)) {
|
if (dynamicConfigurationManager.getConfiguration().getMessageRateConfiguration().getRateLimitedCountryCodes().contains(senderCountryCode)) {
|
||||||
|
@ -552,4 +567,44 @@ public class MessageController {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@VisibleForTesting
|
||||||
|
void recordInternationalUnsealedSenderMetrics(final String senderIp, final String senderCountryCode, final String destinationNumber) {
|
||||||
|
final long distinctDestinations;
|
||||||
|
final long messageCount;
|
||||||
|
{
|
||||||
|
final String destinationSetKey = getDestinationSetKey(senderIp);
|
||||||
|
final String messageCountKey = getMessageCountKey(senderIp);
|
||||||
|
|
||||||
|
final List<Long> counts = (List<Long>) recordInternationalUnsealedSenderMetricsScript.execute(
|
||||||
|
List.of(destinationSetKey, messageCountKey),
|
||||||
|
List.of(destinationNumber));
|
||||||
|
|
||||||
|
distinctDestinations = counts.get(0);
|
||||||
|
messageCount = counts.get(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
Metrics.counter(INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME, SENDER_COUNTRY_TAG_NAME, senderCountryCode).increment();
|
||||||
|
|
||||||
|
DistributionSummary.builder(INTERNATIONAL_UNSEALED_SENDER_IP_DESTINATION_DISTRIBUTION_NAME)
|
||||||
|
.publishPercentileHistogram()
|
||||||
|
.register(Metrics.globalRegistry)
|
||||||
|
.record(distinctDestinations);
|
||||||
|
|
||||||
|
DistributionSummary.builder(INTERNATIONAL_UNSEALED_SENDER_IP_MESSAGE_DISTRIBUTION_NAME)
|
||||||
|
.publishPercentileHistogram()
|
||||||
|
.register(Metrics.globalRegistry)
|
||||||
|
.record(messageCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static String getDestinationSetKey(final String senderIp) {
|
||||||
|
return "international_unsealed_sender_destinations::{" + senderIp + "}";
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static String getMessageCountKey(final String senderIp) {
|
||||||
|
return "international_unsealed_sender_message_count::{" + senderIp + "}";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
local destinationSetKey = KEYS[1]
|
||||||
|
local messageCountKey = KEYS[2]
|
||||||
|
|
||||||
|
local destination = ARGV[1]
|
||||||
|
|
||||||
|
redis.call("PFADD", destinationSetKey, destination)
|
||||||
|
local distinctDestinationCount = redis.call("PFCOUNT", destinationSetKey)
|
||||||
|
|
||||||
|
if redis.call("TTL", destinationSetKey) < 0 then
|
||||||
|
redis.call("EXPIRE", destinationSetKey, 86400)
|
||||||
|
end
|
||||||
|
|
||||||
|
local messageCount = redis.call("INCR", messageCountKey)
|
||||||
|
|
||||||
|
if messageCount == 1 then
|
||||||
|
redis.call("EXPIRE", messageCountKey, 86400)
|
||||||
|
end
|
||||||
|
|
||||||
|
return { distinctDestinationCount, messageCount }
|
|
@ -0,0 +1,59 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2021 Signal Messenger, LLC
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.whispersystems.textsecuregcm.controllers;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
||||||
|
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||||
|
import org.whispersystems.textsecuregcm.push.MessageSender;
|
||||||
|
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||||
|
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||||
|
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
public class MessageControllerMetricsTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
|
private MessageController messageController;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
messageController = new MessageController(mock(RateLimiters.class),
|
||||||
|
mock(MessageSender.class),
|
||||||
|
mock(ReceiptSender.class),
|
||||||
|
mock(AccountsManager.class),
|
||||||
|
mock(MessagesManager.class),
|
||||||
|
mock(ApnFallbackManager.class),
|
||||||
|
mock(DynamicConfigurationManager.class),
|
||||||
|
getRedisCluster(),
|
||||||
|
mock(ScheduledExecutorService.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecordInternationalUnsealedSenderMetrics() {
|
||||||
|
final String senderIp = "127.0.0.1";
|
||||||
|
|
||||||
|
messageController.recordInternationalUnsealedSenderMetrics(senderIp, "84", "+18005551234");
|
||||||
|
messageController.recordInternationalUnsealedSenderMetrics(senderIp, "84", "+18005551234");
|
||||||
|
|
||||||
|
getRedisCluster().useCluster(connection -> {
|
||||||
|
assertEquals(1, (long)connection.sync().pfcount(MessageController.getDestinationSetKey(senderIp)));
|
||||||
|
assertEquals(2, Long.parseLong(connection.sync().get(MessageController.getMessageCountKey(senderIp)), 10));
|
||||||
|
|
||||||
|
assertTrue(connection.sync().ttl(MessageController.getDestinationSetKey(senderIp)) >= 0);
|
||||||
|
assertTrue(connection.sync().ttl(MessageController.getMessageCountKey(senderIp)) >= 0);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
|
@ -44,6 +44,8 @@ import java.util.concurrent.TimeUnit;
|
||||||
import javax.ws.rs.client.Entity;
|
import javax.ws.rs.client.Entity;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
|
import io.lettuce.core.ScriptOutputType;
|
||||||
|
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
|
||||||
import junitparams.JUnitParamsRunner;
|
import junitparams.JUnitParamsRunner;
|
||||||
import junitparams.Parameters;
|
import junitparams.Parameters;
|
||||||
import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory;
|
import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory;
|
||||||
|
@ -81,6 +83,7 @@ import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||||
import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
|
import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
|
||||||
|
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
|
||||||
import org.whispersystems.textsecuregcm.util.Base64;
|
import org.whispersystems.textsecuregcm.util.Base64;
|
||||||
|
|
||||||
@RunWith(JUnitParamsRunner.class)
|
@RunWith(JUnitParamsRunner.class)
|
||||||
|
@ -95,6 +98,9 @@ public class MessageControllerTest {
|
||||||
private static final String INTERNATIONAL_RECIPIENT = "+61123456789";
|
private static final String INTERNATIONAL_RECIPIENT = "+61123456789";
|
||||||
private static final UUID INTERNATIONAL_UUID = UUID.randomUUID();
|
private static final UUID INTERNATIONAL_UUID = UUID.randomUUID();
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private final RedisAdvancedClusterCommands<String, String> redisCommands = mock(RedisAdvancedClusterCommands.class);
|
||||||
|
|
||||||
private final MessageSender messageSender = mock(MessageSender.class);
|
private final MessageSender messageSender = mock(MessageSender.class);
|
||||||
private final ReceiptSender receiptSender = mock(ReceiptSender.class);
|
private final ReceiptSender receiptSender = mock(ReceiptSender.class);
|
||||||
private final AccountsManager accountsManager = mock(AccountsManager.class);
|
private final AccountsManager accountsManager = mock(AccountsManager.class);
|
||||||
|
@ -104,7 +110,7 @@ public class MessageControllerTest {
|
||||||
private final CardinalityRateLimiter unsealedSenderLimiter = mock(CardinalityRateLimiter.class);
|
private final CardinalityRateLimiter unsealedSenderLimiter = mock(CardinalityRateLimiter.class);
|
||||||
private final ApnFallbackManager apnFallbackManager = mock(ApnFallbackManager.class);
|
private final ApnFallbackManager apnFallbackManager = mock(ApnFallbackManager.class);
|
||||||
private final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
|
private final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
|
||||||
private final FaultTolerantRedisCluster metricsCluster = mock(FaultTolerantRedisCluster.class);
|
private final FaultTolerantRedisCluster metricsCluster = RedisClusterHelper.buildMockRedisCluster(redisCommands);
|
||||||
private final ScheduledExecutorService receiptExecutor = mock(ScheduledExecutorService.class);
|
private final ScheduledExecutorService receiptExecutor = mock(ScheduledExecutorService.class);
|
||||||
|
|
||||||
private final ObjectMapper mapper = new ObjectMapper();
|
private final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
@ -207,6 +213,8 @@ public class MessageControllerTest {
|
||||||
when(messageRateConfiguration.getReceiptDelayJitter()).thenReturn(Duration.ofMillis(1));
|
when(messageRateConfiguration.getReceiptDelayJitter()).thenReturn(Duration.ofMillis(1));
|
||||||
when(messageRateConfiguration.getReceiptProbability()).thenReturn(1.0);
|
when(messageRateConfiguration.getReceiptProbability()).thenReturn(1.0);
|
||||||
|
|
||||||
|
when(redisCommands.evalsha(any(), any(), any(), any())).thenReturn(List.of(1L, 1L));
|
||||||
|
|
||||||
Response response =
|
Response response =
|
||||||
resources.getJerseyTest()
|
resources.getJerseyTest()
|
||||||
.target(String.format("/v1/messages/%s", INTERNATIONAL_RECIPIENT))
|
.target(String.format("/v1/messages/%s", INTERNATIONAL_RECIPIENT))
|
||||||
|
|
Loading…
Reference in New Issue