diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index d7f95cced..e3d13654a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -344,7 +344,7 @@ public class WhisperServerService extends Application experiments = Collections.emptyMap(); + @JsonProperty + @Valid + private DynamicRateLimitsConfiguration limits = new DynamicRateLimitsConfiguration(); + public Optional getExperimentEnrollmentConfiguration(final String experimentName) { return Optional.ofNullable(experiments.get(experimentName)); } + + public DynamicRateLimitsConfiguration getLimits() { + return limits; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicRateLimitsConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicRateLimitsConfiguration.java new file mode 100644 index 000000000..03afc9260 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicRateLimitsConfiguration.java @@ -0,0 +1,21 @@ +package org.whispersystems.textsecuregcm.configuration.dynamic; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.RateLimitConfiguration; + +public class DynamicRateLimitsConfiguration { + + @JsonProperty + private RateLimitConfiguration unsealedSenderNumber = new RateLimitConfiguration(60, 1.0 / 60); + + @JsonProperty + private RateLimitConfiguration unsealedSenderIp = new RateLimitConfiguration(120, 2.0 / 60); + + public RateLimitConfiguration getUnsealedSenderIp() { + return unsealedSenderIp; + } + + public RateLimitConfiguration getUnsealedSenderNumber() { + return unsealedSenderNumber; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java index d1c52813e..c274c5ad4 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java @@ -77,6 +77,7 @@ public class MessageController { private final Meter unidentifiedMeter = metricRegistry.meter(name(getClass(), "delivery", "unidentified")); private final Meter identifiedMeter = metricRegistry.meter(name(getClass(), "delivery", "identified" )); private final Meter rejectOver256kibMessageMeter = metricRegistry.meter(name(getClass(), "rejectOver256kibMessage")); + private final Meter rejectUnsealedSenderLimit = metricRegistry.meter(name(getClass(), "rejectUnsealedSenderLimit")); private final Timer sendMessageInternalTimer = metricRegistry.timer(name(getClass(), "sendMessageInternal")); private final Histogram outgoingMessageListSizeHistogram = metricRegistry.histogram(name(getClass(), "outgoingMessageListSize")); @@ -117,24 +118,31 @@ public class MessageController { @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public Response sendMessage(@Auth Optional source, - @HeaderParam(OptionalAccess.UNIDENTIFIED) Optional accessKey, - @HeaderParam("User-Agent") String userAgent, - @PathParam("destination") AmbiguousIdentifier destinationName, - @Valid IncomingMessageList messages) + @HeaderParam(OptionalAccess.UNIDENTIFIED) Optional accessKey, + @HeaderParam("User-Agent") String userAgent, + @PathParam("destination") AmbiguousIdentifier destinationName, + @Valid IncomingMessageList messages) throws RateLimitExceededException { if (shouldSend(destinationName)) { - if (!source.isPresent() && !accessKey.isPresent()) { + if (source.isEmpty() && accessKey.isEmpty()) { throw new WebApplicationException(Response.Status.UNAUTHORIZED); } if (source.isPresent() && !source.get().isFor(destinationName)) { rateLimiters.getMessagesLimiter().validate(source.get().getNumber() + "__" + destinationName); + + try { + rateLimiters.getUnsealedSenderLimiter().validate(source.get().getUuid().toString()); + } catch (RateLimitExceededException e) { + rejectUnsealedSenderLimit.mark(); + logger.info("Rejected unsealed sender limit from: " + source.get().getNumber()); + } } if (source.isPresent() && !source.get().isFor(destinationName)) { identifiedMeter.mark(); - } else if (!source.isPresent()) { + } else if (source.isEmpty()) { unidentifiedMeter.mark(); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java index 4b0d74a74..30f7dc007 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java @@ -31,6 +31,7 @@ public class RateLimiter { protected final FaultTolerantRedisCluster cacheCluster; protected final String name; private final int bucketSize; + private final double leakRatePerMinute; private final double leakRatePerMillis; private final boolean reportLimits; @@ -51,6 +52,7 @@ public class RateLimiter { this.cacheCluster = cacheCluster; this.name = name; this.bucketSize = bucketSize; + this.leakRatePerMinute = leakRatePerMinute; this.leakRatePerMillis = leakRatePerMinute / (60.0 * 1000.0); this.reportLimits = reportLimits; } @@ -76,6 +78,14 @@ public class RateLimiter { cacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key))); } + public int getBucketSize() { + return bucketSize; + } + + public double getLeakRatePerMinute() { + return leakRatePerMinute; + } + private void setBucket(String key, LeakyBucket bucket) { try { final String serialized = bucket.serialize(mapper); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java index 5989b2ee3..df46629ed 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java @@ -6,7 +6,12 @@ package org.whispersystems.textsecuregcm.limits; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration; +import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.RateLimitConfiguration; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.UnaryOperator; public class RateLimiters { @@ -35,7 +40,16 @@ public class RateLimiters { private final RateLimiter usernameLookupLimiter; private final RateLimiter usernameSetLimiter; - public RateLimiters(RateLimitsConfiguration config, FaultTolerantRedisCluster cacheCluster) { + private final AtomicReference unsealedSenderLimiter; + private final AtomicReference unsealedIpLimiter; + + private final FaultTolerantRedisCluster cacheCluster; + private final DynamicConfigurationManager dynamicConfig; + + public RateLimiters(RateLimitsConfiguration config, DynamicConfigurationManager dynamicConfig, FaultTolerantRedisCluster cacheCluster) { + this.cacheCluster = cacheCluster; + this.dynamicConfig = dynamicConfig; + this.smsDestinationLimiter = new RateLimiter(cacheCluster, "smsDestination", config.getSmsDestination().getBucketSize(), config.getSmsDestination().getLeakRatePerMinute()); @@ -115,6 +129,33 @@ public class RateLimiters { this.usernameSetLimiter = new RateLimiter(cacheCluster, "usernameSet", config.getUsernameSet().getBucketSize(), config.getUsernameSet().getLeakRatePerMinute()); + + this.unsealedSenderLimiter = new AtomicReference<>(createUnsealedSenderLimiter(cacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber())); + this.unsealedIpLimiter = new AtomicReference<>(createUnsealedIpLimiter(cacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp())); + } + + public RateLimiter getUnsealedSenderLimiter() { + RateLimitConfiguration currentConfiguration = dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber(); + + return this.unsealedSenderLimiter.updateAndGet(rateLimiter -> { + if (isLimiterConfigurationCurrent(rateLimiter, currentConfiguration)) { + return rateLimiter; + } else { + return createUnsealedSenderLimiter(cacheCluster, currentConfiguration); + } + }); + } + + public RateLimiter getUnsealedIpLimiter() { + RateLimitConfiguration currentConfiguration = dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp(); + + return this.unsealedIpLimiter.updateAndGet(rateLimiter -> { + if (isLimiterConfigurationCurrent(rateLimiter, currentConfiguration)) { + return rateLimiter; + } else { + return createUnsealedIpLimiter(cacheCluster, currentConfiguration); + } + }); } public RateLimiter getAllocateDeviceLimiter() { @@ -197,4 +238,25 @@ public class RateLimiters { return usernameSetLimiter; } + private RateLimiter createUnsealedSenderLimiter(FaultTolerantRedisCluster cacheCluster, + RateLimitConfiguration configuration) + { + return createLimiter(cacheCluster, configuration, "unsealedSender"); + } + + private RateLimiter createUnsealedIpLimiter(FaultTolerantRedisCluster cacheCluster, + RateLimitConfiguration configuration) + { + return createLimiter(cacheCluster, configuration, "unsealedIp"); + } + + private RateLimiter createLimiter(FaultTolerantRedisCluster cacheCluster, RateLimitConfiguration configuration, String name) { + return new RateLimiter(cacheCluster, name, + configuration.getBucketSize(), + configuration.getLeakRatePerMinute()); + } + + private boolean isLimiterConfigurationCurrent(RateLimiter limiter, RateLimitConfiguration configuration) { + return limiter.getBucketSize() == configuration.getBucketSize() && limiter.getLeakRatePerMinute() == configuration.getLeakRatePerMinute(); + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/limits/DynamicRateLimitsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/limits/DynamicRateLimitsTest.java new file mode 100644 index 000000000..80b62c5d5 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/limits/DynamicRateLimitsTest.java @@ -0,0 +1,73 @@ +package org.whispersystems.textsecuregcm.tests.limits; + +import org.junit.Before; +import org.junit.Test; +import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicRateLimitsConfiguration; +import org.whispersystems.textsecuregcm.limits.RateLimiter; +import org.whispersystems.textsecuregcm.limits.RateLimiters; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DynamicRateLimitsTest { + + private DynamicConfigurationManager dynamicConfig; + private FaultTolerantRedisCluster redisCluster; + + @Before + public void setup() { + this.dynamicConfig = mock(DynamicConfigurationManager.class); + this.redisCluster = mock(FaultTolerantRedisCluster.class); + + DynamicConfiguration defaultConfig = new DynamicConfiguration(); + when(dynamicConfig.getConfiguration()).thenReturn(defaultConfig); + + } + + @Test + public void testUnchangingConfiguration() { + RateLimiters rateLimiters = new RateLimiters(new RateLimitsConfiguration(), dynamicConfig, redisCluster); + + RateLimiter limiter = rateLimiters.getUnsealedSenderLimiter(); + + assertThat(limiter.getBucketSize()).isEqualTo(dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber().getBucketSize()); + assertThat(limiter.getLeakRatePerMinute()).isEqualTo(dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber().getLeakRatePerMinute()); + assertSame(rateLimiters.getUnsealedSenderLimiter(), limiter); + } + + @Test + public void testChangingConfiguration() { + DynamicConfiguration configuration = mock(DynamicConfiguration.class); + DynamicRateLimitsConfiguration limitsConfiguration = mock(DynamicRateLimitsConfiguration.class); + + when(configuration.getLimits()).thenReturn(limitsConfiguration); + when(limitsConfiguration.getUnsealedSenderNumber()).thenReturn(new RateLimitsConfiguration.RateLimitConfiguration(1, 2.0)); + when(limitsConfiguration.getUnsealedSenderIp()).thenReturn(new RateLimitsConfiguration.RateLimitConfiguration(4, 1.0)); + + when(dynamicConfig.getConfiguration()).thenReturn(configuration); + + RateLimiters rateLimiters = new RateLimiters(new RateLimitsConfiguration(), dynamicConfig, redisCluster); + + RateLimiter limiter = rateLimiters.getUnsealedSenderLimiter(); + + assertThat(limiter.getBucketSize()).isEqualTo(1); + assertThat(limiter.getLeakRatePerMinute()).isEqualTo(2.0); + assertSame(rateLimiters.getUnsealedSenderLimiter(), limiter); + + when(limitsConfiguration.getUnsealedSenderNumber()).thenReturn(new RateLimitsConfiguration.RateLimitConfiguration(2, 3.0)); + + RateLimiter changed = rateLimiters.getUnsealedSenderLimiter(); + + assertThat(changed.getBucketSize()).isEqualTo(2); + assertThat(changed.getLeakRatePerMinute()).isEqualTo(3.0); + assertNotSame(limiter, changed); + + } + +}