Track impact of unsealed sender rate limits (#374)
This commit is contained in:
parent
3036a149bb
commit
5e1a572bd8
|
@ -344,7 +344,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
|
||||
APNSender apnSender = new APNSender(apnSenderExecutor, accountsManager, config.getApnConfiguration());
|
||||
GCMSender gcmSender = new GCMSender(gcmSenderExecutor, accountsManager, config.getGcmConfiguration().getApiKey());
|
||||
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
|
||||
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), dynamicConfigurationManager, cacheCluster);
|
||||
ProvisioningManager provisioningManager = new ProvisioningManager(pubSubManager);
|
||||
|
||||
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
|
||||
|
|
|
@ -13,7 +13,15 @@ public class DynamicConfiguration {
|
|||
@Valid
|
||||
private Map<String, DynamicExperimentEnrollmentConfiguration> experiments = Collections.emptyMap();
|
||||
|
||||
@JsonProperty
|
||||
@Valid
|
||||
private DynamicRateLimitsConfiguration limits = new DynamicRateLimitsConfiguration();
|
||||
|
||||
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(final String experimentName) {
|
||||
return Optional.ofNullable(experiments.get(experimentName));
|
||||
}
|
||||
|
||||
public DynamicRateLimitsConfiguration getLimits() {
|
||||
return limits;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
package org.whispersystems.textsecuregcm.configuration.dynamic;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.RateLimitConfiguration;
|
||||
|
||||
public class DynamicRateLimitsConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
private RateLimitConfiguration unsealedSenderNumber = new RateLimitConfiguration(60, 1.0 / 60);
|
||||
|
||||
@JsonProperty
|
||||
private RateLimitConfiguration unsealedSenderIp = new RateLimitConfiguration(120, 2.0 / 60);
|
||||
|
||||
public RateLimitConfiguration getUnsealedSenderIp() {
|
||||
return unsealedSenderIp;
|
||||
}
|
||||
|
||||
public RateLimitConfiguration getUnsealedSenderNumber() {
|
||||
return unsealedSenderNumber;
|
||||
}
|
||||
}
|
|
@ -77,6 +77,7 @@ public class MessageController {
|
|||
private final Meter unidentifiedMeter = metricRegistry.meter(name(getClass(), "delivery", "unidentified"));
|
||||
private final Meter identifiedMeter = metricRegistry.meter(name(getClass(), "delivery", "identified" ));
|
||||
private final Meter rejectOver256kibMessageMeter = metricRegistry.meter(name(getClass(), "rejectOver256kibMessage"));
|
||||
private final Meter rejectUnsealedSenderLimit = metricRegistry.meter(name(getClass(), "rejectUnsealedSenderLimit"));
|
||||
private final Timer sendMessageInternalTimer = metricRegistry.timer(name(getClass(), "sendMessageInternal"));
|
||||
private final Histogram outgoingMessageListSizeHistogram = metricRegistry.histogram(name(getClass(), "outgoingMessageListSize"));
|
||||
|
||||
|
@ -117,24 +118,31 @@ public class MessageController {
|
|||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Response sendMessage(@Auth Optional<Account> source,
|
||||
@HeaderParam(OptionalAccess.UNIDENTIFIED) Optional<Anonymous> accessKey,
|
||||
@HeaderParam("User-Agent") String userAgent,
|
||||
@PathParam("destination") AmbiguousIdentifier destinationName,
|
||||
@Valid IncomingMessageList messages)
|
||||
@HeaderParam(OptionalAccess.UNIDENTIFIED) Optional<Anonymous> accessKey,
|
||||
@HeaderParam("User-Agent") String userAgent,
|
||||
@PathParam("destination") AmbiguousIdentifier destinationName,
|
||||
@Valid IncomingMessageList messages)
|
||||
throws RateLimitExceededException
|
||||
{
|
||||
if (shouldSend(destinationName)) {
|
||||
if (!source.isPresent() && !accessKey.isPresent()) {
|
||||
if (source.isEmpty() && accessKey.isEmpty()) {
|
||||
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
|
||||
}
|
||||
|
||||
if (source.isPresent() && !source.get().isFor(destinationName)) {
|
||||
rateLimiters.getMessagesLimiter().validate(source.get().getNumber() + "__" + destinationName);
|
||||
|
||||
try {
|
||||
rateLimiters.getUnsealedSenderLimiter().validate(source.get().getUuid().toString());
|
||||
} catch (RateLimitExceededException e) {
|
||||
rejectUnsealedSenderLimit.mark();
|
||||
logger.info("Rejected unsealed sender limit from: " + source.get().getNumber());
|
||||
}
|
||||
}
|
||||
|
||||
if (source.isPresent() && !source.get().isFor(destinationName)) {
|
||||
identifiedMeter.mark();
|
||||
} else if (!source.isPresent()) {
|
||||
} else if (source.isEmpty()) {
|
||||
unidentifiedMeter.mark();
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ public class RateLimiter {
|
|||
protected final FaultTolerantRedisCluster cacheCluster;
|
||||
protected final String name;
|
||||
private final int bucketSize;
|
||||
private final double leakRatePerMinute;
|
||||
private final double leakRatePerMillis;
|
||||
private final boolean reportLimits;
|
||||
|
||||
|
@ -51,6 +52,7 @@ public class RateLimiter {
|
|||
this.cacheCluster = cacheCluster;
|
||||
this.name = name;
|
||||
this.bucketSize = bucketSize;
|
||||
this.leakRatePerMinute = leakRatePerMinute;
|
||||
this.leakRatePerMillis = leakRatePerMinute / (60.0 * 1000.0);
|
||||
this.reportLimits = reportLimits;
|
||||
}
|
||||
|
@ -76,6 +78,14 @@ public class RateLimiter {
|
|||
cacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key)));
|
||||
}
|
||||
|
||||
public int getBucketSize() {
|
||||
return bucketSize;
|
||||
}
|
||||
|
||||
public double getLeakRatePerMinute() {
|
||||
return leakRatePerMinute;
|
||||
}
|
||||
|
||||
private void setBucket(String key, LeakyBucket bucket) {
|
||||
try {
|
||||
final String serialized = bucket.serialize(mapper);
|
||||
|
|
|
@ -6,7 +6,12 @@ package org.whispersystems.textsecuregcm.limits;
|
|||
|
||||
|
||||
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.RateLimitConfiguration;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.UnaryOperator;
|
||||
|
||||
public class RateLimiters {
|
||||
|
||||
|
@ -35,7 +40,16 @@ public class RateLimiters {
|
|||
private final RateLimiter usernameLookupLimiter;
|
||||
private final RateLimiter usernameSetLimiter;
|
||||
|
||||
public RateLimiters(RateLimitsConfiguration config, FaultTolerantRedisCluster cacheCluster) {
|
||||
private final AtomicReference<RateLimiter> unsealedSenderLimiter;
|
||||
private final AtomicReference<RateLimiter> unsealedIpLimiter;
|
||||
|
||||
private final FaultTolerantRedisCluster cacheCluster;
|
||||
private final DynamicConfigurationManager dynamicConfig;
|
||||
|
||||
public RateLimiters(RateLimitsConfiguration config, DynamicConfigurationManager dynamicConfig, FaultTolerantRedisCluster cacheCluster) {
|
||||
this.cacheCluster = cacheCluster;
|
||||
this.dynamicConfig = dynamicConfig;
|
||||
|
||||
this.smsDestinationLimiter = new RateLimiter(cacheCluster, "smsDestination",
|
||||
config.getSmsDestination().getBucketSize(),
|
||||
config.getSmsDestination().getLeakRatePerMinute());
|
||||
|
@ -115,6 +129,33 @@ public class RateLimiters {
|
|||
this.usernameSetLimiter = new RateLimiter(cacheCluster, "usernameSet",
|
||||
config.getUsernameSet().getBucketSize(),
|
||||
config.getUsernameSet().getLeakRatePerMinute());
|
||||
|
||||
this.unsealedSenderLimiter = new AtomicReference<>(createUnsealedSenderLimiter(cacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber()));
|
||||
this.unsealedIpLimiter = new AtomicReference<>(createUnsealedIpLimiter(cacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp()));
|
||||
}
|
||||
|
||||
public RateLimiter getUnsealedSenderLimiter() {
|
||||
RateLimitConfiguration currentConfiguration = dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber();
|
||||
|
||||
return this.unsealedSenderLimiter.updateAndGet(rateLimiter -> {
|
||||
if (isLimiterConfigurationCurrent(rateLimiter, currentConfiguration)) {
|
||||
return rateLimiter;
|
||||
} else {
|
||||
return createUnsealedSenderLimiter(cacheCluster, currentConfiguration);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public RateLimiter getUnsealedIpLimiter() {
|
||||
RateLimitConfiguration currentConfiguration = dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp();
|
||||
|
||||
return this.unsealedIpLimiter.updateAndGet(rateLimiter -> {
|
||||
if (isLimiterConfigurationCurrent(rateLimiter, currentConfiguration)) {
|
||||
return rateLimiter;
|
||||
} else {
|
||||
return createUnsealedIpLimiter(cacheCluster, currentConfiguration);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public RateLimiter getAllocateDeviceLimiter() {
|
||||
|
@ -197,4 +238,25 @@ public class RateLimiters {
|
|||
return usernameSetLimiter;
|
||||
}
|
||||
|
||||
private RateLimiter createUnsealedSenderLimiter(FaultTolerantRedisCluster cacheCluster,
|
||||
RateLimitConfiguration configuration)
|
||||
{
|
||||
return createLimiter(cacheCluster, configuration, "unsealedSender");
|
||||
}
|
||||
|
||||
private RateLimiter createUnsealedIpLimiter(FaultTolerantRedisCluster cacheCluster,
|
||||
RateLimitConfiguration configuration)
|
||||
{
|
||||
return createLimiter(cacheCluster, configuration, "unsealedIp");
|
||||
}
|
||||
|
||||
private RateLimiter createLimiter(FaultTolerantRedisCluster cacheCluster, RateLimitConfiguration configuration, String name) {
|
||||
return new RateLimiter(cacheCluster, name,
|
||||
configuration.getBucketSize(),
|
||||
configuration.getLeakRatePerMinute());
|
||||
}
|
||||
|
||||
private boolean isLimiterConfigurationCurrent(RateLimiter limiter, RateLimitConfiguration configuration) {
|
||||
return limiter.getBucketSize() == configuration.getBucketSize() && limiter.getLeakRatePerMinute() == configuration.getLeakRatePerMinute();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
package org.whispersystems.textsecuregcm.tests.limits;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicRateLimitsConfiguration;
|
||||
import org.whispersystems.textsecuregcm.limits.RateLimiter;
|
||||
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class DynamicRateLimitsTest {
|
||||
|
||||
private DynamicConfigurationManager dynamicConfig;
|
||||
private FaultTolerantRedisCluster redisCluster;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
this.dynamicConfig = mock(DynamicConfigurationManager.class);
|
||||
this.redisCluster = mock(FaultTolerantRedisCluster.class);
|
||||
|
||||
DynamicConfiguration defaultConfig = new DynamicConfiguration();
|
||||
when(dynamicConfig.getConfiguration()).thenReturn(defaultConfig);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnchangingConfiguration() {
|
||||
RateLimiters rateLimiters = new RateLimiters(new RateLimitsConfiguration(), dynamicConfig, redisCluster);
|
||||
|
||||
RateLimiter limiter = rateLimiters.getUnsealedSenderLimiter();
|
||||
|
||||
assertThat(limiter.getBucketSize()).isEqualTo(dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber().getBucketSize());
|
||||
assertThat(limiter.getLeakRatePerMinute()).isEqualTo(dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber().getLeakRatePerMinute());
|
||||
assertSame(rateLimiters.getUnsealedSenderLimiter(), limiter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChangingConfiguration() {
|
||||
DynamicConfiguration configuration = mock(DynamicConfiguration.class);
|
||||
DynamicRateLimitsConfiguration limitsConfiguration = mock(DynamicRateLimitsConfiguration.class);
|
||||
|
||||
when(configuration.getLimits()).thenReturn(limitsConfiguration);
|
||||
when(limitsConfiguration.getUnsealedSenderNumber()).thenReturn(new RateLimitsConfiguration.RateLimitConfiguration(1, 2.0));
|
||||
when(limitsConfiguration.getUnsealedSenderIp()).thenReturn(new RateLimitsConfiguration.RateLimitConfiguration(4, 1.0));
|
||||
|
||||
when(dynamicConfig.getConfiguration()).thenReturn(configuration);
|
||||
|
||||
RateLimiters rateLimiters = new RateLimiters(new RateLimitsConfiguration(), dynamicConfig, redisCluster);
|
||||
|
||||
RateLimiter limiter = rateLimiters.getUnsealedSenderLimiter();
|
||||
|
||||
assertThat(limiter.getBucketSize()).isEqualTo(1);
|
||||
assertThat(limiter.getLeakRatePerMinute()).isEqualTo(2.0);
|
||||
assertSame(rateLimiters.getUnsealedSenderLimiter(), limiter);
|
||||
|
||||
when(limitsConfiguration.getUnsealedSenderNumber()).thenReturn(new RateLimitsConfiguration.RateLimitConfiguration(2, 3.0));
|
||||
|
||||
RateLimiter changed = rateLimiters.getUnsealedSenderLimiter();
|
||||
|
||||
assertThat(changed.getBucketSize()).isEqualTo(2);
|
||||
assertThat(changed.getLeakRatePerMinute()).isEqualTo(3.0);
|
||||
assertNotSame(limiter, changed);
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue