Combine the read/write breakers for Redis clusters.
This commit is contained in:
parent
ae0f8df11b
commit
5717dc294e
|
@ -41,11 +41,11 @@ public class LockingRateLimiter extends RateLimiter {
|
|||
}
|
||||
|
||||
private void releaseLock(String key) {
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(getLockName(key)));
|
||||
cacheCluster.useCluster(connection -> connection.sync().del(getLockName(key)));
|
||||
}
|
||||
|
||||
private boolean acquireLock(String key) {
|
||||
return cacheCluster.withWriteCluster(connection -> connection.sync().set(getLockName(key), "L", SetArgs.Builder.nx().ex(10))) != null;
|
||||
return cacheCluster.withCluster(connection -> connection.sync().set(getLockName(key), "L", SetArgs.Builder.nx().ex(10))) != null;
|
||||
}
|
||||
|
||||
private String getLockName(String key) {
|
||||
|
|
|
@ -85,14 +85,14 @@ public class RateLimiter {
|
|||
}
|
||||
|
||||
public void clear(String key) {
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(getBucketName(key)));
|
||||
cacheCluster.useCluster(connection -> connection.sync().del(getBucketName(key)));
|
||||
}
|
||||
|
||||
private void setBucket(String key, LeakyBucket bucket) {
|
||||
try {
|
||||
final String serialized = bucket.serialize(mapper);
|
||||
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized));
|
||||
cacheCluster.useCluster(connection -> connection.sync().setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
@ -100,7 +100,7 @@ public class RateLimiter {
|
|||
|
||||
private LeakyBucket getBucket(String key) {
|
||||
try {
|
||||
final String serialized = cacheCluster.withReadCluster(connection -> connection.sync().get(getBucketName(key)));
|
||||
final String serialized = cacheCluster.withCluster(connection -> connection.sync().get(getBucketName(key)));
|
||||
|
||||
if (serialized != null) {
|
||||
return LeakyBucket.fromSerialized(mapper, serialized);
|
||||
|
|
|
@ -37,7 +37,7 @@ public class PushLatencyManager {
|
|||
|
||||
@VisibleForTesting
|
||||
void recordPushSent(final UUID accountUuid, final long deviceId, final long currentTime) {
|
||||
redisCluster.useWriteCluster(connection ->
|
||||
redisCluster.useCluster(connection ->
|
||||
connection.async().set(getFirstUnacknowledgedPushKey(accountUuid, deviceId), String.valueOf(currentTime), SetArgs.Builder.nx().ex(TTL)));
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ public class PushLatencyManager {
|
|||
CompletableFuture<Long> getLatencyAndClearTimestamp(final UUID accountUuid, final long deviceId, final long currentTimeMillis) {
|
||||
final String key = getFirstUnacknowledgedPushKey(accountUuid, deviceId);
|
||||
|
||||
return redisCluster.withWriteCluster(connection -> {
|
||||
return redisCluster.withCluster(connection -> {
|
||||
final RedisAdvancedClusterAsyncCommands<String, String> commands = connection.async();
|
||||
|
||||
final CompletableFuture<String> getFuture = commands.get(key).toCompletableFuture();
|
||||
|
|
|
@ -15,7 +15,7 @@ public class RedisClusterHealthCheck extends HealthCheck {
|
|||
|
||||
@Override
|
||||
protected Result check() throws Exception {
|
||||
return CompletableFuture.allOf(redisCluster.withReadCluster(connection -> connection.async().masters().commands().ping()).futures())
|
||||
return CompletableFuture.allOf(redisCluster.withCluster(connection -> connection.async().masters().commands().ping()).futures())
|
||||
.thenApply(v -> Result.healthy())
|
||||
.exceptionally(Result::unhealthy)
|
||||
.get();
|
||||
|
|
|
@ -105,7 +105,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
|||
connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)).commands().subscribe(presenceChannel);
|
||||
});
|
||||
|
||||
presenceCluster.useWriteCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId));
|
||||
presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId));
|
||||
|
||||
pruneMissingPeersFuture = scheduledExecutorService.scheduleAtFixedRate(this::pruneMissingPeers, new Random().nextInt(PRUNE_PEERS_INTERVAL_SECONDS), PRUNE_PEERS_INTERVAL_SECONDS, TimeUnit.SECONDS);
|
||||
}
|
||||
|
@ -122,7 +122,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
|||
clearPresence(presenceKey);
|
||||
}
|
||||
|
||||
presenceCluster.useWriteCluster(connection -> {
|
||||
presenceCluster.useCluster(connection -> {
|
||||
connection.sync().srem(MANAGER_SET_KEY, managerId);
|
||||
connection.sync().del(getConnectedClientSetKey(managerId));
|
||||
});
|
||||
|
@ -138,7 +138,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
|||
|
||||
displacementListenersByPresenceKey.put(presenceKey, displacementListener);
|
||||
|
||||
presenceCluster.useWriteCluster(connection -> {
|
||||
presenceCluster.useCluster(connection -> {
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
commands.set(presenceKey, managerId);
|
||||
|
@ -161,7 +161,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
|||
|
||||
public boolean isPresent(final UUID accountUuid, final long deviceId) {
|
||||
try (final Timer.Context ignored = checkPresenceTimer.time()) {
|
||||
return presenceCluster.withReadCluster(connection -> connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1;
|
||||
return presenceCluster.withCluster(connection -> connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -175,7 +175,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
|||
unsubscribeFromRemotePresenceChanges(presenceKey);
|
||||
|
||||
final boolean removed = clearPresenceScript.execute(List.of(presenceKey), List.of(managerId)) != null;
|
||||
presenceCluster.useWriteCluster(connection -> connection.sync().srem(connectedClientSetKey, presenceKey));
|
||||
presenceCluster.useCluster(connection -> connection.sync().srem(connectedClientSetKey, presenceKey));
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
@ -201,18 +201,18 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
|
|||
|
||||
void pruneMissingPeers() {
|
||||
try (final Timer.Context ignored = prunePeersTimer.time()) {
|
||||
final Set<String> peerIds = presenceCluster.withReadCluster(connection -> connection.sync().smembers(MANAGER_SET_KEY));
|
||||
final Set<String> peerIds = presenceCluster.withCluster(connection -> connection.sync().smembers(MANAGER_SET_KEY));
|
||||
peerIds.remove(managerId);
|
||||
|
||||
for (final String peerId : peerIds) {
|
||||
final boolean peerMissing = presenceCluster.withWriteCluster(connection -> connection.sync().publish(getManagerPresenceChannel(peerId), "ping") == 0);
|
||||
final boolean peerMissing = presenceCluster.withCluster(connection -> connection.sync().publish(getManagerPresenceChannel(peerId), "ping") == 0);
|
||||
|
||||
if (peerMissing) {
|
||||
log.debug("Presence manager {} did not respond to ping", peerId);
|
||||
|
||||
final String connectedClientsKey = getConnectedClientSetKey(peerId);
|
||||
|
||||
presenceCluster.useWriteCluster(connection -> {
|
||||
presenceCluster.useCluster(connection -> {
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
String presenceKey;
|
||||
|
|
|
@ -48,11 +48,11 @@ public class ClusterLuaScript {
|
|||
this.redisCluster = redisCluster;
|
||||
this.scriptOutputType = scriptOutputType;
|
||||
this.script = script;
|
||||
this.sha = redisCluster.withWriteCluster(connection -> connection.sync().scriptLoad(script));
|
||||
this.sha = redisCluster.withCluster(connection -> connection.sync().scriptLoad(script));
|
||||
}
|
||||
|
||||
public Object execute(final List<String> keys, final List<String> args) {
|
||||
return redisCluster.withWriteCluster(connection -> {
|
||||
return redisCluster.withCluster(connection -> {
|
||||
try {
|
||||
final RedisAdvancedClusterCommands<String, String> clusterCommands = connection.sync();
|
||||
|
||||
|
@ -70,7 +70,7 @@ public class ClusterLuaScript {
|
|||
}
|
||||
|
||||
public Object executeBinary(final List<byte[]> keys, final List<byte[]> args) {
|
||||
return redisCluster.withBinaryWriteCluster(connection -> {
|
||||
return redisCluster.withBinaryCluster(connection -> {
|
||||
try {
|
||||
final RedisAdvancedClusterCommands<byte[], byte[]> binaryCommands = connection.sync();
|
||||
|
||||
|
|
|
@ -36,8 +36,7 @@ public class FaultTolerantRedisCluster {
|
|||
private final List<StatefulRedisClusterPubSubConnection<?, ?>> pubSubConnections = new ArrayList<>();
|
||||
|
||||
private final CircuitBreakerConfiguration circuitBreakerConfiguration;
|
||||
private final CircuitBreaker readCircuitBreaker;
|
||||
private final CircuitBreaker writeCircuitBreaker;
|
||||
private final CircuitBreaker circuitBreaker;
|
||||
|
||||
public FaultTolerantRedisCluster(final String name, final List<String> urls, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) {
|
||||
this(name, RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())), timeout, circuitBreakerConfiguration);
|
||||
|
@ -54,15 +53,10 @@ public class FaultTolerantRedisCluster {
|
|||
this.binaryClusterConnection = clusterClient.connect(ByteArrayCodec.INSTANCE);
|
||||
|
||||
this.circuitBreakerConfiguration = circuitBreakerConfiguration;
|
||||
this.readCircuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig());
|
||||
this.writeCircuitBreaker = CircuitBreaker.of(name + "-write", circuitBreakerConfiguration.toCircuitBreakerConfig());
|
||||
this.circuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig());
|
||||
|
||||
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME),
|
||||
readCircuitBreaker,
|
||||
FaultTolerantRedisCluster.class);
|
||||
|
||||
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME),
|
||||
writeCircuitBreaker,
|
||||
circuitBreaker,
|
||||
FaultTolerantRedisCluster.class);
|
||||
}
|
||||
|
||||
|
@ -77,36 +71,20 @@ public class FaultTolerantRedisCluster {
|
|||
clusterClient.shutdown();
|
||||
}
|
||||
|
||||
public void useReadCluster(final Consumer<StatefulRedisClusterConnection<String, String>> consumer) {
|
||||
this.readCircuitBreaker.executeRunnable(() -> consumer.accept(stringClusterConnection));
|
||||
public void useCluster(final Consumer<StatefulRedisClusterConnection<String, String>> consumer) {
|
||||
this.circuitBreaker.executeRunnable(() -> consumer.accept(stringClusterConnection));
|
||||
}
|
||||
|
||||
public <T> T withReadCluster(final Function<StatefulRedisClusterConnection<String, String>, T> consumer) {
|
||||
return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(stringClusterConnection));
|
||||
public <T> T withCluster(final Function<StatefulRedisClusterConnection<String, String>, T> consumer) {
|
||||
return this.circuitBreaker.executeSupplier(() -> consumer.apply(stringClusterConnection));
|
||||
}
|
||||
|
||||
public void useWriteCluster(final Consumer<StatefulRedisClusterConnection<String, String>> consumer) {
|
||||
this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(stringClusterConnection));
|
||||
public void useBinaryCluster(final Consumer<StatefulRedisClusterConnection<byte[], byte[]>> consumer) {
|
||||
this.circuitBreaker.executeRunnable(() -> consumer.accept(binaryClusterConnection));
|
||||
}
|
||||
|
||||
public <T> T withWriteCluster(final Function<StatefulRedisClusterConnection<String, String>, T> consumer) {
|
||||
return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(stringClusterConnection));
|
||||
}
|
||||
|
||||
public void useBinaryReadCluster(final Consumer<StatefulRedisClusterConnection<byte[], byte[]>> consumer) {
|
||||
this.readCircuitBreaker.executeRunnable(() -> consumer.accept(binaryClusterConnection));
|
||||
}
|
||||
|
||||
public <T> T withBinaryReadCluster(final Function<StatefulRedisClusterConnection<byte[], byte[]>, T> consumer) {
|
||||
return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(binaryClusterConnection));
|
||||
}
|
||||
|
||||
public void useBinaryWriteCluster(final Consumer<StatefulRedisClusterConnection<byte[], byte[]>> consumer) {
|
||||
this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(binaryClusterConnection));
|
||||
}
|
||||
|
||||
public <T> T withBinaryWriteCluster(final Function<StatefulRedisClusterConnection<byte[], byte[]>, T> consumer) {
|
||||
return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(binaryClusterConnection));
|
||||
public <T> T withBinaryCluster(final Function<StatefulRedisClusterConnection<byte[], byte[]>, T> consumer) {
|
||||
return this.circuitBreaker.executeSupplier(() -> consumer.apply(binaryClusterConnection));
|
||||
}
|
||||
|
||||
public FaultTolerantPubSubConnection<String, String> createPubSubConnection() {
|
||||
|
|
|
@ -44,15 +44,15 @@ public class AccountDatabaseCrawlerCache {
|
|||
}
|
||||
|
||||
public void clearAccelerate() {
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(ACCELERATE_KEY));
|
||||
cacheCluster.useCluster(connection -> connection.sync().del(ACCELERATE_KEY));
|
||||
}
|
||||
|
||||
public boolean isAccelerated() {
|
||||
return "1".equals(cacheCluster.withReadCluster(connection -> connection.sync().get(ACCELERATE_KEY)));
|
||||
return "1".equals(cacheCluster.withCluster(connection -> connection.sync().get(ACCELERATE_KEY)));
|
||||
}
|
||||
|
||||
public boolean claimActiveWork(String workerId, long ttlMs) {
|
||||
return "OK".equals(cacheCluster.withWriteCluster(connection -> connection.sync().set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttlMs))));
|
||||
return "OK".equals(cacheCluster.withCluster(connection -> connection.sync().set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttlMs))));
|
||||
}
|
||||
|
||||
public void releaseActiveWork(String workerId) {
|
||||
|
@ -60,7 +60,7 @@ public class AccountDatabaseCrawlerCache {
|
|||
}
|
||||
|
||||
public Optional<UUID> getLastUuid() {
|
||||
final String lastUuidString = cacheCluster.withWriteCluster(connection -> connection.sync().get(LAST_UUID_KEY));
|
||||
final String lastUuidString = cacheCluster.withCluster(connection -> connection.sync().get(LAST_UUID_KEY));
|
||||
|
||||
if (lastUuidString == null) return Optional.empty();
|
||||
else return Optional.of(UUID.fromString(lastUuidString));
|
||||
|
@ -68,9 +68,9 @@ public class AccountDatabaseCrawlerCache {
|
|||
|
||||
public void setLastUuid(Optional<UUID> lastUuid) {
|
||||
if (lastUuid.isPresent()) {
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
|
||||
cacheCluster.useCluster(connection -> connection.sync().psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
|
||||
} else {
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(LAST_UUID_KEY));
|
||||
cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_KEY));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -147,7 +147,7 @@ public class AccountsManager {
|
|||
try (Timer.Context ignored = redisSetTimer.time()) {
|
||||
final String accountJson = mapper.writeValueAsString(account);
|
||||
|
||||
cacheCluster.useWriteCluster(connection -> {
|
||||
cacheCluster.useCluster(connection -> {
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
commands.set(getAccountMapKey(account.getNumber()), account.getUuid().toString());
|
||||
|
@ -160,7 +160,7 @@ public class AccountsManager {
|
|||
|
||||
private Optional<Account> redisGet(String number) {
|
||||
try (Timer.Context ignored = redisNumberGetTimer.time()) {
|
||||
final String uuid = cacheCluster.withReadCluster(connection -> connection.sync().get(getAccountMapKey(number)));
|
||||
final String uuid = cacheCluster.withCluster(connection -> connection.sync().get(getAccountMapKey(number)));
|
||||
|
||||
if (uuid != null) return redisGet(UUID.fromString(uuid));
|
||||
else return Optional.empty();
|
||||
|
@ -175,7 +175,7 @@ public class AccountsManager {
|
|||
|
||||
private Optional<Account> redisGet(UUID uuid) {
|
||||
try (Timer.Context ignored = redisUuidGetTimer.time()) {
|
||||
final String json = cacheCluster.withReadCluster(connection -> connection.sync().get(getAccountEntityKey(uuid)));
|
||||
final String json = cacheCluster.withCluster(connection -> connection.sync().get(getAccountEntityKey(uuid)));
|
||||
|
||||
if (json != null) {
|
||||
Account account = mapper.readValue(json, Account.class);
|
||||
|
|
|
@ -56,7 +56,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
|
|||
|
||||
@Override
|
||||
public void onCrawlStart() {
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(TALLY_KEY));
|
||||
cacheCluster.useCluster(connection -> connection.sync().del(TALLY_KEY));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -153,7 +153,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
|
|||
|
||||
private void incrementTallies(UUID fromUuid, Map<String, long[]> platformIncrements, Map<String, long[]> countryIncrements) {
|
||||
try {
|
||||
final String tallyValue = cacheCluster.withReadCluster(connection -> connection.sync().get(TALLY_KEY));
|
||||
final String tallyValue = cacheCluster.withCluster(connection -> connection.sync().get(TALLY_KEY));
|
||||
|
||||
ActiveUserTally activeUserTally;
|
||||
|
||||
|
@ -173,7 +173,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
|
|||
|
||||
final String tallyJson = mapper.writeValueAsString(activeUserTally);
|
||||
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().set(TALLY_KEY, tallyJson));
|
||||
cacheCluster.useCluster(connection -> connection.sync().set(TALLY_KEY, tallyJson));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
@ -194,7 +194,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
|
|||
|
||||
private ActiveUserTally getFinalTallies() {
|
||||
try {
|
||||
final String tallyJson = cacheCluster.withReadCluster(connection -> connection.sync().get(TALLY_KEY));
|
||||
final String tallyJson = cacheCluster.withCluster(connection -> connection.sync().get(TALLY_KEY));
|
||||
|
||||
return mapper.readValue(tallyJson, ActiveUserTally.class);
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -69,7 +69,7 @@ public class PendingAccountsManager {
|
|||
try {
|
||||
final String verificationCodeJson = mapper.writeValueAsString(code);
|
||||
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().set(CACHE_PREFIX + number, verificationCodeJson));
|
||||
cacheCluster.useCluster(connection -> connection.sync().set(CACHE_PREFIX + number, verificationCodeJson));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ public class PendingAccountsManager {
|
|||
|
||||
private Optional<StoredVerificationCode> memcacheGet(String number) {
|
||||
try {
|
||||
final String json = cacheCluster.withReadCluster(connection -> connection.sync().get(CACHE_PREFIX + number));
|
||||
final String json = cacheCluster.withCluster(connection -> connection.sync().get(CACHE_PREFIX + number));
|
||||
|
||||
if (json == null) return Optional.empty();
|
||||
else return Optional.of(mapper.readValue(json, StoredVerificationCode.class));
|
||||
|
@ -88,6 +88,6 @@ public class PendingAccountsManager {
|
|||
}
|
||||
|
||||
private void memcacheDelete(String number) {
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(CACHE_PREFIX + number));
|
||||
cacheCluster.useCluster(connection -> connection.sync().del(CACHE_PREFIX + number));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class PendingDevicesManager {
|
|||
try {
|
||||
final String verificationCodeJson = mapper.writeValueAsString(code);
|
||||
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().set(CACHE_PREFIX + number, verificationCodeJson));
|
||||
cacheCluster.useCluster(connection -> connection.sync().set(CACHE_PREFIX + number, verificationCodeJson));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ public class PendingDevicesManager {
|
|||
|
||||
private Optional<StoredVerificationCode> memcacheGet(String number) {
|
||||
try {
|
||||
final String json = cacheCluster.withReadCluster(connection -> connection.sync().get(CACHE_PREFIX + number));
|
||||
final String json = cacheCluster.withCluster(connection -> connection.sync().get(CACHE_PREFIX + number));
|
||||
|
||||
if (json == null) return Optional.empty();
|
||||
else return Optional.of(mapper.readValue(json, StoredVerificationCode.class));
|
||||
|
@ -87,7 +87,7 @@ public class PendingDevicesManager {
|
|||
}
|
||||
|
||||
private void memcacheDelete(String number) {
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(CACHE_PREFIX + number));
|
||||
cacheCluster.useCluster(connection -> connection.sync().del(CACHE_PREFIX + number));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ public class ProfilesManager {
|
|||
try {
|
||||
final String profileJson = mapper.writeValueAsString(profile);
|
||||
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().hset(CACHE_PREFIX + uuid.toString(), profile.getVersion(), profileJson));
|
||||
cacheCluster.useCluster(connection -> connection.sync().hset(CACHE_PREFIX + uuid.toString(), profile.getVersion(), profileJson));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
@ -61,7 +61,7 @@ public class ProfilesManager {
|
|||
|
||||
private Optional<VersionedProfile> memcacheGet(UUID uuid, String version) {
|
||||
try {
|
||||
final String json = cacheCluster.withReadCluster(connection -> connection.sync().hget(CACHE_PREFIX + uuid.toString(), version));
|
||||
final String json = cacheCluster.withCluster(connection -> connection.sync().hget(CACHE_PREFIX + uuid.toString(), version));
|
||||
|
||||
if (json == null) return Optional.empty();
|
||||
else return Optional.of(mapper.readValue(json, VersionedProfile.class));
|
||||
|
@ -75,6 +75,6 @@ public class ProfilesManager {
|
|||
}
|
||||
|
||||
private void memcacheDelete(UUID uuid) {
|
||||
cacheCluster.useWriteCluster(connection -> connection.sync().del(CACHE_PREFIX + uuid.toString()));
|
||||
cacheCluster.useCluster(connection -> connection.sync().del(CACHE_PREFIX + uuid.toString()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -269,7 +269,7 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
|
|||
}
|
||||
|
||||
int getNextSlotToPersist() {
|
||||
return (int)(redisCluster.withWriteCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT);
|
||||
return (int)(redisCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT);
|
||||
}
|
||||
|
||||
List<String> getQueuesToPersist(final int slot, final Instant maxTime, final int limit) {
|
||||
|
@ -280,11 +280,11 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
|
|||
}
|
||||
|
||||
void lockQueueForPersistence(final String queue) {
|
||||
redisCluster.useBinaryWriteCluster(connection -> connection.sync().setex(getPersistInProgressKey(queue), 30, LOCK_VALUE));
|
||||
redisCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(queue), 30, LOCK_VALUE));
|
||||
}
|
||||
|
||||
void unlockQueueForPersistence(final String queue) {
|
||||
redisCluster.useBinaryWriteCluster(connection -> connection.sync().del(getPersistInProgressKey(queue)));
|
||||
redisCluster.useBinaryCluster(connection -> connection.sync().del(getPersistInProgressKey(queue)));
|
||||
}
|
||||
|
||||
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
||||
|
|
|
@ -36,7 +36,7 @@ public class RegistrationLockVersionCounter extends AccountDatabaseCrawlerListen
|
|||
|
||||
@Override
|
||||
public void onCrawlStart() {
|
||||
redisCluster.useWriteCluster(connection -> connection.sync().hset(REGLOCK_COUNT_KEY, Map.of(PIN_KEY, "0", REGLOCK_KEY, "0")));
|
||||
redisCluster.useCluster(connection -> connection.sync().hset(REGLOCK_COUNT_KEY, Map.of(PIN_KEY, "0", REGLOCK_KEY, "0")));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -60,7 +60,7 @@ public class RegistrationLockVersionCounter extends AccountDatabaseCrawlerListen
|
|||
}
|
||||
|
||||
private void incrementReglockCounts(final int pinCount, final int reglockCount) {
|
||||
redisCluster.useWriteCluster(connection -> {
|
||||
redisCluster.useCluster(connection -> {
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
commands.hincrby(REGLOCK_COUNT_KEY, PIN_KEY, pinCount);
|
||||
|
@ -71,7 +71,7 @@ public class RegistrationLockVersionCounter extends AccountDatabaseCrawlerListen
|
|||
@Override
|
||||
public void onCrawlEnd(final Optional<UUID> fromUuid) {
|
||||
final Map<String, Integer> countsByReglockType =
|
||||
redisCluster.withReadCluster(connection -> connection.sync().hmget(REGLOCK_COUNT_KEY, PIN_KEY, REGLOCK_KEY))
|
||||
redisCluster.withCluster(connection -> connection.sync().hmget(REGLOCK_COUNT_KEY, PIN_KEY, REGLOCK_KEY))
|
||||
.stream()
|
||||
.collect(Collectors.toMap(KeyValue::getKey, keyValue -> keyValue.hasValue() ? keyValue.map(Integer::parseInt).getValue() : 0));
|
||||
|
||||
|
|
|
@ -113,7 +113,7 @@ public class UsernamesManager {
|
|||
final String usernameMapKey = getUsernameMapKey(username);
|
||||
|
||||
try (Timer.Context ignored = redisSetTimer.time()) {
|
||||
cacheCluster.useWriteCluster(connection -> {
|
||||
cacheCluster.useCluster(connection -> {
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
final Optional<String> maybeOldUsername = Optional.ofNullable(commands.get(uuidMapKey));
|
||||
|
@ -130,7 +130,7 @@ public class UsernamesManager {
|
|||
|
||||
private Optional<UUID> redisGet(String username) {
|
||||
try (Timer.Context ignored = redisUsernameGetTimer.time()) {
|
||||
final String result = cacheCluster.withReadCluster(connection -> connection.sync().get(getUsernameMapKey(username)));
|
||||
final String result = cacheCluster.withCluster(connection -> connection.sync().get(getUsernameMapKey(username)));
|
||||
|
||||
if (result == null) return Optional.empty();
|
||||
else return Optional.of(UUID.fromString(result));
|
||||
|
@ -142,7 +142,7 @@ public class UsernamesManager {
|
|||
|
||||
private Optional<String> redisGet(UUID uuid) {
|
||||
try (Timer.Context ignored = redisUuidGetTimer.time()) {
|
||||
final String result = cacheCluster.withReadCluster(connection -> connection.sync().get(getUuidMapKey(uuid)));
|
||||
final String result = cacheCluster.withCluster(connection -> connection.sync().get(getUuidMapKey(uuid)));
|
||||
|
||||
return Optional.ofNullable(result);
|
||||
} catch (RedisException e) {
|
||||
|
@ -153,7 +153,7 @@ public class UsernamesManager {
|
|||
|
||||
private void redisDelete(UUID uuid) {
|
||||
try (Timer.Context ignored = redisUuidGetTimer.time()) {
|
||||
cacheCluster.useWriteCluster(connection -> {
|
||||
cacheCluster.useCluster(connection -> {
|
||||
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
|
||||
|
||||
commands.del(getUuidMapKey(uuid));
|
||||
|
|
|
@ -15,6 +15,6 @@ public class ClearMessagesCacheClusterCommand extends ConfiguredCommand<WhisperS
|
|||
@Override
|
||||
protected void run(final Bootstrap<WhisperServerConfiguration> bootstrap, final Namespace namespace, final WhisperServerConfiguration config) {
|
||||
final FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration().getUrls(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getTimeout(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getCircuitBreakerConfiguration());
|
||||
messagesCacheCluster.useWriteCluster(connection -> connection.sync().masters().commands().flushallAsync());
|
||||
messagesCacheCluster.useCluster(connection -> connection.sync().masters().commands().flushallAsync());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
getRedisCluster().useWriteCluster(connection -> {
|
||||
getRedisCluster().useCluster(connection -> {
|
||||
connection.sync().flushall();
|
||||
connection.sync().masters().commands().configSet("notify-keyspace-events", "K$z");
|
||||
});
|
||||
|
@ -93,7 +93,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
}
|
||||
});
|
||||
|
||||
getRedisCluster().useWriteCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
|
||||
getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
|
||||
UUID.randomUUID().toString()));
|
||||
|
||||
synchronized (displaced) {
|
||||
|
@ -125,7 +125,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
|
||||
clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.getResources().eventBus().publish(new ClusterTopologyChangedEvent(List.of(), List.of())));
|
||||
|
||||
getRedisCluster().useWriteCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
|
||||
getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
|
||||
UUID.randomUUID().toString()));
|
||||
|
||||
synchronized (displaced) {
|
||||
|
@ -149,7 +149,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
assertTrue(clientPresenceManager.clearPresence(accountUuid, deviceId));
|
||||
|
||||
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
|
||||
getRedisCluster().useWriteCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
|
||||
getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
|
||||
UUID.randomUUID().toString()));
|
||||
|
||||
assertFalse(clientPresenceManager.clearPresence(accountUuid, deviceId));
|
||||
|
@ -160,7 +160,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
final String presentPeerId = UUID.randomUUID().toString();
|
||||
final String missingPeerId = UUID.randomUUID().toString();
|
||||
|
||||
getRedisCluster().useWriteCluster(connection -> {
|
||||
getRedisCluster().useCluster(connection -> {
|
||||
connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId);
|
||||
connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId);
|
||||
});
|
||||
|
@ -173,17 +173,17 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().masters().commands().subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId)));
|
||||
clientPresenceManager.pruneMissingPeers();
|
||||
|
||||
assertEquals(1, (long)getRedisCluster().withWriteCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId))));
|
||||
assertTrue(getRedisCluster().withReadCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId)));
|
||||
assertEquals(1, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId))));
|
||||
assertTrue(getRedisCluster().withCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId)));
|
||||
|
||||
assertEquals(0, (long)getRedisCluster().withReadCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(missingPeerId))));
|
||||
assertFalse(getRedisCluster().withReadCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId)));
|
||||
assertEquals(0, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(missingPeerId))));
|
||||
assertFalse(getRedisCluster().withCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId)));
|
||||
}
|
||||
|
||||
private void addClientPresence(final String managerId) {
|
||||
final String clientPresenceKey = ClientPresenceManager.getPresenceKey(UUID.randomUUID(), 7);
|
||||
|
||||
getRedisCluster().useWriteCluster(connection -> {
|
||||
getRedisCluster().useCluster(connection -> {
|
||||
connection.sync().set(clientPresenceKey, managerId);
|
||||
connection.sync().sadd(ClientPresenceManager.getConnectedClientSetKey(managerId), clientPresenceKey);
|
||||
});
|
||||
|
@ -206,7 +206,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
|
|||
final long displacedAccountDeviceId = 7;
|
||||
|
||||
clientPresenceManager.setPresent(displacedAccountUuid, displacedAccountDeviceId, NO_OP);
|
||||
getRedisCluster().useWriteCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(displacedAccountUuid, displacedAccountDeviceId),
|
||||
getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(displacedAccountUuid, displacedAccountDeviceId),
|
||||
UUID.randomUUID().toString()));
|
||||
|
||||
clientPresenceManager.stop();
|
||||
|
|
|
@ -58,7 +58,7 @@ public abstract class AbstractRedisClusterTest {
|
|||
|
||||
redisCluster = new FaultTolerantRedisCluster("test-cluster", urls, Duration.ofSeconds(2), new CircuitBreakerConfiguration());
|
||||
|
||||
redisCluster.useWriteCluster(connection -> {
|
||||
redisCluster.useCluster(connection -> {
|
||||
boolean setAll = false;
|
||||
|
||||
final String[] keys = new String[NODE_COUNT];
|
||||
|
@ -84,7 +84,7 @@ public abstract class AbstractRedisClusterTest {
|
|||
}
|
||||
});
|
||||
|
||||
redisCluster.useWriteCluster(connection -> connection.sync().flushall());
|
||||
redisCluster.useCluster(connection -> connection.sync().flushall());
|
||||
}
|
||||
|
||||
protected FaultTolerantRedisCluster getRedisCluster() {
|
||||
|
|
|
@ -15,7 +15,6 @@ import java.util.List;
|
|||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
@ -33,35 +32,35 @@ public class ClusterLuaScriptTest extends AbstractRedisClusterTest {
|
|||
final ClusterLuaScript script = new ClusterLuaScript(redisCluster, "return redis.call(\"SET\", KEYS[1], ARGV[1])", ScriptOutputType.VALUE);
|
||||
|
||||
assertEquals("OK", script.execute(List.of(key), List.of(value)));
|
||||
assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key)));
|
||||
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
|
||||
|
||||
final int slot = SlotHash.getSlot(key);
|
||||
|
||||
final int sourcePort = redisCluster.withWriteCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).node(0).getUri().getPort());
|
||||
final RedisCommands<String, String> sourceCommands = redisCluster.withWriteCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).commands(0));
|
||||
final RedisCommands<String, String> destinationCommands = redisCluster.withWriteCluster(connection -> connection.sync().nodes(node -> !node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).commands(0));
|
||||
final int sourcePort = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).node(0).getUri().getPort());
|
||||
final RedisCommands<String, String> sourceCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).commands(0));
|
||||
final RedisCommands<String, String> destinationCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> !node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).commands(0));
|
||||
|
||||
destinationCommands.clusterSetSlotImporting(slot, sourceCommands.clusterMyId());
|
||||
|
||||
assertEquals("OK", script.execute(List.of(key), List.of(value)));
|
||||
assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key)));
|
||||
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
|
||||
|
||||
sourceCommands.clusterSetSlotMigrating(slot, destinationCommands.clusterMyId());
|
||||
|
||||
assertEquals("OK", script.execute(List.of(key), List.of(value)));
|
||||
assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key)));
|
||||
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
|
||||
|
||||
for (final String migrateKey : sourceCommands.clusterGetKeysInSlot(slot, Integer.MAX_VALUE)) {
|
||||
destinationCommands.migrate("127.0.0.1", sourcePort, migrateKey, 0, 1000);
|
||||
}
|
||||
|
||||
assertEquals("OK", script.execute(List.of(key), List.of(value)));
|
||||
assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key)));
|
||||
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
|
||||
|
||||
destinationCommands.clusterSetSlotNode(slot, destinationCommands.clusterMyId());
|
||||
|
||||
assertEquals("OK", script.execute(List.of(key), List.of(value)));
|
||||
assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key)));
|
||||
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -44,47 +44,17 @@ public class FaultTolerantRedisClusterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testReadBreaker() {
|
||||
public void testBreaker() {
|
||||
when(clusterCommands.get(anyString()))
|
||||
.thenReturn("value")
|
||||
.thenThrow(new RedisException("Badness has ensued."));
|
||||
|
||||
assertEquals("value", faultTolerantCluster.withReadCluster(connection -> connection.sync().get("key")));
|
||||
assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
|
||||
|
||||
assertThrows(RedisException.class,
|
||||
() -> faultTolerantCluster.withReadCluster(connection -> connection.sync().get("OH NO")));
|
||||
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));
|
||||
|
||||
assertThrows(CircuitBreakerOpenException.class,
|
||||
() -> faultTolerantCluster.withReadCluster(connection -> connection.sync().get("OH NO")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadsContinueWhileWriteBreakerOpen() {
|
||||
when(clusterCommands.set(anyString(), anyString())).thenThrow(new RedisException("Badness has ensued."));
|
||||
|
||||
assertThrows(RedisException.class,
|
||||
() -> faultTolerantCluster.useWriteCluster(connection -> connection.sync().set("OH", "NO")));
|
||||
|
||||
assertThrows(CircuitBreakerOpenException.class,
|
||||
() -> faultTolerantCluster.useWriteCluster(connection -> connection.sync().set("OH", "NO")));
|
||||
|
||||
when(clusterCommands.get("key")).thenReturn("value");
|
||||
|
||||
assertEquals("value", faultTolerantCluster.withReadCluster(connection -> connection.sync().get("key")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteBreaker() {
|
||||
when(clusterCommands.get(anyString()))
|
||||
.thenReturn("value")
|
||||
.thenThrow(new RedisException("Badness has ensued."));
|
||||
|
||||
assertEquals("value", faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("key")));
|
||||
|
||||
assertThrows(RedisException.class,
|
||||
() -> faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("OH NO")));
|
||||
|
||||
assertThrows(CircuitBreakerOpenException.class,
|
||||
() -> faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("OH NO")));
|
||||
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ public class RedisClusterMessagesCacheTest extends AbstractMessagesCacheTest {
|
|||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
getRedisCluster().useWriteCluster(connection -> connection.sync().masters().commands().configSet("notify-keyspace-events", "K$gz"));
|
||||
getRedisCluster().useCluster(connection -> connection.sync().masters().commands().configSet("notify-keyspace-events", "K$gz"));
|
||||
|
||||
notificationExecutorService = Executors.newSingleThreadExecutor();
|
||||
messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService);
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.whispersystems.textsecuregcm.tests.storage;
|
|||
|
||||
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.ActiveUserCounter;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException;
|
||||
|
@ -30,7 +29,6 @@ import com.google.common.collect.ImmutableList;
|
|||
import io.dropwizard.metrics.MetricsFactory;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import redis.clients.jedis.Jedis;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.UUID;
|
||||
|
@ -105,7 +103,7 @@ public class ActiveUserCounterTest {
|
|||
public void testCrawlStart() {
|
||||
activeUserCounter.onCrawlStart();
|
||||
|
||||
verify(cacheCluster, times(1)).useWriteCluster(any());
|
||||
verify(cacheCluster, times(1)).useCluster(any());
|
||||
verify(commands, times(1)).del(any(String.class));
|
||||
|
||||
verifyZeroInteractions(iosDevice);
|
||||
|
@ -122,7 +120,7 @@ public class ActiveUserCounterTest {
|
|||
public void testCrawlEnd() {
|
||||
activeUserCounter.onCrawlEnd(Optional.empty());
|
||||
|
||||
verify(cacheCluster, times(1)).withReadCluster(any());
|
||||
verify(cacheCluster, times(1)).withCluster(any());
|
||||
verify(commands, times(1)).get(any(String.class));
|
||||
|
||||
verify(metricsFactory, times(1)).getReporters();
|
||||
|
@ -150,8 +148,8 @@ public class ActiveUserCounterTest {
|
|||
verify(iosDevice, times(1)).getApnId();
|
||||
verify(iosDevice, times(0)).getGcmId();
|
||||
|
||||
verify(cacheCluster, times(1)).withReadCluster(any());
|
||||
verify(cacheCluster, times(1)).useWriteCluster(any());
|
||||
verify(cacheCluster, times(1)).withCluster(any());
|
||||
verify(cacheCluster, times(1)).useCluster(any());
|
||||
verify(commands, times(1)).get(any(String.class));
|
||||
verify(commands, times(1)).set(any(String.class), eq("{\"fromUuid\":\""+UUID_IOS.toString()+"\",\"platforms\":{\"ios\":[1,1,1,1,1]},\"countries\":{\"1\":[1,1,1,1,1]}}"));
|
||||
|
||||
|
@ -174,8 +172,8 @@ public class ActiveUserCounterTest {
|
|||
|
||||
verify(noDeviceAccount, times(1)).getMasterDevice();
|
||||
|
||||
verify(cacheCluster, times(1)).withReadCluster(any());
|
||||
verify(cacheCluster, times(1)).useWriteCluster(any());
|
||||
verify(cacheCluster, times(1)).withCluster(any());
|
||||
verify(cacheCluster, times(1)).useCluster(any());
|
||||
verify(commands, times(1)).get(eq(TALLY_KEY));
|
||||
verify(commands, times(1)).set(any(String.class), eq("{\"fromUuid\":\""+UUID_NODEVICE+"\",\"platforms\":{},\"countries\":{}}"));
|
||||
|
||||
|
@ -210,8 +208,8 @@ public class ActiveUserCounterTest {
|
|||
verify(androidDevice, times(1)).getApnId();
|
||||
verify(androidDevice, times(1)).getGcmId();
|
||||
|
||||
verify(cacheCluster, times(1)).withReadCluster(any());
|
||||
verify(cacheCluster, times(1)).useWriteCluster(any());
|
||||
verify(cacheCluster, times(1)).withCluster(any());
|
||||
verify(cacheCluster, times(1)).useCluster(any());
|
||||
verify(commands, times(1)).get(eq(TALLY_KEY));
|
||||
verify(commands, times(1)).set(any(String.class), eq("{\"fromUuid\":\""+UUID_IOS+"\",\"platforms\":{\"android\":[0,0,0,1,1],\"ios\":[1,1,1,1,1]},\"countries\":{\"55\":[0,0,0,1,1],\"1\":[1,1,1,1,1]}}"));
|
||||
|
||||
|
|
|
@ -28,41 +28,41 @@ public class RedisClusterHelper {
|
|||
when(stringConnection.sync()).thenReturn(stringCommands);
|
||||
when(binaryConnection.sync()).thenReturn(binaryCommands);
|
||||
|
||||
when(cluster.withReadCluster(any(Function.class))).thenAnswer(invocation -> {
|
||||
when(cluster.withCluster(any(Function.class))).thenAnswer(invocation -> {
|
||||
return invocation.getArgument(0, Function.class).apply(stringConnection);
|
||||
});
|
||||
|
||||
doAnswer(invocation -> {
|
||||
invocation.getArgument(0, Consumer.class).accept(stringConnection);
|
||||
return null;
|
||||
}).when(cluster).useReadCluster(any(Consumer.class));
|
||||
}).when(cluster).useCluster(any(Consumer.class));
|
||||
|
||||
when(cluster.withWriteCluster(any(Function.class))).thenAnswer(invocation -> {
|
||||
when(cluster.withCluster(any(Function.class))).thenAnswer(invocation -> {
|
||||
return invocation.getArgument(0, Function.class).apply(stringConnection);
|
||||
});
|
||||
|
||||
doAnswer(invocation -> {
|
||||
invocation.getArgument(0, Consumer.class).accept(stringConnection);
|
||||
return null;
|
||||
}).when(cluster).useWriteCluster(any(Consumer.class));
|
||||
}).when(cluster).useCluster(any(Consumer.class));
|
||||
|
||||
when(cluster.withBinaryReadCluster(any(Function.class))).thenAnswer(invocation -> {
|
||||
when(cluster.withBinaryCluster(any(Function.class))).thenAnswer(invocation -> {
|
||||
return invocation.getArgument(0, Function.class).apply(binaryConnection);
|
||||
});
|
||||
|
||||
doAnswer(invocation -> {
|
||||
invocation.getArgument(0, Consumer.class).accept(binaryConnection);
|
||||
return null;
|
||||
}).when(cluster).useBinaryReadCluster(any(Consumer.class));
|
||||
}).when(cluster).useBinaryCluster(any(Consumer.class));
|
||||
|
||||
when(cluster.withBinaryWriteCluster(any(Function.class))).thenAnswer(invocation -> {
|
||||
when(cluster.withBinaryCluster(any(Function.class))).thenAnswer(invocation -> {
|
||||
return invocation.getArgument(0, Function.class).apply(binaryConnection);
|
||||
});
|
||||
|
||||
doAnswer(invocation -> {
|
||||
invocation.getArgument(0, Consumer.class).accept(binaryConnection);
|
||||
return null;
|
||||
}).when(cluster).useBinaryWriteCluster(any(Consumer.class));
|
||||
}).when(cluster).useBinaryCluster(any(Consumer.class));
|
||||
|
||||
return cluster;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue