From 5717dc294e0000668628d13bf0c7d50bea6d314b Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Fri, 14 Aug 2020 11:23:10 -0400 Subject: [PATCH] Combine the read/write breakers for Redis clusters. --- .../limits/LockingRateLimiter.java | 4 +- .../textsecuregcm/limits/RateLimiter.java | 6 +-- .../metrics/PushLatencyManager.java | 4 +- .../providers/RedisClusterHealthCheck.java | 2 +- .../push/ClientPresenceManager.java | 16 +++---- .../textsecuregcm/redis/ClusterLuaScript.java | 6 +-- .../redis/FaultTolerantRedisCluster.java | 44 +++++-------------- .../storage/AccountDatabaseCrawlerCache.java | 12 ++--- .../storage/AccountsManager.java | 6 +-- .../storage/ActiveUserCounter.java | 8 ++-- .../storage/PendingAccountsManager.java | 6 +-- .../storage/PendingDevicesManager.java | 6 +-- .../storage/ProfilesManager.java | 6 +-- .../storage/RedisClusterMessagesCache.java | 6 +-- .../RegistrationLockVersionCounter.java | 6 +-- .../storage/UsernamesManager.java | 8 ++-- .../ClearMessagesCacheClusterCommand.java | 2 +- .../push/ClientPresenceManagerTest.java | 22 +++++----- .../redis/AbstractRedisClusterTest.java | 4 +- .../redis/ClusterLuaScriptTest.java | 17 ++++--- .../redis/FaultTolerantRedisClusterTest.java | 38 ++-------------- .../RedisClusterMessagesCacheTest.java | 2 +- .../tests/storage/ActiveUserCounterTest.java | 18 ++++---- .../tests/util/RedisClusterHelper.java | 16 +++---- 24 files changed, 105 insertions(+), 160 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java index 49662357b..67775599d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java @@ -41,11 +41,11 @@ public class LockingRateLimiter extends RateLimiter { } private void releaseLock(String key) { - cacheCluster.useWriteCluster(connection -> connection.sync().del(getLockName(key))); + cacheCluster.useCluster(connection -> connection.sync().del(getLockName(key))); } private boolean acquireLock(String key) { - return cacheCluster.withWriteCluster(connection -> connection.sync().set(getLockName(key), "L", SetArgs.Builder.nx().ex(10))) != null; + return cacheCluster.withCluster(connection -> connection.sync().set(getLockName(key), "L", SetArgs.Builder.nx().ex(10))) != null; } private String getLockName(String key) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java index 05a70b9bf..9512a0176 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java @@ -85,14 +85,14 @@ public class RateLimiter { } public void clear(String key) { - cacheCluster.useWriteCluster(connection -> connection.sync().del(getBucketName(key))); + cacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key))); } private void setBucket(String key, LeakyBucket bucket) { try { final String serialized = bucket.serialize(mapper); - cacheCluster.useWriteCluster(connection -> connection.sync().setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized)); + cacheCluster.useCluster(connection -> connection.sync().setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -100,7 +100,7 @@ public class RateLimiter { private LeakyBucket getBucket(String key) { try { - final String serialized = cacheCluster.withReadCluster(connection -> connection.sync().get(getBucketName(key))); + final String serialized = cacheCluster.withCluster(connection -> connection.sync().get(getBucketName(key))); if (serialized != null) { return LeakyBucket.fromSerialized(mapper, serialized); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManager.java index 43ad40069..1f6aae3dc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManager.java @@ -37,7 +37,7 @@ public class PushLatencyManager { @VisibleForTesting void recordPushSent(final UUID accountUuid, final long deviceId, final long currentTime) { - redisCluster.useWriteCluster(connection -> + redisCluster.useCluster(connection -> connection.async().set(getFirstUnacknowledgedPushKey(accountUuid, deviceId), String.valueOf(currentTime), SetArgs.Builder.nx().ex(TTL))); } @@ -53,7 +53,7 @@ public class PushLatencyManager { CompletableFuture getLatencyAndClearTimestamp(final UUID accountUuid, final long deviceId, final long currentTimeMillis) { final String key = getFirstUnacknowledgedPushKey(accountUuid, deviceId); - return redisCluster.withWriteCluster(connection -> { + return redisCluster.withCluster(connection -> { final RedisAdvancedClusterAsyncCommands commands = connection.async(); final CompletableFuture getFuture = commands.get(key).toCompletableFuture(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClusterHealthCheck.java b/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClusterHealthCheck.java index 00767452e..3be9ab90a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClusterHealthCheck.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClusterHealthCheck.java @@ -15,7 +15,7 @@ public class RedisClusterHealthCheck extends HealthCheck { @Override protected Result check() throws Exception { - return CompletableFuture.allOf(redisCluster.withReadCluster(connection -> connection.async().masters().commands().ping()).futures()) + return CompletableFuture.allOf(redisCluster.withCluster(connection -> connection.async().masters().commands().ping()).futures()) .thenApply(v -> Result.healthy()) .exceptionally(Result::unhealthy) .get(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java index 38078af20..4332c6f18 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java @@ -105,7 +105,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)).commands().subscribe(presenceChannel); }); - presenceCluster.useWriteCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId)); + presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId)); pruneMissingPeersFuture = scheduledExecutorService.scheduleAtFixedRate(this::pruneMissingPeers, new Random().nextInt(PRUNE_PEERS_INTERVAL_SECONDS), PRUNE_PEERS_INTERVAL_SECONDS, TimeUnit.SECONDS); } @@ -122,7 +122,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter { + presenceCluster.useCluster(connection -> { connection.sync().srem(MANAGER_SET_KEY, managerId); connection.sync().del(getConnectedClientSetKey(managerId)); }); @@ -138,7 +138,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter { + presenceCluster.useCluster(connection -> { final RedisAdvancedClusterCommands commands = connection.sync(); commands.set(presenceKey, managerId); @@ -161,7 +161,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1; + return presenceCluster.withCluster(connection -> connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1; } } @@ -175,7 +175,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.sync().srem(connectedClientSetKey, presenceKey)); + presenceCluster.useCluster(connection -> connection.sync().srem(connectedClientSetKey, presenceKey)); return removed; } @@ -201,18 +201,18 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter peerIds = presenceCluster.withReadCluster(connection -> connection.sync().smembers(MANAGER_SET_KEY)); + final Set peerIds = presenceCluster.withCluster(connection -> connection.sync().smembers(MANAGER_SET_KEY)); peerIds.remove(managerId); for (final String peerId : peerIds) { - final boolean peerMissing = presenceCluster.withWriteCluster(connection -> connection.sync().publish(getManagerPresenceChannel(peerId), "ping") == 0); + final boolean peerMissing = presenceCluster.withCluster(connection -> connection.sync().publish(getManagerPresenceChannel(peerId), "ping") == 0); if (peerMissing) { log.debug("Presence manager {} did not respond to ping", peerId); final String connectedClientsKey = getConnectedClientSetKey(peerId); - presenceCluster.useWriteCluster(connection -> { + presenceCluster.useCluster(connection -> { final RedisAdvancedClusterCommands commands = connection.sync(); String presenceKey; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScript.java index 0b6a24e29..fd62d039f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScript.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScript.java @@ -48,11 +48,11 @@ public class ClusterLuaScript { this.redisCluster = redisCluster; this.scriptOutputType = scriptOutputType; this.script = script; - this.sha = redisCluster.withWriteCluster(connection -> connection.sync().scriptLoad(script)); + this.sha = redisCluster.withCluster(connection -> connection.sync().scriptLoad(script)); } public Object execute(final List keys, final List args) { - return redisCluster.withWriteCluster(connection -> { + return redisCluster.withCluster(connection -> { try { final RedisAdvancedClusterCommands clusterCommands = connection.sync(); @@ -70,7 +70,7 @@ public class ClusterLuaScript { } public Object executeBinary(final List keys, final List args) { - return redisCluster.withBinaryWriteCluster(connection -> { + return redisCluster.withBinaryCluster(connection -> { try { final RedisAdvancedClusterCommands binaryCommands = connection.sync(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index ae6269d11..aab61e4ce 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -36,8 +36,7 @@ public class FaultTolerantRedisCluster { private final List> pubSubConnections = new ArrayList<>(); private final CircuitBreakerConfiguration circuitBreakerConfiguration; - private final CircuitBreaker readCircuitBreaker; - private final CircuitBreaker writeCircuitBreaker; + private final CircuitBreaker circuitBreaker; public FaultTolerantRedisCluster(final String name, final List urls, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) { this(name, RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())), timeout, circuitBreakerConfiguration); @@ -54,15 +53,10 @@ public class FaultTolerantRedisCluster { this.binaryClusterConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); this.circuitBreakerConfiguration = circuitBreakerConfiguration; - this.readCircuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig()); - this.writeCircuitBreaker = CircuitBreaker.of(name + "-write", circuitBreakerConfiguration.toCircuitBreakerConfig()); + this.circuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig()); CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), - readCircuitBreaker, - FaultTolerantRedisCluster.class); - - CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), - writeCircuitBreaker, + circuitBreaker, FaultTolerantRedisCluster.class); } @@ -77,36 +71,20 @@ public class FaultTolerantRedisCluster { clusterClient.shutdown(); } - public void useReadCluster(final Consumer> consumer) { - this.readCircuitBreaker.executeRunnable(() -> consumer.accept(stringClusterConnection)); + public void useCluster(final Consumer> consumer) { + this.circuitBreaker.executeRunnable(() -> consumer.accept(stringClusterConnection)); } - public T withReadCluster(final Function, T> consumer) { - return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(stringClusterConnection)); + public T withCluster(final Function, T> consumer) { + return this.circuitBreaker.executeSupplier(() -> consumer.apply(stringClusterConnection)); } - public void useWriteCluster(final Consumer> consumer) { - this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(stringClusterConnection)); + public void useBinaryCluster(final Consumer> consumer) { + this.circuitBreaker.executeRunnable(() -> consumer.accept(binaryClusterConnection)); } - public T withWriteCluster(final Function, T> consumer) { - return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(stringClusterConnection)); - } - - public void useBinaryReadCluster(final Consumer> consumer) { - this.readCircuitBreaker.executeRunnable(() -> consumer.accept(binaryClusterConnection)); - } - - public T withBinaryReadCluster(final Function, T> consumer) { - return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(binaryClusterConnection)); - } - - public void useBinaryWriteCluster(final Consumer> consumer) { - this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(binaryClusterConnection)); - } - - public T withBinaryWriteCluster(final Function, T> consumer) { - return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(binaryClusterConnection)); + public T withBinaryCluster(final Function, T> consumer) { + return this.circuitBreaker.executeSupplier(() -> consumer.apply(binaryClusterConnection)); } public FaultTolerantPubSubConnection createPubSubConnection() { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java index c244f7ea6..ebc24fc5c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java @@ -44,15 +44,15 @@ public class AccountDatabaseCrawlerCache { } public void clearAccelerate() { - cacheCluster.useWriteCluster(connection -> connection.sync().del(ACCELERATE_KEY)); + cacheCluster.useCluster(connection -> connection.sync().del(ACCELERATE_KEY)); } public boolean isAccelerated() { - return "1".equals(cacheCluster.withReadCluster(connection -> connection.sync().get(ACCELERATE_KEY))); + return "1".equals(cacheCluster.withCluster(connection -> connection.sync().get(ACCELERATE_KEY))); } public boolean claimActiveWork(String workerId, long ttlMs) { - return "OK".equals(cacheCluster.withWriteCluster(connection -> connection.sync().set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttlMs)))); + return "OK".equals(cacheCluster.withCluster(connection -> connection.sync().set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttlMs)))); } public void releaseActiveWork(String workerId) { @@ -60,7 +60,7 @@ public class AccountDatabaseCrawlerCache { } public Optional getLastUuid() { - final String lastUuidString = cacheCluster.withWriteCluster(connection -> connection.sync().get(LAST_UUID_KEY)); + final String lastUuidString = cacheCluster.withCluster(connection -> connection.sync().get(LAST_UUID_KEY)); if (lastUuidString == null) return Optional.empty(); else return Optional.of(UUID.fromString(lastUuidString)); @@ -68,9 +68,9 @@ public class AccountDatabaseCrawlerCache { public void setLastUuid(Optional lastUuid) { if (lastUuid.isPresent()) { - cacheCluster.useWriteCluster(connection -> connection.sync().psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); + cacheCluster.useCluster(connection -> connection.sync().psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); } else { - cacheCluster.useWriteCluster(connection -> connection.sync().del(LAST_UUID_KEY)); + cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_KEY)); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index f0896c60a..f23abfa81 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -147,7 +147,7 @@ public class AccountsManager { try (Timer.Context ignored = redisSetTimer.time()) { final String accountJson = mapper.writeValueAsString(account); - cacheCluster.useWriteCluster(connection -> { + cacheCluster.useCluster(connection -> { final RedisAdvancedClusterCommands commands = connection.sync(); commands.set(getAccountMapKey(account.getNumber()), account.getUuid().toString()); @@ -160,7 +160,7 @@ public class AccountsManager { private Optional redisGet(String number) { try (Timer.Context ignored = redisNumberGetTimer.time()) { - final String uuid = cacheCluster.withReadCluster(connection -> connection.sync().get(getAccountMapKey(number))); + final String uuid = cacheCluster.withCluster(connection -> connection.sync().get(getAccountMapKey(number))); if (uuid != null) return redisGet(UUID.fromString(uuid)); else return Optional.empty(); @@ -175,7 +175,7 @@ public class AccountsManager { private Optional redisGet(UUID uuid) { try (Timer.Context ignored = redisUuidGetTimer.time()) { - final String json = cacheCluster.withReadCluster(connection -> connection.sync().get(getAccountEntityKey(uuid))); + final String json = cacheCluster.withCluster(connection -> connection.sync().get(getAccountEntityKey(uuid))); if (json != null) { Account account = mapper.readValue(json, Account.class); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java index 38e952004..83b958573 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java @@ -56,7 +56,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener { @Override public void onCrawlStart() { - cacheCluster.useWriteCluster(connection -> connection.sync().del(TALLY_KEY)); + cacheCluster.useCluster(connection -> connection.sync().del(TALLY_KEY)); } @Override @@ -153,7 +153,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener { private void incrementTallies(UUID fromUuid, Map platformIncrements, Map countryIncrements) { try { - final String tallyValue = cacheCluster.withReadCluster(connection -> connection.sync().get(TALLY_KEY)); + final String tallyValue = cacheCluster.withCluster(connection -> connection.sync().get(TALLY_KEY)); ActiveUserTally activeUserTally; @@ -173,7 +173,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener { final String tallyJson = mapper.writeValueAsString(activeUserTally); - cacheCluster.useWriteCluster(connection -> connection.sync().set(TALLY_KEY, tallyJson)); + cacheCluster.useCluster(connection -> connection.sync().set(TALLY_KEY, tallyJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -194,7 +194,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener { private ActiveUserTally getFinalTallies() { try { - final String tallyJson = cacheCluster.withReadCluster(connection -> connection.sync().get(TALLY_KEY)); + final String tallyJson = cacheCluster.withCluster(connection -> connection.sync().get(TALLY_KEY)); return mapper.readValue(tallyJson, ActiveUserTally.class); } catch (IOException e) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java index 390a4dcd0..19c384978 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java @@ -69,7 +69,7 @@ public class PendingAccountsManager { try { final String verificationCodeJson = mapper.writeValueAsString(code); - cacheCluster.useWriteCluster(connection -> connection.sync().set(CACHE_PREFIX + number, verificationCodeJson)); + cacheCluster.useCluster(connection -> connection.sync().set(CACHE_PREFIX + number, verificationCodeJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -77,7 +77,7 @@ public class PendingAccountsManager { private Optional memcacheGet(String number) { try { - final String json = cacheCluster.withReadCluster(connection -> connection.sync().get(CACHE_PREFIX + number)); + final String json = cacheCluster.withCluster(connection -> connection.sync().get(CACHE_PREFIX + number)); if (json == null) return Optional.empty(); else return Optional.of(mapper.readValue(json, StoredVerificationCode.class)); @@ -88,6 +88,6 @@ public class PendingAccountsManager { } private void memcacheDelete(String number) { - cacheCluster.useWriteCluster(connection -> connection.sync().del(CACHE_PREFIX + number)); + cacheCluster.useCluster(connection -> connection.sync().del(CACHE_PREFIX + number)); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java index 6fd9ed909..f4bb5921e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java @@ -68,7 +68,7 @@ public class PendingDevicesManager { try { final String verificationCodeJson = mapper.writeValueAsString(code); - cacheCluster.useWriteCluster(connection -> connection.sync().set(CACHE_PREFIX + number, verificationCodeJson)); + cacheCluster.useCluster(connection -> connection.sync().set(CACHE_PREFIX + number, verificationCodeJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -76,7 +76,7 @@ public class PendingDevicesManager { private Optional memcacheGet(String number) { try { - final String json = cacheCluster.withReadCluster(connection -> connection.sync().get(CACHE_PREFIX + number)); + final String json = cacheCluster.withCluster(connection -> connection.sync().get(CACHE_PREFIX + number)); if (json == null) return Optional.empty(); else return Optional.of(mapper.readValue(json, StoredVerificationCode.class)); @@ -87,7 +87,7 @@ public class PendingDevicesManager { } private void memcacheDelete(String number) { - cacheCluster.useWriteCluster(connection -> connection.sync().del(CACHE_PREFIX + number)); + cacheCluster.useCluster(connection -> connection.sync().del(CACHE_PREFIX + number)); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java index e9a4614bf..7a365369c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java @@ -53,7 +53,7 @@ public class ProfilesManager { try { final String profileJson = mapper.writeValueAsString(profile); - cacheCluster.useWriteCluster(connection -> connection.sync().hset(CACHE_PREFIX + uuid.toString(), profile.getVersion(), profileJson)); + cacheCluster.useCluster(connection -> connection.sync().hset(CACHE_PREFIX + uuid.toString(), profile.getVersion(), profileJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -61,7 +61,7 @@ public class ProfilesManager { private Optional memcacheGet(UUID uuid, String version) { try { - final String json = cacheCluster.withReadCluster(connection -> connection.sync().hget(CACHE_PREFIX + uuid.toString(), version)); + final String json = cacheCluster.withCluster(connection -> connection.sync().hget(CACHE_PREFIX + uuid.toString(), version)); if (json == null) return Optional.empty(); else return Optional.of(mapper.readValue(json, VersionedProfile.class)); @@ -75,6 +75,6 @@ public class ProfilesManager { } private void memcacheDelete(UUID uuid) { - cacheCluster.useWriteCluster(connection -> connection.sync().del(CACHE_PREFIX + uuid.toString())); + cacheCluster.useCluster(connection -> connection.sync().del(CACHE_PREFIX + uuid.toString())); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java index 9a8650667..bebda380e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java @@ -269,7 +269,7 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT); + return (int)(redisCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT); } List getQueuesToPersist(final int slot, final Instant maxTime, final int limit) { @@ -280,11 +280,11 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter connection.sync().setex(getPersistInProgressKey(queue), 30, LOCK_VALUE)); + redisCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(queue), 30, LOCK_VALUE)); } void unlockQueueForPersistence(final String queue) { - redisCluster.useBinaryWriteCluster(connection -> connection.sync().del(getPersistInProgressKey(queue))); + redisCluster.useBinaryCluster(connection -> connection.sync().del(getPersistInProgressKey(queue))); } public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationLockVersionCounter.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationLockVersionCounter.java index ff68c3a8b..e47c5706c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationLockVersionCounter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RegistrationLockVersionCounter.java @@ -36,7 +36,7 @@ public class RegistrationLockVersionCounter extends AccountDatabaseCrawlerListen @Override public void onCrawlStart() { - redisCluster.useWriteCluster(connection -> connection.sync().hset(REGLOCK_COUNT_KEY, Map.of(PIN_KEY, "0", REGLOCK_KEY, "0"))); + redisCluster.useCluster(connection -> connection.sync().hset(REGLOCK_COUNT_KEY, Map.of(PIN_KEY, "0", REGLOCK_KEY, "0"))); } @Override @@ -60,7 +60,7 @@ public class RegistrationLockVersionCounter extends AccountDatabaseCrawlerListen } private void incrementReglockCounts(final int pinCount, final int reglockCount) { - redisCluster.useWriteCluster(connection -> { + redisCluster.useCluster(connection -> { final RedisAdvancedClusterCommands commands = connection.sync(); commands.hincrby(REGLOCK_COUNT_KEY, PIN_KEY, pinCount); @@ -71,7 +71,7 @@ public class RegistrationLockVersionCounter extends AccountDatabaseCrawlerListen @Override public void onCrawlEnd(final Optional fromUuid) { final Map countsByReglockType = - redisCluster.withReadCluster(connection -> connection.sync().hmget(REGLOCK_COUNT_KEY, PIN_KEY, REGLOCK_KEY)) + redisCluster.withCluster(connection -> connection.sync().hmget(REGLOCK_COUNT_KEY, PIN_KEY, REGLOCK_KEY)) .stream() .collect(Collectors.toMap(KeyValue::getKey, keyValue -> keyValue.hasValue() ? keyValue.map(Integer::parseInt).getValue() : 0)); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java index e78deeec7..f24450a45 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java @@ -113,7 +113,7 @@ public class UsernamesManager { final String usernameMapKey = getUsernameMapKey(username); try (Timer.Context ignored = redisSetTimer.time()) { - cacheCluster.useWriteCluster(connection -> { + cacheCluster.useCluster(connection -> { final RedisAdvancedClusterCommands commands = connection.sync(); final Optional maybeOldUsername = Optional.ofNullable(commands.get(uuidMapKey)); @@ -130,7 +130,7 @@ public class UsernamesManager { private Optional redisGet(String username) { try (Timer.Context ignored = redisUsernameGetTimer.time()) { - final String result = cacheCluster.withReadCluster(connection -> connection.sync().get(getUsernameMapKey(username))); + final String result = cacheCluster.withCluster(connection -> connection.sync().get(getUsernameMapKey(username))); if (result == null) return Optional.empty(); else return Optional.of(UUID.fromString(result)); @@ -142,7 +142,7 @@ public class UsernamesManager { private Optional redisGet(UUID uuid) { try (Timer.Context ignored = redisUuidGetTimer.time()) { - final String result = cacheCluster.withReadCluster(connection -> connection.sync().get(getUuidMapKey(uuid))); + final String result = cacheCluster.withCluster(connection -> connection.sync().get(getUuidMapKey(uuid))); return Optional.ofNullable(result); } catch (RedisException e) { @@ -153,7 +153,7 @@ public class UsernamesManager { private void redisDelete(UUID uuid) { try (Timer.Context ignored = redisUuidGetTimer.time()) { - cacheCluster.useWriteCluster(connection -> { + cacheCluster.useCluster(connection -> { final RedisAdvancedClusterCommands commands = connection.sync(); commands.del(getUuidMapKey(uuid)); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearMessagesCacheClusterCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearMessagesCacheClusterCommand.java index 895949d04..ad7a994db 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearMessagesCacheClusterCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearMessagesCacheClusterCommand.java @@ -15,6 +15,6 @@ public class ClearMessagesCacheClusterCommand extends ConfiguredCommand bootstrap, final Namespace namespace, final WhisperServerConfiguration config) { final FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration().getUrls(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getTimeout(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getCircuitBreakerConfiguration()); - messagesCacheCluster.useWriteCluster(connection -> connection.sync().masters().commands().flushallAsync()); + messagesCacheCluster.useCluster(connection -> connection.sync().masters().commands().flushallAsync()); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java index c0c224fbb..6e9f06175 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java @@ -30,7 +30,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { public void setUp() throws Exception { super.setUp(); - getRedisCluster().useWriteCluster(connection -> { + getRedisCluster().useCluster(connection -> { connection.sync().flushall(); connection.sync().masters().commands().configSet("notify-keyspace-events", "K$z"); }); @@ -93,7 +93,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { } }); - getRedisCluster().useWriteCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), + getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), UUID.randomUUID().toString())); synchronized (displaced) { @@ -125,7 +125,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.getResources().eventBus().publish(new ClusterTopologyChangedEvent(List.of(), List.of()))); - getRedisCluster().useWriteCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), + getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), UUID.randomUUID().toString())); synchronized (displaced) { @@ -149,7 +149,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { assertTrue(clientPresenceManager.clearPresence(accountUuid, deviceId)); clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); - getRedisCluster().useWriteCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), + getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), UUID.randomUUID().toString())); assertFalse(clientPresenceManager.clearPresence(accountUuid, deviceId)); @@ -160,7 +160,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { final String presentPeerId = UUID.randomUUID().toString(); final String missingPeerId = UUID.randomUUID().toString(); - getRedisCluster().useWriteCluster(connection -> { + getRedisCluster().useCluster(connection -> { connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId); connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId); }); @@ -173,17 +173,17 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().masters().commands().subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId))); clientPresenceManager.pruneMissingPeers(); - assertEquals(1, (long)getRedisCluster().withWriteCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId)))); - assertTrue(getRedisCluster().withReadCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId))); + assertEquals(1, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId)))); + assertTrue(getRedisCluster().withCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId))); - assertEquals(0, (long)getRedisCluster().withReadCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(missingPeerId)))); - assertFalse(getRedisCluster().withReadCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId))); + assertEquals(0, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(missingPeerId)))); + assertFalse(getRedisCluster().withCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId))); } private void addClientPresence(final String managerId) { final String clientPresenceKey = ClientPresenceManager.getPresenceKey(UUID.randomUUID(), 7); - getRedisCluster().useWriteCluster(connection -> { + getRedisCluster().useCluster(connection -> { connection.sync().set(clientPresenceKey, managerId); connection.sync().sadd(ClientPresenceManager.getConnectedClientSetKey(managerId), clientPresenceKey); }); @@ -206,7 +206,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { final long displacedAccountDeviceId = 7; clientPresenceManager.setPresent(displacedAccountUuid, displacedAccountDeviceId, NO_OP); - getRedisCluster().useWriteCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(displacedAccountUuid, displacedAccountDeviceId), + getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(displacedAccountUuid, displacedAccountDeviceId), UUID.randomUUID().toString())); clientPresenceManager.stop(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java index 3f1a9fd59..2ec914989 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java @@ -58,7 +58,7 @@ public abstract class AbstractRedisClusterTest { redisCluster = new FaultTolerantRedisCluster("test-cluster", urls, Duration.ofSeconds(2), new CircuitBreakerConfiguration()); - redisCluster.useWriteCluster(connection -> { + redisCluster.useCluster(connection -> { boolean setAll = false; final String[] keys = new String[NODE_COUNT]; @@ -84,7 +84,7 @@ public abstract class AbstractRedisClusterTest { } }); - redisCluster.useWriteCluster(connection -> connection.sync().flushall()); + redisCluster.useCluster(connection -> connection.sync().flushall()); } protected FaultTolerantRedisCluster getRedisCluster() { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScriptTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScriptTest.java index 20f84c958..fee225c49 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScriptTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScriptTest.java @@ -15,7 +15,6 @@ import java.util.List; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -33,35 +32,35 @@ public class ClusterLuaScriptTest extends AbstractRedisClusterTest { final ClusterLuaScript script = new ClusterLuaScript(redisCluster, "return redis.call(\"SET\", KEYS[1], ARGV[1])", ScriptOutputType.VALUE); assertEquals("OK", script.execute(List.of(key), List.of(value))); - assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key))); + assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key))); final int slot = SlotHash.getSlot(key); - final int sourcePort = redisCluster.withWriteCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).node(0).getUri().getPort()); - final RedisCommands sourceCommands = redisCluster.withWriteCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).commands(0)); - final RedisCommands destinationCommands = redisCluster.withWriteCluster(connection -> connection.sync().nodes(node -> !node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).commands(0)); + final int sourcePort = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).node(0).getUri().getPort()); + final RedisCommands sourceCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).commands(0)); + final RedisCommands destinationCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> !node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).commands(0)); destinationCommands.clusterSetSlotImporting(slot, sourceCommands.clusterMyId()); assertEquals("OK", script.execute(List.of(key), List.of(value))); - assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key))); + assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key))); sourceCommands.clusterSetSlotMigrating(slot, destinationCommands.clusterMyId()); assertEquals("OK", script.execute(List.of(key), List.of(value))); - assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key))); + assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key))); for (final String migrateKey : sourceCommands.clusterGetKeysInSlot(slot, Integer.MAX_VALUE)) { destinationCommands.migrate("127.0.0.1", sourcePort, migrateKey, 0, 1000); } assertEquals("OK", script.execute(List.of(key), List.of(value))); - assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key))); + assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key))); destinationCommands.clusterSetSlotNode(slot, destinationCommands.clusterMyId()); assertEquals("OK", script.execute(List.of(key), List.of(value))); - assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key))); + assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key))); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java index 6905dd07f..6af06fcb3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java @@ -44,47 +44,17 @@ public class FaultTolerantRedisClusterTest { } @Test - public void testReadBreaker() { + public void testBreaker() { when(clusterCommands.get(anyString())) .thenReturn("value") .thenThrow(new RedisException("Badness has ensued.")); - assertEquals("value", faultTolerantCluster.withReadCluster(connection -> connection.sync().get("key"))); + assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key"))); assertThrows(RedisException.class, - () -> faultTolerantCluster.withReadCluster(connection -> connection.sync().get("OH NO"))); + () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO"))); assertThrows(CircuitBreakerOpenException.class, - () -> faultTolerantCluster.withReadCluster(connection -> connection.sync().get("OH NO"))); - } - - @Test - public void testReadsContinueWhileWriteBreakerOpen() { - when(clusterCommands.set(anyString(), anyString())).thenThrow(new RedisException("Badness has ensued.")); - - assertThrows(RedisException.class, - () -> faultTolerantCluster.useWriteCluster(connection -> connection.sync().set("OH", "NO"))); - - assertThrows(CircuitBreakerOpenException.class, - () -> faultTolerantCluster.useWriteCluster(connection -> connection.sync().set("OH", "NO"))); - - when(clusterCommands.get("key")).thenReturn("value"); - - assertEquals("value", faultTolerantCluster.withReadCluster(connection -> connection.sync().get("key"))); - } - - @Test - public void testWriteBreaker() { - when(clusterCommands.get(anyString())) - .thenReturn("value") - .thenThrow(new RedisException("Badness has ensued.")); - - assertEquals("value", faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("key"))); - - assertThrows(RedisException.class, - () -> faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("OH NO"))); - - assertThrows(CircuitBreakerOpenException.class, - () -> faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("OH NO"))); + () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO"))); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCacheTest.java index 9bc452d8f..4202fe521 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCacheTest.java @@ -31,7 +31,7 @@ public class RedisClusterMessagesCacheTest extends AbstractMessagesCacheTest { public void setUp() throws Exception { super.setUp(); - getRedisCluster().useWriteCluster(connection -> connection.sync().masters().commands().configSet("notify-keyspace-events", "K$gz")); + getRedisCluster().useCluster(connection -> connection.sync().masters().commands().configSet("notify-keyspace-events", "K$gz")); notificationExecutorService = Executors.newSingleThreadExecutor(); messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java index d1fbf5c33..f1ea4a813 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java @@ -19,7 +19,6 @@ package org.whispersystems.textsecuregcm.tests.storage; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.ActiveUserCounter; import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException; @@ -30,7 +29,6 @@ import com.google.common.collect.ImmutableList; import io.dropwizard.metrics.MetricsFactory; import org.junit.Before; import org.junit.Test; -import redis.clients.jedis.Jedis; import java.util.Arrays; import java.util.UUID; @@ -105,7 +103,7 @@ public class ActiveUserCounterTest { public void testCrawlStart() { activeUserCounter.onCrawlStart(); - verify(cacheCluster, times(1)).useWriteCluster(any()); + verify(cacheCluster, times(1)).useCluster(any()); verify(commands, times(1)).del(any(String.class)); verifyZeroInteractions(iosDevice); @@ -122,7 +120,7 @@ public class ActiveUserCounterTest { public void testCrawlEnd() { activeUserCounter.onCrawlEnd(Optional.empty()); - verify(cacheCluster, times(1)).withReadCluster(any()); + verify(cacheCluster, times(1)).withCluster(any()); verify(commands, times(1)).get(any(String.class)); verify(metricsFactory, times(1)).getReporters(); @@ -150,8 +148,8 @@ public class ActiveUserCounterTest { verify(iosDevice, times(1)).getApnId(); verify(iosDevice, times(0)).getGcmId(); - verify(cacheCluster, times(1)).withReadCluster(any()); - verify(cacheCluster, times(1)).useWriteCluster(any()); + verify(cacheCluster, times(1)).withCluster(any()); + verify(cacheCluster, times(1)).useCluster(any()); verify(commands, times(1)).get(any(String.class)); verify(commands, times(1)).set(any(String.class), eq("{\"fromUuid\":\""+UUID_IOS.toString()+"\",\"platforms\":{\"ios\":[1,1,1,1,1]},\"countries\":{\"1\":[1,1,1,1,1]}}")); @@ -174,8 +172,8 @@ public class ActiveUserCounterTest { verify(noDeviceAccount, times(1)).getMasterDevice(); - verify(cacheCluster, times(1)).withReadCluster(any()); - verify(cacheCluster, times(1)).useWriteCluster(any()); + verify(cacheCluster, times(1)).withCluster(any()); + verify(cacheCluster, times(1)).useCluster(any()); verify(commands, times(1)).get(eq(TALLY_KEY)); verify(commands, times(1)).set(any(String.class), eq("{\"fromUuid\":\""+UUID_NODEVICE+"\",\"platforms\":{},\"countries\":{}}")); @@ -210,8 +208,8 @@ public class ActiveUserCounterTest { verify(androidDevice, times(1)).getApnId(); verify(androidDevice, times(1)).getGcmId(); - verify(cacheCluster, times(1)).withReadCluster(any()); - verify(cacheCluster, times(1)).useWriteCluster(any()); + verify(cacheCluster, times(1)).withCluster(any()); + verify(cacheCluster, times(1)).useCluster(any()); verify(commands, times(1)).get(eq(TALLY_KEY)); verify(commands, times(1)).set(any(String.class), eq("{\"fromUuid\":\""+UUID_IOS+"\",\"platforms\":{\"android\":[0,0,0,1,1],\"ios\":[1,1,1,1,1]},\"countries\":{\"55\":[0,0,0,1,1],\"1\":[1,1,1,1,1]}}")); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/RedisClusterHelper.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/RedisClusterHelper.java index 418c711d2..e4a1a4d92 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/RedisClusterHelper.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/RedisClusterHelper.java @@ -28,41 +28,41 @@ public class RedisClusterHelper { when(stringConnection.sync()).thenReturn(stringCommands); when(binaryConnection.sync()).thenReturn(binaryCommands); - when(cluster.withReadCluster(any(Function.class))).thenAnswer(invocation -> { + when(cluster.withCluster(any(Function.class))).thenAnswer(invocation -> { return invocation.getArgument(0, Function.class).apply(stringConnection); }); doAnswer(invocation -> { invocation.getArgument(0, Consumer.class).accept(stringConnection); return null; - }).when(cluster).useReadCluster(any(Consumer.class)); + }).when(cluster).useCluster(any(Consumer.class)); - when(cluster.withWriteCluster(any(Function.class))).thenAnswer(invocation -> { + when(cluster.withCluster(any(Function.class))).thenAnswer(invocation -> { return invocation.getArgument(0, Function.class).apply(stringConnection); }); doAnswer(invocation -> { invocation.getArgument(0, Consumer.class).accept(stringConnection); return null; - }).when(cluster).useWriteCluster(any(Consumer.class)); + }).when(cluster).useCluster(any(Consumer.class)); - when(cluster.withBinaryReadCluster(any(Function.class))).thenAnswer(invocation -> { + when(cluster.withBinaryCluster(any(Function.class))).thenAnswer(invocation -> { return invocation.getArgument(0, Function.class).apply(binaryConnection); }); doAnswer(invocation -> { invocation.getArgument(0, Consumer.class).accept(binaryConnection); return null; - }).when(cluster).useBinaryReadCluster(any(Consumer.class)); + }).when(cluster).useBinaryCluster(any(Consumer.class)); - when(cluster.withBinaryWriteCluster(any(Function.class))).thenAnswer(invocation -> { + when(cluster.withBinaryCluster(any(Function.class))).thenAnswer(invocation -> { return invocation.getArgument(0, Function.class).apply(binaryConnection); }); doAnswer(invocation -> { invocation.getArgument(0, Consumer.class).accept(binaryConnection); return null; - }).when(cluster).useBinaryWriteCluster(any(Consumer.class)); + }).when(cluster).useBinaryCluster(any(Consumer.class)); return cluster; }