From 138810391918529bb309ad77ee6fea7dc20c518c Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Sat, 6 Jun 2020 12:23:05 -0400 Subject: [PATCH] Mirror writes to the cache cluster. --- .../textsecuregcm/WhisperServerService.java | 25 ++++-- .../limits/LockingRateLimiter.java | 18 +++- .../textsecuregcm/limits/RateLimiter.java | 34 +++++--- .../textsecuregcm/limits/RateLimiters.java | 43 +++++----- .../storage/AccountDatabaseCrawlerCache.java | 22 ++++- .../storage/AccountsManager.java | 37 ++++++--- .../storage/ActiveUserCounter.java | 17 ++-- .../storage/PendingAccountsManager.java | 22 +++-- .../storage/PendingDevicesManager.java | 22 +++-- .../storage/ProfilesManager.java | 28 +++++-- .../storage/UsernamesManager.java | 55 +++++++++--- .../workers/DeleteUserCommand.java | 9 +- .../tests/storage/AccountsManagerTest.java | 83 ++++++++++--------- .../tests/storage/ActiveUserCounterTest.java | 10 ++- .../tests/storage/ProfilesManagerTest.java | 28 ++++--- .../tests/storage/UsernamesManagerTest.java | 67 ++++++++------- 16 files changed, 337 insertions(+), 183 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index cce52e215..0391d19cc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -40,6 +40,8 @@ import io.dropwizard.db.PooledDataSourceFactory; import io.dropwizard.jdbi3.JdbiFactory; import io.dropwizard.setup.Bootstrap; import io.dropwizard.setup.Environment; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.RedisClusterClient; import io.micrometer.core.instrument.Clock; import io.micrometer.core.instrument.Metrics; import io.micrometer.wavefront.WavefrontMeterRegistry; @@ -96,6 +98,7 @@ import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.WebsocketSender; import org.whispersystems.textsecuregcm.recaptcha.RecaptchaClient; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.s3.PolicySigner; import org.whispersystems.textsecuregcm.s3.PostPolicyGenerator; @@ -147,10 +150,12 @@ import org.whispersystems.websocket.setup.WebSocketEnvironment; import javax.servlet.DispatcherType; import javax.servlet.FilterRegistration; import javax.servlet.ServletRegistration; +import java.net.URI; import java.security.Security; import java.util.EnumSet; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import static com.codahale.metrics.MetricRegistry.name; @@ -238,13 +243,17 @@ public class WhisperServerService extends Application accountDatabaseCrawlerListeners = List.of(pushFeedbackProcessor, activeUserCounter, directoryReconciler, accountCleaner); - AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheClient); + AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheClient, cacheCluster); AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()); messagesCache.setPubSubManager(pubSubManager, pushSender); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java index fbcbf31f4..361414713 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java @@ -3,7 +3,9 @@ package org.whispersystems.textsecuregcm.limits; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; +import io.lettuce.core.SetArgs; import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.Constants; @@ -14,8 +16,8 @@ public class LockingRateLimiter extends RateLimiter { private final Meter meter; - public LockingRateLimiter(ReplicatedJedisPool cacheClient, String name, int bucketSize, double leakRatePerMinute) { - super(cacheClient, name, bucketSize, leakRatePerMinute); + public LockingRateLimiter(ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute) { + super(cacheClient, cacheCluster, name, bucketSize, leakRatePerMinute); MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); this.meter = metricRegistry.meter(name(getClass(), name, "locked")); @@ -42,13 +44,21 @@ public class LockingRateLimiter extends RateLimiter { private void releaseLock(String key) { try (Jedis jedis = cacheClient.getWriteResource()) { - jedis.del(getLockName(key)); + final String lockName = getLockName(key); + + jedis.del(lockName); + cacheCluster.useWriteCluster(connection -> connection.async().del(lockName)); } } private boolean acquireLock(String key) { try (Jedis jedis = cacheClient.getWriteResource()) { - return jedis.set(getLockName(key), "L", "NX", "EX", 10) != null; + final String lockName = getLockName(key); + + final boolean acquiredLock = jedis.set(lockName, "L", "NX", "EX", 10) != null; + cacheCluster.useWriteCluster(connection -> connection.async().set(lockName, "L", SetArgs.Builder.nx().ex(10))); + + return acquiredLock; } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java index 99332ddef..5f3b98300 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.SystemMapper; @@ -38,20 +39,21 @@ public class RateLimiter { private final Logger logger = LoggerFactory.getLogger(RateLimiter.class); private final ObjectMapper mapper = SystemMapper.getMapper(); - private final Meter meter; - protected final ReplicatedJedisPool cacheClient; - protected final String name; - private final int bucketSize; - private final double leakRatePerMillis; - private final boolean reportLimits; + private final Meter meter; + protected final ReplicatedJedisPool cacheClient; + protected final FaultTolerantRedisCluster cacheCluster; + protected final String name; + private final int bucketSize; + private final double leakRatePerMillis; + private final boolean reportLimits; - public RateLimiter(ReplicatedJedisPool cacheClient, String name, + public RateLimiter(ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute) { - this(cacheClient, name, bucketSize, leakRatePerMinute, false); + this(cacheClient, cacheCluster, name, bucketSize, leakRatePerMinute, false); } - public RateLimiter(ReplicatedJedisPool cacheClient, String name, + public RateLimiter(ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute, boolean reportLimits) { @@ -59,6 +61,7 @@ public class RateLimiter { this.meter = metricRegistry.meter(name(getClass(), name, "exceeded")); this.cacheClient = cacheClient; + this.cacheCluster = cacheCluster; this.name = name; this.bucketSize = bucketSize; this.leakRatePerMillis = leakRatePerMinute / (60.0 * 1000.0); @@ -82,14 +85,21 @@ public class RateLimiter { public void clear(String key) { try (Jedis jedis = cacheClient.getWriteResource()) { - jedis.del(getBucketName(key)); + final String bucketName = getBucketName(key); + + jedis.del(bucketName); + cacheCluster.useWriteCluster(connection -> connection.async().del(bucketName)); } } private void setBucket(String key, LeakyBucket bucket) { try (Jedis jedis = cacheClient.getWriteResource()) { - String serialized = bucket.serialize(mapper); - jedis.setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized); + final String bucketName = getBucketName(key); + final String serialized = bucket.serialize(mapper); + final int level = (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000); + + jedis.setex(bucketName, level, serialized); + cacheCluster.useWriteCluster(connection -> connection.async().setex(bucketName, level, serialized)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java index f58db72b4..70e4974aa 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java @@ -18,6 +18,7 @@ package org.whispersystems.textsecuregcm.limits; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; public class RateLimiters { @@ -47,84 +48,84 @@ public class RateLimiters { private final RateLimiter usernameLookupLimiter; private final RateLimiter usernameSetLimiter; - public RateLimiters(RateLimitsConfiguration config, ReplicatedJedisPool cacheClient) { - this.smsDestinationLimiter = new RateLimiter(cacheClient, "smsDestination", + public RateLimiters(RateLimitsConfiguration config, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { + this.smsDestinationLimiter = new RateLimiter(cacheClient, cacheCluster, "smsDestination", config.getSmsDestination().getBucketSize(), config.getSmsDestination().getLeakRatePerMinute()); - this.voiceDestinationLimiter = new RateLimiter(cacheClient, "voxDestination", + this.voiceDestinationLimiter = new RateLimiter(cacheClient, cacheCluster, "voxDestination", config.getVoiceDestination().getBucketSize(), config.getVoiceDestination().getLeakRatePerMinute()); - this.voiceDestinationDailyLimiter = new RateLimiter(cacheClient, "voxDestinationDaily", + this.voiceDestinationDailyLimiter = new RateLimiter(cacheClient, cacheCluster, "voxDestinationDaily", config.getVoiceDestinationDaily().getBucketSize(), config.getVoiceDestinationDaily().getLeakRatePerMinute()); - this.smsVoiceIpLimiter = new RateLimiter(cacheClient, "smsVoiceIp", + this.smsVoiceIpLimiter = new RateLimiter(cacheClient, cacheCluster, "smsVoiceIp", config.getSmsVoiceIp().getBucketSize(), config.getSmsVoiceIp().getLeakRatePerMinute()); - this.smsVoicePrefixLimiter = new RateLimiter(cacheClient, "smsVoicePrefix", + this.smsVoicePrefixLimiter = new RateLimiter(cacheClient, cacheCluster, "smsVoicePrefix", config.getSmsVoicePrefix().getBucketSize(), config.getSmsVoicePrefix().getLeakRatePerMinute()); - this.autoBlockLimiter = new RateLimiter(cacheClient, "autoBlock", + this.autoBlockLimiter = new RateLimiter(cacheClient, cacheCluster, "autoBlock", config.getAutoBlock().getBucketSize(), config.getAutoBlock().getLeakRatePerMinute()); - this.verifyLimiter = new LockingRateLimiter(cacheClient, "verify", + this.verifyLimiter = new LockingRateLimiter(cacheClient, cacheCluster, "verify", config.getVerifyNumber().getBucketSize(), config.getVerifyNumber().getLeakRatePerMinute()); - this.pinLimiter = new LockingRateLimiter(cacheClient, "pin", + this.pinLimiter = new LockingRateLimiter(cacheClient, cacheCluster, "pin", config.getVerifyPin().getBucketSize(), config.getVerifyPin().getLeakRatePerMinute()); - this.attachmentLimiter = new RateLimiter(cacheClient, "attachmentCreate", + this.attachmentLimiter = new RateLimiter(cacheClient, cacheCluster, "attachmentCreate", config.getAttachments().getBucketSize(), config.getAttachments().getLeakRatePerMinute()); - this.contactsLimiter = new RateLimiter(cacheClient, "contactsQuery", + this.contactsLimiter = new RateLimiter(cacheClient, cacheCluster, "contactsQuery", config.getContactQueries().getBucketSize(), config.getContactQueries().getLeakRatePerMinute()); - this.contactsIpLimiter = new RateLimiter(cacheClient, "contactsIpQuery", + this.contactsIpLimiter = new RateLimiter(cacheClient, cacheCluster, "contactsIpQuery", config.getContactIpQueries().getBucketSize(), config.getContactIpQueries().getLeakRatePerMinute()); - this.preKeysLimiter = new RateLimiter(cacheClient, "prekeys", + this.preKeysLimiter = new RateLimiter(cacheClient, cacheCluster, "prekeys", config.getPreKeys().getBucketSize(), config.getPreKeys().getLeakRatePerMinute()); - this.messagesLimiter = new RateLimiter(cacheClient, "messages", + this.messagesLimiter = new RateLimiter(cacheClient, cacheCluster, "messages", config.getMessages().getBucketSize(), config.getMessages().getLeakRatePerMinute()); - this.allocateDeviceLimiter = new RateLimiter(cacheClient, "allocateDevice", + this.allocateDeviceLimiter = new RateLimiter(cacheClient, cacheCluster, "allocateDevice", config.getAllocateDevice().getBucketSize(), config.getAllocateDevice().getLeakRatePerMinute()); - this.verifyDeviceLimiter = new RateLimiter(cacheClient, "verifyDevice", + this.verifyDeviceLimiter = new RateLimiter(cacheClient, cacheCluster, "verifyDevice", config.getVerifyDevice().getBucketSize(), config.getVerifyDevice().getLeakRatePerMinute()); - this.turnLimiter = new RateLimiter(cacheClient, "turnAllocate", + this.turnLimiter = new RateLimiter(cacheClient, cacheCluster, "turnAllocate", config.getTurnAllocations().getBucketSize(), config.getTurnAllocations().getLeakRatePerMinute()); - this.profileLimiter = new RateLimiter(cacheClient, "profile", + this.profileLimiter = new RateLimiter(cacheClient, cacheCluster, "profile", config.getProfile().getBucketSize(), config.getProfile().getLeakRatePerMinute()); - this.stickerPackLimiter = new RateLimiter(cacheClient, "stickerPack", + this.stickerPackLimiter = new RateLimiter(cacheClient, cacheCluster, "stickerPack", config.getStickerPack().getBucketSize(), config.getStickerPack().getLeakRatePerMinute()); - this.usernameLookupLimiter = new RateLimiter(cacheClient, "usernameLookup", + this.usernameLookupLimiter = new RateLimiter(cacheClient, cacheCluster, "usernameLookup", config.getUsernameLookup().getBucketSize(), config.getUsernameLookup().getLeakRatePerMinute()); - this.usernameSetLimiter = new RateLimiter(cacheClient, "usernameSet", + this.usernameSetLimiter = new RateLimiter(cacheClient, cacheCluster, "usernameSet", config.getUsernameSet().getBucketSize(), config.getUsernameSet().getLeakRatePerMinute()); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java index 4b1402b1a..7876c22af 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java @@ -16,6 +16,8 @@ */ package org.whispersystems.textsecuregcm.storage; +import io.lettuce.core.SetArgs; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.LuaScript; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; @@ -36,17 +38,20 @@ public class AccountDatabaseCrawlerCache { private static final long LAST_NUMBER_TTL_MS = 86400_000L; - private final ReplicatedJedisPool jedisPool; - private final LuaScript luaScript; + private final ReplicatedJedisPool jedisPool; + private final FaultTolerantRedisCluster cacheCluster; + private final LuaScript luaScript; - public AccountDatabaseCrawlerCache(ReplicatedJedisPool jedisPool) throws IOException { + public AccountDatabaseCrawlerCache(ReplicatedJedisPool jedisPool, FaultTolerantRedisCluster cacheCluster) throws IOException { this.jedisPool = jedisPool; + this.cacheCluster = cacheCluster; this.luaScript = LuaScript.fromResource(jedisPool, "lua/account_database_crawler/unlock.lua"); } public void clearAccelerate() { try (Jedis jedis = jedisPool.getWriteResource()) { jedis.del(ACCELERATE_KEY); + cacheCluster.useWriteCluster(connection -> connection.async().del(ACCELERATE_KEY)); } } @@ -58,7 +63,14 @@ public class AccountDatabaseCrawlerCache { public boolean claimActiveWork(String workerId, long ttlMs) { try (Jedis jedis = jedisPool.getWriteResource()) { - return "OK".equals(jedis.set(ACTIVE_WORKER_KEY, workerId, "NX", "PX", ttlMs)); + final boolean claimed = "OK".equals(jedis.set(ACTIVE_WORKER_KEY, workerId, "NX", "PX", ttlMs)); + + if (claimed) { + // TODO Restore the NX flag when making the cluster the primary data store + cacheCluster.useWriteCluster(connection -> connection.async().set(ACCELERATE_KEY, workerId, SetArgs.Builder.px(ttlMs))); + } + + return claimed; } } @@ -81,8 +93,10 @@ public class AccountDatabaseCrawlerCache { try (Jedis jedis = jedisPool.getWriteResource()) { if (lastUuid.isPresent()) { jedis.psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString()); + cacheCluster.useWriteCluster(connection -> connection.async().psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); } else { jedis.del(LAST_UUID_KEY); + cacheCluster.useWriteCluster(connection -> connection.async().del(LAST_UUID_KEY)); } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 6e94b9a6f..d5316be42 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -22,10 +22,12 @@ import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier; import org.whispersystems.textsecuregcm.entities.ClientContact; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.SystemMapper; @@ -54,16 +56,18 @@ public class AccountsManager { private final Logger logger = LoggerFactory.getLogger(AccountsManager.class); - private final Accounts accounts; - private final ReplicatedJedisPool cacheClient; - private final DirectoryManager directory; - private final ObjectMapper mapper; + private final Accounts accounts; + private final ReplicatedJedisPool cacheClient; + private final FaultTolerantRedisCluster cacheCluster; + private final DirectoryManager directory; + private final ObjectMapper mapper; - public AccountsManager(Accounts accounts, DirectoryManager directory, ReplicatedJedisPool cacheClient) { - this.accounts = accounts; - this.directory = directory; - this.cacheClient = cacheClient; - this.mapper = SystemMapper.getMapper(); + public AccountsManager(Accounts accounts, DirectoryManager directory, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { + this.accounts = accounts; + this.directory = directory; + this.cacheClient = cacheClient; + this.cacheCluster = cacheCluster; + this.mapper = SystemMapper.getMapper(); } public boolean create(Account account) { @@ -147,8 +151,19 @@ public class AccountsManager { try (Jedis jedis = cacheClient.getWriteResource(); Timer.Context ignored = redisSetTimer.time()) { - jedis.set(getAccountMapKey(account.getNumber()), account.getUuid().toString()); - jedis.set(getAccountEntityKey(account.getUuid()), mapper.writeValueAsString(account)); + final String accountMapKey = getAccountMapKey(account.getNumber()); + final String accountEntityKey = getAccountEntityKey(account.getUuid()); + final String accountJson = mapper.writeValueAsString(account); + + jedis.set(accountMapKey, account.getUuid().toString()); + jedis.set(accountEntityKey, accountJson); + + cacheCluster.useWriteCluster(connection -> { + RedisAdvancedClusterAsyncCommands asyncCommands = connection.async(); + + asyncCommands.set(accountMapKey, account.getUuid().toString()); + asyncCommands.set(accountEntityKey, accountJson); + }); } catch (JsonProcessingException e) { throw new IllegalStateException(e); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java index 1c23de692..8817ca51c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java @@ -21,6 +21,7 @@ import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.whispersystems.textsecuregcm.entities.ActiveUserTally; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.Util; @@ -46,13 +47,15 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener { private static final String INTERVALS[] = {"daily", "weekly", "monthly", "quarterly", "yearly"}; - private final MetricsFactory metricsFactory; - private final ReplicatedJedisPool jedisPool; - private final ObjectMapper mapper; + private final MetricsFactory metricsFactory; + private final ReplicatedJedisPool jedisPool; + private final FaultTolerantRedisCluster cacheCluster; + private final ObjectMapper mapper; - public ActiveUserCounter(MetricsFactory metricsFactory, ReplicatedJedisPool jedisPool) { + public ActiveUserCounter(MetricsFactory metricsFactory, ReplicatedJedisPool jedisPool, FaultTolerantRedisCluster cacheCluster) { this.metricsFactory = metricsFactory; this.jedisPool = jedisPool; + this.cacheCluster = cacheCluster; this.mapper = SystemMapper.getMapper(); } @@ -60,6 +63,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener { public void onCrawlStart() { try (Jedis jedis = jedisPool.getWriteResource()) { jedis.del(TALLY_KEY); + cacheCluster.useWriteCluster(connection -> connection.async().del(TALLY_KEY)); } } @@ -174,7 +178,10 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener { } } - jedis.set(TALLY_KEY, mapper.writeValueAsString(activeUserTally)); + final String tallyJson = mapper.writeValueAsString(activeUserTally); + + jedis.set(TALLY_KEY, tallyJson); + cacheCluster.useWriteCluster(connection -> connection.async().set(TALLY_KEY, tallyJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } catch (IOException e) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java index 8b74cb5e1..776e1bdb8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.SystemMapper; @@ -35,14 +36,16 @@ public class PendingAccountsManager { private static final String CACHE_PREFIX = "pending_account2::"; - private final PendingAccounts pendingAccounts; - private final ReplicatedJedisPool cacheClient; - private final ObjectMapper mapper; + private final PendingAccounts pendingAccounts; + private final ReplicatedJedisPool cacheClient; + private final FaultTolerantRedisCluster cacheCluster; + private final ObjectMapper mapper; - public PendingAccountsManager(PendingAccounts pendingAccounts, ReplicatedJedisPool cacheClient) + public PendingAccountsManager(PendingAccounts pendingAccounts, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { this.pendingAccounts = pendingAccounts; this.cacheClient = cacheClient; + this.cacheCluster = cacheCluster; this.mapper = SystemMapper.getMapper(); } @@ -69,7 +72,11 @@ public class PendingAccountsManager { private void memcacheSet(String number, StoredVerificationCode code) { try (Jedis jedis = cacheClient.getWriteResource()) { - jedis.set(CACHE_PREFIX + number, mapper.writeValueAsString(code)); + final String key = CACHE_PREFIX + number; + final String verificationCodeJson = mapper.writeValueAsString(code); + + jedis.set(key, verificationCodeJson); + cacheCluster.useWriteCluster(connection -> connection.async().set(key, verificationCodeJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -89,7 +96,10 @@ public class PendingAccountsManager { private void memcacheDelete(String number) { try (Jedis jedis = cacheClient.getWriteResource()) { - jedis.del(CACHE_PREFIX + number); + final String key = CACHE_PREFIX + number; + + jedis.del(key); + cacheCluster.useWriteCluster(connection -> connection.async().del(key)); } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java index 90207458c..92250284d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.SystemMapper; @@ -35,13 +36,15 @@ public class PendingDevicesManager { private static final String CACHE_PREFIX = "pending_devices2::"; - private final PendingDevices pendingDevices; - private final ReplicatedJedisPool cacheClient; - private final ObjectMapper mapper; + private final PendingDevices pendingDevices; + private final ReplicatedJedisPool cacheClient; + private final FaultTolerantRedisCluster cacheCluster; + private final ObjectMapper mapper; - public PendingDevicesManager(PendingDevices pendingDevices, ReplicatedJedisPool cacheClient) { + public PendingDevicesManager(PendingDevices pendingDevices, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { this.pendingDevices = pendingDevices; this.cacheClient = cacheClient; + this.cacheCluster = cacheCluster; this.mapper = SystemMapper.getMapper(); } @@ -68,7 +71,11 @@ public class PendingDevicesManager { private void memcacheSet(String number, StoredVerificationCode code) { try (Jedis jedis = cacheClient.getWriteResource()) { - jedis.set(CACHE_PREFIX + number, mapper.writeValueAsString(code)); + final String key = CACHE_PREFIX + number; + final String verificationCodeJson = mapper.writeValueAsString(code); + + jedis.set(key, verificationCodeJson); + cacheCluster.useWriteCluster(connection -> connection.async().set(key, verificationCodeJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -88,7 +95,10 @@ public class PendingDevicesManager { private void memcacheDelete(String number) { try (Jedis jedis = cacheClient.getWriteResource()) { - jedis.del(CACHE_PREFIX + number); + final String key = CACHE_PREFIX + number; + + jedis.del(key); + cacheCluster.useWriteCluster(connection -> connection.async().del(key)); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java index 5245d78b9..5774ef348 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.SystemMapper; @@ -20,14 +21,16 @@ public class ProfilesManager { private static final String CACHE_PREFIX = "profiles::"; - private final Profiles profiles; - private final ReplicatedJedisPool cacheClient; - private final ObjectMapper mapper; + private final Profiles profiles; + private final ReplicatedJedisPool cacheClient; + private final FaultTolerantRedisCluster cacheCluster; + private final ObjectMapper mapper; - public ProfilesManager(Profiles profiles, ReplicatedJedisPool cacheClient) { - this.profiles = profiles; - this.cacheClient = cacheClient; - this.mapper = SystemMapper.getMapper(); + public ProfilesManager(Profiles profiles, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { + this.profiles = profiles; + this.cacheClient = cacheClient; + this.cacheCluster = cacheCluster; + this.mapper = SystemMapper.getMapper(); } public void set(UUID uuid, VersionedProfile versionedProfile) { @@ -53,7 +56,11 @@ public class ProfilesManager { private void memcacheSet(UUID uuid, VersionedProfile profile) { try (Jedis jedis = cacheClient.getWriteResource()) { - jedis.hset(CACHE_PREFIX + uuid.toString(), profile.getVersion(), mapper.writeValueAsString(profile)); + final String key = CACHE_PREFIX + uuid.toString(); + final String profileJson = mapper.writeValueAsString(profile); + + jedis.hset(key, profile.getVersion(), profileJson); + cacheCluster.useWriteCluster(connection -> connection.async().hset(key, profile.getVersion(), profileJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -76,7 +83,10 @@ public class ProfilesManager { private void memcacheDelete(UUID uuid) { try (Jedis jedis = cacheClient.getWriteResource()) { - jedis.del(CACHE_PREFIX + uuid.toString()); + final String key = CACHE_PREFIX + uuid.toString(); + + jedis.del(key); + cacheCluster.useWriteCluster(connection -> connection.async().del(key)); } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java index a3949ca3d..044accf0b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java @@ -3,8 +3,10 @@ package org.whispersystems.textsecuregcm.storage; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; +import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.Constants; @@ -29,14 +31,16 @@ public class UsernamesManager { private final Logger logger = LoggerFactory.getLogger(AccountsManager.class); - private final Usernames usernames; - private final ReservedUsernames reservedUsernames; - private final ReplicatedJedisPool cacheClient; + private final Usernames usernames; + private final ReservedUsernames reservedUsernames; + private final ReplicatedJedisPool cacheClient; + private final FaultTolerantRedisCluster cacheCluster; - public UsernamesManager(Usernames usernames, ReservedUsernames reservedUsernames, ReplicatedJedisPool cacheClient) { + public UsernamesManager(Usernames usernames, ReservedUsernames reservedUsernames, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) { this.usernames = usernames; this.reservedUsernames = reservedUsernames; this.cacheClient = cacheClient; + this.cacheCluster = cacheCluster; } public boolean put(UUID uuid, String username) { @@ -112,10 +116,26 @@ public class UsernamesManager { try (Jedis jedis = cacheClient.getWriteResource(); Timer.Context ignored = redisSetTimer.time()) { - Optional.ofNullable(jedis.get(getUuidMapKey(uuid))).ifPresent(oldUsername -> jedis.del(getUsernameMapKey(oldUsername))); + final String uuidMapKey = getUuidMapKey(uuid); + final String usernameMapKey = getUsernameMapKey(username); - jedis.set(getUuidMapKey(uuid), username); - jedis.set(getUsernameMapKey(username), uuid.toString()); + Optional.ofNullable(jedis.get(uuidMapKey)).ifPresent(oldUsername -> jedis.del(getUsernameMapKey(oldUsername))); + + jedis.set(uuidMapKey, username); + jedis.set(usernameMapKey, uuid.toString()); + + cacheCluster.useWriteCluster(connection -> { + final RedisAdvancedClusterAsyncCommands asyncCommands = connection.async(); + + asyncCommands.get(uuidMapKey).thenAccept(oldUsername -> { + if (oldUsername != null) { + asyncCommands.del(getUsernameMapKey(oldUsername)); + } + + asyncCommands.set(uuidMapKey, username); + asyncCommands.set(usernameMapKey, uuid.toString()); + }); + }); } catch (JedisException e) { if (required) throw e; else logger.warn("Ignoring jedis failure", e); @@ -151,12 +171,23 @@ public class UsernamesManager { try (Jedis jedis = cacheClient.getWriteResource(); Timer.Context ignored = redisUuidGetTimer.time()) { - Optional username = redisGet(uuid); + final String uuidMapKey = getUuidMapKey(uuid); - if (username.isPresent()) { - jedis.del(getUsernameMapKey(username.get())); - jedis.del(getUuidMapKey(uuid)); - } + redisGet(uuid).ifPresent(username -> { + jedis.del(getUsernameMapKey(username)); + jedis.del(uuidMapKey); + }); + + cacheCluster.useWriteCluster(connection -> { + final RedisAdvancedClusterAsyncCommands asyncCommands = connection.async(); + + asyncCommands.get(uuidMapKey).thenAccept(username -> { + if (username != null) { + asyncCommands.del(getUsernameMapKey(username)); + asyncCommands.del(uuidMapKey); + } + }); + }); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index e7864660a..2aaa2d3c3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -1,6 +1,8 @@ package org.whispersystems.textsecuregcm.workers; import com.fasterxml.jackson.databind.DeserializationFeature; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.RedisClusterClient; import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Subparser; import org.jdbi.v3.core.Jdbi; @@ -9,6 +11,7 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.auth.AuthenticationCredentials; import org.whispersystems.textsecuregcm.providers.RedisClientFactory; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; import org.whispersystems.textsecuregcm.storage.Account; @@ -21,6 +24,7 @@ import org.whispersystems.textsecuregcm.util.Base64; import java.security.SecureRandom; import java.util.Optional; +import java.util.stream.Collectors; import io.dropwizard.Application; import io.dropwizard.cli.EnvironmentCommand; @@ -66,12 +70,15 @@ public class DeleteUserCommand extends EnvironmentCommand account = accountsManager.get(user); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java index 08870857c..2aa7e36cf 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java @@ -1,6 +1,7 @@ package org.whispersystems.textsecuregcm.tests.storage; import org.junit.Test; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Accounts; @@ -23,10 +24,11 @@ public class AccountsManagerTest { @Test public void testGetAccountByNumberInCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Accounts accounts = mock(Accounts.class ); - DirectoryManager directoryManager = mock(DirectoryManager.class ); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class ); + Jedis jedis = mock(Jedis.class ); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + Accounts accounts = mock(Accounts.class ); + DirectoryManager directoryManager = mock(DirectoryManager.class ); UUID uuid = UUID.randomUUID(); @@ -34,7 +36,7 @@ public class AccountsManagerTest { when(jedis.get(eq("AccountMap::+14152222222"))).thenReturn(uuid.toString()); when(jedis.get(eq("Account3::" + uuid.toString()))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}"); - AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient); + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); Optional account = accountsManager.get("+14152222222"); assertTrue(account.isPresent()); @@ -50,17 +52,18 @@ public class AccountsManagerTest { @Test public void testGetAccountByUuidInCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Accounts accounts = mock(Accounts.class ); - DirectoryManager directoryManager = mock(DirectoryManager.class ); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class ); + Jedis jedis = mock(Jedis.class ); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + Accounts accounts = mock(Accounts.class ); + DirectoryManager directoryManager = mock(DirectoryManager.class ); UUID uuid = UUID.randomUUID(); when(cacheClient.getReadResource()).thenReturn(jedis); when(jedis.get(eq("Account3::" + uuid.toString()))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}"); - AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient); + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); Optional account = accountsManager.get(uuid); assertTrue(account.isPresent()); @@ -77,19 +80,20 @@ public class AccountsManagerTest { @Test public void testGetAccountByNumberNotInCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Accounts accounts = mock(Accounts.class ); - DirectoryManager directoryManager = mock(DirectoryManager.class ); - UUID uuid = UUID.randomUUID(); - Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class ); + Jedis jedis = mock(Jedis.class ); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + Accounts accounts = mock(Accounts.class ); + DirectoryManager directoryManager = mock(DirectoryManager.class ); + UUID uuid = UUID.randomUUID(); + Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); when(cacheClient.getReadResource()).thenReturn(jedis); when(cacheClient.getWriteResource()).thenReturn(jedis); when(jedis.get(eq("AccountMap::+14152222222"))).thenReturn(null); when(accounts.get(eq("+14152222222"))).thenReturn(Optional.of(account)); - AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient); + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); Optional retrieved = accountsManager.get("+14152222222"); assertTrue(retrieved.isPresent()); @@ -107,19 +111,20 @@ public class AccountsManagerTest { @Test public void testGetAccountByUuidNotInCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Accounts accounts = mock(Accounts.class ); - DirectoryManager directoryManager = mock(DirectoryManager.class ); - UUID uuid = UUID.randomUUID(); - Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class ); + Jedis jedis = mock(Jedis.class ); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + Accounts accounts = mock(Accounts.class ); + DirectoryManager directoryManager = mock(DirectoryManager.class ); + UUID uuid = UUID.randomUUID(); + Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); when(cacheClient.getReadResource()).thenReturn(jedis); when(cacheClient.getWriteResource()).thenReturn(jedis); when(jedis.get(eq("Account3::" + uuid))).thenReturn(null); when(accounts.get(eq(uuid))).thenReturn(Optional.of(account)); - AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient); + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); Optional retrieved = accountsManager.get(uuid); assertTrue(retrieved.isPresent()); @@ -137,19 +142,20 @@ public class AccountsManagerTest { @Test public void testGetAccountByNumberBrokenCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Accounts accounts = mock(Accounts.class ); - DirectoryManager directoryManager = mock(DirectoryManager.class ); - UUID uuid = UUID.randomUUID(); - Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class ); + Jedis jedis = mock(Jedis.class ); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + Accounts accounts = mock(Accounts.class ); + DirectoryManager directoryManager = mock(DirectoryManager.class ); + UUID uuid = UUID.randomUUID(); + Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); when(cacheClient.getReadResource()).thenReturn(jedis); when(cacheClient.getWriteResource()).thenReturn(jedis); when(jedis.get(eq("AccountMap::+14152222222"))).thenThrow(new JedisException("Connection lost!")); when(accounts.get(eq("+14152222222"))).thenReturn(Optional.of(account)); - AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient); + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); Optional retrieved = accountsManager.get("+14152222222"); assertTrue(retrieved.isPresent()); @@ -167,19 +173,20 @@ public class AccountsManagerTest { @Test public void testGetAccountByUuidBrokenCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Accounts accounts = mock(Accounts.class ); - DirectoryManager directoryManager = mock(DirectoryManager.class ); - UUID uuid = UUID.randomUUID(); - Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class ); + Jedis jedis = mock(Jedis.class ); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + Accounts accounts = mock(Accounts.class ); + DirectoryManager directoryManager = mock(DirectoryManager.class ); + UUID uuid = UUID.randomUUID(); + Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]); when(cacheClient.getReadResource()).thenReturn(jedis); when(cacheClient.getWriteResource()).thenReturn(jedis); when(jedis.get(eq("Account3::" + uuid))).thenThrow(new JedisException("Connection lost!")); when(accounts.get(eq(uuid))).thenReturn(Optional.of(account)); - AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient); + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient, cacheCluster); Optional retrieved = accountsManager.get(uuid); assertTrue(retrieved.isPresent()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java index 0c3f67cf7..df7820d7a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java @@ -17,6 +17,7 @@ package org.whispersystems.textsecuregcm.tests.storage; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.ActiveUserCounter; @@ -64,11 +65,12 @@ public class ActiveUserCounterTest { private final Account iosAccount = mock(Account.class); private final Account noDeviceAccount = mock(Account.class); - private final Jedis jedis = mock(Jedis.class); - private final ReplicatedJedisPool jedisPool = mock(ReplicatedJedisPool.class); - private final MetricsFactory metricsFactory = mock(MetricsFactory.class); + private final Jedis jedis = mock(Jedis.class); + private final ReplicatedJedisPool jedisPool = mock(ReplicatedJedisPool.class); + private final FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + private final MetricsFactory metricsFactory = mock(MetricsFactory.class); - private final ActiveUserCounter activeUserCounter = new ActiveUserCounter(metricsFactory, jedisPool); + private final ActiveUserCounter activeUserCounter = new ActiveUserCounter(metricsFactory, jedisPool, cacheCluster); @Before public void setup() { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesManagerTest.java index 81b95057b..e39b9118f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesManagerTest.java @@ -1,6 +1,7 @@ package org.whispersystems.textsecuregcm.tests.storage; import org.junit.Test; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.storage.Profiles; import org.whispersystems.textsecuregcm.storage.ProfilesManager; @@ -24,16 +25,17 @@ public class ProfilesManagerTest { @Test public void testGetProfileInCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Profiles profiles = mock(Profiles.class ); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class ); + Profiles profiles = mock(Profiles.class ); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); UUID uuid = UUID.randomUUID(); when(cacheClient.getReadResource()).thenReturn(jedis); when(jedis.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenReturn("{\"version\": \"someversion\", \"name\": \"somename\", \"avatar\": \"someavatar\", \"commitment\":\"" + Base64.encodeBytes("somecommitment".getBytes()) + "\"}"); - ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient); + ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient, cacheCluster); Optional profile = profilesManager.get(uuid, "someversion"); assertTrue(profile.isPresent()); @@ -49,9 +51,10 @@ public class ProfilesManagerTest { @Test public void testGetProfileNotInCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Profiles profiles = mock(Profiles.class ); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class ); + Profiles profiles = mock(Profiles.class ); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); UUID uuid = UUID.randomUUID(); VersionedProfile profile = new VersionedProfile("someversion", "somename", "someavatar", "somecommitment".getBytes()); @@ -61,7 +64,7 @@ public class ProfilesManagerTest { when(jedis.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenReturn(null); when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile)); - ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient); + ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient, cacheCluster); Optional retrieved = profilesManager.get(uuid, "someversion"); assertTrue(retrieved.isPresent()); @@ -78,9 +81,10 @@ public class ProfilesManagerTest { @Test public void testGetProfileBrokenCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Profiles profiles = mock(Profiles.class ); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class ); + Profiles profiles = mock(Profiles.class ); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); UUID uuid = UUID.randomUUID(); VersionedProfile profile = new VersionedProfile("someversion", "somename", "someavatar", "somecommitment".getBytes()); @@ -90,7 +94,7 @@ public class ProfilesManagerTest { when(jedis.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenThrow(new JedisException("Connection lost")); when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile)); - ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient); + ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient, cacheCluster); Optional retrieved = profilesManager.get(uuid, "someversion"); assertTrue(retrieved.isPresent()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/UsernamesManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/UsernamesManagerTest.java index 0bcf862f8..e40740516 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/UsernamesManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/UsernamesManagerTest.java @@ -1,6 +1,7 @@ package org.whispersystems.textsecuregcm.tests.storage; import org.junit.Test; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.storage.ReservedUsernames; import org.whispersystems.textsecuregcm.storage.Usernames; @@ -21,17 +22,18 @@ public class UsernamesManagerTest { @Test public void testGetByUsernameInCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Usernames usernames = mock(Usernames.class ); - ReservedUsernames reserved = mock(ReservedUsernames.class); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + Usernames usernames = mock(Usernames.class); + ReservedUsernames reserved = mock(ReservedUsernames.class); UUID uuid = UUID.randomUUID(); when(cacheClient.getReadResource()).thenReturn(jedis); when(jedis.get(eq("UsernameByUsername::n00bkiller"))).thenReturn(uuid.toString()); - UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient); + UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); Optional retrieved = usernamesManager.get("n00bkiller"); assertTrue(retrieved.isPresent()); @@ -45,17 +47,18 @@ public class UsernamesManagerTest { @Test public void testGetByUuidInCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Usernames usernames = mock(Usernames.class ); - ReservedUsernames reserved = mock(ReservedUsernames.class); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + Usernames usernames = mock(Usernames.class); + ReservedUsernames reserved = mock(ReservedUsernames.class); UUID uuid = UUID.randomUUID(); when(cacheClient.getReadResource()).thenReturn(jedis); when(jedis.get(eq("UsernameByUuid::" + uuid.toString()))).thenReturn("n00bkiller"); - UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient); + UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); Optional retrieved = usernamesManager.get(uuid); assertTrue(retrieved.isPresent()); @@ -70,10 +73,11 @@ public class UsernamesManagerTest { @Test public void testGetByUsernameNotInCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Usernames usernames = mock(Usernames.class ); - ReservedUsernames reserved = mock(ReservedUsernames.class); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + Usernames usernames = mock(Usernames.class); + ReservedUsernames reserved = mock(ReservedUsernames.class); UUID uuid = UUID.randomUUID(); @@ -83,7 +87,7 @@ public class UsernamesManagerTest { when(jedis.get(eq("UsernameByUsername::n00bkiller"))).thenReturn(null); when(usernames.get(eq("n00bkiller"))).thenReturn(Optional.of(uuid)); - UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient); + UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); Optional retrieved = usernamesManager.get("n00bkiller"); assertTrue(retrieved.isPresent()); @@ -102,10 +106,11 @@ public class UsernamesManagerTest { @Test public void testGetByUuidNotInCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Usernames usernames = mock(Usernames.class ); - ReservedUsernames reserved = mock(ReservedUsernames.class); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + Usernames usernames = mock(Usernames.class); + ReservedUsernames reserved = mock(ReservedUsernames.class); UUID uuid = UUID.randomUUID(); @@ -114,7 +119,7 @@ public class UsernamesManagerTest { when(jedis.get(eq("UsernameByUuid::" + uuid.toString()))).thenReturn(null); when(usernames.get(eq(uuid))).thenReturn(Optional.of("n00bkiller")); - UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient); + UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); Optional retrieved = usernamesManager.get(uuid); assertTrue(retrieved.isPresent()); @@ -132,10 +137,11 @@ public class UsernamesManagerTest { @Test public void testGetByUsernameBrokenCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Usernames usernames = mock(Usernames.class ); - ReservedUsernames reserved = mock(ReservedUsernames.class); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + Usernames usernames = mock(Usernames.class); + ReservedUsernames reserved = mock(ReservedUsernames.class); UUID uuid = UUID.randomUUID(); @@ -144,7 +150,7 @@ public class UsernamesManagerTest { when(jedis.get(eq("UsernameByUsername::n00bkiller"))).thenThrow(new JedisException("Connection lost!")); when(usernames.get(eq("n00bkiller"))).thenReturn(Optional.of(uuid)); - UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient); + UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); Optional retrieved = usernamesManager.get("n00bkiller"); assertTrue(retrieved.isPresent()); @@ -163,10 +169,11 @@ public class UsernamesManagerTest { @Test public void testGetAccountByUuidBrokenCache() { - ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); - Jedis jedis = mock(Jedis.class ); - Usernames usernames = mock(Usernames.class ); - ReservedUsernames reserved = mock(ReservedUsernames.class); + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class); + FaultTolerantRedisCluster cacheCluster = mock(FaultTolerantRedisCluster.class); + Usernames usernames = mock(Usernames.class); + ReservedUsernames reserved = mock(ReservedUsernames.class); UUID uuid = UUID.randomUUID(); @@ -175,7 +182,7 @@ public class UsernamesManagerTest { when(jedis.get(eq("UsernameByUuid::" + uuid))).thenThrow(new JedisException("Connection lost!")); when(usernames.get(eq(uuid))).thenReturn(Optional.of("n00bkiller")); - UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient); + UsernamesManager usernamesManager = new UsernamesManager(usernames, reserved, cacheClient, cacheCluster); Optional retrieved = usernamesManager.get(uuid); assertTrue(retrieved.isPresent());