Write synchronously to the cache cluster.
This commit is contained in:
parent
c745fe7778
commit
7454e55693
|
@ -47,7 +47,7 @@ public class LockingRateLimiter extends RateLimiter {
|
|||
final String lockName = getLockName(key);
|
||||
|
||||
jedis.del(lockName);
|
||||
cacheCluster.useWriteCluster(connection -> connection.async().del(lockName));
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(lockName));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -59,7 +59,7 @@ public class LockingRateLimiter extends RateLimiter {
|
|||
|
||||
if (acquiredLock) {
|
||||
// TODO Restore the NX flag when the cluster becomes the primary source of truth
|
||||
cacheCluster.useWriteCluster(connection -> connection.async().set(lockName, "L", SetArgs.Builder.ex(10)));
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().set(lockName, "L", SetArgs.Builder.ex(10)));
|
||||
}
|
||||
|
||||
return acquiredLock;
|
||||
|
|
|
@ -96,7 +96,7 @@ public class RateLimiter {
|
|||
final String bucketName = getBucketName(key);
|
||||
|
||||
jedis.del(bucketName);
|
||||
cacheCluster.useWriteCluster(connection -> connection.async().del(bucketName));
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(bucketName));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -107,7 +107,7 @@ public class RateLimiter {
|
|||
final int level = (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000);
|
||||
|
||||
jedis.setex(bucketName, level, serialized);
|
||||
cacheCluster.useWriteCluster(connection -> connection.async().setex(bucketName, level, serialized));
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().setex(bucketName, level, serialized));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.codahale.metrics.Timer;
|
|||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
|
||||
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
|
||||
|
@ -162,10 +163,10 @@ public class AccountsManager {
|
|||
jedis.set(accountEntityKey, accountJson);
|
||||
|
||||
cacheCluster.useWriteCluster(connection -> {
|
||||
RedisAdvancedClusterAsyncCommands<String, String> asyncCommands = connection.async();
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
asyncCommands.set(accountMapKey, account.getUuid().toString());
|
||||
asyncCommands.set(accountEntityKey, accountJson);
|
||||
commands.set(accountMapKey, account.getUuid().toString());
|
||||
commands.set(accountEntityKey, accountJson);
|
||||
});
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalStateException(e);
|
||||
|
|
|
@ -66,7 +66,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
|
|||
public void onCrawlStart() {
|
||||
try (Jedis jedis = jedisPool.getWriteResource()) {
|
||||
jedis.del(TALLY_KEY);
|
||||
cacheCluster.useWriteCluster(connection -> connection.async().del(TALLY_KEY));
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(TALLY_KEY));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -186,7 +186,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
|
|||
final String tallyJson = mapper.writeValueAsString(activeUserTally);
|
||||
|
||||
jedis.set(TALLY_KEY, tallyJson);
|
||||
cacheCluster.useWriteCluster(connection -> connection.async().set(TALLY_KEY, tallyJson));
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().set(TALLY_KEY, tallyJson));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -78,7 +78,7 @@ public class PendingAccountsManager {
|
|||
final String verificationCodeJson = mapper.writeValueAsString(code);
|
||||
|
||||
jedis.set(key, verificationCodeJson);
|
||||
cacheCluster.useWriteCluster(connection -> connection.async().set(key, verificationCodeJson));
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().set(key, verificationCodeJson));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
@ -104,7 +104,7 @@ public class PendingAccountsManager {
|
|||
final String key = CACHE_PREFIX + number;
|
||||
|
||||
jedis.del(key);
|
||||
cacheCluster.useWriteCluster(connection -> connection.async().del(key));
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public class PendingDevicesManager {
|
|||
final String verificationCodeJson = mapper.writeValueAsString(code);
|
||||
|
||||
jedis.set(key, verificationCodeJson);
|
||||
cacheCluster.useWriteCluster(connection -> connection.async().set(key, verificationCodeJson));
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().set(key, verificationCodeJson));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
@ -103,7 +103,7 @@ public class PendingDevicesManager {
|
|||
final String key = CACHE_PREFIX + number;
|
||||
|
||||
jedis.del(key);
|
||||
cacheCluster.useWriteCluster(connection -> connection.async().del(key));
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(key));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -63,7 +63,7 @@ public class ProfilesManager {
|
|||
final String profileJson = mapper.writeValueAsString(profile);
|
||||
|
||||
jedis.hset(key, profile.getVersion(), profileJson);
|
||||
cacheCluster.useWriteCluster(connection -> connection.async().hset(key, profile.getVersion(), profileJson));
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().hset(key, profile.getVersion(), profileJson));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ public class ProfilesManager {
|
|||
final String key = CACHE_PREFIX + uuid.toString();
|
||||
|
||||
jedis.del(key);
|
||||
cacheCluster.useWriteCluster(connection -> connection.async().del(key));
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ package org.whispersystems.textsecuregcm.storage;
|
|||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
|
||||
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.experiment.Experiment;
|
||||
|
@ -129,11 +129,11 @@ public class UsernamesManager {
|
|||
jedis.set(usernameMapKey, uuid.toString());
|
||||
|
||||
cacheCluster.useWriteCluster(connection -> {
|
||||
final RedisAdvancedClusterAsyncCommands<String, String> asyncCommands = connection.async();
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
maybeOldUsername.ifPresent(oldUsername -> asyncCommands.del(getUsernameMapKey(oldUsername)));
|
||||
asyncCommands.set(uuidMapKey, username);
|
||||
asyncCommands.set(usernameMapKey, uuid.toString());
|
||||
maybeOldUsername.ifPresent(oldUsername -> commands.del(getUsernameMapKey(oldUsername)));
|
||||
commands.set(uuidMapKey, username);
|
||||
commands.set(usernameMapKey, uuid.toString());
|
||||
});
|
||||
} catch (JedisException e) {
|
||||
if (required) throw e;
|
||||
|
@ -187,10 +187,10 @@ public class UsernamesManager {
|
|||
jedis.del(uuidMapKey);
|
||||
|
||||
cacheCluster.useWriteCluster(connection -> {
|
||||
final RedisAdvancedClusterAsyncCommands<String, String> asyncCommands = connection.async();
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
asyncCommands.del(usernameMapKey);
|
||||
asyncCommands.del(uuidMapKey);
|
||||
commands.del(usernameMapKey);
|
||||
commands.del(uuidMapKey);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue