Migrate rate limiters to rate limiter cluster
This commit is contained in:
parent
01e526af25
commit
fac4538f6f
|
@ -404,7 +404,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
|
||||
APNSender apnSender = new APNSender(apnSenderExecutor, accountsManager, config.getApnConfiguration());
|
||||
GCMSender gcmSender = new GCMSender(gcmSenderExecutor, accountsManager, config.getGcmConfiguration().getApiKey());
|
||||
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), dynamicConfigurationManager, cacheCluster, rateLimitersCluster);
|
||||
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), dynamicConfigurationManager, rateLimitersCluster);
|
||||
ProvisioningManager provisioningManager = new ProvisioningManager(pubSubManager);
|
||||
|
||||
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
|
||||
|
|
|
@ -7,7 +7,6 @@ package org.whispersystems.textsecuregcm.limits;
|
|||
|
||||
import java.time.Duration;
|
||||
import java.util.Random;
|
||||
import javax.annotation.Nullable;
|
||||
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.CardinalityRateLimitConfiguration;
|
||||
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||
|
@ -21,8 +20,6 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
|||
public class CardinalityRateLimiter {
|
||||
|
||||
private final FaultTolerantRedisCluster cacheCluster;
|
||||
@Nullable
|
||||
private final FaultTolerantRedisCluster secondaryCacheCluster;
|
||||
|
||||
private final String name;
|
||||
|
||||
|
@ -32,9 +29,8 @@ public class CardinalityRateLimiter {
|
|||
|
||||
private final Random random = new Random();
|
||||
|
||||
public CardinalityRateLimiter(final FaultTolerantRedisCluster cacheCluster, @Nullable final FaultTolerantRedisCluster secondaryCacheCluster, final String name, final Duration ttl, final Duration ttlJitter, final int maxCardinality) {
|
||||
public CardinalityRateLimiter(final FaultTolerantRedisCluster cacheCluster, final String name, final Duration ttl, final Duration ttlJitter, final int maxCardinality) {
|
||||
this.cacheCluster = cacheCluster;
|
||||
this.secondaryCacheCluster = secondaryCacheCluster;
|
||||
|
||||
this.name = name;
|
||||
|
||||
|
@ -62,28 +58,7 @@ public class CardinalityRateLimiter {
|
|||
return changed && cardinality > maxCardinality;
|
||||
});
|
||||
|
||||
final boolean secondaryRateLimitExceeded;
|
||||
if (secondaryCacheCluster != null) {
|
||||
secondaryRateLimitExceeded = secondaryCacheCluster.withCluster(connection -> {
|
||||
final boolean changed = connection.sync().pfadd(hllKey, target) == 1;
|
||||
final long cardinality = connection.sync().pfcount(hllKey);
|
||||
|
||||
final boolean mayNeedExpiration = changed && cardinality == 1;
|
||||
|
||||
// If the set already existed, we can assume it already had an expiration time and can save a round trip by
|
||||
// skipping the ttl check.
|
||||
if (mayNeedExpiration && connection.sync().ttl(hllKey) == -1) {
|
||||
final long expireSeconds = ttl.plusSeconds(random.nextInt((int) ttlJitter.toSeconds())).toSeconds();
|
||||
connection.sync().expire(hllKey, expireSeconds);
|
||||
}
|
||||
|
||||
return changed && cardinality > maxCardinality;
|
||||
});
|
||||
} else {
|
||||
secondaryRateLimitExceeded = false;
|
||||
}
|
||||
|
||||
if (rateLimitExceeded || secondaryRateLimitExceeded) {
|
||||
if (rateLimitExceeded) {
|
||||
// Using the TTL as the "retry after" time isn't EXACTLY right, but it's a reasonable approximation
|
||||
throw new RateLimitExceededException(ttl);
|
||||
}
|
||||
|
|
|
@ -20,8 +20,8 @@ public class LockingRateLimiter extends RateLimiter {
|
|||
|
||||
private final Meter meter;
|
||||
|
||||
public LockingRateLimiter(FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster secondaryCacheCluster, String name, int bucketSize, double leakRatePerMinute) {
|
||||
super(cacheCluster, secondaryCacheCluster, name, bucketSize, leakRatePerMinute);
|
||||
public LockingRateLimiter(FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute) {
|
||||
super(cacheCluster, name, bucketSize, leakRatePerMinute);
|
||||
|
||||
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
this.meter = metricRegistry.meter(name(getClass(), name, "locked"));
|
||||
|
|
|
@ -34,17 +34,13 @@ public class RateLimiter {
|
|||
private final double leakRatePerMinute;
|
||||
private final double leakRatePerMillis;
|
||||
|
||||
private final FaultTolerantRedisCluster secondaryCacheCluster;
|
||||
|
||||
public RateLimiter(FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster secondaryCacheCluster,
|
||||
String name, int bucketSize, double leakRatePerMinute)
|
||||
public RateLimiter(FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute)
|
||||
{
|
||||
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
|
||||
this.meter = metricRegistry.meter(name(getClass(), name, "exceeded"));
|
||||
this.validateTimer = metricRegistry.timer(name(getClass(), name, "validate"));
|
||||
this.cacheCluster = cacheCluster;
|
||||
this.secondaryCacheCluster = secondaryCacheCluster;
|
||||
this.name = name;
|
||||
this.bucketSize = bucketSize;
|
||||
this.leakRatePerMinute = leakRatePerMinute;
|
||||
|
@ -70,8 +66,6 @@ public class RateLimiter {
|
|||
|
||||
public void clear(String key) {
|
||||
cacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key)));
|
||||
|
||||
secondaryCacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key)));
|
||||
}
|
||||
|
||||
public int getBucketSize() {
|
||||
|
@ -84,28 +78,13 @@ public class RateLimiter {
|
|||
|
||||
private void setBucket(String key, LeakyBucket bucket) {
|
||||
|
||||
IllegalArgumentException ex = null;
|
||||
try {
|
||||
final String serialized = bucket.serialize(mapper);
|
||||
|
||||
cacheCluster.useCluster(connection -> connection.sync().setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized));
|
||||
} catch (JsonProcessingException e) {
|
||||
ex = new IllegalArgumentException(e);
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
final String serialized = bucket.serialize(mapper);
|
||||
|
||||
secondaryCacheCluster.useCluster(connection -> connection.sync()
|
||||
.setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized));
|
||||
} catch (JsonProcessingException e) {
|
||||
ex = ex == null ? new IllegalArgumentException(e) : ex;
|
||||
}
|
||||
|
||||
if (ex != null) {
|
||||
throw ex;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private LeakyBucket getBucket(String key) {
|
||||
|
@ -119,17 +98,6 @@ public class RateLimiter {
|
|||
logger.warn("Deserialization error", e);
|
||||
}
|
||||
|
||||
try {
|
||||
final String serialized = secondaryCacheCluster
|
||||
.withCluster(connection -> connection.sync().get(getBucketName(key)));
|
||||
|
||||
if (serialized != null) {
|
||||
return LeakyBucket.fromSerialized(mapper, serialized);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("Deserialization error", e);
|
||||
}
|
||||
|
||||
return new LeakyBucket(bucketSize, leakRatePerMillis);
|
||||
}
|
||||
|
||||
|
|
|
@ -41,88 +41,86 @@ public class RateLimiters {
|
|||
private final AtomicReference<RateLimiter> unsealedIpLimiter;
|
||||
|
||||
private final FaultTolerantRedisCluster cacheCluster;
|
||||
private final FaultTolerantRedisCluster newCacheCluster;
|
||||
private final DynamicConfigurationManager dynamicConfig;
|
||||
|
||||
public RateLimiters(RateLimitsConfiguration config, DynamicConfigurationManager dynamicConfig, FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster newCacheCluster) {
|
||||
this.cacheCluster = cacheCluster;
|
||||
this.newCacheCluster = newCacheCluster;
|
||||
this.dynamicConfig = dynamicConfig;
|
||||
public RateLimiters(RateLimitsConfiguration config, DynamicConfigurationManager dynamicConfig, FaultTolerantRedisCluster cacheCluster) {
|
||||
this.cacheCluster = cacheCluster;
|
||||
this.dynamicConfig = dynamicConfig;
|
||||
|
||||
this.smsDestinationLimiter = new RateLimiter(cacheCluster, newCacheCluster, "smsDestination",
|
||||
this.smsDestinationLimiter = new RateLimiter(cacheCluster, "smsDestination",
|
||||
config.getSmsDestination().getBucketSize(),
|
||||
config.getSmsDestination().getLeakRatePerMinute());
|
||||
|
||||
this.voiceDestinationLimiter = new RateLimiter(cacheCluster, newCacheCluster, "voxDestination",
|
||||
this.voiceDestinationLimiter = new RateLimiter(cacheCluster, "voxDestination",
|
||||
config.getVoiceDestination().getBucketSize(),
|
||||
config.getVoiceDestination().getLeakRatePerMinute());
|
||||
|
||||
this.voiceDestinationDailyLimiter = new RateLimiter(cacheCluster, newCacheCluster, "voxDestinationDaily",
|
||||
this.voiceDestinationDailyLimiter = new RateLimiter(cacheCluster, "voxDestinationDaily",
|
||||
config.getVoiceDestinationDaily().getBucketSize(),
|
||||
config.getVoiceDestinationDaily().getLeakRatePerMinute());
|
||||
|
||||
this.smsVoiceIpLimiter = new RateLimiter(cacheCluster, newCacheCluster, "smsVoiceIp",
|
||||
this.smsVoiceIpLimiter = new RateLimiter(cacheCluster, "smsVoiceIp",
|
||||
config.getSmsVoiceIp().getBucketSize(),
|
||||
config.getSmsVoiceIp().getLeakRatePerMinute());
|
||||
|
||||
this.smsVoicePrefixLimiter = new RateLimiter(cacheCluster, newCacheCluster, "smsVoicePrefix",
|
||||
this.smsVoicePrefixLimiter = new RateLimiter(cacheCluster, "smsVoicePrefix",
|
||||
config.getSmsVoicePrefix().getBucketSize(),
|
||||
config.getSmsVoicePrefix().getLeakRatePerMinute());
|
||||
|
||||
this.autoBlockLimiter = new RateLimiter(cacheCluster, newCacheCluster, "autoBlock",
|
||||
this.autoBlockLimiter = new RateLimiter(cacheCluster, "autoBlock",
|
||||
config.getAutoBlock().getBucketSize(),
|
||||
config.getAutoBlock().getLeakRatePerMinute());
|
||||
|
||||
this.verifyLimiter = new LockingRateLimiter(cacheCluster, newCacheCluster, "verify",
|
||||
this.verifyLimiter = new LockingRateLimiter(cacheCluster, "verify",
|
||||
config.getVerifyNumber().getBucketSize(),
|
||||
config.getVerifyNumber().getLeakRatePerMinute());
|
||||
|
||||
this.pinLimiter = new LockingRateLimiter(cacheCluster, newCacheCluster, "pin",
|
||||
this.pinLimiter = new LockingRateLimiter(cacheCluster, "pin",
|
||||
config.getVerifyPin().getBucketSize(),
|
||||
config.getVerifyPin().getLeakRatePerMinute());
|
||||
|
||||
this.attachmentLimiter = new RateLimiter(cacheCluster, newCacheCluster, "attachmentCreate",
|
||||
this.attachmentLimiter = new RateLimiter(cacheCluster, "attachmentCreate",
|
||||
config.getAttachments().getBucketSize(),
|
||||
config.getAttachments().getLeakRatePerMinute());
|
||||
|
||||
this.preKeysLimiter = new RateLimiter(cacheCluster, newCacheCluster, "prekeys",
|
||||
this.preKeysLimiter = new RateLimiter(cacheCluster, "prekeys",
|
||||
config.getPreKeys().getBucketSize(),
|
||||
config.getPreKeys().getLeakRatePerMinute());
|
||||
|
||||
this.messagesLimiter = new RateLimiter(cacheCluster, newCacheCluster, "messages",
|
||||
this.messagesLimiter = new RateLimiter(cacheCluster, "messages",
|
||||
config.getMessages().getBucketSize(),
|
||||
config.getMessages().getLeakRatePerMinute());
|
||||
|
||||
this.allocateDeviceLimiter = new RateLimiter(cacheCluster, newCacheCluster, "allocateDevice",
|
||||
this.allocateDeviceLimiter = new RateLimiter(cacheCluster, "allocateDevice",
|
||||
config.getAllocateDevice().getBucketSize(),
|
||||
config.getAllocateDevice().getLeakRatePerMinute());
|
||||
|
||||
this.verifyDeviceLimiter = new RateLimiter(cacheCluster, newCacheCluster, "verifyDevice",
|
||||
this.verifyDeviceLimiter = new RateLimiter(cacheCluster, "verifyDevice",
|
||||
config.getVerifyDevice().getBucketSize(),
|
||||
config.getVerifyDevice().getLeakRatePerMinute());
|
||||
|
||||
this.turnLimiter = new RateLimiter(cacheCluster, newCacheCluster, "turnAllocate",
|
||||
this.turnLimiter = new RateLimiter(cacheCluster, "turnAllocate",
|
||||
config.getTurnAllocations().getBucketSize(),
|
||||
config.getTurnAllocations().getLeakRatePerMinute());
|
||||
|
||||
this.profileLimiter = new RateLimiter(cacheCluster, newCacheCluster, "profile",
|
||||
this.profileLimiter = new RateLimiter(cacheCluster, "profile",
|
||||
config.getProfile().getBucketSize(),
|
||||
config.getProfile().getLeakRatePerMinute());
|
||||
|
||||
this.stickerPackLimiter = new RateLimiter(cacheCluster, newCacheCluster, "stickerPack",
|
||||
this.stickerPackLimiter = new RateLimiter(cacheCluster, "stickerPack",
|
||||
config.getStickerPack().getBucketSize(),
|
||||
config.getStickerPack().getLeakRatePerMinute());
|
||||
|
||||
this.usernameLookupLimiter = new RateLimiter(cacheCluster, newCacheCluster, "usernameLookup",
|
||||
this.usernameLookupLimiter = new RateLimiter(cacheCluster, "usernameLookup",
|
||||
config.getUsernameLookup().getBucketSize(),
|
||||
config.getUsernameLookup().getLeakRatePerMinute());
|
||||
|
||||
this.usernameSetLimiter = new RateLimiter(cacheCluster, newCacheCluster, "usernameSet",
|
||||
this.usernameSetLimiter = new RateLimiter(cacheCluster, "usernameSet",
|
||||
config.getUsernameSet().getBucketSize(),
|
||||
config.getUsernameSet().getLeakRatePerMinute());
|
||||
|
||||
this.unsealedSenderLimiter = new AtomicReference<>(createUnsealedSenderLimiter(cacheCluster, newCacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber()));
|
||||
this.unsealedIpLimiter = new AtomicReference<>(createUnsealedIpLimiter(cacheCluster, newCacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp()));
|
||||
this.unsealedSenderLimiter = new AtomicReference<>(createUnsealedSenderLimiter(cacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber()));
|
||||
this.unsealedIpLimiter = new AtomicReference<>(createUnsealedIpLimiter(cacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp()));
|
||||
}
|
||||
|
||||
public CardinalityRateLimiter getUnsealedSenderLimiter() {
|
||||
|
@ -132,7 +130,7 @@ public class RateLimiters {
|
|||
if (rateLimiter.hasConfiguration(currentConfiguration)) {
|
||||
return rateLimiter;
|
||||
} else {
|
||||
return createUnsealedSenderLimiter(cacheCluster, newCacheCluster, currentConfiguration);
|
||||
return createUnsealedSenderLimiter(cacheCluster, currentConfiguration);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -144,7 +142,7 @@ public class RateLimiters {
|
|||
if (rateLimiter.hasConfiguration(currentConfiguration)) {
|
||||
return rateLimiter;
|
||||
} else {
|
||||
return createUnsealedIpLimiter(cacheCluster, newCacheCluster, currentConfiguration);
|
||||
return createUnsealedIpLimiter(cacheCluster, currentConfiguration);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -221,19 +219,17 @@ public class RateLimiters {
|
|||
return usernameSetLimiter;
|
||||
}
|
||||
|
||||
private CardinalityRateLimiter createUnsealedSenderLimiter(FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster secondaryCacheCluster, CardinalityRateLimitConfiguration configuration) {
|
||||
return new CardinalityRateLimiter(cacheCluster, secondaryCacheCluster, "unsealedSender", configuration.getTtl(), configuration.getTtlJitter(), configuration.getMaxCardinality());
|
||||
private CardinalityRateLimiter createUnsealedSenderLimiter(FaultTolerantRedisCluster cacheCluster, CardinalityRateLimitConfiguration configuration) {
|
||||
return new CardinalityRateLimiter(cacheCluster, "unsealedSender", configuration.getTtl(), configuration.getTtlJitter(), configuration.getMaxCardinality());
|
||||
}
|
||||
|
||||
private RateLimiter createUnsealedIpLimiter(FaultTolerantRedisCluster cacheCluster,
|
||||
FaultTolerantRedisCluster secondaryCacheCluster,
|
||||
RateLimitConfiguration configuration)
|
||||
private RateLimiter createUnsealedIpLimiter(FaultTolerantRedisCluster cacheCluster, RateLimitConfiguration configuration)
|
||||
{
|
||||
return createLimiter(cacheCluster, secondaryCacheCluster, configuration, "unsealedIp");
|
||||
return createLimiter(cacheCluster, configuration, "unsealedIp");
|
||||
}
|
||||
|
||||
private RateLimiter createLimiter(FaultTolerantRedisCluster cacheCluster, FaultTolerantRedisCluster secondaryCacheCluster, RateLimitConfiguration configuration, String name) {
|
||||
return new RateLimiter(cacheCluster, secondaryCacheCluster, name,
|
||||
private RateLimiter createLimiter(FaultTolerantRedisCluster cacheCluster, RateLimitConfiguration configuration, String name) {
|
||||
return new RateLimiter(cacheCluster, name,
|
||||
configuration.getBucketSize(),
|
||||
configuration.getLeakRatePerMinute());
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ public class CardinalityRateLimiterTest extends AbstractRedisClusterTest {
|
|||
@Test
|
||||
public void testValidate() {
|
||||
final int maxCardinality = 10;
|
||||
final CardinalityRateLimiter rateLimiter = new CardinalityRateLimiter(getRedisCluster(), null, "test", Duration.ofDays(1), Duration.ofDays(1), maxCardinality);
|
||||
final CardinalityRateLimiter rateLimiter = new CardinalityRateLimiter(getRedisCluster(), "test", Duration.ofDays(1), Duration.ofDays(1), maxCardinality);
|
||||
|
||||
final String source = "+18005551234";
|
||||
int validatedAttempts = 0;
|
||||
|
|
|
@ -22,13 +22,11 @@ public class DynamicRateLimitsTest {
|
|||
|
||||
private DynamicConfigurationManager dynamicConfig;
|
||||
private FaultTolerantRedisCluster redisCluster;
|
||||
private FaultTolerantRedisCluster newRedisCluster;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
this.dynamicConfig = mock(DynamicConfigurationManager.class);
|
||||
this.redisCluster = mock(FaultTolerantRedisCluster.class);
|
||||
this.newRedisCluster = mock(FaultTolerantRedisCluster.class);
|
||||
this.dynamicConfig = mock(DynamicConfigurationManager.class);
|
||||
this.redisCluster = mock(FaultTolerantRedisCluster.class);
|
||||
|
||||
DynamicConfiguration defaultConfig = new DynamicConfiguration();
|
||||
when(dynamicConfig.getConfiguration()).thenReturn(defaultConfig);
|
||||
|
@ -37,7 +35,7 @@ public class DynamicRateLimitsTest {
|
|||
|
||||
@Test
|
||||
public void testUnchangingConfiguration() {
|
||||
RateLimiters rateLimiters = new RateLimiters(new RateLimitsConfiguration(), dynamicConfig, redisCluster, newRedisCluster);
|
||||
RateLimiters rateLimiters = new RateLimiters(new RateLimitsConfiguration(), dynamicConfig, redisCluster);
|
||||
|
||||
RateLimiter limiter = rateLimiters.getUnsealedIpLimiter();
|
||||
|
||||
|
@ -57,7 +55,7 @@ public class DynamicRateLimitsTest {
|
|||
|
||||
when(dynamicConfig.getConfiguration()).thenReturn(configuration);
|
||||
|
||||
RateLimiters rateLimiters = new RateLimiters(new RateLimitsConfiguration(), dynamicConfig, redisCluster, newRedisCluster);
|
||||
RateLimiters rateLimiters = new RateLimiters(new RateLimitsConfiguration(), dynamicConfig, redisCluster);
|
||||
|
||||
CardinalityRateLimiter limiter = rateLimiters.getUnsealedSenderLimiter();
|
||||
|
||||
|
|
Loading…
Reference in New Issue