diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/CardinalityRateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/CardinalityRateLimiter.java index 5c41e83c4..faca01cf5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/CardinalityRateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/CardinalityRateLimiter.java @@ -5,12 +5,12 @@ package org.whispersystems.textsecuregcm.limits; +import java.time.Duration; +import java.util.Random; +import javax.annotation.Nullable; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.CardinalityRateLimitConfiguration; import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import javax.annotation.Nullable; -import java.time.Duration; -import java.util.Random; /** * A cardinality rate limiter prevents an actor from taking some action if that actor has attempted to take that action diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java index 1a2f46e0b..a6a44701b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java @@ -27,14 +27,6 @@ public class LockingRateLimiter extends RateLimiter { this.meter = metricRegistry.meter(name(getClass(), name, "locked")); } - - public LockingRateLimiter(FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute) { - super(cacheCluster, name, bucketSize, leakRatePerMinute); - - MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - this.meter = metricRegistry.meter(name(getClass(), name, "locked")); - } - @Override public void validate(String key, int amount) throws RateLimitExceededException { if (!acquireLock(key)) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java index 0b5efaec0..87f0e825d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java @@ -13,7 +13,6 @@ import com.codahale.metrics.Timer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; -import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.RateLimitConfiguration; @@ -35,18 +34,10 @@ public class RateLimiter { private final double leakRatePerMinute; private final double leakRatePerMillis; - @Nullable private final FaultTolerantRedisCluster secondaryCacheCluster; - public RateLimiter(FaultTolerantRedisCluster cacheCluster, String name, - int bucketSize, double leakRatePerMinute) - { - this(cacheCluster, null, name, bucketSize, leakRatePerMinute); - } - - public RateLimiter(FaultTolerantRedisCluster cacheCluster, @Nullable FaultTolerantRedisCluster secondaryCacheCluster, - String name, - int bucketSize, double leakRatePerMinute) + public RateLimiter(FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster secondaryCacheCluster, + String name, int bucketSize, double leakRatePerMinute) { MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); @@ -80,9 +71,7 @@ public class RateLimiter { public void clear(String key) { cacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key))); - if (secondaryCacheCluster != null) { - secondaryCacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key))); - } + secondaryCacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key))); } public int getBucketSize() { @@ -104,15 +93,13 @@ public class RateLimiter { ex = new IllegalArgumentException(e); } - if (secondaryCacheCluster != null) { - try { - final String serialized = bucket.serialize(mapper); + try { + final String serialized = bucket.serialize(mapper); - secondaryCacheCluster.useCluster(connection -> connection.sync() - .setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized)); - } catch (JsonProcessingException e) { - ex = ex == null ? new IllegalArgumentException(e) : ex; - } + secondaryCacheCluster.useCluster(connection -> connection.sync() + .setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized)); + } catch (JsonProcessingException e) { + ex = ex == null ? new IllegalArgumentException(e) : ex; } if (ex != null) { @@ -132,17 +119,15 @@ public class RateLimiter { logger.warn("Deserialization error", e); } - if (secondaryCacheCluster != null) { - try { - final String serialized = secondaryCacheCluster - .withCluster(connection -> connection.sync().get(getBucketName(key))); + try { + final String serialized = secondaryCacheCluster + .withCluster(connection -> connection.sync().get(getBucketName(key))); - if (serialized != null) { - return LeakyBucket.fromSerialized(mapper, serialized); - } - } catch (IOException e) { - logger.warn("Deserialization error", e); + if (serialized != null) { + return LeakyBucket.fromSerialized(mapper, serialized); } + } catch (IOException e) { + logger.warn("Deserialization error", e); } return new LeakyBucket(bucketSize, leakRatePerMillis); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java index 91a53ff25..f395a6ac0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java @@ -11,7 +11,6 @@ import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.Ca import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.RateLimitConfiguration; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; -import javax.annotation.Nullable; public class RateLimiters { @@ -50,23 +49,23 @@ public class RateLimiters { this.newCacheCluster = newCacheCluster; this.dynamicConfig = dynamicConfig; - this.smsDestinationLimiter = new RateLimiter(cacheCluster, "smsDestination", + this.smsDestinationLimiter = new RateLimiter(cacheCluster, newCacheCluster, "smsDestination", config.getSmsDestination().getBucketSize(), config.getSmsDestination().getLeakRatePerMinute()); - this.voiceDestinationLimiter = new RateLimiter(cacheCluster, "voxDestination", + this.voiceDestinationLimiter = new RateLimiter(cacheCluster, newCacheCluster, "voxDestination", config.getVoiceDestination().getBucketSize(), config.getVoiceDestination().getLeakRatePerMinute()); - this.voiceDestinationDailyLimiter = new RateLimiter(cacheCluster, "voxDestinationDaily", + this.voiceDestinationDailyLimiter = new RateLimiter(cacheCluster, newCacheCluster, "voxDestinationDaily", config.getVoiceDestinationDaily().getBucketSize(), config.getVoiceDestinationDaily().getLeakRatePerMinute()); - this.smsVoiceIpLimiter = new RateLimiter(cacheCluster, "smsVoiceIp", + this.smsVoiceIpLimiter = new RateLimiter(cacheCluster, newCacheCluster, "smsVoiceIp", config.getSmsVoiceIp().getBucketSize(), config.getSmsVoiceIp().getLeakRatePerMinute()); - this.smsVoicePrefixLimiter = new RateLimiter(cacheCluster, "smsVoicePrefix", + this.smsVoicePrefixLimiter = new RateLimiter(cacheCluster, newCacheCluster, "smsVoicePrefix", config.getSmsVoicePrefix().getBucketSize(), config.getSmsVoicePrefix().getLeakRatePerMinute()); @@ -78,31 +77,31 @@ public class RateLimiters { config.getVerifyNumber().getBucketSize(), config.getVerifyNumber().getLeakRatePerMinute()); - this.pinLimiter = new LockingRateLimiter(cacheCluster, "pin", + this.pinLimiter = new LockingRateLimiter(cacheCluster, newCacheCluster, "pin", config.getVerifyPin().getBucketSize(), config.getVerifyPin().getLeakRatePerMinute()); - this.attachmentLimiter = new RateLimiter(cacheCluster, "attachmentCreate", + this.attachmentLimiter = new RateLimiter(cacheCluster, newCacheCluster, "attachmentCreate", config.getAttachments().getBucketSize(), config.getAttachments().getLeakRatePerMinute()); - this.preKeysLimiter = new RateLimiter(cacheCluster, "prekeys", + this.preKeysLimiter = new RateLimiter(cacheCluster, newCacheCluster, "prekeys", config.getPreKeys().getBucketSize(), config.getPreKeys().getLeakRatePerMinute()); - this.messagesLimiter = new RateLimiter(cacheCluster, "messages", + this.messagesLimiter = new RateLimiter(cacheCluster, newCacheCluster, "messages", config.getMessages().getBucketSize(), config.getMessages().getLeakRatePerMinute()); - this.allocateDeviceLimiter = new RateLimiter(cacheCluster, "allocateDevice", + this.allocateDeviceLimiter = new RateLimiter(cacheCluster, newCacheCluster, "allocateDevice", config.getAllocateDevice().getBucketSize(), config.getAllocateDevice().getLeakRatePerMinute()); - this.verifyDeviceLimiter = new RateLimiter(cacheCluster, "verifyDevice", + this.verifyDeviceLimiter = new RateLimiter(cacheCluster, newCacheCluster, "verifyDevice", config.getVerifyDevice().getBucketSize(), config.getVerifyDevice().getLeakRatePerMinute()); - this.turnLimiter = new RateLimiter(cacheCluster, "turnAllocate", + this.turnLimiter = new RateLimiter(cacheCluster, newCacheCluster, "turnAllocate", config.getTurnAllocations().getBucketSize(), config.getTurnAllocations().getLeakRatePerMinute()); @@ -110,19 +109,19 @@ public class RateLimiters { config.getProfile().getBucketSize(), config.getProfile().getLeakRatePerMinute()); - this.stickerPackLimiter = new RateLimiter(cacheCluster, "stickerPack", + this.stickerPackLimiter = new RateLimiter(cacheCluster, newCacheCluster, "stickerPack", config.getStickerPack().getBucketSize(), config.getStickerPack().getLeakRatePerMinute()); - this.usernameLookupLimiter = new RateLimiter(cacheCluster, "usernameLookup", + this.usernameLookupLimiter = new RateLimiter(cacheCluster, newCacheCluster, "usernameLookup", config.getUsernameLookup().getBucketSize(), config.getUsernameLookup().getLeakRatePerMinute()); - this.usernameSetLimiter = new RateLimiter(cacheCluster, "usernameSet", + this.usernameSetLimiter = new RateLimiter(cacheCluster, newCacheCluster, "usernameSet", config.getUsernameSet().getBucketSize(), config.getUsernameSet().getLeakRatePerMinute()); - this.unsealedSenderLimiter = new AtomicReference<>(createUnsealedSenderLimiter(cacheCluster, null, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber())); + this.unsealedSenderLimiter = new AtomicReference<>(createUnsealedSenderLimiter(cacheCluster, newCacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber())); this.unsealedIpLimiter = new AtomicReference<>(createUnsealedIpLimiter(cacheCluster, newCacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp())); } @@ -133,7 +132,7 @@ public class RateLimiters { if (rateLimiter.hasConfiguration(currentConfiguration)) { return rateLimiter; } else { - return createUnsealedSenderLimiter(cacheCluster, null, currentConfiguration); + return createUnsealedSenderLimiter(cacheCluster, newCacheCluster, currentConfiguration); } }); } @@ -227,13 +226,13 @@ public class RateLimiters { } private RateLimiter createUnsealedIpLimiter(FaultTolerantRedisCluster cacheCluster, - @Nullable FaultTolerantRedisCluster secondaryCacheCluster, + FaultTolerantRedisCluster secondaryCacheCluster, RateLimitConfiguration configuration) { return createLimiter(cacheCluster, secondaryCacheCluster, configuration, "unsealedIp"); } - private RateLimiter createLimiter(FaultTolerantRedisCluster cacheCluster, @Nullable FaultTolerantRedisCluster secondaryCacheCluster, RateLimitConfiguration configuration, String name) { + private RateLimiter createLimiter(FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster secondaryCacheCluster, RateLimitConfiguration configuration, String name) { return new RateLimiter(cacheCluster, secondaryCacheCluster, name, configuration.getBucketSize(), configuration.getLeakRatePerMinute());