Decommission the old cache.

This commit is contained in:
Jon Chambers 2020-06-17 18:08:59 -04:00 committed by Jon Chambers
parent 0352d413e3
commit eea073f882
20 changed files with 94 additions and 393 deletions

View File

@ -24,10 +24,6 @@ turn: # TURN server configuration
- turn:yourdomain:443?transport=udp - turn:yourdomain:443?transport=udp
- turn:etc.com:80?transport=udp - turn:etc.com:80?transport=udp
cache: # Redis server configuration for cache cluster
url:
replicaUrls:
cacheCluster: # Redis server configuration for cache cluster cacheCluster: # Redis server configuration for cache cluster
urls: urls:
- redis://redis.example.com:6379/ - redis://redis.example.com:6379/

View File

@ -86,11 +86,6 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty @JsonProperty
private List<MicrometerConfiguration> micrometer = new LinkedList<>(); private List<MicrometerConfiguration> micrometer = new LinkedList<>();
@NotNull
@Valid
@JsonProperty
private RedisConfiguration cache;
@NotNull @NotNull
@Valid @Valid
@JsonProperty @JsonProperty
@ -245,10 +240,6 @@ public class WhisperServerConfiguration extends Configuration {
return gcpAttachments; return gcpAttachments;
} }
public RedisConfiguration getCacheConfiguration() {
return cache;
}
public RedisClusterConfiguration getCacheClusterConfiguration() { public RedisClusterConfiguration getCacheClusterConfiguration() {
return cacheCluster; return cacheCluster;
} }

View File

@ -144,7 +144,6 @@ import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler;
import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener; import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener;
import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
import org.whispersystems.textsecuregcm.workers.CertificateCommand; import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.ClearCacheClusterCommand;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.VacuumCommand; import org.whispersystems.textsecuregcm.workers.VacuumCommand;
import org.whispersystems.textsecuregcm.workers.ZkParamsCommand; import org.whispersystems.textsecuregcm.workers.ZkParamsCommand;
@ -175,7 +174,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new DeleteUserCommand()); bootstrap.addCommand(new DeleteUserCommand());
bootstrap.addCommand(new CertificateCommand()); bootstrap.addCommand(new CertificateCommand());
bootstrap.addCommand(new ZkParamsCommand()); bootstrap.addCommand(new ZkParamsCommand());
bootstrap.addCommand(new ClearCacheClusterCommand());
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("accountdb", "accountsdb.xml") { bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("accountdb", "accountsdb.xml") {
@Override @Override
@ -269,13 +267,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase); AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase); RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase);
RedisClientFactory cacheClientFactory = new RedisClientFactory("main_cache", config.getCacheConfiguration().getUrl(), config.getCacheConfiguration().getReplicaUrls(), config.getCacheConfiguration().getCircuitBreakerConfiguration());
RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration()); RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
RedisClientFactory directoryClientFactory = new RedisClientFactory("directory_cache", config.getDirectoryConfiguration().getRedisConfiguration().getUrl(), config.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), config.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()); RedisClientFactory directoryClientFactory = new RedisClientFactory("directory_cache", config.getDirectoryConfiguration().getRedisConfiguration().getUrl(), config.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), config.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
RedisClientFactory messagesClientFactory = new RedisClientFactory("message_cache", config.getMessageCacheConfiguration().getRedisConfiguration().getUrl(), config.getMessageCacheConfiguration().getRedisConfiguration().getReplicaUrls(), config.getMessageCacheConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()); RedisClientFactory messagesClientFactory = new RedisClientFactory("message_cache", config.getMessageCacheConfiguration().getRedisConfiguration().getUrl(), config.getMessageCacheConfiguration().getRedisConfiguration().getReplicaUrls(), config.getMessageCacheConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
RedisClientFactory pushSchedulerClientFactory = new RedisClientFactory("push_scheduler_cache", config.getPushScheduler().getUrl(), config.getPushScheduler().getReplicaUrls(), config.getPushScheduler().getCircuitBreakerConfiguration()); RedisClientFactory pushSchedulerClientFactory = new RedisClientFactory("push_scheduler_cache", config.getPushScheduler().getUrl(), config.getPushScheduler().getReplicaUrls(), config.getPushScheduler().getCircuitBreakerConfiguration());
ReplicatedJedisPool cacheClient = cacheClientFactory.getRedisClientPool();
ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool(); ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool();
ReplicatedJedisPool directoryClient = directoryClientFactory.getRedisClientPool(); ReplicatedJedisPool directoryClient = directoryClientFactory.getRedisClientPool();
ReplicatedJedisPool messagesClient = messagesClientFactory.getRedisClientPool(); ReplicatedJedisPool messagesClient = messagesClientFactory.getRedisClientPool();
@ -288,11 +284,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DirectoryManager directory = new DirectoryManager(directoryClient); DirectoryManager directory = new DirectoryManager(directoryClient);
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration()); DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheClient, cacheCluster); PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheCluster);
PendingDevicesManager pendingDevicesManager = new PendingDevicesManager(pendingDevices, cacheClient, cacheCluster); PendingDevicesManager pendingDevicesManager = new PendingDevicesManager(pendingDevices, cacheCluster);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient, cacheCluster); AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster);
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheClient, cacheCluster); UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient, cacheCluster); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesClient, messages, accountsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes()); MessagesCache messagesCache = new MessagesCache(messagesClient, messages, accountsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes());
MessagesManager messagesManager = new MessagesManager(messages, messagesCache); MessagesManager messagesManager = new MessagesManager(messages, messagesCache);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs); RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
@ -302,7 +298,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration()); APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey()); GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager); WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager);
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheClient, cacheCluster); RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager); AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
DisabledPermittedAccountAuthenticator disabledPermittedAccountAuthenticator = new DisabledPermittedAccountAuthenticator(accountsManager); DisabledPermittedAccountAuthenticator disabledPermittedAccountAuthenticator = new DisabledPermittedAccountAuthenticator(accountsManager);
@ -325,13 +321,13 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration()); DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
ActiveUserCounter activeUserCounter = new ActiveUserCounter(config.getMetricsFactory(), cacheClient, cacheCluster); ActiveUserCounter activeUserCounter = new ActiveUserCounter(config.getMetricsFactory(), cacheCluster);
DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryReconciliationClient, directory); DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryReconciliationClient, directory);
AccountCleaner accountCleaner = new AccountCleaner(accountsManager, directoryQueue); AccountCleaner accountCleaner = new AccountCleaner(accountsManager, directoryQueue);
PushFeedbackProcessor pushFeedbackProcessor = new PushFeedbackProcessor(accountsManager, directoryQueue); PushFeedbackProcessor pushFeedbackProcessor = new PushFeedbackProcessor(accountsManager, directoryQueue);
List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = List.of(pushFeedbackProcessor, activeUserCounter, directoryReconciler, accountCleaner); List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = List.of(pushFeedbackProcessor, activeUserCounter, directoryReconciler, accountCleaner);
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheClient, cacheCluster); AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster);
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()); AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs());
messagesCache.setPubSubManager(pubSubManager, pushSender); messagesCache.setPubSubManager(pubSubManager, pushSender);
@ -429,7 +425,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
/// ///
environment.healthChecks().register("directory", new RedisHealthCheck(directoryClient)); environment.healthChecks().register("directory", new RedisHealthCheck(directoryClient));
environment.healthChecks().register("cache", new RedisHealthCheck(cacheClient));
environment.healthChecks().register("cacheCluster", new RedisClusterHealthCheck(cacheCluster)); environment.healthChecks().register("cacheCluster", new RedisClusterHealthCheck(cacheCluster));
environment.metrics().register(name(CpuUsageGauge.class, "cpu"), new CpuUsageGauge()); environment.metrics().register(name(CpuUsageGauge.class, "cpu"), new CpuUsageGauge());

View File

@ -6,18 +6,16 @@ import com.codahale.metrics.SharedMetricRegistries;
import io.lettuce.core.SetArgs; import io.lettuce.core.SetArgs;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
import redis.clients.jedis.Jedis;
public class LockingRateLimiter extends RateLimiter { public class LockingRateLimiter extends RateLimiter {
private final Meter meter; private final Meter meter;
public LockingRateLimiter(ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute) { public LockingRateLimiter(FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute) {
super(cacheClient, cacheCluster, name, bucketSize, leakRatePerMinute); super(cacheCluster, name, bucketSize, leakRatePerMinute);
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
this.meter = metricRegistry.meter(name(getClass(), name, "locked")); this.meter = metricRegistry.meter(name(getClass(), name, "locked"));
@ -43,27 +41,11 @@ public class LockingRateLimiter extends RateLimiter {
} }
private void releaseLock(String key) { private void releaseLock(String key) {
try (Jedis jedis = cacheClient.getWriteResource()) { cacheCluster.useWriteCluster(connection -> connection.sync().del(getLockName(key)));
final String lockName = getLockName(key);
jedis.del(lockName);
cacheCluster.useWriteCluster(connection -> connection.sync().del(lockName));
}
} }
private boolean acquireLock(String key) { private boolean acquireLock(String key) {
try (Jedis jedis = cacheClient.getWriteResource()) { return cacheCluster.withWriteCluster(connection -> connection.sync().set(getLockName(key), "L", SetArgs.Builder.nx().ex(10))) != null;
final String lockName = getLockName(key);
final boolean acquiredLock = jedis.set(lockName, "L", "NX", "EX", 10) != null;
if (acquiredLock) {
// TODO Restore the NX flag when the cluster becomes the primary source of truth
cacheCluster.useWriteCluster(connection -> connection.sync().set(lockName, "L", SetArgs.Builder.ex(10)));
}
return acquiredLock;
}
} }
private String getLockName(String key) { private String getLockName(String key) {

View File

@ -26,14 +26,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.io.IOException; import java.io.IOException;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
import redis.clients.jedis.Jedis;
public class RateLimiter { public class RateLimiter {
@ -42,20 +40,19 @@ public class RateLimiter {
private final Meter meter; private final Meter meter;
private final Timer validateTimer; private final Timer validateTimer;
protected final ReplicatedJedisPool cacheClient;
protected final FaultTolerantRedisCluster cacheCluster; protected final FaultTolerantRedisCluster cacheCluster;
protected final String name; protected final String name;
private final int bucketSize; private final int bucketSize;
private final double leakRatePerMillis; private final double leakRatePerMillis;
private final boolean reportLimits; private final boolean reportLimits;
public RateLimiter(ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, String name, public RateLimiter(FaultTolerantRedisCluster cacheCluster, String name,
int bucketSize, double leakRatePerMinute) int bucketSize, double leakRatePerMinute)
{ {
this(cacheClient, cacheCluster, name, bucketSize, leakRatePerMinute, false); this(cacheCluster, name, bucketSize, leakRatePerMinute, false);
} }
public RateLimiter(ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, String name, public RateLimiter(FaultTolerantRedisCluster cacheCluster, String name,
int bucketSize, double leakRatePerMinute, int bucketSize, double leakRatePerMinute,
boolean reportLimits) boolean reportLimits)
{ {
@ -63,7 +60,6 @@ public class RateLimiter {
this.meter = metricRegistry.meter(name(getClass(), name, "exceeded")); this.meter = metricRegistry.meter(name(getClass(), name, "exceeded"));
this.validateTimer = metricRegistry.timer(name(getClass(), name, "validate")); this.validateTimer = metricRegistry.timer(name(getClass(), name, "validate"));
this.cacheClient = cacheClient;
this.cacheCluster = cacheCluster; this.cacheCluster = cacheCluster;
this.name = name; this.name = name;
this.bucketSize = bucketSize; this.bucketSize = bucketSize;
@ -89,22 +85,14 @@ public class RateLimiter {
} }
public void clear(String key) { public void clear(String key) {
try (Jedis jedis = cacheClient.getWriteResource()) { cacheCluster.useWriteCluster(connection -> connection.sync().del(getBucketName(key)));
final String bucketName = getBucketName(key);
jedis.del(bucketName);
cacheCluster.useWriteCluster(connection -> connection.sync().del(bucketName));
}
} }
private void setBucket(String key, LeakyBucket bucket) { private void setBucket(String key, LeakyBucket bucket) {
try (Jedis jedis = cacheClient.getWriteResource()) { try {
final String bucketName = getBucketName(key);
final String serialized = bucket.serialize(mapper); final String serialized = bucket.serialize(mapper);
final int level = (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000);
jedis.setex(bucketName, level, serialized); cacheCluster.useWriteCluster(connection -> connection.sync().setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized));
cacheCluster.useWriteCluster(connection -> connection.sync().setex(bucketName, level, serialized));
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }

View File

@ -19,7 +19,6 @@ package org.whispersystems.textsecuregcm.limits;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
public class RateLimiters { public class RateLimiters {
@ -48,84 +47,84 @@ public class RateLimiters {
private final RateLimiter usernameLookupLimiter; private final RateLimiter usernameLookupLimiter;
private final RateLimiter usernameSetLimiter; private final RateLimiter usernameSetLimiter;
public RateLimiters(RateLimitsConfiguration config, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { public RateLimiters(RateLimitsConfiguration config, FaultTolerantRedisCluster cacheCluster) {
this.smsDestinationLimiter = new RateLimiter(cacheClient, cacheCluster, "smsDestination", this.smsDestinationLimiter = new RateLimiter(cacheCluster, "smsDestination",
config.getSmsDestination().getBucketSize(), config.getSmsDestination().getBucketSize(),
config.getSmsDestination().getLeakRatePerMinute()); config.getSmsDestination().getLeakRatePerMinute());
this.voiceDestinationLimiter = new RateLimiter(cacheClient, cacheCluster, "voxDestination", this.voiceDestinationLimiter = new RateLimiter(cacheCluster, "voxDestination",
config.getVoiceDestination().getBucketSize(), config.getVoiceDestination().getBucketSize(),
config.getVoiceDestination().getLeakRatePerMinute()); config.getVoiceDestination().getLeakRatePerMinute());
this.voiceDestinationDailyLimiter = new RateLimiter(cacheClient, cacheCluster, "voxDestinationDaily", this.voiceDestinationDailyLimiter = new RateLimiter(cacheCluster, "voxDestinationDaily",
config.getVoiceDestinationDaily().getBucketSize(), config.getVoiceDestinationDaily().getBucketSize(),
config.getVoiceDestinationDaily().getLeakRatePerMinute()); config.getVoiceDestinationDaily().getLeakRatePerMinute());
this.smsVoiceIpLimiter = new RateLimiter(cacheClient, cacheCluster, "smsVoiceIp", this.smsVoiceIpLimiter = new RateLimiter(cacheCluster, "smsVoiceIp",
config.getSmsVoiceIp().getBucketSize(), config.getSmsVoiceIp().getBucketSize(),
config.getSmsVoiceIp().getLeakRatePerMinute()); config.getSmsVoiceIp().getLeakRatePerMinute());
this.smsVoicePrefixLimiter = new RateLimiter(cacheClient, cacheCluster, "smsVoicePrefix", this.smsVoicePrefixLimiter = new RateLimiter(cacheCluster, "smsVoicePrefix",
config.getSmsVoicePrefix().getBucketSize(), config.getSmsVoicePrefix().getBucketSize(),
config.getSmsVoicePrefix().getLeakRatePerMinute()); config.getSmsVoicePrefix().getLeakRatePerMinute());
this.autoBlockLimiter = new RateLimiter(cacheClient, cacheCluster, "autoBlock", this.autoBlockLimiter = new RateLimiter(cacheCluster, "autoBlock",
config.getAutoBlock().getBucketSize(), config.getAutoBlock().getBucketSize(),
config.getAutoBlock().getLeakRatePerMinute()); config.getAutoBlock().getLeakRatePerMinute());
this.verifyLimiter = new LockingRateLimiter(cacheClient, cacheCluster, "verify", this.verifyLimiter = new LockingRateLimiter(cacheCluster, "verify",
config.getVerifyNumber().getBucketSize(), config.getVerifyNumber().getBucketSize(),
config.getVerifyNumber().getLeakRatePerMinute()); config.getVerifyNumber().getLeakRatePerMinute());
this.pinLimiter = new LockingRateLimiter(cacheClient, cacheCluster, "pin", this.pinLimiter = new LockingRateLimiter(cacheCluster, "pin",
config.getVerifyPin().getBucketSize(), config.getVerifyPin().getBucketSize(),
config.getVerifyPin().getLeakRatePerMinute()); config.getVerifyPin().getLeakRatePerMinute());
this.attachmentLimiter = new RateLimiter(cacheClient, cacheCluster, "attachmentCreate", this.attachmentLimiter = new RateLimiter(cacheCluster, "attachmentCreate",
config.getAttachments().getBucketSize(), config.getAttachments().getBucketSize(),
config.getAttachments().getLeakRatePerMinute()); config.getAttachments().getLeakRatePerMinute());
this.contactsLimiter = new RateLimiter(cacheClient, cacheCluster, "contactsQuery", this.contactsLimiter = new RateLimiter(cacheCluster, "contactsQuery",
config.getContactQueries().getBucketSize(), config.getContactQueries().getBucketSize(),
config.getContactQueries().getLeakRatePerMinute()); config.getContactQueries().getLeakRatePerMinute());
this.contactsIpLimiter = new RateLimiter(cacheClient, cacheCluster, "contactsIpQuery", this.contactsIpLimiter = new RateLimiter(cacheCluster, "contactsIpQuery",
config.getContactIpQueries().getBucketSize(), config.getContactIpQueries().getBucketSize(),
config.getContactIpQueries().getLeakRatePerMinute()); config.getContactIpQueries().getLeakRatePerMinute());
this.preKeysLimiter = new RateLimiter(cacheClient, cacheCluster, "prekeys", this.preKeysLimiter = new RateLimiter(cacheCluster, "prekeys",
config.getPreKeys().getBucketSize(), config.getPreKeys().getBucketSize(),
config.getPreKeys().getLeakRatePerMinute()); config.getPreKeys().getLeakRatePerMinute());
this.messagesLimiter = new RateLimiter(cacheClient, cacheCluster, "messages", this.messagesLimiter = new RateLimiter(cacheCluster, "messages",
config.getMessages().getBucketSize(), config.getMessages().getBucketSize(),
config.getMessages().getLeakRatePerMinute()); config.getMessages().getLeakRatePerMinute());
this.allocateDeviceLimiter = new RateLimiter(cacheClient, cacheCluster, "allocateDevice", this.allocateDeviceLimiter = new RateLimiter(cacheCluster, "allocateDevice",
config.getAllocateDevice().getBucketSize(), config.getAllocateDevice().getBucketSize(),
config.getAllocateDevice().getLeakRatePerMinute()); config.getAllocateDevice().getLeakRatePerMinute());
this.verifyDeviceLimiter = new RateLimiter(cacheClient, cacheCluster, "verifyDevice", this.verifyDeviceLimiter = new RateLimiter(cacheCluster, "verifyDevice",
config.getVerifyDevice().getBucketSize(), config.getVerifyDevice().getBucketSize(),
config.getVerifyDevice().getLeakRatePerMinute()); config.getVerifyDevice().getLeakRatePerMinute());
this.turnLimiter = new RateLimiter(cacheClient, cacheCluster, "turnAllocate", this.turnLimiter = new RateLimiter(cacheCluster, "turnAllocate",
config.getTurnAllocations().getBucketSize(), config.getTurnAllocations().getBucketSize(),
config.getTurnAllocations().getLeakRatePerMinute()); config.getTurnAllocations().getLeakRatePerMinute());
this.profileLimiter = new RateLimiter(cacheClient, cacheCluster, "profile", this.profileLimiter = new RateLimiter(cacheCluster, "profile",
config.getProfile().getBucketSize(), config.getProfile().getBucketSize(),
config.getProfile().getLeakRatePerMinute()); config.getProfile().getLeakRatePerMinute());
this.stickerPackLimiter = new RateLimiter(cacheClient, cacheCluster, "stickerPack", this.stickerPackLimiter = new RateLimiter(cacheCluster, "stickerPack",
config.getStickerPack().getBucketSize(), config.getStickerPack().getBucketSize(),
config.getStickerPack().getLeakRatePerMinute()); config.getStickerPack().getLeakRatePerMinute());
this.usernameLookupLimiter = new RateLimiter(cacheClient, cacheCluster, "usernameLookup", this.usernameLookupLimiter = new RateLimiter(cacheCluster, "usernameLookup",
config.getUsernameLookup().getBucketSize(), config.getUsernameLookup().getBucketSize(),
config.getUsernameLookup().getLeakRatePerMinute()); config.getUsernameLookup().getLeakRatePerMinute());
this.usernameSetLimiter = new RateLimiter(cacheClient, cacheCluster, "usernameSet", this.usernameSetLimiter = new RateLimiter(cacheCluster, "usernameSet",
config.getUsernameSet().getBucketSize(), config.getUsernameSet().getBucketSize(),
config.getUsernameSet().getLeakRatePerMinute()); config.getUsernameSet().getLeakRatePerMinute());
} }

View File

@ -18,16 +18,10 @@ package org.whispersystems.textsecuregcm.storage;
import io.lettuce.core.ScriptOutputType; import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs; import io.lettuce.core.SetArgs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.LuaScript;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import redis.clients.jedis.Jedis;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@ -41,54 +35,28 @@ public class AccountDatabaseCrawlerCache {
private static final long LAST_NUMBER_TTL_MS = 86400_000L; private static final long LAST_NUMBER_TTL_MS = 86400_000L;
private static final Logger log = LoggerFactory.getLogger(AccountDatabaseCrawlerCache.class);
private final ReplicatedJedisPool jedisPool;
private final FaultTolerantRedisCluster cacheCluster; private final FaultTolerantRedisCluster cacheCluster;
private final LuaScript unlockScript;
private final ClusterLuaScript unlockClusterScript; private final ClusterLuaScript unlockClusterScript;
public AccountDatabaseCrawlerCache(ReplicatedJedisPool jedisPool, FaultTolerantRedisCluster cacheCluster) throws IOException { public AccountDatabaseCrawlerCache(FaultTolerantRedisCluster cacheCluster) throws IOException {
this.jedisPool = jedisPool;
this.cacheCluster = cacheCluster; this.cacheCluster = cacheCluster;
this.unlockScript = LuaScript.fromResource(jedisPool, "lua/account_database_crawler/unlock.lua");
this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/account_database_crawler/unlock.lua", ScriptOutputType.INTEGER); this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/account_database_crawler/unlock.lua", ScriptOutputType.INTEGER);
} }
public void clearAccelerate() { public void clearAccelerate() {
try (Jedis jedis = jedisPool.getWriteResource()) {
jedis.del(ACCELERATE_KEY);
cacheCluster.useWriteCluster(connection -> connection.sync().del(ACCELERATE_KEY)); cacheCluster.useWriteCluster(connection -> connection.sync().del(ACCELERATE_KEY));
} }
}
public boolean isAccelerated() { public boolean isAccelerated() {
return "1".equals(cacheCluster.withReadCluster(connection -> connection.sync().get(ACCELERATE_KEY))); return "1".equals(cacheCluster.withReadCluster(connection -> connection.sync().get(ACCELERATE_KEY)));
} }
public boolean claimActiveWork(String workerId, long ttlMs) { public boolean claimActiveWork(String workerId, long ttlMs) {
try (Jedis jedis = jedisPool.getWriteResource()) { return "OK".equals(cacheCluster.withWriteCluster(connection -> connection.sync().set(ACCELERATE_KEY, workerId, SetArgs.Builder.nx().px(ttlMs))));
final boolean claimed = "OK".equals(jedis.set(ACTIVE_WORKER_KEY, workerId, "NX", "PX", ttlMs));
if (claimed) {
// TODO Restore the NX flag when making the cluster the primary data store
cacheCluster.useWriteCluster(connection -> connection.sync().set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.px(ttlMs)));
}
return claimed;
}
} }
public void releaseActiveWork(String workerId) { public void releaseActiveWork(String workerId) {
List<byte[]> keys = Arrays.asList(ACTIVE_WORKER_KEY.getBytes());
List<byte[]> args = Arrays.asList(workerId.getBytes());
unlockScript.execute(keys, args);
try {
unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId)); unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId));
} catch (Exception e) {
log.warn("Failed to execute clustered unlock script", e);
}
} }
public Optional<UUID> getLastUuid() { public Optional<UUID> getLastUuid() {
@ -99,15 +67,11 @@ public class AccountDatabaseCrawlerCache {
} }
public void setLastUuid(Optional<UUID> lastUuid) { public void setLastUuid(Optional<UUID> lastUuid) {
try (Jedis jedis = jedisPool.getWriteResource()) {
if (lastUuid.isPresent()) { if (lastUuid.isPresent()) {
jedis.psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString());
cacheCluster.useWriteCluster(connection -> connection.sync().psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); cacheCluster.useWriteCluster(connection -> connection.sync().psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
} else { } else {
jedis.del(LAST_UUID_KEY);
cacheCluster.useWriteCluster(connection -> connection.sync().del(LAST_UUID_KEY)); cacheCluster.useWriteCluster(connection -> connection.sync().del(LAST_UUID_KEY));
} }
} }
}
} }

View File

@ -23,14 +23,12 @@ import com.codahale.metrics.Timer;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import io.lettuce.core.RedisException; import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier; import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
import org.whispersystems.textsecuregcm.entities.ClientContact; import org.whispersystems.textsecuregcm.entities.ClientContact;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.util.Util;
@ -41,7 +39,6 @@ import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
import redis.clients.jedis.Jedis;
public class AccountsManager { public class AccountsManager {
@ -58,15 +55,13 @@ public class AccountsManager {
private final Logger logger = LoggerFactory.getLogger(AccountsManager.class); private final Logger logger = LoggerFactory.getLogger(AccountsManager.class);
private final Accounts accounts; private final Accounts accounts;
private final ReplicatedJedisPool cacheClient;
private final FaultTolerantRedisCluster cacheCluster; private final FaultTolerantRedisCluster cacheCluster;
private final DirectoryManager directory; private final DirectoryManager directory;
private final ObjectMapper mapper; private final ObjectMapper mapper;
public AccountsManager(Accounts accounts, DirectoryManager directory, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { public AccountsManager(Accounts accounts, DirectoryManager directory, FaultTolerantRedisCluster cacheCluster) {
this.accounts = accounts; this.accounts = accounts;
this.directory = directory; this.directory = directory;
this.cacheClient = cacheClient;
this.cacheCluster = cacheCluster; this.cacheCluster = cacheCluster;
this.mapper = SystemMapper.getMapper(); this.mapper = SystemMapper.getMapper();
} }
@ -149,21 +144,14 @@ public class AccountsManager {
} }
private void redisSet(Account account) { private void redisSet(Account account) {
try (Jedis jedis = cacheClient.getWriteResource(); try (Timer.Context ignored = redisSetTimer.time()) {
Timer.Context ignored = redisSetTimer.time())
{
final String accountMapKey = getAccountMapKey(account.getNumber());
final String accountEntityKey = getAccountEntityKey(account.getUuid());
final String accountJson = mapper.writeValueAsString(account); final String accountJson = mapper.writeValueAsString(account);
jedis.set(accountMapKey, account.getUuid().toString());
jedis.set(accountEntityKey, accountJson);
cacheCluster.useWriteCluster(connection -> { cacheCluster.useWriteCluster(connection -> {
final RedisAdvancedClusterCommands<String, String> commands = connection.sync(); final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
commands.set(accountMapKey, account.getUuid().toString()); commands.set(getAccountMapKey(account.getNumber()), account.getUuid().toString());
commands.set(accountEntityKey, accountJson); commands.set(getAccountEntityKey(account.getUuid()), accountJson);
}); });
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);

View File

@ -20,9 +20,10 @@ import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.metrics.MetricsFactory;
import io.dropwizard.metrics.ReporterFactory;
import org.whispersystems.textsecuregcm.entities.ActiveUserTally; import org.whispersystems.textsecuregcm.entities.ActiveUserTally;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.util.Util;
@ -34,10 +35,6 @@ import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.dropwizard.metrics.MetricsFactory;
import io.dropwizard.metrics.ReporterFactory;
import redis.clients.jedis.Jedis;
public class ActiveUserCounter extends AccountDatabaseCrawlerListener { public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
private static final String TALLY_KEY = "active_user_tally"; private static final String TALLY_KEY = "active_user_tally";
@ -48,24 +45,19 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
private static final String INTERVALS[] = {"daily", "weekly", "monthly", "quarterly", "yearly"}; private static final String INTERVALS[] = {"daily", "weekly", "monthly", "quarterly", "yearly"};
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
private final ReplicatedJedisPool jedisPool;
private final FaultTolerantRedisCluster cacheCluster; private final FaultTolerantRedisCluster cacheCluster;
private final ObjectMapper mapper; private final ObjectMapper mapper;
public ActiveUserCounter(MetricsFactory metricsFactory, ReplicatedJedisPool jedisPool, FaultTolerantRedisCluster cacheCluster) { public ActiveUserCounter(MetricsFactory metricsFactory, FaultTolerantRedisCluster cacheCluster) {
this.metricsFactory = metricsFactory; this.metricsFactory = metricsFactory;
this.jedisPool = jedisPool;
this.cacheCluster = cacheCluster; this.cacheCluster = cacheCluster;
this.mapper = SystemMapper.getMapper(); this.mapper = SystemMapper.getMapper();
} }
@Override @Override
public void onCrawlStart() { public void onCrawlStart() {
try (Jedis jedis = jedisPool.getWriteResource()) {
jedis.del(TALLY_KEY);
cacheCluster.useWriteCluster(connection -> connection.sync().del(TALLY_KEY)); cacheCluster.useWriteCluster(connection -> connection.sync().del(TALLY_KEY));
} }
}
@Override @Override
public void onCrawlEnd(Optional<UUID> fromNumber) { public void onCrawlEnd(Optional<UUID> fromNumber) {
@ -160,7 +152,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
} }
private void incrementTallies(UUID fromUuid, Map<String, long[]> platformIncrements, Map<String, long[]> countryIncrements) { private void incrementTallies(UUID fromUuid, Map<String, long[]> platformIncrements, Map<String, long[]> countryIncrements) {
try (Jedis jedis = jedisPool.getWriteResource()) { try {
final String tallyValue = cacheCluster.withReadCluster(connection -> connection.sync().get(TALLY_KEY)); final String tallyValue = cacheCluster.withReadCluster(connection -> connection.sync().get(TALLY_KEY));
ActiveUserTally activeUserTally; ActiveUserTally activeUserTally;
@ -181,12 +173,9 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
final String tallyJson = mapper.writeValueAsString(activeUserTally); final String tallyJson = mapper.writeValueAsString(activeUserTally);
jedis.set(TALLY_KEY, tallyJson);
cacheCluster.useWriteCluster(connection -> connection.sync().set(TALLY_KEY, tallyJson)); cacheCluster.useWriteCluster(connection -> connection.sync().set(TALLY_KEY, tallyJson));
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} }
} }

View File

@ -22,14 +22,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import redis.clients.jedis.Jedis;
public class PendingAccountsManager { public class PendingAccountsManager {
private final Logger logger = LoggerFactory.getLogger(PendingAccountsManager.class); private final Logger logger = LoggerFactory.getLogger(PendingAccountsManager.class);
@ -37,14 +34,12 @@ public class PendingAccountsManager {
private static final String CACHE_PREFIX = "pending_account2::"; private static final String CACHE_PREFIX = "pending_account2::";
private final PendingAccounts pendingAccounts; private final PendingAccounts pendingAccounts;
private final ReplicatedJedisPool cacheClient;
private final FaultTolerantRedisCluster cacheCluster; private final FaultTolerantRedisCluster cacheCluster;
private final ObjectMapper mapper; private final ObjectMapper mapper;
public PendingAccountsManager(PendingAccounts pendingAccounts, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) public PendingAccountsManager(PendingAccounts pendingAccounts, FaultTolerantRedisCluster cacheCluster)
{ {
this.pendingAccounts = pendingAccounts; this.pendingAccounts = pendingAccounts;
this.cacheClient = cacheClient;
this.cacheCluster = cacheCluster; this.cacheCluster = cacheCluster;
this.mapper = SystemMapper.getMapper(); this.mapper = SystemMapper.getMapper();
} }
@ -71,12 +66,10 @@ public class PendingAccountsManager {
} }
private void memcacheSet(String number, StoredVerificationCode code) { private void memcacheSet(String number, StoredVerificationCode code) {
try (Jedis jedis = cacheClient.getWriteResource()) { try {
final String key = CACHE_PREFIX + number;
final String verificationCodeJson = mapper.writeValueAsString(code); final String verificationCodeJson = mapper.writeValueAsString(code);
jedis.set(key, verificationCodeJson); cacheCluster.useWriteCluster(connection -> connection.sync().set(CACHE_PREFIX + number, verificationCodeJson));
cacheCluster.useWriteCluster(connection -> connection.sync().set(key, verificationCodeJson));
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }
@ -95,11 +88,6 @@ public class PendingAccountsManager {
} }
private void memcacheDelete(String number) { private void memcacheDelete(String number) {
try (Jedis jedis = cacheClient.getWriteResource()) { cacheCluster.useWriteCluster(connection -> connection.sync().del(CACHE_PREFIX + number));
final String key = CACHE_PREFIX + number;
jedis.del(key);
cacheCluster.useWriteCluster(connection -> connection.sync().del(key));
}
} }
} }

View File

@ -22,14 +22,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import redis.clients.jedis.Jedis;
public class PendingDevicesManager { public class PendingDevicesManager {
private final Logger logger = LoggerFactory.getLogger(PendingDevicesManager.class); private final Logger logger = LoggerFactory.getLogger(PendingDevicesManager.class);
@ -37,13 +34,11 @@ public class PendingDevicesManager {
private static final String CACHE_PREFIX = "pending_devices2::"; private static final String CACHE_PREFIX = "pending_devices2::";
private final PendingDevices pendingDevices; private final PendingDevices pendingDevices;
private final ReplicatedJedisPool cacheClient;
private final FaultTolerantRedisCluster cacheCluster; private final FaultTolerantRedisCluster cacheCluster;
private final ObjectMapper mapper; private final ObjectMapper mapper;
public PendingDevicesManager(PendingDevices pendingDevices, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { public PendingDevicesManager(PendingDevices pendingDevices, FaultTolerantRedisCluster cacheCluster) {
this.pendingDevices = pendingDevices; this.pendingDevices = pendingDevices;
this.cacheClient = cacheClient;
this.cacheCluster = cacheCluster; this.cacheCluster = cacheCluster;
this.mapper = SystemMapper.getMapper(); this.mapper = SystemMapper.getMapper();
} }
@ -70,12 +65,10 @@ public class PendingDevicesManager {
} }
private void memcacheSet(String number, StoredVerificationCode code) { private void memcacheSet(String number, StoredVerificationCode code) {
try (Jedis jedis = cacheClient.getWriteResource()) { try {
final String key = CACHE_PREFIX + number;
final String verificationCodeJson = mapper.writeValueAsString(code); final String verificationCodeJson = mapper.writeValueAsString(code);
jedis.set(key, verificationCodeJson); cacheCluster.useWriteCluster(connection -> connection.sync().set(CACHE_PREFIX + number, verificationCodeJson));
cacheCluster.useWriteCluster(connection -> connection.sync().set(key, verificationCodeJson));
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }
@ -94,12 +87,7 @@ public class PendingDevicesManager {
} }
private void memcacheDelete(String number) { private void memcacheDelete(String number) {
try (Jedis jedis = cacheClient.getWriteResource()) { cacheCluster.useWriteCluster(connection -> connection.sync().del(CACHE_PREFIX + number));
final String key = CACHE_PREFIX + number;
jedis.del(key);
cacheCluster.useWriteCluster(connection -> connection.sync().del(key));
}
} }
} }

View File

@ -6,15 +6,12 @@ import io.lettuce.core.RedisException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import redis.clients.jedis.Jedis;
public class ProfilesManager { public class ProfilesManager {
private final Logger logger = LoggerFactory.getLogger(PendingAccountsManager.class); private final Logger logger = LoggerFactory.getLogger(PendingAccountsManager.class);
@ -22,12 +19,10 @@ public class ProfilesManager {
private static final String CACHE_PREFIX = "profiles::"; private static final String CACHE_PREFIX = "profiles::";
private final Profiles profiles; private final Profiles profiles;
private final ReplicatedJedisPool cacheClient;
private final FaultTolerantRedisCluster cacheCluster; private final FaultTolerantRedisCluster cacheCluster;
private final ObjectMapper mapper; private final ObjectMapper mapper;
public ProfilesManager(Profiles profiles, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { public ProfilesManager(Profiles profiles, FaultTolerantRedisCluster cacheCluster) {
this.cacheClient = cacheClient;
this.profiles = profiles; this.profiles = profiles;
this.cacheCluster = cacheCluster; this.cacheCluster = cacheCluster;
this.mapper = SystemMapper.getMapper(); this.mapper = SystemMapper.getMapper();
@ -55,12 +50,10 @@ public class ProfilesManager {
} }
private void memcacheSet(UUID uuid, VersionedProfile profile) { private void memcacheSet(UUID uuid, VersionedProfile profile) {
try (Jedis jedis = cacheClient.getWriteResource()) { try {
final String key = CACHE_PREFIX + uuid.toString();
final String profileJson = mapper.writeValueAsString(profile); final String profileJson = mapper.writeValueAsString(profile);
jedis.hset(key, profile.getVersion(), profileJson); cacheCluster.useWriteCluster(connection -> connection.sync().hset(CACHE_PREFIX + uuid.toString(), profile.getVersion(), profileJson));
cacheCluster.useWriteCluster(connection -> connection.sync().hset(key, profile.getVersion(), profileJson));
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }
@ -82,11 +75,6 @@ public class ProfilesManager {
} }
private void memcacheDelete(UUID uuid) { private void memcacheDelete(UUID uuid) {
try (Jedis jedis = cacheClient.getWriteResource()) { cacheCluster.useWriteCluster(connection -> connection.sync().del(CACHE_PREFIX + uuid.toString()));
final String key = CACHE_PREFIX + uuid.toString();
jedis.del(key);
cacheCluster.useWriteCluster(connection -> connection.sync().del(key));
}
} }
} }

View File

@ -8,15 +8,12 @@ import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisException;
public class UsernamesManager { public class UsernamesManager {
@ -34,13 +31,11 @@ public class UsernamesManager {
private final Usernames usernames; private final Usernames usernames;
private final ReservedUsernames reservedUsernames; private final ReservedUsernames reservedUsernames;
private final ReplicatedJedisPool cacheClient;
private final FaultTolerantRedisCluster cacheCluster; private final FaultTolerantRedisCluster cacheCluster;
public UsernamesManager(Usernames usernames, ReservedUsernames reservedUsernames, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { public UsernamesManager(Usernames usernames, ReservedUsernames reservedUsernames, FaultTolerantRedisCluster cacheCluster) {
this.usernames = usernames; this.usernames = usernames;
this.reservedUsernames = reservedUsernames; this.reservedUsernames = reservedUsernames;
this.cacheClient = cacheClient;
this.cacheCluster = cacheCluster; this.cacheCluster = cacheCluster;
} }
@ -126,15 +121,8 @@ public class UsernamesManager {
maybeOldUsername.ifPresent(oldUsername -> commands.del(getUsernameMapKey(oldUsername))); maybeOldUsername.ifPresent(oldUsername -> commands.del(getUsernameMapKey(oldUsername)));
commands.set(uuidMapKey, username); commands.set(uuidMapKey, username);
commands.set(usernameMapKey, uuid.toString()); commands.set(usernameMapKey, uuid.toString());
try (final Jedis jedis = cacheClient.getWriteResource()) {
maybeOldUsername.ifPresent(oldUsername -> jedis.del(getUsernameMapKey(oldUsername)));
jedis.set(uuidMapKey, username);
jedis.set(usernameMapKey, uuid.toString());
}
}); });
} catch (JedisException | RedisException e) { } catch (RedisException e) {
if (required) throw e; if (required) throw e;
else logger.warn("Ignoring Redis failure", e); else logger.warn("Ignoring Redis failure", e);
} }
@ -164,22 +152,14 @@ public class UsernamesManager {
} }
private void redisDelete(UUID uuid) { private void redisDelete(UUID uuid) {
try (Jedis jedis = cacheClient.getWriteResource(); try (Timer.Context ignored = redisUuidGetTimer.time()) {
Timer.Context ignored = redisUuidGetTimer.time())
{
final String uuidMapKey = getUuidMapKey(uuid);
redisGet(uuid).ifPresent(username -> {
final String usernameMapKey = getUsernameMapKey(username);
jedis.del(usernameMapKey);
jedis.del(uuidMapKey);
cacheCluster.useWriteCluster(connection -> { cacheCluster.useWriteCluster(connection -> {
final RedisAdvancedClusterCommands<String, String> commands = connection.sync(); final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
commands.del(usernameMapKey); commands.del(getUuidMapKey(uuid));
commands.del(uuidMapKey);
redisGet(uuid).ifPresent(username -> {
commands.del(getUsernameMapKey(username));
}); });
}); });
} }

View File

@ -1,25 +0,0 @@
package org.whispersystems.textsecuregcm.workers;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.cli.ConfiguredCommand;
import io.dropwizard.setup.Bootstrap;
import net.sourceforge.argparse4j.inf.Namespace;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
public class ClearCacheClusterCommand extends ConfiguredCommand<WhisperServerConfiguration> {
public ClearCacheClusterCommand() {
super("clearcache", "remove all keys from cache cluster");
}
@Override
protected void run(final Bootstrap<WhisperServerConfiguration> bootstrap, final Namespace namespace, final WhisperServerConfiguration config) {
clearCache(new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration().getUrls(), config.getCacheClusterConfiguration().getTimeout(), config.getCacheClusterConfiguration().getCircuitBreakerConfiguration()));
}
@VisibleForTesting
static void clearCache(final FaultTolerantRedisCluster cacheCluster) {
cacheCluster.useWriteCluster(connection -> connection.sync().masters().commands().flushallAsync());
}
}

View File

@ -69,11 +69,10 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration().getUrls(), configuration.getCacheClusterConfiguration().getTimeout(), configuration.getCacheClusterConfiguration().getCircuitBreakerConfiguration()); FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration().getUrls(), configuration.getCacheClusterConfiguration().getTimeout(), configuration.getCacheClusterConfiguration().getCircuitBreakerConfiguration());
Accounts accounts = new Accounts(accountDatabase); Accounts accounts = new Accounts(accountDatabase);
ReplicatedJedisPool cacheClient = new RedisClientFactory("main_cache_delete_command", configuration.getCacheConfiguration().getUrl(), configuration.getCacheConfiguration().getReplicaUrls(), configuration.getCacheConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
ReplicatedJedisPool redisClient = new RedisClientFactory("directory_cache_delete_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool(); ReplicatedJedisPool redisClient = new RedisClientFactory("directory_cache_delete_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
DirectoryQueue directoryQueue = new DirectoryQueue (configuration.getDirectoryConfiguration().getSqsConfiguration()); DirectoryQueue directoryQueue = new DirectoryQueue (configuration.getDirectoryConfiguration().getSqsConfiguration());
DirectoryManager directory = new DirectoryManager(redisClient ); DirectoryManager directory = new DirectoryManager(redisClient );
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient, cacheCluster); AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster);
for (String user: users) { for (String user: users) {
Optional<Account> account = accountsManager.get(user); Optional<Account> account = accountsManager.get(user);

View File

@ -4,13 +4,11 @@ import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.junit.Test; import org.junit.Test;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts; import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DirectoryManager; import org.whispersystems.textsecuregcm.storage.DirectoryManager;
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
import redis.clients.jedis.Jedis;
import java.util.HashSet; import java.util.HashSet;
import java.util.Optional; import java.util.Optional;
@ -36,15 +34,12 @@ public class AccountsManagerTest {
Accounts accounts = mock(Accounts.class); Accounts accounts = mock(Accounts.class);
DirectoryManager directoryManager = mock(DirectoryManager.class); DirectoryManager directoryManager = mock(DirectoryManager.class);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
when(commands.get(eq("AccountMap::+14152222222"))).thenReturn(uuid.toString()); when(commands.get(eq("AccountMap::+14152222222"))).thenReturn(uuid.toString());
when(commands.get(eq("Account3::" + uuid.toString()))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}"); when(commands.get(eq("Account3::" + uuid.toString()))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}");
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster);
Optional<Account> account = accountsManager.get("+14152222222"); Optional<Account> account = accountsManager.get("+14152222222");
assertTrue(account.isPresent()); assertTrue(account.isPresent());
@ -64,14 +59,11 @@ public class AccountsManagerTest {
Accounts accounts = mock(Accounts.class); Accounts accounts = mock(Accounts.class);
DirectoryManager directoryManager = mock(DirectoryManager.class); DirectoryManager directoryManager = mock(DirectoryManager.class);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
when(commands.get(eq("Account3::" + uuid.toString()))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}"); when(commands.get(eq("Account3::" + uuid.toString()))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}");
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster);
Optional<Account> account = accountsManager.get(uuid); Optional<Account> account = accountsManager.get(uuid);
assertTrue(account.isPresent()); assertTrue(account.isPresent());
@ -94,13 +86,10 @@ public class AccountsManagerTest {
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
when(commands.get(eq("AccountMap::+14152222222"))).thenReturn(null); when(commands.get(eq("AccountMap::+14152222222"))).thenReturn(null);
when(accounts.get(eq("+14152222222"))).thenReturn(Optional.of(account)); when(accounts.get(eq("+14152222222"))).thenReturn(Optional.of(account));
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster);
Optional<Account> retrieved = accountsManager.get("+14152222222"); Optional<Account> retrieved = accountsManager.get("+14152222222");
assertTrue(retrieved.isPresent()); assertTrue(retrieved.isPresent());
@ -124,13 +113,10 @@ public class AccountsManagerTest {
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
when(commands.get(eq("Account3::" + uuid))).thenReturn(null); when(commands.get(eq("Account3::" + uuid))).thenReturn(null);
when(accounts.get(eq(uuid))).thenReturn(Optional.of(account)); when(accounts.get(eq(uuid))).thenReturn(Optional.of(account));
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster);
Optional<Account> retrieved = accountsManager.get(uuid); Optional<Account> retrieved = accountsManager.get(uuid);
assertTrue(retrieved.isPresent()); assertTrue(retrieved.isPresent());
@ -154,13 +140,10 @@ public class AccountsManagerTest {
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
when(commands.get(eq("AccountMap::+14152222222"))).thenThrow(new RedisException("Connection lost!")); when(commands.get(eq("AccountMap::+14152222222"))).thenThrow(new RedisException("Connection lost!"));
when(accounts.get(eq("+14152222222"))).thenReturn(Optional.of(account)); when(accounts.get(eq("+14152222222"))).thenReturn(Optional.of(account));
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster);
Optional<Account> retrieved = accountsManager.get("+14152222222"); Optional<Account> retrieved = accountsManager.get("+14152222222");
assertTrue(retrieved.isPresent()); assertTrue(retrieved.isPresent());
@ -184,13 +167,10 @@ public class AccountsManagerTest {
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
when(commands.get(eq("Account3::" + uuid))).thenThrow(new RedisException("Connection lost!")); when(commands.get(eq("Account3::" + uuid))).thenThrow(new RedisException("Connection lost!"));
when(accounts.get(eq(uuid))).thenReturn(Optional.of(account)); when(accounts.get(eq(uuid))).thenReturn(Optional.of(account));
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster);
Optional<Account> retrieved = accountsManager.get(uuid); Optional<Account> retrieved = accountsManager.get(uuid);
assertTrue(retrieved.isPresent()); assertTrue(retrieved.isPresent());

View File

@ -69,9 +69,7 @@ public class ActiveUserCounterTest {
private final FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); private final FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands);
private final MetricsFactory metricsFactory = mock(MetricsFactory.class); private final MetricsFactory metricsFactory = mock(MetricsFactory.class);
private final ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); private final ActiveUserCounter activeUserCounter = new ActiveUserCounter(metricsFactory, cacheCluster);
private final ActiveUserCounter activeUserCounter = new ActiveUserCounter(metricsFactory, cacheClient, cacheCluster);
@Before @Before
public void setup() { public void setup() {
@ -101,8 +99,6 @@ public class ActiveUserCounterTest {
when(commands.get(any(String.class))).thenReturn("{\"fromNumber\":\"+\",\"platforms\":{},\"countries\":{}}"); when(commands.get(any(String.class))).thenReturn("{\"fromNumber\":\"+\",\"platforms\":{},\"countries\":{}}");
when(metricsFactory.getReporters()).thenReturn(ImmutableList.of()); when(metricsFactory.getReporters()).thenReturn(ImmutableList.of());
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
} }
@Test @Test

View File

@ -4,13 +4,11 @@ import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.junit.Test; import org.junit.Test;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.storage.Profiles; import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesManager; import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.VersionedProfile; import org.whispersystems.textsecuregcm.storage.VersionedProfile;
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
import org.whispersystems.textsecuregcm.util.Base64; import org.whispersystems.textsecuregcm.util.Base64;
import redis.clients.jedis.Jedis;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@ -35,14 +33,11 @@ public class ProfilesManagerTest {
FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands);
Profiles profiles = mock(Profiles.class); Profiles profiles = mock(Profiles.class);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenReturn("{\"version\": \"someversion\", \"name\": \"somename\", \"avatar\": \"someavatar\", \"commitment\":\"" + Base64.encodeBytes("somecommitment".getBytes()) + "\"}"); when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenReturn("{\"version\": \"someversion\", \"name\": \"somename\", \"avatar\": \"someavatar\", \"commitment\":\"" + Base64.encodeBytes("somecommitment".getBytes()) + "\"}");
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient, cacheCluster); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
Optional<VersionedProfile> profile = profilesManager.get(uuid, "someversion"); Optional<VersionedProfile> profile = profilesManager.get(uuid, "someversion");
assertTrue(profile.isPresent()); assertTrue(profile.isPresent());
@ -61,16 +56,13 @@ public class ProfilesManagerTest {
FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands);
Profiles profiles = mock(Profiles.class); Profiles profiles = mock(Profiles.class);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
VersionedProfile profile = new VersionedProfile("someversion", "somename", "someavatar", "somecommitment".getBytes()); VersionedProfile profile = new VersionedProfile("someversion", "somename", "someavatar", "somecommitment".getBytes());
when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenReturn(null); when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenReturn(null);
when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile)); when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile));
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient, cacheCluster); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
Optional<VersionedProfile> retrieved = profilesManager.get(uuid, "someversion"); Optional<VersionedProfile> retrieved = profilesManager.get(uuid, "someversion");
assertTrue(retrieved.isPresent()); assertTrue(retrieved.isPresent());
@ -90,16 +82,13 @@ public class ProfilesManagerTest {
FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands);
Profiles profiles = mock(Profiles.class); Profiles profiles = mock(Profiles.class);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
VersionedProfile profile = new VersionedProfile("someversion", "somename", "someavatar", "somecommitment".getBytes()); VersionedProfile profile = new VersionedProfile("someversion", "somename", "someavatar", "somecommitment".getBytes());
when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenThrow(new RedisException("Connection lost")); when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenThrow(new RedisException("Connection lost"));
when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile)); when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile));
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient, cacheCluster); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
Optional<VersionedProfile> retrieved = profilesManager.get(uuid, "someversion"); Optional<VersionedProfile> retrieved = profilesManager.get(uuid, "someversion");
assertTrue(retrieved.isPresent()); assertTrue(retrieved.isPresent());

View File

@ -4,12 +4,10 @@ import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.junit.Test; import org.junit.Test;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.storage.ReservedUsernames; import org.whispersystems.textsecuregcm.storage.ReservedUsernames;
import org.whispersystems.textsecuregcm.storage.Usernames; import org.whispersystems.textsecuregcm.storage.Usernames;
import org.whispersystems.textsecuregcm.storage.UsernamesManager; import org.whispersystems.textsecuregcm.storage.UsernamesManager;
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
import redis.clients.jedis.Jedis;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@ -33,14 +31,11 @@ public class UsernamesManagerTest {
Usernames usernames = mock(Usernames.class); Usernames usernames = mock(Usernames.class);
ReservedUsernames reserved = mock(ReservedUsernames.class); ReservedUsernames reserved = mock(ReservedUsernames.class);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
when(commands.get(eq("UsernameByUsername::n00bkiller"))).thenReturn(uuid.toString()); when(commands.get(eq("UsernameByUsername::n00bkiller"))).thenReturn(uuid.toString());
UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheCluster);
Optional<UUID> retrieved = usernamesManager.get("n00bkiller"); Optional<UUID> retrieved = usernamesManager.get("n00bkiller");
assertTrue(retrieved.isPresent()); assertTrue(retrieved.isPresent());
@ -58,14 +53,11 @@ public class UsernamesManagerTest {
Usernames usernames = mock(Usernames.class); Usernames usernames = mock(Usernames.class);
ReservedUsernames reserved = mock(ReservedUsernames.class); ReservedUsernames reserved = mock(ReservedUsernames.class);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
when(commands.get(eq("UsernameByUuid::" + uuid.toString()))).thenReturn("n00bkiller"); when(commands.get(eq("UsernameByUuid::" + uuid.toString()))).thenReturn("n00bkiller");
UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheCluster);
Optional<String> retrieved = usernamesManager.get(uuid); Optional<String> retrieved = usernamesManager.get(uuid);
assertTrue(retrieved.isPresent()); assertTrue(retrieved.isPresent());
@ -84,15 +76,12 @@ public class UsernamesManagerTest {
Usernames usernames = mock(Usernames.class); Usernames usernames = mock(Usernames.class);
ReservedUsernames reserved = mock(ReservedUsernames.class); ReservedUsernames reserved = mock(ReservedUsernames.class);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
when(commands.get(eq("UsernameByUsername::n00bkiller"))).thenReturn(null); when(commands.get(eq("UsernameByUsername::n00bkiller"))).thenReturn(null);
when(usernames.get(eq("n00bkiller"))).thenReturn(Optional.of(uuid)); when(usernames.get(eq("n00bkiller"))).thenReturn(Optional.of(uuid));
UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheCluster);
Optional<UUID> retrieved = usernamesManager.get("n00bkiller"); Optional<UUID> retrieved = usernamesManager.get("n00bkiller");
assertTrue(retrieved.isPresent()); assertTrue(retrieved.isPresent());
@ -115,15 +104,12 @@ public class UsernamesManagerTest {
Usernames usernames = mock(Usernames.class); Usernames usernames = mock(Usernames.class);
ReservedUsernames reserved = mock(ReservedUsernames.class); ReservedUsernames reserved = mock(ReservedUsernames.class);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
when(commands.get(eq("UsernameByUuid::" + uuid.toString()))).thenReturn(null); when(commands.get(eq("UsernameByUuid::" + uuid.toString()))).thenReturn(null);
when(usernames.get(eq(uuid))).thenReturn(Optional.of("n00bkiller")); when(usernames.get(eq(uuid))).thenReturn(Optional.of("n00bkiller"));
UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheCluster);
Optional<String> retrieved = usernamesManager.get(uuid); Optional<String> retrieved = usernamesManager.get(uuid);
assertTrue(retrieved.isPresent()); assertTrue(retrieved.isPresent());
@ -145,15 +131,12 @@ public class UsernamesManagerTest {
Usernames usernames = mock(Usernames.class); Usernames usernames = mock(Usernames.class);
ReservedUsernames reserved = mock(ReservedUsernames.class); ReservedUsernames reserved = mock(ReservedUsernames.class);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
when(commands.get(eq("UsernameByUsername::n00bkiller"))).thenThrow(new RedisException("Connection lost!")); when(commands.get(eq("UsernameByUsername::n00bkiller"))).thenThrow(new RedisException("Connection lost!"));
when(usernames.get(eq("n00bkiller"))).thenReturn(Optional.of(uuid)); when(usernames.get(eq("n00bkiller"))).thenReturn(Optional.of(uuid));
UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheCluster);
Optional<UUID> retrieved = usernamesManager.get("n00bkiller"); Optional<UUID> retrieved = usernamesManager.get("n00bkiller");
assertTrue(retrieved.isPresent()); assertTrue(retrieved.isPresent());
@ -176,15 +159,12 @@ public class UsernamesManagerTest {
Usernames usernames = mock(Usernames.class); Usernames usernames = mock(Usernames.class);
ReservedUsernames reserved = mock(ReservedUsernames.class); ReservedUsernames reserved = mock(ReservedUsernames.class);
ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class));
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
when(commands.get(eq("UsernameByUuid::" + uuid))).thenThrow(new RedisException("Connection lost!")); when(commands.get(eq("UsernameByUuid::" + uuid))).thenThrow(new RedisException("Connection lost!"));
when(usernames.get(eq(uuid))).thenReturn(Optional.of("n00bkiller")); when(usernames.get(eq(uuid))).thenReturn(Optional.of("n00bkiller"));
UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheCluster);
Optional<String> retrieved = usernamesManager.get(uuid); Optional<String> retrieved = usernamesManager.get(uuid);
assertTrue(retrieved.isPresent()); assertTrue(retrieved.isPresent());

View File

@ -1,54 +0,0 @@
package org.whispersystems.textsecuregcm.workers;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.junit.Test;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
public class ClearCacheClusterCommandTest extends AbstractRedisClusterTest {
private static final int KEY_COUNT = 10_000;
@Test
public void testClearCache() throws InterruptedException {
final FaultTolerantRedisCluster cluster = getRedisCluster();
cluster.useWriteCluster(connection -> {
final RedisAdvancedClusterCommands<String, String> clusterCommands = connection.sync();
for (int i = 0; i < KEY_COUNT; i++) {
clusterCommands.set("key::" + i, String.valueOf(i));
}
});
{
final AtomicInteger nodeCount = new AtomicInteger(0);
cluster.useWriteCluster(connection -> connection.sync().masters().asMap().forEach((node, commands) -> {
assertTrue(commands.dbsize() > 0);
nodeCount.incrementAndGet();
}));
assertTrue(nodeCount.get() > 0);
}
ClearCacheClusterCommand.clearCache(cluster);
Thread.sleep(1000);
{
final AtomicInteger nodeCount = new AtomicInteger(0);
cluster.useWriteCluster(connection -> connection.sync().masters().asMap().forEach((node, commands) -> {
assertEquals(0L, (long)commands.dbsize());
nodeCount.incrementAndGet();
}));
assertTrue(nodeCount.get() > 0);
}
}
}