From 91bd061110ba08edc9446394797c0edd6b326dcb Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Mon, 29 Mar 2021 11:25:59 -0500 Subject: [PATCH] Migrate deprecated Lettuce method and enum usages --- .../providers/RedisClusterHealthCheck.java | 4 +-- .../push/ClientPresenceManager.java | 25 ++++++++-------- .../redis/FaultTolerantRedisCluster.java | 19 ++++++------ .../textsecuregcm/storage/MessagesCache.java | 27 ++++++++--------- .../workers/GetRedisCommandStatsCommand.java | 2 +- .../workers/GetRedisSlowlogCommand.java | 2 +- .../push/ClientPresenceManagerTest.java | 21 +++++++------ .../redis/ClusterLuaScriptTest.java | 27 ++++++++--------- .../MessagePersisterIntegrationTest.java | 2 +- .../storage/MessagesCacheTest.java | 30 +++++++++---------- 10 files changed, 74 insertions(+), 85 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClusterHealthCheck.java b/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClusterHealthCheck.java index 64ff143fd..bfef14dee 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClusterHealthCheck.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClusterHealthCheck.java @@ -8,8 +8,6 @@ package org.whispersystems.textsecuregcm.providers; import com.codahale.metrics.health.HealthCheck; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import java.util.concurrent.CompletableFuture; - public class RedisClusterHealthCheck extends HealthCheck { private final FaultTolerantRedisCluster redisCluster; @@ -20,7 +18,7 @@ public class RedisClusterHealthCheck extends HealthCheck { @Override protected Result check() { - redisCluster.withCluster(connection -> connection.sync().masters().commands().ping()); + redisCluster.withCluster(connection -> connection.sync().upstream().commands().ping()); return Result.healthy(); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java index 5a7b85c65..b197cf9d7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java @@ -5,6 +5,8 @@ package org.whispersystems.textsecuregcm.push; +import static com.codahale.metrics.MetricRegistry.name; + import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; @@ -17,13 +19,6 @@ import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.util.Constants; - import java.io.IOException; import java.time.Duration; import java.util.List; @@ -36,8 +31,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; - -import static com.codahale.metrics.MetricRegistry.name; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; +import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.util.Constants; /** * The client presence manager keeps track of which clients are actively connected and "present" to receive messages. @@ -112,7 +111,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)).commands().subscribe(presenceChannel); + connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)).commands().subscribe(presenceChannel); }); presenceCluster.useCluster(connection -> connection.sync().sadd(MANAGER_SET_KEY, managerId)); @@ -143,7 +142,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.sync().masters().commands().unsubscribe(getManagerPresenceChannel(managerId))); + pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe(getManagerPresenceChannel(managerId))); } public void setPresent(final UUID accountUuid, final long deviceId, final DisplacedPresenceListener displacementListener) { @@ -204,7 +203,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)) + pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)) .commands() .subscribe(getKeyspaceNotificationChannel(presenceKey))); } @@ -216,7 +215,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.sync().masters().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey))); + pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe(getKeyspaceNotificationChannel(presenceKey))); } void pruneMissingPeers() { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 811bf937c..c9ff5e792 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -5,6 +5,8 @@ package org.whispersystems.textsecuregcm.redis; +import static com.codahale.metrics.MetricRegistry.name; + import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; @@ -20,6 +22,13 @@ import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.codec.ByteArrayCodec; import io.lettuce.core.resource.ClientResources; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; @@ -29,16 +38,6 @@ import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.ThreadDumpUtil; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static com.codahale.metrics.MetricRegistry.name; - /** * A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed, * circuit-breaker-protected access to connections. diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index a60b4bd05..a5420f5b6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -5,6 +5,8 @@ package org.whispersystems.textsecuregcm.storage; +import static com.codahale.metrics.MetricRegistry.name; + import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.InvalidProtocolBufferException; import io.dropwizard.lifecycle.Managed; @@ -18,15 +20,6 @@ import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.entities.MessageProtos; -import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; -import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.util.RedisClusterUtil; - import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -43,8 +36,14 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; - -import static com.codahale.metrics.MetricRegistry.name; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; +import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; +import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.util.RedisClusterUtil; public class MessagesCache extends RedisClusterPubSubAdapter implements Managed { @@ -123,7 +122,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp @Override public void stop() { - pubSubConnection.usePubSubConnection(connection -> connection.sync().masters().commands().unsubscribe()); + pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe()); } private void resubscribeAll() { @@ -372,13 +371,13 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private void subscribeForKeyspaceNotifications(final String queueName) { final int slot = SlotHash.getSlot(queueName); - pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.MASTER) && node.hasSlot(slot)) + pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)) .commands() .subscribe(getKeyspaceChannels(queueName))); } private void unsubscribeFromKeyspaceNotifications(final String queueName) { - pubSubConnection.usePubSubConnection(connection -> connection.sync().masters() + pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream() .commands() .unsubscribe(getKeyspaceChannels(queueName))); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/GetRedisCommandStatsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/GetRedisCommandStatsCommand.java index 52ebb3f05..18479e41c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/GetRedisCommandStatsCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/GetRedisCommandStatsCommand.java @@ -30,7 +30,7 @@ public class GetRedisCommandStatsCommand extends ConfiguredCommand connection.sync() - .masters() + .upstream() .commands() .info("commandstats") .asMap() diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/GetRedisSlowlogCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/GetRedisSlowlogCommand.java index cdf0fe821..d2b6aa817 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/GetRedisSlowlogCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/GetRedisSlowlogCommand.java @@ -50,7 +50,7 @@ public class GetRedisSlowlogCommand extends ConfiguredCommand connection.sync() - .masters() + .upstream() .commands() .slowlogGet(entries) .asMap() diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java index 4464798c9..c4d34c31b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java @@ -5,12 +5,11 @@ package org.whispersystems.textsecuregcm.push; -import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; import java.util.List; import java.util.UUID; import java.util.concurrent.Executors; @@ -18,10 +17,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; public class ClientPresenceManagerTest extends AbstractRedisClusterTest { @@ -37,7 +36,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { getRedisCluster().useCluster(connection -> { connection.sync().flushall(); - connection.sync().masters().commands().configSet("notify-keyspace-events", "K$z"); + connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$z"); }); presenceRenewalExecutorService = Executors.newSingleThreadScheduledExecutor(); @@ -188,7 +187,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { addClientPresence(missingPeerId); } - clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().masters().commands().subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId))); + clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().upstream().commands().subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId))); clientPresenceManager.pruneMissingPeers(); assertEquals(1, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId)))); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScriptTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScriptTest.java index b627c8226..c8c524f3f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScriptTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/ClusterLuaScriptTest.java @@ -5,25 +5,22 @@ package org.whispersystems.textsecuregcm.redis; -import io.lettuce.core.RedisNoScriptException; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import io.lettuce.core.ScriptOutputType; import io.lettuce.core.api.sync.RedisCommands; import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; -import org.junit.Test; -import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; - import java.nio.charset.StandardCharsets; import java.util.List; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import org.junit.Test; +import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; public class ClusterLuaScriptTest extends AbstractRedisClusterTest { @@ -41,9 +38,9 @@ public class ClusterLuaScriptTest extends AbstractRedisClusterTest { final int slot = SlotHash.getSlot(key); - final int sourcePort = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).node(0).getUri().getPort()); - final RedisCommands sourceCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).commands(0)); - final RedisCommands destinationCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> !node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.MASTER)).commands(0)); + final int sourcePort = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.UPSTREAM)).node(0).getUri().getPort()); + final RedisCommands sourceCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.UPSTREAM)).commands(0)); + final RedisCommands destinationCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> !node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.UPSTREAM)).commands(0)); destinationCommands.clusterSetSlotImporting(slot, sourceCommands.clusterMyId()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index 11a229b0f..16e671380 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -59,7 +59,7 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest { getRedisCluster().useCluster(connection -> { connection.sync().flushall(); - connection.sync().masters().commands().configSet("notify-keyspace-events", "K$glz"); + connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz"); }); final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDB(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7)); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index ca0ade698..702873d5c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -5,19 +5,12 @@ package org.whispersystems.textsecuregcm.storage; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import com.google.protobuf.ByteString; import io.lettuce.core.cluster.SlotHash; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; -import org.apache.commons.lang3.RandomStringUtils; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.whispersystems.textsecuregcm.entities.MessageProtos; -import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; -import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; - import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.ArrayList; @@ -31,10 +24,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; +import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; @RunWith(JUnitParamsRunner.class) public class MessagesCacheTest extends AbstractRedisClusterTest { @@ -53,7 +51,7 @@ public class MessagesCacheTest extends AbstractRedisClusterTest { public void setUp() throws Exception { super.setUp(); - getRedisCluster().useCluster(connection -> connection.sync().masters().commands().configSet("notify-keyspace-events", "Klgz")); + getRedisCluster().useCluster(connection -> connection.sync().upstream().commands().configSet("notify-keyspace-events", "Klgz")); notificationExecutorService = Executors.newSingleThreadExecutor(); messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService);