diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 67a028ff8..b9a2dc379 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -404,7 +404,7 @@ public class WhisperServerService extends Application maxCardinality; }); - final boolean secondaryRateLimitExceeded; - if (secondaryCacheCluster != null) { - secondaryRateLimitExceeded = secondaryCacheCluster.withCluster(connection -> { - final boolean changed = connection.sync().pfadd(hllKey, target) == 1; - final long cardinality = connection.sync().pfcount(hllKey); - - final boolean mayNeedExpiration = changed && cardinality == 1; - - // If the set already existed, we can assume it already had an expiration time and can save a round trip by - // skipping the ttl check. - if (mayNeedExpiration && connection.sync().ttl(hllKey) == -1) { - final long expireSeconds = ttl.plusSeconds(random.nextInt((int) ttlJitter.toSeconds())).toSeconds(); - connection.sync().expire(hllKey, expireSeconds); - } - - return changed && cardinality > maxCardinality; - }); - } else { - secondaryRateLimitExceeded = false; - } - - if (rateLimitExceeded || secondaryRateLimitExceeded) { + if (rateLimitExceeded) { // Using the TTL as the "retry after" time isn't EXACTLY right, but it's a reasonable approximation throw new RateLimitExceededException(ttl); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java index a6a44701b..343d684af 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java @@ -20,8 +20,8 @@ public class LockingRateLimiter extends RateLimiter { private final Meter meter; - public LockingRateLimiter(FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster secondaryCacheCluster, String name, int bucketSize, double leakRatePerMinute) { - super(cacheCluster, secondaryCacheCluster, name, bucketSize, leakRatePerMinute); + public LockingRateLimiter(FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute) { + super(cacheCluster, name, bucketSize, leakRatePerMinute); MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); this.meter = metricRegistry.meter(name(getClass(), name, "locked")); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java index 87f0e825d..c13c54948 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java @@ -34,17 +34,13 @@ public class RateLimiter { private final double leakRatePerMinute; private final double leakRatePerMillis; - private final FaultTolerantRedisCluster secondaryCacheCluster; - - public RateLimiter(FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster secondaryCacheCluster, - String name, int bucketSize, double leakRatePerMinute) + public RateLimiter(FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute) { MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); this.meter = metricRegistry.meter(name(getClass(), name, "exceeded")); this.validateTimer = metricRegistry.timer(name(getClass(), name, "validate")); this.cacheCluster = cacheCluster; - this.secondaryCacheCluster = secondaryCacheCluster; this.name = name; this.bucketSize = bucketSize; this.leakRatePerMinute = leakRatePerMinute; @@ -70,8 +66,6 @@ public class RateLimiter { public void clear(String key) { cacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key))); - - secondaryCacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key))); } public int getBucketSize() { @@ -84,28 +78,13 @@ public class RateLimiter { private void setBucket(String key, LeakyBucket bucket) { - IllegalArgumentException ex = null; try { final String serialized = bucket.serialize(mapper); cacheCluster.useCluster(connection -> connection.sync().setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized)); } catch (JsonProcessingException e) { - ex = new IllegalArgumentException(e); + throw new IllegalArgumentException(e); } - - try { - final String serialized = bucket.serialize(mapper); - - secondaryCacheCluster.useCluster(connection -> connection.sync() - .setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized)); - } catch (JsonProcessingException e) { - ex = ex == null ? new IllegalArgumentException(e) : ex; - } - - if (ex != null) { - throw ex; - } - } private LeakyBucket getBucket(String key) { @@ -119,17 +98,6 @@ public class RateLimiter { logger.warn("Deserialization error", e); } - try { - final String serialized = secondaryCacheCluster - .withCluster(connection -> connection.sync().get(getBucketName(key))); - - if (serialized != null) { - return LeakyBucket.fromSerialized(mapper, serialized); - } - } catch (IOException e) { - logger.warn("Deserialization error", e); - } - return new LeakyBucket(bucketSize, leakRatePerMillis); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java index f395a6ac0..4e66e1e95 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java @@ -41,88 +41,86 @@ public class RateLimiters { private final AtomicReference unsealedIpLimiter; private final FaultTolerantRedisCluster cacheCluster; - private final FaultTolerantRedisCluster newCacheCluster; private final DynamicConfigurationManager dynamicConfig; - public RateLimiters(RateLimitsConfiguration config, DynamicConfigurationManager dynamicConfig, FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster newCacheCluster) { - this.cacheCluster = cacheCluster; - this.newCacheCluster = newCacheCluster; - this.dynamicConfig = dynamicConfig; + public RateLimiters(RateLimitsConfiguration config, DynamicConfigurationManager dynamicConfig, FaultTolerantRedisCluster cacheCluster) { + this.cacheCluster = cacheCluster; + this.dynamicConfig = dynamicConfig; - this.smsDestinationLimiter = new RateLimiter(cacheCluster, newCacheCluster, "smsDestination", + this.smsDestinationLimiter = new RateLimiter(cacheCluster, "smsDestination", config.getSmsDestination().getBucketSize(), config.getSmsDestination().getLeakRatePerMinute()); - this.voiceDestinationLimiter = new RateLimiter(cacheCluster, newCacheCluster, "voxDestination", + this.voiceDestinationLimiter = new RateLimiter(cacheCluster, "voxDestination", config.getVoiceDestination().getBucketSize(), config.getVoiceDestination().getLeakRatePerMinute()); - this.voiceDestinationDailyLimiter = new RateLimiter(cacheCluster, newCacheCluster, "voxDestinationDaily", + this.voiceDestinationDailyLimiter = new RateLimiter(cacheCluster, "voxDestinationDaily", config.getVoiceDestinationDaily().getBucketSize(), config.getVoiceDestinationDaily().getLeakRatePerMinute()); - this.smsVoiceIpLimiter = new RateLimiter(cacheCluster, newCacheCluster, "smsVoiceIp", + this.smsVoiceIpLimiter = new RateLimiter(cacheCluster, "smsVoiceIp", config.getSmsVoiceIp().getBucketSize(), config.getSmsVoiceIp().getLeakRatePerMinute()); - this.smsVoicePrefixLimiter = new RateLimiter(cacheCluster, newCacheCluster, "smsVoicePrefix", + this.smsVoicePrefixLimiter = new RateLimiter(cacheCluster, "smsVoicePrefix", config.getSmsVoicePrefix().getBucketSize(), config.getSmsVoicePrefix().getLeakRatePerMinute()); - this.autoBlockLimiter = new RateLimiter(cacheCluster, newCacheCluster, "autoBlock", + this.autoBlockLimiter = new RateLimiter(cacheCluster, "autoBlock", config.getAutoBlock().getBucketSize(), config.getAutoBlock().getLeakRatePerMinute()); - this.verifyLimiter = new LockingRateLimiter(cacheCluster, newCacheCluster, "verify", + this.verifyLimiter = new LockingRateLimiter(cacheCluster, "verify", config.getVerifyNumber().getBucketSize(), config.getVerifyNumber().getLeakRatePerMinute()); - this.pinLimiter = new LockingRateLimiter(cacheCluster, newCacheCluster, "pin", + this.pinLimiter = new LockingRateLimiter(cacheCluster, "pin", config.getVerifyPin().getBucketSize(), config.getVerifyPin().getLeakRatePerMinute()); - this.attachmentLimiter = new RateLimiter(cacheCluster, newCacheCluster, "attachmentCreate", + this.attachmentLimiter = new RateLimiter(cacheCluster, "attachmentCreate", config.getAttachments().getBucketSize(), config.getAttachments().getLeakRatePerMinute()); - this.preKeysLimiter = new RateLimiter(cacheCluster, newCacheCluster, "prekeys", + this.preKeysLimiter = new RateLimiter(cacheCluster, "prekeys", config.getPreKeys().getBucketSize(), config.getPreKeys().getLeakRatePerMinute()); - this.messagesLimiter = new RateLimiter(cacheCluster, newCacheCluster, "messages", + this.messagesLimiter = new RateLimiter(cacheCluster, "messages", config.getMessages().getBucketSize(), config.getMessages().getLeakRatePerMinute()); - this.allocateDeviceLimiter = new RateLimiter(cacheCluster, newCacheCluster, "allocateDevice", + this.allocateDeviceLimiter = new RateLimiter(cacheCluster, "allocateDevice", config.getAllocateDevice().getBucketSize(), config.getAllocateDevice().getLeakRatePerMinute()); - this.verifyDeviceLimiter = new RateLimiter(cacheCluster, newCacheCluster, "verifyDevice", + this.verifyDeviceLimiter = new RateLimiter(cacheCluster, "verifyDevice", config.getVerifyDevice().getBucketSize(), config.getVerifyDevice().getLeakRatePerMinute()); - this.turnLimiter = new RateLimiter(cacheCluster, newCacheCluster, "turnAllocate", + this.turnLimiter = new RateLimiter(cacheCluster, "turnAllocate", config.getTurnAllocations().getBucketSize(), config.getTurnAllocations().getLeakRatePerMinute()); - this.profileLimiter = new RateLimiter(cacheCluster, newCacheCluster, "profile", + this.profileLimiter = new RateLimiter(cacheCluster, "profile", config.getProfile().getBucketSize(), config.getProfile().getLeakRatePerMinute()); - this.stickerPackLimiter = new RateLimiter(cacheCluster, newCacheCluster, "stickerPack", + this.stickerPackLimiter = new RateLimiter(cacheCluster, "stickerPack", config.getStickerPack().getBucketSize(), config.getStickerPack().getLeakRatePerMinute()); - this.usernameLookupLimiter = new RateLimiter(cacheCluster, newCacheCluster, "usernameLookup", + this.usernameLookupLimiter = new RateLimiter(cacheCluster, "usernameLookup", config.getUsernameLookup().getBucketSize(), config.getUsernameLookup().getLeakRatePerMinute()); - this.usernameSetLimiter = new RateLimiter(cacheCluster, newCacheCluster, "usernameSet", + this.usernameSetLimiter = new RateLimiter(cacheCluster, "usernameSet", config.getUsernameSet().getBucketSize(), config.getUsernameSet().getLeakRatePerMinute()); - this.unsealedSenderLimiter = new AtomicReference<>(createUnsealedSenderLimiter(cacheCluster, newCacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber())); - this.unsealedIpLimiter = new AtomicReference<>(createUnsealedIpLimiter(cacheCluster, newCacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp())); + this.unsealedSenderLimiter = new AtomicReference<>(createUnsealedSenderLimiter(cacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber())); + this.unsealedIpLimiter = new AtomicReference<>(createUnsealedIpLimiter(cacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp())); } public CardinalityRateLimiter getUnsealedSenderLimiter() { @@ -132,7 +130,7 @@ public class RateLimiters { if (rateLimiter.hasConfiguration(currentConfiguration)) { return rateLimiter; } else { - return createUnsealedSenderLimiter(cacheCluster, newCacheCluster, currentConfiguration); + return createUnsealedSenderLimiter(cacheCluster, currentConfiguration); } }); } @@ -144,7 +142,7 @@ public class RateLimiters { if (rateLimiter.hasConfiguration(currentConfiguration)) { return rateLimiter; } else { - return createUnsealedIpLimiter(cacheCluster, newCacheCluster, currentConfiguration); + return createUnsealedIpLimiter(cacheCluster, currentConfiguration); } }); } @@ -221,19 +219,17 @@ public class RateLimiters { return usernameSetLimiter; } - private CardinalityRateLimiter createUnsealedSenderLimiter(FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster secondaryCacheCluster, CardinalityRateLimitConfiguration configuration) { - return new CardinalityRateLimiter(cacheCluster, secondaryCacheCluster, "unsealedSender", configuration.getTtl(), configuration.getTtlJitter(), configuration.getMaxCardinality()); + private CardinalityRateLimiter createUnsealedSenderLimiter(FaultTolerantRedisCluster cacheCluster, CardinalityRateLimitConfiguration configuration) { + return new CardinalityRateLimiter(cacheCluster, "unsealedSender", configuration.getTtl(), configuration.getTtlJitter(), configuration.getMaxCardinality()); } - private RateLimiter createUnsealedIpLimiter(FaultTolerantRedisCluster cacheCluster, - FaultTolerantRedisCluster secondaryCacheCluster, - RateLimitConfiguration configuration) + private RateLimiter createUnsealedIpLimiter(FaultTolerantRedisCluster cacheCluster, RateLimitConfiguration configuration) { - return createLimiter(cacheCluster, secondaryCacheCluster, configuration, "unsealedIp"); + return createLimiter(cacheCluster, configuration, "unsealedIp"); } - private RateLimiter createLimiter(FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster secondaryCacheCluster, RateLimitConfiguration configuration, String name) { - return new RateLimiter(cacheCluster, secondaryCacheCluster, name, + private RateLimiter createLimiter(FaultTolerantRedisCluster cacheCluster, RateLimitConfiguration configuration, String name) { + return new RateLimiter(cacheCluster, name, configuration.getBucketSize(), configuration.getLeakRatePerMinute()); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/limits/CardinalityRateLimiterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/limits/CardinalityRateLimiterTest.java index 03311d84c..9d762a895 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/limits/CardinalityRateLimiterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/limits/CardinalityRateLimiterTest.java @@ -30,7 +30,7 @@ public class CardinalityRateLimiterTest extends AbstractRedisClusterTest { @Test public void testValidate() { final int maxCardinality = 10; - final CardinalityRateLimiter rateLimiter = new CardinalityRateLimiter(getRedisCluster(), null, "test", Duration.ofDays(1), Duration.ofDays(1), maxCardinality); + final CardinalityRateLimiter rateLimiter = new CardinalityRateLimiter(getRedisCluster(), "test", Duration.ofDays(1), Duration.ofDays(1), maxCardinality); final String source = "+18005551234"; int validatedAttempts = 0; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/limits/DynamicRateLimitsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/limits/DynamicRateLimitsTest.java index 1e09044c1..5c42537ce 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/limits/DynamicRateLimitsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/limits/DynamicRateLimitsTest.java @@ -22,13 +22,11 @@ public class DynamicRateLimitsTest { private DynamicConfigurationManager dynamicConfig; private FaultTolerantRedisCluster redisCluster; - private FaultTolerantRedisCluster newRedisCluster; @Before public void setup() { - this.dynamicConfig = mock(DynamicConfigurationManager.class); - this.redisCluster = mock(FaultTolerantRedisCluster.class); - this.newRedisCluster = mock(FaultTolerantRedisCluster.class); + this.dynamicConfig = mock(DynamicConfigurationManager.class); + this.redisCluster = mock(FaultTolerantRedisCluster.class); DynamicConfiguration defaultConfig = new DynamicConfiguration(); when(dynamicConfig.getConfiguration()).thenReturn(defaultConfig); @@ -37,7 +35,7 @@ public class DynamicRateLimitsTest { @Test public void testUnchangingConfiguration() { - RateLimiters rateLimiters = new RateLimiters(new RateLimitsConfiguration(), dynamicConfig, redisCluster, newRedisCluster); + RateLimiters rateLimiters = new RateLimiters(new RateLimitsConfiguration(), dynamicConfig, redisCluster); RateLimiter limiter = rateLimiters.getUnsealedIpLimiter(); @@ -57,7 +55,7 @@ public class DynamicRateLimitsTest { when(dynamicConfig.getConfiguration()).thenReturn(configuration); - RateLimiters rateLimiters = new RateLimiters(new RateLimitsConfiguration(), dynamicConfig, redisCluster, newRedisCluster); + RateLimiters rateLimiters = new RateLimiters(new RateLimitsConfiguration(), dynamicConfig, redisCluster); CardinalityRateLimiter limiter = rateLimiters.getUnsealedSenderLimiter();