From 69c8968cb07ceaf2f6656f557cb530c40589431f Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 9 Jul 2020 14:24:58 -0400 Subject: [PATCH] Add byte-array-based methods to FaultTolerantRedisCluster. --- .../redis/FaultTolerantRedisCluster.java | 36 ++++++++++++---- .../tests/util/RedisClusterHelper.java | 41 +++++++++++++++---- 2 files changed, 61 insertions(+), 16 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 857be7765..7650d05ad 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -7,6 +7,7 @@ import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.codec.ByteArrayCodec; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.Constants; @@ -25,7 +26,9 @@ import java.util.stream.Collectors; public class FaultTolerantRedisCluster implements Managed { private final RedisClusterClient clusterClient; - private final StatefulRedisClusterConnection clusterConnection; + + private final StatefulRedisClusterConnection stringClusterConnection; + private final StatefulRedisClusterConnection binaryClusterConnection; private final CircuitBreaker readCircuitBreaker; private final CircuitBreaker writeCircuitBreaker; @@ -39,9 +42,10 @@ public class FaultTolerantRedisCluster implements Managed { this.clusterClient = clusterClient; this.clusterClient.setDefaultTimeout(timeout); - this.clusterConnection = clusterClient.connect(); - this.readCircuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig()); - this.writeCircuitBreaker = CircuitBreaker.of(name + "-write", circuitBreakerConfiguration.toCircuitBreakerConfig()); + this.stringClusterConnection = clusterClient.connect(); + this.binaryClusterConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); + this.readCircuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig()); + this.writeCircuitBreaker = CircuitBreaker.of(name + "-write", circuitBreakerConfiguration.toCircuitBreakerConfig()); CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), readCircuitBreaker, @@ -53,19 +57,35 @@ public class FaultTolerantRedisCluster implements Managed { } public void useReadCluster(final Consumer> consumer) { - this.readCircuitBreaker.executeRunnable(() -> consumer.accept(clusterConnection)); + this.readCircuitBreaker.executeRunnable(() -> consumer.accept(stringClusterConnection)); } public T withReadCluster(final Function, T> consumer) { - return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(clusterConnection)); + return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(stringClusterConnection)); } public void useWriteCluster(final Consumer> consumer) { - this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(clusterConnection)); + this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(stringClusterConnection)); } public T withWriteCluster(final Function, T> consumer) { - return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(clusterConnection)); + return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(stringClusterConnection)); + } + + public void useBinaryReadCluster(final Consumer> consumer) { + this.readCircuitBreaker.executeRunnable(() -> consumer.accept(binaryClusterConnection)); + } + + public T withBinaryReadCluster(final Function, T> consumer) { + return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(binaryClusterConnection)); + } + + public void useBinaryWriteCluster(final Consumer> consumer) { + this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(binaryClusterConnection)); + } + + public T withBinaryWriteCluster(final Function, T> consumer) { + return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(binaryClusterConnection)); } @Override diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/RedisClusterHelper.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/RedisClusterHelper.java index 3b26b61aa..418c711d2 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/RedisClusterHelper.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/RedisClusterHelper.java @@ -15,29 +15,54 @@ import static org.mockito.Mockito.when; public class RedisClusterHelper { @SuppressWarnings("unchecked") - public static FaultTolerantRedisCluster buildMockRedisCluster(final RedisAdvancedClusterCommands commands) { - final FaultTolerantRedisCluster cluster = mock(FaultTolerantRedisCluster.class); - final StatefulRedisClusterConnection connection = mock(StatefulRedisClusterConnection.class); + public static FaultTolerantRedisCluster buildMockRedisCluster(final RedisAdvancedClusterCommands stringCommands) { + return buildMockRedisCluster(stringCommands, mock(RedisAdvancedClusterCommands.class)); + } - when(connection.sync()).thenReturn(commands); + @SuppressWarnings("unchecked") + public static FaultTolerantRedisCluster buildMockRedisCluster(final RedisAdvancedClusterCommands stringCommands, final RedisAdvancedClusterCommands binaryCommands) { + final FaultTolerantRedisCluster cluster = mock(FaultTolerantRedisCluster.class); + final StatefulRedisClusterConnection stringConnection = mock(StatefulRedisClusterConnection.class); + final StatefulRedisClusterConnection binaryConnection = mock(StatefulRedisClusterConnection.class); + + when(stringConnection.sync()).thenReturn(stringCommands); + when(binaryConnection.sync()).thenReturn(binaryCommands); when(cluster.withReadCluster(any(Function.class))).thenAnswer(invocation -> { - return invocation.getArgument(0, Function.class).apply(connection); + return invocation.getArgument(0, Function.class).apply(stringConnection); }); doAnswer(invocation -> { - invocation.getArgument(0, Consumer.class).accept(connection); + invocation.getArgument(0, Consumer.class).accept(stringConnection); return null; }).when(cluster).useReadCluster(any(Consumer.class)); when(cluster.withWriteCluster(any(Function.class))).thenAnswer(invocation -> { - return invocation.getArgument(0, Function.class).apply(connection); + return invocation.getArgument(0, Function.class).apply(stringConnection); }); doAnswer(invocation -> { - invocation.getArgument(0, Consumer.class).accept(connection); + invocation.getArgument(0, Consumer.class).accept(stringConnection); return null; }).when(cluster).useWriteCluster(any(Consumer.class)); + + when(cluster.withBinaryReadCluster(any(Function.class))).thenAnswer(invocation -> { + return invocation.getArgument(0, Function.class).apply(binaryConnection); + }); + + doAnswer(invocation -> { + invocation.getArgument(0, Consumer.class).accept(binaryConnection); + return null; + }).when(cluster).useBinaryReadCluster(any(Consumer.class)); + + when(cluster.withBinaryWriteCluster(any(Function.class))).thenAnswer(invocation -> { + return invocation.getArgument(0, Function.class).apply(binaryConnection); + }); + + doAnswer(invocation -> { + invocation.getArgument(0, Consumer.class).accept(binaryConnection); + return null; + }).when(cluster).useBinaryWriteCluster(any(Consumer.class)); return cluster; }