Migrate deprecated Lettuce method and enum usages

This commit is contained in:
Chris Eager 2021-03-29 11:25:59 -05:00 committed by Chris Eager
parent 83aa59f4dd
commit 91bd061110
10 changed files with 74 additions and 85 deletions

View File

@ -8,8 +8,6 @@ package org.whispersystems.textsecuregcm.providers;
import com.codahale.metrics.health.HealthCheck; import com.codahale.metrics.health.HealthCheck;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import java.util.concurrent.CompletableFuture;
public class RedisClusterHealthCheck extends HealthCheck { public class RedisClusterHealthCheck extends HealthCheck {
private final FaultTolerantRedisCluster redisCluster; private final FaultTolerantRedisCluster redisCluster;
@ -20,7 +18,7 @@ public class RedisClusterHealthCheck extends HealthCheck {
@Override @Override
protected Result check() { protected Result check() {
redisCluster.withCluster(connection -> connection.sync().masters().commands().ping()); redisCluster.withCluster(connection -> connection.sync().upstream().commands().ping());
return Result.healthy(); return Result.healthy();
} }
} }

View File

@ -5,6 +5,8 @@
package org.whispersystems.textsecuregcm.push; package org.whispersystems.textsecuregcm.push;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.SharedMetricRegistries;
@ -17,13 +19,6 @@ import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.Constants;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
@ -36,8 +31,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import static com.codahale.metrics.MetricRegistry.name; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.Constants;
/** /**
* The client presence manager keeps track of which clients are actively connected and "present" to receive messages. * The client presence manager keeps track of which clients are actively connected and "present" to receive messages.
@ -112,7 +111,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
final String presenceChannel = getManagerPresenceChannel(managerId); final String presenceChannel = getManagerPresenceChannel(managerId);
final int slot = SlotHash.getSlot(presenceChannel); final int slot = SlotHash.getSlot(presenceChannel);
connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)).commands().subscribe(presenceChannel); connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)).commands().subscribe(presenceChannel);
}); });
presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId)); presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId));
@ -143,7 +142,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
connection.sync().del(getConnectedClientSetKey(managerId)); connection.sync().del(getConnectedClientSetKey(managerId));
}); });
pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getManagerPresenceChannel(managerId))); pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe(getManagerPresenceChannel(managerId)));
} }
public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) { public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) {
@ -204,7 +203,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
private void subscribeForRemotePresenceChanges(final String presenceKey) { private void subscribeForRemotePresenceChanges(final String presenceKey) {
final int slot = SlotHash.getSlot(presenceKey); final int slot = SlotHash.getSlot(presenceKey);
pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)) pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot))
.commands() .commands()
.subscribe(getKeyspaceNotificationChannel(presenceKey))); .subscribe(getKeyspaceNotificationChannel(presenceKey)));
} }
@ -216,7 +215,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
} }
private void unsubscribeFromRemotePresenceChanges(final String presenceKey) { private void unsubscribeFromRemotePresenceChanges(final String presenceKey) {
pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey))); pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey)));
} }
void pruneMissingPeers() { void pruneMissingPeers() {

View File

@ -5,6 +5,8 @@
package org.whispersystems.textsecuregcm.redis; package org.whispersystems.textsecuregcm.redis;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.SharedMetricRegistries;
@ -20,6 +22,13 @@ import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.codec.ByteArrayCodec; import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.ClientResources;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
@ -29,16 +38,6 @@ import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.ThreadDumpUtil; import org.whispersystems.textsecuregcm.util.ThreadDumpUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.codahale.metrics.MetricRegistry.name;
/** /**
* A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed, * A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed,
* circuit-breaker-protected access to connections. * circuit-breaker-protected access to connections.

View File

@ -5,6 +5,8 @@
package org.whispersystems.textsecuregcm.storage; package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import io.dropwizard.lifecycle.Managed; import io.dropwizard.lifecycle.Managed;
@ -18,15 +20,6 @@ import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
@ -43,8 +36,14 @@ import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.slf4j.Logger;
import static com.codahale.metrics.MetricRegistry.name; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
public class MessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed { public class MessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed {
@ -123,7 +122,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
@Override @Override
public void stop() { public void stop() {
pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe()); pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe());
} }
private void resubscribeAll() { private void resubscribeAll() {
@ -372,13 +371,13 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private void subscribeForKeyspaceNotifications(final String queueName) { private void subscribeForKeyspaceNotifications(final String queueName) {
final int slot = SlotHash.getSlot(queueName); final int slot = SlotHash.getSlot(queueName);
pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)) pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot))
.commands() .commands()
.subscribe(getKeyspaceChannels(queueName))); .subscribe(getKeyspaceChannels(queueName)));
} }
private void unsubscribeFromKeyspaceNotifications(final String queueName) { private void unsubscribeFromKeyspaceNotifications(final String queueName) {
pubSubConnection.usePubSubConnection(connection -> connection.sync().masters() pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream()
.commands() .commands()
.unsubscribe(getKeyspaceChannels(queueName))); .unsubscribe(getKeyspaceChannels(queueName)));
} }

View File

@ -30,7 +30,7 @@ public class GetRedisCommandStatsCommand extends ConfiguredCommand<WhisperServer
for (final FaultTolerantRedisCluster cluster : List.of(cacheCluster, messagesCacheCluster, metricsCluster)) { for (final FaultTolerantRedisCluster cluster : List.of(cacheCluster, messagesCacheCluster, metricsCluster)) {
cluster.useCluster(connection -> connection.sync() cluster.useCluster(connection -> connection.sync()
.masters() .upstream()
.commands() .commands()
.info("commandstats") .info("commandstats")
.asMap() .asMap()

View File

@ -50,7 +50,7 @@ public class GetRedisSlowlogCommand extends ConfiguredCommand<WhisperServerConfi
for (final FaultTolerantRedisCluster cluster : List.of(cacheCluster, messagesCacheCluster, metricsCluster)) { for (final FaultTolerantRedisCluster cluster : List.of(cacheCluster, messagesCacheCluster, metricsCluster)) {
cluster.useCluster(connection -> connection.sync() cluster.useCluster(connection -> connection.sync()
.masters() .upstream()
.commands() .commands()
.slowlogGet(entries) .slowlogGet(entries)
.asMap() .asMap()

View File

@ -5,12 +5,11 @@
package org.whispersystems.textsecuregcm.push; package org.whispersystems.textsecuregcm.push;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import static org.junit.Assert.assertEquals;
import org.junit.After; import static org.junit.Assert.assertFalse;
import org.junit.Before; import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -18,10 +17,10 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import static org.junit.Assert.assertEquals; import org.junit.Before;
import static org.junit.Assert.assertFalse; import org.junit.Test;
import static org.junit.Assert.assertTrue; import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
public class ClientPresenceManagerTest extends AbstractRedisClusterTest { public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
@ -37,7 +36,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
getRedisCluster().useCluster(connection -> { getRedisCluster().useCluster(connection -> {
connection.sync().flushall(); connection.sync().flushall();
connection.sync().masters().commands().configSet("notify-keyspace-events", "K$z"); connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$z");
}); });
presenceRenewalExecutorService = Executors.newSingleThreadScheduledExecutor(); presenceRenewalExecutorService = Executors.newSingleThreadScheduledExecutor();
@ -188,7 +187,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
addClientPresence(missingPeerId); addClientPresence(missingPeerId);
} }
clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().masters().commands().subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId))); clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().upstream().commands().subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId)));
clientPresenceManager.pruneMissingPeers(); clientPresenceManager.pruneMissingPeers();
assertEquals(1, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId)))); assertEquals(1, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId))));

View File

@ -5,25 +5,22 @@
package org.whispersystems.textsecuregcm.redis; package org.whispersystems.textsecuregcm.redis;
import io.lettuce.core.RedisNoScriptException; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.lettuce.core.ScriptOutputType; import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.api.sync.RedisCommands; import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import org.junit.Test;
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ClusterLuaScriptTest extends AbstractRedisClusterTest { public class ClusterLuaScriptTest extends AbstractRedisClusterTest {
@ -41,9 +38,9 @@ public class ClusterLuaScriptTest extends AbstractRedisClusterTest {
final int slot = SlotHash.getSlot(key); final int slot = SlotHash.getSlot(key);
final int sourcePort = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).node(0).getUri().getPort()); final int sourcePort = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.UPSTREAM)).node(0).getUri().getPort());
final RedisCommands<String, String> sourceCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).commands(0)); final RedisCommands<String, String> sourceCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.UPSTREAM)).commands(0));
final RedisCommands<String, String> destinationCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> !node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).commands(0)); final RedisCommands<String, String> destinationCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> !node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.UPSTREAM)).commands(0));
destinationCommands.clusterSetSlotImporting(slot, sourceCommands.clusterMyId()); destinationCommands.clusterSetSlotImporting(slot, sourceCommands.clusterMyId());

View File

@ -59,7 +59,7 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest {
getRedisCluster().useCluster(connection -> { getRedisCluster().useCluster(connection -> {
connection.sync().flushall(); connection.sync().flushall();
connection.sync().masters().commands().configSet("notify-keyspace-events", "K$glz"); connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz");
}); });
final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDB(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7)); final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDB(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7));

View File

@ -5,19 +5,12 @@
package org.whispersystems.textsecuregcm.storage; package org.whispersystems.textsecuregcm.storage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.SlotHash;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
@ -31,10 +24,15 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import junitparams.JUnitParamsRunner;
import static org.junit.Assert.assertEquals; import junitparams.Parameters;
import static org.junit.Assert.assertFalse; import org.apache.commons.lang3.RandomStringUtils;
import static org.junit.Assert.assertTrue; import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
@RunWith(JUnitParamsRunner.class) @RunWith(JUnitParamsRunner.class)
public class MessagesCacheTest extends AbstractRedisClusterTest { public class MessagesCacheTest extends AbstractRedisClusterTest {
@ -53,7 +51,7 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
getRedisCluster().useCluster(connection -> connection.sync().masters().commands().configSet("notify-keyspace-events", "Klgz")); getRedisCluster().useCluster(connection -> connection.sync().upstream().commands().configSet("notify-keyspace-events", "Klgz"));
notificationExecutorService = Executors.newSingleThreadExecutor(); notificationExecutorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService); messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService);