From eea073f88245b81025d4c2720a95b1659320d993 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 17 Jun 2020 18:08:59 -0400 Subject: [PATCH] Decommission the old cache. --- service/config/sample.yml | 4 -- .../WhisperServerConfiguration.java | 9 ---- .../textsecuregcm/WhisperServerService.java | 21 +++----- .../limits/LockingRateLimiter.java | 26 ++------- .../textsecuregcm/limits/RateLimiter.java | 24 +++------ .../textsecuregcm/limits/RateLimiters.java | 43 ++++++++------- .../storage/AccountDatabaseCrawlerCache.java | 52 +++--------------- .../storage/AccountsManager.java | 22 ++------ .../storage/ActiveUserCounter.java | 21 ++------ .../storage/PendingAccountsManager.java | 20 ++----- .../storage/PendingDevicesManager.java | 20 ++----- .../storage/ProfilesManager.java | 20 ++----- .../storage/UsernamesManager.java | 36 +++---------- .../workers/ClearCacheClusterCommand.java | 25 --------- .../workers/DeleteUserCommand.java | 3 +- .../tests/storage/AccountsManagerTest.java | 32 +++-------- .../tests/storage/ActiveUserCounterTest.java | 6 +-- .../tests/storage/ProfilesManagerTest.java | 17 ++---- .../tests/storage/UsernamesManagerTest.java | 32 +++-------- .../workers/ClearCacheClusterCommandTest.java | 54 ------------------- 20 files changed, 94 insertions(+), 393 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommand.java delete mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommandTest.java diff --git a/service/config/sample.yml b/service/config/sample.yml index 3cb2b1e8b..efbf66102 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -24,10 +24,6 @@ turn: # TURN server configuration - turn:yourdomain:443?transport=udp - turn:etc.com:80?transport=udp -cache: # Redis server configuration for cache cluster - url: - replicaUrls: - cacheCluster: # Redis server configuration for cache cluster urls: - redis://redis.example.com:6379/ diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 8f5e10c71..f71a34444 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -86,11 +86,6 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private List micrometer = new LinkedList<>(); - @NotNull - @Valid - @JsonProperty - private RedisConfiguration cache; - @NotNull @Valid @JsonProperty @@ -245,10 +240,6 @@ public class WhisperServerConfiguration extends Configuration { return gcpAttachments; } - public RedisConfiguration getCacheConfiguration() { - return cache; - } - public RedisClusterConfiguration getCacheClusterConfiguration() { return cacheCluster; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 2152563bb..5654e2aac 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -144,7 +144,6 @@ import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler; import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener; import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; import org.whispersystems.textsecuregcm.workers.CertificateCommand; -import org.whispersystems.textsecuregcm.workers.ClearCacheClusterCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; import org.whispersystems.textsecuregcm.workers.VacuumCommand; import org.whispersystems.textsecuregcm.workers.ZkParamsCommand; @@ -175,7 +174,6 @@ public class WhisperServerService extends Application("accountdb", "accountsdb.xml") { @Override @@ -269,13 +267,11 @@ public class WhisperServerService extends Application accountDatabaseCrawlerListeners = List.of(pushFeedbackProcessor, activeUserCounter, directoryReconciler, accountCleaner); - AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheClient, cacheCluster); + AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster); AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()); messagesCache.setPubSubManager(pubSubManager, pushSender); @@ -429,7 +425,6 @@ public class WhisperServerService extends Application connection.sync().del(lockName)); - } + cacheCluster.useWriteCluster(connection -> connection.sync().del(getLockName(key))); } private boolean acquireLock(String key) { - try (Jedis jedis = cacheClient.getWriteResource()) { - final String lockName = getLockName(key); - - final boolean acquiredLock = jedis.set(lockName, "L", "NX", "EX", 10) != null; - - if (acquiredLock) { - // TODO Restore the NX flag when the cluster becomes the primary source of truth - cacheCluster.useWriteCluster(connection -> connection.sync().set(lockName, "L", SetArgs.Builder.ex(10))); - } - - return acquiredLock; - } + return cacheCluster.withWriteCluster(connection -> connection.sync().set(getLockName(key), "L", SetArgs.Builder.nx().ex(10))) != null; } private String getLockName(String key) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java index 52c095251..05a70b9bf 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java @@ -26,14 +26,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.SystemMapper; import java.io.IOException; import static com.codahale.metrics.MetricRegistry.name; -import redis.clients.jedis.Jedis; public class RateLimiter { @@ -42,20 +40,19 @@ public class RateLimiter { private final Meter meter; private final Timer validateTimer; - protected final ReplicatedJedisPool cacheClient; protected final FaultTolerantRedisCluster cacheCluster; protected final String name; private final int bucketSize; private final double leakRatePerMillis; private final boolean reportLimits; - public RateLimiter(ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, String name, + public RateLimiter(FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute) { - this(cacheClient, cacheCluster, name, bucketSize, leakRatePerMinute, false); + this(cacheCluster, name, bucketSize, leakRatePerMinute, false); } - public RateLimiter(ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, String name, + public RateLimiter(FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute, boolean reportLimits) { @@ -63,7 +60,6 @@ public class RateLimiter { this.meter = metricRegistry.meter(name(getClass(), name, "exceeded")); this.validateTimer = metricRegistry.timer(name(getClass(), name, "validate")); - this.cacheClient = cacheClient; this.cacheCluster = cacheCluster; this.name = name; this.bucketSize = bucketSize; @@ -89,22 +85,14 @@ public class RateLimiter { } public void clear(String key) { - try (Jedis jedis = cacheClient.getWriteResource()) { - final String bucketName = getBucketName(key); - - jedis.del(bucketName); - cacheCluster.useWriteCluster(connection -> connection.sync().del(bucketName)); - } + cacheCluster.useWriteCluster(connection -> connection.sync().del(getBucketName(key))); } private void setBucket(String key, LeakyBucket bucket) { - try (Jedis jedis = cacheClient.getWriteResource()) { - final String bucketName = getBucketName(key); + try { final String serialized = bucket.serialize(mapper); - final int level = (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000); - jedis.setex(bucketName, level, serialized); - cacheCluster.useWriteCluster(connection -> connection.sync().setex(bucketName, level, serialized)); + cacheCluster.useWriteCluster(connection -> connection.sync().setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java index 70e4974aa..6a2cf1d37 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java @@ -19,7 +19,6 @@ package org.whispersystems.textsecuregcm.limits; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; public class RateLimiters { @@ -48,84 +47,84 @@ public class RateLimiters { private final RateLimiter usernameLookupLimiter; private final RateLimiter usernameSetLimiter; - public RateLimiters(RateLimitsConfiguration config, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { - this.smsDestinationLimiter = new RateLimiter(cacheClient, cacheCluster, "smsDestination", + public RateLimiters(RateLimitsConfiguration config, FaultTolerantRedisCluster cacheCluster) { + this.smsDestinationLimiter = new RateLimiter(cacheCluster, "smsDestination", config.getSmsDestination().getBucketSize(), config.getSmsDestination().getLeakRatePerMinute()); - this.voiceDestinationLimiter = new RateLimiter(cacheClient, cacheCluster, "voxDestination", + this.voiceDestinationLimiter = new RateLimiter(cacheCluster, "voxDestination", config.getVoiceDestination().getBucketSize(), config.getVoiceDestination().getLeakRatePerMinute()); - this.voiceDestinationDailyLimiter = new RateLimiter(cacheClient, cacheCluster, "voxDestinationDaily", + this.voiceDestinationDailyLimiter = new RateLimiter(cacheCluster, "voxDestinationDaily", config.getVoiceDestinationDaily().getBucketSize(), config.getVoiceDestinationDaily().getLeakRatePerMinute()); - this.smsVoiceIpLimiter = new RateLimiter(cacheClient, cacheCluster, "smsVoiceIp", + this.smsVoiceIpLimiter = new RateLimiter(cacheCluster, "smsVoiceIp", config.getSmsVoiceIp().getBucketSize(), config.getSmsVoiceIp().getLeakRatePerMinute()); - this.smsVoicePrefixLimiter = new RateLimiter(cacheClient, cacheCluster, "smsVoicePrefix", + this.smsVoicePrefixLimiter = new RateLimiter(cacheCluster, "smsVoicePrefix", config.getSmsVoicePrefix().getBucketSize(), config.getSmsVoicePrefix().getLeakRatePerMinute()); - this.autoBlockLimiter = new RateLimiter(cacheClient, cacheCluster, "autoBlock", + this.autoBlockLimiter = new RateLimiter(cacheCluster, "autoBlock", config.getAutoBlock().getBucketSize(), config.getAutoBlock().getLeakRatePerMinute()); - this.verifyLimiter = new LockingRateLimiter(cacheClient, cacheCluster, "verify", + this.verifyLimiter = new LockingRateLimiter(cacheCluster, "verify", config.getVerifyNumber().getBucketSize(), config.getVerifyNumber().getLeakRatePerMinute()); - this.pinLimiter = new LockingRateLimiter(cacheClient, cacheCluster, "pin", + this.pinLimiter = new LockingRateLimiter(cacheCluster, "pin", config.getVerifyPin().getBucketSize(), config.getVerifyPin().getLeakRatePerMinute()); - this.attachmentLimiter = new RateLimiter(cacheClient, cacheCluster, "attachmentCreate", + this.attachmentLimiter = new RateLimiter(cacheCluster, "attachmentCreate", config.getAttachments().getBucketSize(), config.getAttachments().getLeakRatePerMinute()); - this.contactsLimiter = new RateLimiter(cacheClient, cacheCluster, "contactsQuery", + this.contactsLimiter = new RateLimiter(cacheCluster, "contactsQuery", config.getContactQueries().getBucketSize(), config.getContactQueries().getLeakRatePerMinute()); - this.contactsIpLimiter = new RateLimiter(cacheClient, cacheCluster, "contactsIpQuery", + this.contactsIpLimiter = new RateLimiter(cacheCluster, "contactsIpQuery", config.getContactIpQueries().getBucketSize(), config.getContactIpQueries().getLeakRatePerMinute()); - this.preKeysLimiter = new RateLimiter(cacheClient, cacheCluster, "prekeys", + this.preKeysLimiter = new RateLimiter(cacheCluster, "prekeys", config.getPreKeys().getBucketSize(), config.getPreKeys().getLeakRatePerMinute()); - this.messagesLimiter = new RateLimiter(cacheClient, cacheCluster, "messages", + this.messagesLimiter = new RateLimiter(cacheCluster, "messages", config.getMessages().getBucketSize(), config.getMessages().getLeakRatePerMinute()); - this.allocateDeviceLimiter = new RateLimiter(cacheClient, cacheCluster, "allocateDevice", + this.allocateDeviceLimiter = new RateLimiter(cacheCluster, "allocateDevice", config.getAllocateDevice().getBucketSize(), config.getAllocateDevice().getLeakRatePerMinute()); - this.verifyDeviceLimiter = new RateLimiter(cacheClient, cacheCluster, "verifyDevice", + this.verifyDeviceLimiter = new RateLimiter(cacheCluster, "verifyDevice", config.getVerifyDevice().getBucketSize(), config.getVerifyDevice().getLeakRatePerMinute()); - this.turnLimiter = new RateLimiter(cacheClient, cacheCluster, "turnAllocate", + this.turnLimiter = new RateLimiter(cacheCluster, "turnAllocate", config.getTurnAllocations().getBucketSize(), config.getTurnAllocations().getLeakRatePerMinute()); - this.profileLimiter = new RateLimiter(cacheClient, cacheCluster, "profile", + this.profileLimiter = new RateLimiter(cacheCluster, "profile", config.getProfile().getBucketSize(), config.getProfile().getLeakRatePerMinute()); - this.stickerPackLimiter = new RateLimiter(cacheClient, cacheCluster, "stickerPack", + this.stickerPackLimiter = new RateLimiter(cacheCluster, "stickerPack", config.getStickerPack().getBucketSize(), config.getStickerPack().getLeakRatePerMinute()); - this.usernameLookupLimiter = new RateLimiter(cacheClient, cacheCluster, "usernameLookup", + this.usernameLookupLimiter = new RateLimiter(cacheCluster, "usernameLookup", config.getUsernameLookup().getBucketSize(), config.getUsernameLookup().getLeakRatePerMinute()); - this.usernameSetLimiter = new RateLimiter(cacheClient, cacheCluster, "usernameSet", + this.usernameSetLimiter = new RateLimiter(cacheCluster, "usernameSet", config.getUsernameSet().getBucketSize(), config.getUsernameSet().getLeakRatePerMinute()); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java index c247b53ff..86695518c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java @@ -18,16 +18,10 @@ package org.whispersystems.textsecuregcm.storage; import io.lettuce.core.ScriptOutputType; import io.lettuce.core.SetArgs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.LuaScript; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; -import redis.clients.jedis.Jedis; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -41,25 +35,16 @@ public class AccountDatabaseCrawlerCache { private static final long LAST_NUMBER_TTL_MS = 86400_000L; - private static final Logger log = LoggerFactory.getLogger(AccountDatabaseCrawlerCache.class); - - private final ReplicatedJedisPool jedisPool; private final FaultTolerantRedisCluster cacheCluster; - private final LuaScript unlockScript; private final ClusterLuaScript unlockClusterScript; - public AccountDatabaseCrawlerCache(ReplicatedJedisPool jedisPool, FaultTolerantRedisCluster cacheCluster) throws IOException { - this.jedisPool = jedisPool; + public AccountDatabaseCrawlerCache(FaultTolerantRedisCluster cacheCluster) throws IOException { this.cacheCluster = cacheCluster; - this.unlockScript = LuaScript.fromResource(jedisPool, "lua/account_database_crawler/unlock.lua"); this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/account_database_crawler/unlock.lua", ScriptOutputType.INTEGER); } public void clearAccelerate() { - try (Jedis jedis = jedisPool.getWriteResource()) { - jedis.del(ACCELERATE_KEY); - cacheCluster.useWriteCluster(connection -> connection.sync().del(ACCELERATE_KEY)); - } + cacheCluster.useWriteCluster(connection -> connection.sync().del(ACCELERATE_KEY)); } public boolean isAccelerated() { @@ -67,28 +52,11 @@ public class AccountDatabaseCrawlerCache { } public boolean claimActiveWork(String workerId, long ttlMs) { - try (Jedis jedis = jedisPool.getWriteResource()) { - final boolean claimed = "OK".equals(jedis.set(ACTIVE_WORKER_KEY, workerId, "NX", "PX", ttlMs)); - - if (claimed) { - // TODO Restore the NX flag when making the cluster the primary data store - cacheCluster.useWriteCluster(connection -> connection.sync().set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.px(ttlMs))); - } - - return claimed; - } + return "OK".equals(cacheCluster.withWriteCluster(connection -> connection.sync().set(ACCELERATE_KEY, workerId, SetArgs.Builder.nx().px(ttlMs)))); } public void releaseActiveWork(String workerId) { - List keys = Arrays.asList(ACTIVE_WORKER_KEY.getBytes()); - List args = Arrays.asList(workerId.getBytes()); - unlockScript.execute(keys, args); - - try { - unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId)); - } catch (Exception e) { - log.warn("Failed to execute clustered unlock script", e); - } + unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId)); } public Optional getLastUuid() { @@ -99,14 +67,10 @@ public class AccountDatabaseCrawlerCache { } public void setLastUuid(Optional lastUuid) { - try (Jedis jedis = jedisPool.getWriteResource()) { - if (lastUuid.isPresent()) { - jedis.psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString()); - cacheCluster.useWriteCluster(connection -> connection.sync().psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); - } else { - jedis.del(LAST_UUID_KEY); - cacheCluster.useWriteCluster(connection -> connection.sync().del(LAST_UUID_KEY)); - } + if (lastUuid.isPresent()) { + cacheCluster.useWriteCluster(connection -> connection.sync().psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); + } else { + cacheCluster.useWriteCluster(connection -> connection.sync().del(LAST_UUID_KEY)); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index a9388ba69..f0896c60a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -23,14 +23,12 @@ import com.codahale.metrics.Timer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.lettuce.core.RedisException; -import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier; import org.whispersystems.textsecuregcm.entities.ClientContact; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.Util; @@ -41,7 +39,6 @@ import java.util.Optional; import java.util.UUID; import static com.codahale.metrics.MetricRegistry.name; -import redis.clients.jedis.Jedis; public class AccountsManager { @@ -58,15 +55,13 @@ public class AccountsManager { private final Logger logger = LoggerFactory.getLogger(AccountsManager.class); private final Accounts accounts; - private final ReplicatedJedisPool cacheClient; private final FaultTolerantRedisCluster cacheCluster; private final DirectoryManager directory; private final ObjectMapper mapper; - public AccountsManager(Accounts accounts, DirectoryManager directory, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { + public AccountsManager(Accounts accounts, DirectoryManager directory, FaultTolerantRedisCluster cacheCluster) { this.accounts = accounts; this.directory = directory; - this.cacheClient = cacheClient; this.cacheCluster = cacheCluster; this.mapper = SystemMapper.getMapper(); } @@ -149,21 +144,14 @@ public class AccountsManager { } private void redisSet(Account account) { - try (Jedis jedis = cacheClient.getWriteResource(); - Timer.Context ignored = redisSetTimer.time()) - { - final String accountMapKey = getAccountMapKey(account.getNumber()); - final String accountEntityKey = getAccountEntityKey(account.getUuid()); - final String accountJson = mapper.writeValueAsString(account); - - jedis.set(accountMapKey, account.getUuid().toString()); - jedis.set(accountEntityKey, accountJson); + try (Timer.Context ignored = redisSetTimer.time()) { + final String accountJson = mapper.writeValueAsString(account); cacheCluster.useWriteCluster(connection -> { final RedisAdvancedClusterCommands commands = connection.sync(); - commands.set(accountMapKey, account.getUuid().toString()); - commands.set(accountEntityKey, accountJson); + commands.set(getAccountMapKey(account.getNumber()), account.getUuid().toString()); + commands.set(getAccountEntityKey(account.getUuid()), accountJson); }); } catch (JsonProcessingException e) { throw new IllegalStateException(e); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java index 7b5c51a8c..15c5fba4e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java @@ -20,9 +20,10 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import io.dropwizard.metrics.MetricsFactory; +import io.dropwizard.metrics.ReporterFactory; import org.whispersystems.textsecuregcm.entities.ActiveUserTally; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.Util; @@ -34,10 +35,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; -import io.dropwizard.metrics.MetricsFactory; -import io.dropwizard.metrics.ReporterFactory; -import redis.clients.jedis.Jedis; - public class ActiveUserCounter extends AccountDatabaseCrawlerListener { private static final String TALLY_KEY = "active_user_tally"; @@ -48,23 +45,18 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener { private static final String INTERVALS[] = {"daily", "weekly", "monthly", "quarterly", "yearly"}; private final MetricsFactory metricsFactory; - private final ReplicatedJedisPool jedisPool; private final FaultTolerantRedisCluster cacheCluster; private final ObjectMapper mapper; - public ActiveUserCounter(MetricsFactory metricsFactory, ReplicatedJedisPool jedisPool, FaultTolerantRedisCluster cacheCluster) { + public ActiveUserCounter(MetricsFactory metricsFactory, FaultTolerantRedisCluster cacheCluster) { this.metricsFactory = metricsFactory; - this.jedisPool = jedisPool; this.cacheCluster = cacheCluster; this.mapper = SystemMapper.getMapper(); } @Override public void onCrawlStart() { - try (Jedis jedis = jedisPool.getWriteResource()) { - jedis.del(TALLY_KEY); - cacheCluster.useWriteCluster(connection -> connection.sync().del(TALLY_KEY)); - } + cacheCluster.useWriteCluster(connection -> connection.sync().del(TALLY_KEY)); } @Override @@ -160,7 +152,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener { } private void incrementTallies(UUID fromUuid, Map platformIncrements, Map countryIncrements) { - try (Jedis jedis = jedisPool.getWriteResource()) { + try { final String tallyValue = cacheCluster.withReadCluster(connection -> connection.sync().get(TALLY_KEY)); ActiveUserTally activeUserTally; @@ -181,12 +173,9 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener { final String tallyJson = mapper.writeValueAsString(activeUserTally); - jedis.set(TALLY_KEY, tallyJson); cacheCluster.useWriteCluster(connection -> connection.sync().set(TALLY_KEY, tallyJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); - } catch (IOException e) { - throw new RuntimeException(e); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java index 2e514c476..390a4dcd0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java @@ -22,14 +22,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.SystemMapper; import java.io.IOException; import java.util.Optional; -import redis.clients.jedis.Jedis; - public class PendingAccountsManager { private final Logger logger = LoggerFactory.getLogger(PendingAccountsManager.class); @@ -37,14 +34,12 @@ public class PendingAccountsManager { private static final String CACHE_PREFIX = "pending_account2::"; private final PendingAccounts pendingAccounts; - private final ReplicatedJedisPool cacheClient; private final FaultTolerantRedisCluster cacheCluster; private final ObjectMapper mapper; - public PendingAccountsManager(PendingAccounts pendingAccounts, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) + public PendingAccountsManager(PendingAccounts pendingAccounts, FaultTolerantRedisCluster cacheCluster) { this.pendingAccounts = pendingAccounts; - this.cacheClient = cacheClient; this.cacheCluster = cacheCluster; this.mapper = SystemMapper.getMapper(); } @@ -71,12 +66,10 @@ public class PendingAccountsManager { } private void memcacheSet(String number, StoredVerificationCode code) { - try (Jedis jedis = cacheClient.getWriteResource()) { - final String key = CACHE_PREFIX + number; + try { final String verificationCodeJson = mapper.writeValueAsString(code); - jedis.set(key, verificationCodeJson); - cacheCluster.useWriteCluster(connection -> connection.sync().set(key, verificationCodeJson)); + cacheCluster.useWriteCluster(connection -> connection.sync().set(CACHE_PREFIX + number, verificationCodeJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -95,11 +88,6 @@ public class PendingAccountsManager { } private void memcacheDelete(String number) { - try (Jedis jedis = cacheClient.getWriteResource()) { - final String key = CACHE_PREFIX + number; - - jedis.del(key); - cacheCluster.useWriteCluster(connection -> connection.sync().del(key)); - } + cacheCluster.useWriteCluster(connection -> connection.sync().del(CACHE_PREFIX + number)); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java index bae914779..6fd9ed909 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java @@ -22,14 +22,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.SystemMapper; import java.io.IOException; import java.util.Optional; -import redis.clients.jedis.Jedis; - public class PendingDevicesManager { private final Logger logger = LoggerFactory.getLogger(PendingDevicesManager.class); @@ -37,13 +34,11 @@ public class PendingDevicesManager { private static final String CACHE_PREFIX = "pending_devices2::"; private final PendingDevices pendingDevices; - private final ReplicatedJedisPool cacheClient; private final FaultTolerantRedisCluster cacheCluster; private final ObjectMapper mapper; - public PendingDevicesManager(PendingDevices pendingDevices, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { + public PendingDevicesManager(PendingDevices pendingDevices, FaultTolerantRedisCluster cacheCluster) { this.pendingDevices = pendingDevices; - this.cacheClient = cacheClient; this.cacheCluster = cacheCluster; this.mapper = SystemMapper.getMapper(); } @@ -70,12 +65,10 @@ public class PendingDevicesManager { } private void memcacheSet(String number, StoredVerificationCode code) { - try (Jedis jedis = cacheClient.getWriteResource()) { - final String key = CACHE_PREFIX + number; + try { final String verificationCodeJson = mapper.writeValueAsString(code); - jedis.set(key, verificationCodeJson); - cacheCluster.useWriteCluster(connection -> connection.sync().set(key, verificationCodeJson)); + cacheCluster.useWriteCluster(connection -> connection.sync().set(CACHE_PREFIX + number, verificationCodeJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -94,12 +87,7 @@ public class PendingDevicesManager { } private void memcacheDelete(String number) { - try (Jedis jedis = cacheClient.getWriteResource()) { - final String key = CACHE_PREFIX + number; - - jedis.del(key); - cacheCluster.useWriteCluster(connection -> connection.sync().del(key)); - } + cacheCluster.useWriteCluster(connection -> connection.sync().del(CACHE_PREFIX + number)); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java index 26c814361..e9a4614bf 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java @@ -6,15 +6,12 @@ import io.lettuce.core.RedisException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.SystemMapper; import java.io.IOException; import java.util.Optional; import java.util.UUID; -import redis.clients.jedis.Jedis; - public class ProfilesManager { private final Logger logger = LoggerFactory.getLogger(PendingAccountsManager.class); @@ -22,12 +19,10 @@ public class ProfilesManager { private static final String CACHE_PREFIX = "profiles::"; private final Profiles profiles; - private final ReplicatedJedisPool cacheClient; private final FaultTolerantRedisCluster cacheCluster; private final ObjectMapper mapper; - public ProfilesManager(Profiles profiles, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { - this.cacheClient = cacheClient; + public ProfilesManager(Profiles profiles, FaultTolerantRedisCluster cacheCluster) { this.profiles = profiles; this.cacheCluster = cacheCluster; this.mapper = SystemMapper.getMapper(); @@ -55,12 +50,10 @@ public class ProfilesManager { } private void memcacheSet(UUID uuid, VersionedProfile profile) { - try (Jedis jedis = cacheClient.getWriteResource()) { - final String key = CACHE_PREFIX + uuid.toString(); + try { final String profileJson = mapper.writeValueAsString(profile); - jedis.hset(key, profile.getVersion(), profileJson); - cacheCluster.useWriteCluster(connection -> connection.sync().hset(key, profile.getVersion(), profileJson)); + cacheCluster.useWriteCluster(connection -> connection.sync().hset(CACHE_PREFIX + uuid.toString(), profile.getVersion(), profileJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -82,11 +75,6 @@ public class ProfilesManager { } private void memcacheDelete(UUID uuid) { - try (Jedis jedis = cacheClient.getWriteResource()) { - final String key = CACHE_PREFIX + uuid.toString(); - - jedis.del(key); - cacheCluster.useWriteCluster(connection -> connection.sync().del(key)); - } + cacheCluster.useWriteCluster(connection -> connection.sync().del(CACHE_PREFIX + uuid.toString())); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java index e542c4744..255f0b712 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java @@ -8,15 +8,12 @@ import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.Constants; import java.util.Optional; import java.util.UUID; import static com.codahale.metrics.MetricRegistry.name; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.exceptions.JedisException; public class UsernamesManager { @@ -34,13 +31,11 @@ public class UsernamesManager { private final Usernames usernames; private final ReservedUsernames reservedUsernames; - private final ReplicatedJedisPool cacheClient; private final FaultTolerantRedisCluster cacheCluster; - public UsernamesManager(Usernames usernames, ReservedUsernames reservedUsernames, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { + public UsernamesManager(Usernames usernames, ReservedUsernames reservedUsernames, FaultTolerantRedisCluster cacheCluster) { this.usernames = usernames; this.reservedUsernames = reservedUsernames; - this.cacheClient = cacheClient; this.cacheCluster = cacheCluster; } @@ -126,15 +121,8 @@ public class UsernamesManager { maybeOldUsername.ifPresent(oldUsername -> commands.del(getUsernameMapKey(oldUsername))); commands.set(uuidMapKey, username); commands.set(usernameMapKey, uuid.toString()); - - try (final Jedis jedis = cacheClient.getWriteResource()) { - maybeOldUsername.ifPresent(oldUsername -> jedis.del(getUsernameMapKey(oldUsername))); - - jedis.set(uuidMapKey, username); - jedis.set(usernameMapKey, uuid.toString()); - } }); - } catch (JedisException | RedisException e) { + } catch (RedisException e) { if (required) throw e; else logger.warn("Ignoring Redis failure", e); } @@ -164,22 +152,14 @@ public class UsernamesManager { } private void redisDelete(UUID uuid) { - try (Jedis jedis = cacheClient.getWriteResource(); - Timer.Context ignored = redisUuidGetTimer.time()) - { - final String uuidMapKey = getUuidMapKey(uuid); + try (Timer.Context ignored = redisUuidGetTimer.time()) { + cacheCluster.useWriteCluster(connection -> { + final RedisAdvancedClusterCommands commands = connection.sync(); - redisGet(uuid).ifPresent(username -> { - final String usernameMapKey = getUsernameMapKey(username); + commands.del(getUuidMapKey(uuid)); - jedis.del(usernameMapKey); - jedis.del(uuidMapKey); - - cacheCluster.useWriteCluster(connection -> { - final RedisAdvancedClusterCommands commands = connection.sync(); - - commands.del(usernameMapKey); - commands.del(uuidMapKey); + redisGet(uuid).ifPresent(username -> { + commands.del(getUsernameMapKey(username)); }); }); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommand.java deleted file mode 100644 index 05f8bb67f..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommand.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.whispersystems.textsecuregcm.workers; - -import com.google.common.annotations.VisibleForTesting; -import io.dropwizard.cli.ConfiguredCommand; -import io.dropwizard.setup.Bootstrap; -import net.sourceforge.argparse4j.inf.Namespace; -import org.whispersystems.textsecuregcm.WhisperServerConfiguration; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; - -public class ClearCacheClusterCommand extends ConfiguredCommand { - - public ClearCacheClusterCommand() { - super("clearcache", "remove all keys from cache cluster"); - } - - @Override - protected void run(final Bootstrap bootstrap, final Namespace namespace, final WhisperServerConfiguration config) { - clearCache(new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration().getUrls(), config.getCacheClusterConfiguration().getTimeout(), config.getCacheClusterConfiguration().getCircuitBreakerConfiguration())); - } - - @VisibleForTesting - static void clearCache(final FaultTolerantRedisCluster cacheCluster) { - cacheCluster.useWriteCluster(connection -> connection.sync().masters().commands().flushallAsync()); - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index cd46d7ca2..c9d77d187 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -69,11 +69,10 @@ public class DeleteUserCommand extends EnvironmentCommand account = accountsManager.get(user); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java index c0e8b9c43..d0ca0b597 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java @@ -4,13 +4,11 @@ import io.lettuce.core.RedisException; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import org.junit.Test; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Accounts; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.DirectoryManager; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; -import redis.clients.jedis.Jedis; import java.util.HashSet; import java.util.Optional; @@ -36,15 +34,12 @@ public class AccountsManagerTest { Accounts accounts = mock(Accounts.class); DirectoryManager directoryManager = mock(DirectoryManager.class); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - UUID uuid = UUID.randomUUID(); when(commands.get(eq("AccountMap::+14152222222"))).thenReturn(uuid.toString()); when(commands.get(eq("Account3::" + uuid.toString()))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}"); - AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster); Optional account = accountsManager.get("+14152222222"); assertTrue(account.isPresent()); @@ -64,14 +59,11 @@ public class AccountsManagerTest { Accounts accounts = mock(Accounts.class); DirectoryManager directoryManager = mock(DirectoryManager.class); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - UUID uuid = UUID.randomUUID(); when(commands.get(eq("Account3::" + uuid.toString()))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}"); - AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster); Optional account = accountsManager.get(uuid); assertTrue(account.isPresent()); @@ -94,13 +86,10 @@ public class AccountsManagerTest { UUID uuid = UUID.randomUUID(); Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - when(commands.get(eq("AccountMap::+14152222222"))).thenReturn(null); when(accounts.get(eq("+14152222222"))).thenReturn(Optional.of(account)); - AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster); Optional retrieved = accountsManager.get("+14152222222"); assertTrue(retrieved.isPresent()); @@ -124,13 +113,10 @@ public class AccountsManagerTest { UUID uuid = UUID.randomUUID(); Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - when(commands.get(eq("Account3::" + uuid))).thenReturn(null); when(accounts.get(eq(uuid))).thenReturn(Optional.of(account)); - AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster); Optional retrieved = accountsManager.get(uuid); assertTrue(retrieved.isPresent()); @@ -154,13 +140,10 @@ public class AccountsManagerTest { UUID uuid = UUID.randomUUID(); Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - when(commands.get(eq("AccountMap::+14152222222"))).thenThrow(new RedisException("Connection lost!")); when(accounts.get(eq("+14152222222"))).thenReturn(Optional.of(account)); - AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster); Optional retrieved = accountsManager.get("+14152222222"); assertTrue(retrieved.isPresent()); @@ -184,13 +167,10 @@ public class AccountsManagerTest { UUID uuid = UUID.randomUUID(); Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - when(commands.get(eq("Account3::" + uuid))).thenThrow(new RedisException("Connection lost!")); when(accounts.get(eq(uuid))).thenReturn(Optional.of(account)); - AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster); Optional retrieved = accountsManager.get(uuid); assertTrue(retrieved.isPresent()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java index c7e517eab..d1fbf5c33 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java @@ -69,9 +69,7 @@ public class ActiveUserCounterTest { private final FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); private final MetricsFactory metricsFactory = mock(MetricsFactory.class); - private final ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - - private final ActiveUserCounter activeUserCounter = new ActiveUserCounter(metricsFactory, cacheClient, cacheCluster); + private final ActiveUserCounter activeUserCounter = new ActiveUserCounter(metricsFactory, cacheCluster); @Before public void setup() { @@ -101,8 +99,6 @@ public class ActiveUserCounterTest { when(commands.get(any(String.class))).thenReturn("{\"fromNumber\":\"+\",\"platforms\":{},\"countries\":{}}"); when(metricsFactory.getReporters()).thenReturn(ImmutableList.of()); - - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesManagerTest.java index e6ff48ffb..599323135 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesManagerTest.java @@ -4,13 +4,11 @@ import io.lettuce.core.RedisException; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import org.junit.Test; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.storage.Profiles; import org.whispersystems.textsecuregcm.storage.ProfilesManager; import org.whispersystems.textsecuregcm.storage.VersionedProfile; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; import org.whispersystems.textsecuregcm.util.Base64; -import redis.clients.jedis.Jedis; import java.util.Optional; import java.util.UUID; @@ -35,14 +33,11 @@ public class ProfilesManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Profiles profiles = mock(Profiles.class); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - UUID uuid = UUID.randomUUID(); when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenReturn("{\"version\": \"someversion\", \"name\": \"somename\", \"avatar\": \"someavatar\", \"commitment\":\"" + Base64.encodeBytes("somecommitment".getBytes()) + "\"}"); - ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient, cacheCluster); + ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); Optional profile = profilesManager.get(uuid, "someversion"); assertTrue(profile.isPresent()); @@ -61,16 +56,13 @@ public class ProfilesManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Profiles profiles = mock(Profiles.class); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - UUID uuid = UUID.randomUUID(); VersionedProfile profile = new VersionedProfile("someversion", "somename", "someavatar", "somecommitment".getBytes()); when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenReturn(null); when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile)); - ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient, cacheCluster); + ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); Optional retrieved = profilesManager.get(uuid, "someversion"); assertTrue(retrieved.isPresent()); @@ -90,16 +82,13 @@ public class ProfilesManagerTest { FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); Profiles profiles = mock(Profiles.class); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - UUID uuid = UUID.randomUUID(); VersionedProfile profile = new VersionedProfile("someversion", "somename", "someavatar", "somecommitment".getBytes()); when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenThrow(new RedisException("Connection lost")); when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile)); - ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient, cacheCluster); + ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); Optional retrieved = profilesManager.get(uuid, "someversion"); assertTrue(retrieved.isPresent()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/UsernamesManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/UsernamesManagerTest.java index e2fd4f7bc..648b03508 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/UsernamesManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/UsernamesManagerTest.java @@ -4,12 +4,10 @@ import io.lettuce.core.RedisException; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import org.junit.Test; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.storage.ReservedUsernames; import org.whispersystems.textsecuregcm.storage.Usernames; import org.whispersystems.textsecuregcm.storage.UsernamesManager; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; -import redis.clients.jedis.Jedis; import java.util.Optional; import java.util.UUID; @@ -33,14 +31,11 @@ public class UsernamesManagerTest { Usernames usernames = mock(Usernames.class); ReservedUsernames reserved = mock(ReservedUsernames.class); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - UUID uuid = UUID.randomUUID(); when(commands.get(eq("UsernameByUsername::n00bkiller"))).thenReturn(uuid.toString()); - UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); + UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheCluster); Optional retrieved = usernamesManager.get("n00bkiller"); assertTrue(retrieved.isPresent()); @@ -58,14 +53,11 @@ public class UsernamesManagerTest { Usernames usernames = mock(Usernames.class); ReservedUsernames reserved = mock(ReservedUsernames.class); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - UUID uuid = UUID.randomUUID(); when(commands.get(eq("UsernameByUuid::" + uuid.toString()))).thenReturn("n00bkiller"); - UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); + UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheCluster); Optional retrieved = usernamesManager.get(uuid); assertTrue(retrieved.isPresent()); @@ -84,15 +76,12 @@ public class UsernamesManagerTest { Usernames usernames = mock(Usernames.class); ReservedUsernames reserved = mock(ReservedUsernames.class); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - UUID uuid = UUID.randomUUID(); when(commands.get(eq("UsernameByUsername::n00bkiller"))).thenReturn(null); when(usernames.get(eq("n00bkiller"))).thenReturn(Optional.of(uuid)); - UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); + UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheCluster); Optional retrieved = usernamesManager.get("n00bkiller"); assertTrue(retrieved.isPresent()); @@ -115,15 +104,12 @@ public class UsernamesManagerTest { Usernames usernames = mock(Usernames.class); ReservedUsernames reserved = mock(ReservedUsernames.class); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - UUID uuid = UUID.randomUUID(); when(commands.get(eq("UsernameByUuid::" + uuid.toString()))).thenReturn(null); when(usernames.get(eq(uuid))).thenReturn(Optional.of("n00bkiller")); - UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); + UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheCluster); Optional retrieved = usernamesManager.get(uuid); assertTrue(retrieved.isPresent()); @@ -145,15 +131,12 @@ public class UsernamesManagerTest { Usernames usernames = mock(Usernames.class); ReservedUsernames reserved = mock(ReservedUsernames.class); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - UUID uuid = UUID.randomUUID(); when(commands.get(eq("UsernameByUsername::n00bkiller"))).thenThrow(new RedisException("Connection lost!")); when(usernames.get(eq("n00bkiller"))).thenReturn(Optional.of(uuid)); - UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); + UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheCluster); Optional retrieved = usernamesManager.get("n00bkiller"); assertTrue(retrieved.isPresent()); @@ -176,15 +159,12 @@ public class UsernamesManagerTest { Usernames usernames = mock(Usernames.class); ReservedUsernames reserved = mock(ReservedUsernames.class); - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - when(cacheClient.getWriteResource()).thenReturn(mock(Jedis.class)); - UUID uuid = UUID.randomUUID(); when(commands.get(eq("UsernameByUuid::" + uuid))).thenThrow(new RedisException("Connection lost!")); when(usernames.get(eq(uuid))).thenReturn(Optional.of("n00bkiller")); - UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); + UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheCluster); Optional retrieved = usernamesManager.get(uuid); assertTrue(retrieved.isPresent()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommandTest.java deleted file mode 100644 index 163958d89..000000000 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommandTest.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.whispersystems.textsecuregcm.workers; - -import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; -import org.junit.Test; -import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; - -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.*; - -public class ClearCacheClusterCommandTest extends AbstractRedisClusterTest { - - private static final int KEY_COUNT = 10_000; - - @Test - public void testClearCache() throws InterruptedException { - final FaultTolerantRedisCluster cluster = getRedisCluster(); - - cluster.useWriteCluster(connection -> { - final RedisAdvancedClusterCommands clusterCommands = connection.sync(); - - for (int i = 0; i < KEY_COUNT; i++) { - clusterCommands.set("key::" + i, String.valueOf(i)); - } - }); - - { - final AtomicInteger nodeCount = new AtomicInteger(0); - - cluster.useWriteCluster(connection -> connection.sync().masters().asMap().forEach((node, commands) -> { - assertTrue(commands.dbsize() > 0); - nodeCount.incrementAndGet(); - })); - - assertTrue(nodeCount.get() > 0); - } - - ClearCacheClusterCommand.clearCache(cluster); - - Thread.sleep(1000); - - { - final AtomicInteger nodeCount = new AtomicInteger(0); - - cluster.useWriteCluster(connection -> connection.sync().masters().asMap().forEach((node, commands) -> { - assertEquals(0L, (long)commands.dbsize()); - nodeCount.incrementAndGet(); - })); - - assertTrue(nodeCount.get() > 0); - } - } -}