From 7454e55693dbf7e91ada82bde1728d744d0b5ebc Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 17 Jun 2020 11:51:47 -0400 Subject: [PATCH] Write synchronously to the cache cluster. --- .../textsecuregcm/limits/LockingRateLimiter.java | 4 ++-- .../textsecuregcm/limits/RateLimiter.java | 4 ++-- .../textsecuregcm/storage/AccountsManager.java | 7 ++++--- .../textsecuregcm/storage/ActiveUserCounter.java | 4 ++-- .../storage/PendingAccountsManager.java | 4 ++-- .../storage/PendingDevicesManager.java | 4 ++-- .../textsecuregcm/storage/ProfilesManager.java | 4 ++-- .../textsecuregcm/storage/UsernamesManager.java | 16 ++++++++-------- 8 files changed, 24 insertions(+), 23 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java index 30c0396b9..c16079790 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java @@ -47,7 +47,7 @@ public class LockingRateLimiter extends RateLimiter { final String lockName = getLockName(key); jedis.del(lockName); - cacheCluster.useWriteCluster(connection -> connection.async().del(lockName)); + cacheCluster.useWriteCluster(connection -> connection.sync().del(lockName)); } } @@ -59,7 +59,7 @@ public class LockingRateLimiter extends RateLimiter { if (acquiredLock) { // TODO Restore the NX flag when the cluster becomes the primary source of truth - cacheCluster.useWriteCluster(connection -> connection.async().set(lockName, "L", SetArgs.Builder.ex(10))); + cacheCluster.useWriteCluster(connection -> connection.sync().set(lockName, "L", SetArgs.Builder.ex(10))); } return acquiredLock; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java index cd6d74064..81780df7a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java @@ -96,7 +96,7 @@ public class RateLimiter { final String bucketName = getBucketName(key); jedis.del(bucketName); - cacheCluster.useWriteCluster(connection -> connection.async().del(bucketName)); + cacheCluster.useWriteCluster(connection -> connection.sync().del(bucketName)); } } @@ -107,7 +107,7 @@ public class RateLimiter { final int level = (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000); jedis.setex(bucketName, level, serialized); - cacheCluster.useWriteCluster(connection -> connection.async().setex(bucketName, level, serialized)); + cacheCluster.useWriteCluster(connection -> connection.sync().setex(bucketName, level, serialized)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index fae8f15af..37814fdef 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -23,6 +23,7 @@ import com.codahale.metrics.Timer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; +import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier; @@ -162,10 +163,10 @@ public class AccountsManager { jedis.set(accountEntityKey, accountJson); cacheCluster.useWriteCluster(connection -> { - RedisAdvancedClusterAsyncCommands asyncCommands = connection.async(); + final RedisAdvancedClusterCommands commands = connection.sync(); - asyncCommands.set(accountMapKey, account.getUuid().toString()); - asyncCommands.set(accountEntityKey, accountJson); + commands.set(accountMapKey, account.getUuid().toString()); + commands.set(accountEntityKey, accountJson); }); } catch (JsonProcessingException e) { throw new IllegalStateException(e); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java index 648912b2d..c2528dc8f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java @@ -66,7 +66,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener { public void onCrawlStart() { try (Jedis jedis = jedisPool.getWriteResource()) { jedis.del(TALLY_KEY); - cacheCluster.useWriteCluster(connection -> connection.async().del(TALLY_KEY)); + cacheCluster.useWriteCluster(connection -> connection.sync().del(TALLY_KEY)); } } @@ -186,7 +186,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener { final String tallyJson = mapper.writeValueAsString(activeUserTally); jedis.set(TALLY_KEY, tallyJson); - cacheCluster.useWriteCluster(connection -> connection.async().set(TALLY_KEY, tallyJson)); + cacheCluster.useWriteCluster(connection -> connection.sync().set(TALLY_KEY, tallyJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } catch (IOException e) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java index 7e357e31b..447d4056f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java @@ -78,7 +78,7 @@ public class PendingAccountsManager { final String verificationCodeJson = mapper.writeValueAsString(code); jedis.set(key, verificationCodeJson); - cacheCluster.useWriteCluster(connection -> connection.async().set(key, verificationCodeJson)); + cacheCluster.useWriteCluster(connection -> connection.sync().set(key, verificationCodeJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -104,7 +104,7 @@ public class PendingAccountsManager { final String key = CACHE_PREFIX + number; jedis.del(key); - cacheCluster.useWriteCluster(connection -> connection.async().del(key)); + cacheCluster.useWriteCluster(connection -> connection.sync().del(key)); } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java index 24c050b05..7346fe99b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java @@ -77,7 +77,7 @@ public class PendingDevicesManager { final String verificationCodeJson = mapper.writeValueAsString(code); jedis.set(key, verificationCodeJson); - cacheCluster.useWriteCluster(connection -> connection.async().set(key, verificationCodeJson)); + cacheCluster.useWriteCluster(connection -> connection.sync().set(key, verificationCodeJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -103,7 +103,7 @@ public class PendingDevicesManager { final String key = CACHE_PREFIX + number; jedis.del(key); - cacheCluster.useWriteCluster(connection -> connection.async().del(key)); + cacheCluster.useWriteCluster(connection -> connection.sync().del(key)); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java index 38551351a..2ca278dc2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java @@ -63,7 +63,7 @@ public class ProfilesManager { final String profileJson = mapper.writeValueAsString(profile); jedis.hset(key, profile.getVersion(), profileJson); - cacheCluster.useWriteCluster(connection -> connection.async().hset(key, profile.getVersion(), profileJson)); + cacheCluster.useWriteCluster(connection -> connection.sync().hset(key, profile.getVersion(), profileJson)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } @@ -92,7 +92,7 @@ public class ProfilesManager { final String key = CACHE_PREFIX + uuid.toString(); jedis.del(key); - cacheCluster.useWriteCluster(connection -> connection.async().del(key)); + cacheCluster.useWriteCluster(connection -> connection.sync().del(key)); } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java index 16490c3a7..b9a1c7d62 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/UsernamesManager.java @@ -3,7 +3,7 @@ package org.whispersystems.textsecuregcm.storage; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; -import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; +import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.experiment.Experiment; @@ -129,11 +129,11 @@ public class UsernamesManager { jedis.set(usernameMapKey, uuid.toString()); cacheCluster.useWriteCluster(connection -> { - final RedisAdvancedClusterAsyncCommands asyncCommands = connection.async(); + final RedisAdvancedClusterCommands commands = connection.sync(); - maybeOldUsername.ifPresent(oldUsername -> asyncCommands.del(getUsernameMapKey(oldUsername))); - asyncCommands.set(uuidMapKey, username); - asyncCommands.set(usernameMapKey, uuid.toString()); + maybeOldUsername.ifPresent(oldUsername -> commands.del(getUsernameMapKey(oldUsername))); + commands.set(uuidMapKey, username); + commands.set(usernameMapKey, uuid.toString()); }); } catch (JedisException e) { if (required) throw e; @@ -187,10 +187,10 @@ public class UsernamesManager { jedis.del(uuidMapKey); cacheCluster.useWriteCluster(connection -> { - final RedisAdvancedClusterAsyncCommands asyncCommands = connection.async(); + final RedisAdvancedClusterCommands commands = connection.sync(); - asyncCommands.del(usernameMapKey); - asyncCommands.del(uuidMapKey); + commands.del(usernameMapKey); + commands.del(uuidMapKey); }); }); }