Add rate limiters cluster to all RateLimiters

This commit is contained in:
Chris Eager 2021-04-30 16:40:26 -05:00 committed by Chris Eager
parent c63bebb3e7
commit 7e805d1592
4 changed files with 38 additions and 62 deletions

View File

@ -5,12 +5,12 @@
package org.whispersystems.textsecuregcm.limits; package org.whispersystems.textsecuregcm.limits;
import java.time.Duration;
import java.util.Random;
import javax.annotation.Nullable;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.CardinalityRateLimitConfiguration; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.CardinalityRateLimitConfiguration;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Random;
/** /**
* A cardinality rate limiter prevents an actor from taking some action if that actor has attempted to take that action * A cardinality rate limiter prevents an actor from taking some action if that actor has attempted to take that action

View File

@ -27,14 +27,6 @@ public class LockingRateLimiter extends RateLimiter {
this.meter = metricRegistry.meter(name(getClass(), name, "locked")); this.meter = metricRegistry.meter(name(getClass(), name, "locked"));
} }
public LockingRateLimiter(FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute) {
super(cacheCluster, name, bucketSize, leakRatePerMinute);
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
this.meter = metricRegistry.meter(name(getClass(), name, "locked"));
}
@Override @Override
public void validate(String key, int amount) throws RateLimitExceededException { public void validate(String key, int amount) throws RateLimitExceededException {
if (!acquireLock(key)) { if (!acquireLock(key)) {

View File

@ -13,7 +13,6 @@ import com.codahale.metrics.Timer;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException; import java.io.IOException;
import javax.annotation.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.RateLimitConfiguration; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.RateLimitConfiguration;
@ -35,18 +34,10 @@ public class RateLimiter {
private final double leakRatePerMinute; private final double leakRatePerMinute;
private final double leakRatePerMillis; private final double leakRatePerMillis;
@Nullable
private final FaultTolerantRedisCluster secondaryCacheCluster; private final FaultTolerantRedisCluster secondaryCacheCluster;
public RateLimiter(FaultTolerantRedisCluster cacheCluster, String name, public RateLimiter(FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster secondaryCacheCluster,
int bucketSize, double leakRatePerMinute) String name, int bucketSize, double leakRatePerMinute)
{
this(cacheCluster, null, name, bucketSize, leakRatePerMinute);
}
public RateLimiter(FaultTolerantRedisCluster cacheCluster, @Nullable FaultTolerantRedisCluster secondaryCacheCluster,
String name,
int bucketSize, double leakRatePerMinute)
{ {
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
@ -80,9 +71,7 @@ public class RateLimiter {
public void clear(String key) { public void clear(String key) {
cacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key))); cacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key)));
if (secondaryCacheCluster != null) { secondaryCacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key)));
secondaryCacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key)));
}
} }
public int getBucketSize() { public int getBucketSize() {
@ -104,15 +93,13 @@ public class RateLimiter {
ex = new IllegalArgumentException(e); ex = new IllegalArgumentException(e);
} }
if (secondaryCacheCluster != null) { try {
try { final String serialized = bucket.serialize(mapper);
final String serialized = bucket.serialize(mapper);
secondaryCacheCluster.useCluster(connection -> connection.sync() secondaryCacheCluster.useCluster(connection -> connection.sync()
.setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized)); .setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized));
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
ex = ex == null ? new IllegalArgumentException(e) : ex; ex = ex == null ? new IllegalArgumentException(e) : ex;
}
} }
if (ex != null) { if (ex != null) {
@ -132,17 +119,15 @@ public class RateLimiter {
logger.warn("Deserialization error", e); logger.warn("Deserialization error", e);
} }
if (secondaryCacheCluster != null) { try {
try { final String serialized = secondaryCacheCluster
final String serialized = secondaryCacheCluster .withCluster(connection -> connection.sync().get(getBucketName(key)));
.withCluster(connection -> connection.sync().get(getBucketName(key)));
if (serialized != null) { if (serialized != null) {
return LeakyBucket.fromSerialized(mapper, serialized); return LeakyBucket.fromSerialized(mapper, serialized);
}
} catch (IOException e) {
logger.warn("Deserialization error", e);
} }
} catch (IOException e) {
logger.warn("Deserialization error", e);
} }
return new LeakyBucket(bucketSize, leakRatePerMillis); return new LeakyBucket(bucketSize, leakRatePerMillis);

View File

@ -11,7 +11,6 @@ import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.Ca
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.RateLimitConfiguration; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.RateLimitConfiguration;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import javax.annotation.Nullable;
public class RateLimiters { public class RateLimiters {
@ -50,23 +49,23 @@ public class RateLimiters {
this.newCacheCluster = newCacheCluster; this.newCacheCluster = newCacheCluster;
this.dynamicConfig = dynamicConfig; this.dynamicConfig = dynamicConfig;
this.smsDestinationLimiter = new RateLimiter(cacheCluster, "smsDestination", this.smsDestinationLimiter = new RateLimiter(cacheCluster, newCacheCluster, "smsDestination",
config.getSmsDestination().getBucketSize(), config.getSmsDestination().getBucketSize(),
config.getSmsDestination().getLeakRatePerMinute()); config.getSmsDestination().getLeakRatePerMinute());
this.voiceDestinationLimiter = new RateLimiter(cacheCluster, "voxDestination", this.voiceDestinationLimiter = new RateLimiter(cacheCluster, newCacheCluster, "voxDestination",
config.getVoiceDestination().getBucketSize(), config.getVoiceDestination().getBucketSize(),
config.getVoiceDestination().getLeakRatePerMinute()); config.getVoiceDestination().getLeakRatePerMinute());
this.voiceDestinationDailyLimiter = new RateLimiter(cacheCluster, "voxDestinationDaily", this.voiceDestinationDailyLimiter = new RateLimiter(cacheCluster, newCacheCluster, "voxDestinationDaily",
config.getVoiceDestinationDaily().getBucketSize(), config.getVoiceDestinationDaily().getBucketSize(),
config.getVoiceDestinationDaily().getLeakRatePerMinute()); config.getVoiceDestinationDaily().getLeakRatePerMinute());
this.smsVoiceIpLimiter = new RateLimiter(cacheCluster, "smsVoiceIp", this.smsVoiceIpLimiter = new RateLimiter(cacheCluster, newCacheCluster, "smsVoiceIp",
config.getSmsVoiceIp().getBucketSize(), config.getSmsVoiceIp().getBucketSize(),
config.getSmsVoiceIp().getLeakRatePerMinute()); config.getSmsVoiceIp().getLeakRatePerMinute());
this.smsVoicePrefixLimiter = new RateLimiter(cacheCluster, "smsVoicePrefix", this.smsVoicePrefixLimiter = new RateLimiter(cacheCluster, newCacheCluster, "smsVoicePrefix",
config.getSmsVoicePrefix().getBucketSize(), config.getSmsVoicePrefix().getBucketSize(),
config.getSmsVoicePrefix().getLeakRatePerMinute()); config.getSmsVoicePrefix().getLeakRatePerMinute());
@ -78,31 +77,31 @@ public class RateLimiters {
config.getVerifyNumber().getBucketSize(), config.getVerifyNumber().getBucketSize(),
config.getVerifyNumber().getLeakRatePerMinute()); config.getVerifyNumber().getLeakRatePerMinute());
this.pinLimiter = new LockingRateLimiter(cacheCluster, "pin", this.pinLimiter = new LockingRateLimiter(cacheCluster, newCacheCluster, "pin",
config.getVerifyPin().getBucketSize(), config.getVerifyPin().getBucketSize(),
config.getVerifyPin().getLeakRatePerMinute()); config.getVerifyPin().getLeakRatePerMinute());
this.attachmentLimiter = new RateLimiter(cacheCluster, "attachmentCreate", this.attachmentLimiter = new RateLimiter(cacheCluster, newCacheCluster, "attachmentCreate",
config.getAttachments().getBucketSize(), config.getAttachments().getBucketSize(),
config.getAttachments().getLeakRatePerMinute()); config.getAttachments().getLeakRatePerMinute());
this.preKeysLimiter = new RateLimiter(cacheCluster, "prekeys", this.preKeysLimiter = new RateLimiter(cacheCluster, newCacheCluster, "prekeys",
config.getPreKeys().getBucketSize(), config.getPreKeys().getBucketSize(),
config.getPreKeys().getLeakRatePerMinute()); config.getPreKeys().getLeakRatePerMinute());
this.messagesLimiter = new RateLimiter(cacheCluster, "messages", this.messagesLimiter = new RateLimiter(cacheCluster, newCacheCluster, "messages",
config.getMessages().getBucketSize(), config.getMessages().getBucketSize(),
config.getMessages().getLeakRatePerMinute()); config.getMessages().getLeakRatePerMinute());
this.allocateDeviceLimiter = new RateLimiter(cacheCluster, "allocateDevice", this.allocateDeviceLimiter = new RateLimiter(cacheCluster, newCacheCluster, "allocateDevice",
config.getAllocateDevice().getBucketSize(), config.getAllocateDevice().getBucketSize(),
config.getAllocateDevice().getLeakRatePerMinute()); config.getAllocateDevice().getLeakRatePerMinute());
this.verifyDeviceLimiter = new RateLimiter(cacheCluster, "verifyDevice", this.verifyDeviceLimiter = new RateLimiter(cacheCluster, newCacheCluster, "verifyDevice",
config.getVerifyDevice().getBucketSize(), config.getVerifyDevice().getBucketSize(),
config.getVerifyDevice().getLeakRatePerMinute()); config.getVerifyDevice().getLeakRatePerMinute());
this.turnLimiter = new RateLimiter(cacheCluster, "turnAllocate", this.turnLimiter = new RateLimiter(cacheCluster, newCacheCluster, "turnAllocate",
config.getTurnAllocations().getBucketSize(), config.getTurnAllocations().getBucketSize(),
config.getTurnAllocations().getLeakRatePerMinute()); config.getTurnAllocations().getLeakRatePerMinute());
@ -110,19 +109,19 @@ public class RateLimiters {
config.getProfile().getBucketSize(), config.getProfile().getBucketSize(),
config.getProfile().getLeakRatePerMinute()); config.getProfile().getLeakRatePerMinute());
this.stickerPackLimiter = new RateLimiter(cacheCluster, "stickerPack", this.stickerPackLimiter = new RateLimiter(cacheCluster, newCacheCluster, "stickerPack",
config.getStickerPack().getBucketSize(), config.getStickerPack().getBucketSize(),
config.getStickerPack().getLeakRatePerMinute()); config.getStickerPack().getLeakRatePerMinute());
this.usernameLookupLimiter = new RateLimiter(cacheCluster, "usernameLookup", this.usernameLookupLimiter = new RateLimiter(cacheCluster, newCacheCluster, "usernameLookup",
config.getUsernameLookup().getBucketSize(), config.getUsernameLookup().getBucketSize(),
config.getUsernameLookup().getLeakRatePerMinute()); config.getUsernameLookup().getLeakRatePerMinute());
this.usernameSetLimiter = new RateLimiter(cacheCluster, "usernameSet", this.usernameSetLimiter = new RateLimiter(cacheCluster, newCacheCluster, "usernameSet",
config.getUsernameSet().getBucketSize(), config.getUsernameSet().getBucketSize(),
config.getUsernameSet().getLeakRatePerMinute()); config.getUsernameSet().getLeakRatePerMinute());
this.unsealedSenderLimiter = new AtomicReference<>(createUnsealedSenderLimiter(cacheCluster, null, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber())); this.unsealedSenderLimiter = new AtomicReference<>(createUnsealedSenderLimiter(cacheCluster, newCacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber()));
this.unsealedIpLimiter = new AtomicReference<>(createUnsealedIpLimiter(cacheCluster, newCacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp())); this.unsealedIpLimiter = new AtomicReference<>(createUnsealedIpLimiter(cacheCluster, newCacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp()));
} }
@ -133,7 +132,7 @@ public class RateLimiters {
if (rateLimiter.hasConfiguration(currentConfiguration)) { if (rateLimiter.hasConfiguration(currentConfiguration)) {
return rateLimiter; return rateLimiter;
} else { } else {
return createUnsealedSenderLimiter(cacheCluster, null, currentConfiguration); return createUnsealedSenderLimiter(cacheCluster, newCacheCluster, currentConfiguration);
} }
}); });
} }
@ -227,13 +226,13 @@ public class RateLimiters {
} }
private RateLimiter createUnsealedIpLimiter(FaultTolerantRedisCluster cacheCluster, private RateLimiter createUnsealedIpLimiter(FaultTolerantRedisCluster cacheCluster,
@Nullable FaultTolerantRedisCluster secondaryCacheCluster, FaultTolerantRedisCluster secondaryCacheCluster,
RateLimitConfiguration configuration) RateLimitConfiguration configuration)
{ {
return createLimiter(cacheCluster, secondaryCacheCluster, configuration, "unsealedIp"); return createLimiter(cacheCluster, secondaryCacheCluster, configuration, "unsealedIp");
} }
private RateLimiter createLimiter(FaultTolerantRedisCluster cacheCluster, @Nullable FaultTolerantRedisCluster secondaryCacheCluster, RateLimitConfiguration configuration, String name) { private RateLimiter createLimiter(FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster secondaryCacheCluster, RateLimitConfiguration configuration, String name) {
return new RateLimiter(cacheCluster, secondaryCacheCluster, name, return new RateLimiter(cacheCluster, secondaryCacheCluster, name,
configuration.getBucketSize(), configuration.getBucketSize(),
configuration.getLeakRatePerMinute()); configuration.getLeakRatePerMinute());