From fe1054d58acf6fc0e5206a52170a0e117fd6dc10 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Sat, 6 Jun 2020 11:24:11 -0400 Subject: [PATCH] Introduce a Lettuce-based fault-tolerant Redis cluster accessor. --- service/config/sample.yml | 4 + service/pom.xml | 6 ++ .../WhisperServerConfiguration.java | 9 ++ .../RedisClusterConfiguration.java | 28 ++++++ .../redis/FaultTolerantRedisCluster.java | 56 ++++++++++++ .../redis/FaultTolerantRedisClusterTest.java | 86 +++++++++++++++++++ 6 files changed, 189 insertions(+) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java diff --git a/service/config/sample.yml b/service/config/sample.yml index 7fe9a03c5..2e55985df 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -28,6 +28,10 @@ cache: # Redis server configuration for cache cluster url: replicaUrls: +cacheCluster: # Redis server configuration for cache cluster + urls: + - redis://redis.example.com:6379/ + directory: redis: # Redis server configuration for directory cluster url: diff --git a/service/pom.xml b/service/pom.xml index 2799d82b0..7af2b1700 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -99,6 +99,12 @@ compile + + io.lettuce + lettuce-core + 5.3.0.RELEASE + + org.postgresql postgresql diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index a627dc9b1..46a031546 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -68,6 +68,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private RedisConfiguration cache; + @NotNull + @Valid + @JsonProperty + private RedisClusterConfiguration cacheCluster; + @NotNull @Valid @JsonProperty @@ -221,6 +226,10 @@ public class WhisperServerConfiguration extends Configuration { return cache; } + public RedisClusterConfiguration getCacheClusterConfiguration() { + return cacheCluster; + } + public RedisConfiguration getPubsubCacheConfiguration() { return pubsub; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java new file mode 100644 index 000000000..6f97e59cf --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java @@ -0,0 +1,28 @@ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.Valid; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; +import java.util.List; + +public class RedisClusterConfiguration { + + @JsonProperty + @NotEmpty + private List urls; + + @JsonProperty + @NotNull + @Valid + private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration(); + + public List getUrls() { + return urls; + } + + public CircuitBreakerConfiguration getCircuitBreakerConfiguration() { + return circuitBreaker; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java new file mode 100644 index 000000000..ffd852d68 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -0,0 +1,56 @@ +package org.whispersystems.textsecuregcm.redis; + +import com.codahale.metrics.SharedMetricRegistries; +import com.google.common.annotations.VisibleForTesting; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; +import org.whispersystems.textsecuregcm.util.Constants; + +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster has separate circuit breakers for + * read and write operations because the leader in a Redis cluster shard may fail while its read-only replicas can still + * serve traffic. + */ +public class FaultTolerantRedisCluster { + + private final StatefulRedisClusterConnection clusterConnection; + + private final CircuitBreaker readCircuitBreaker; + private final CircuitBreaker writeCircuitBreaker; + + public FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final CircuitBreakerConfiguration circuitBreakerConfiguration) { + this.clusterConnection = clusterClient.connect(); + this.readCircuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig()); + this.writeCircuitBreaker = CircuitBreaker.of(name + "-write", circuitBreakerConfiguration.toCircuitBreakerConfig()); + + CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), + readCircuitBreaker, + FaultTolerantRedisCluster.class); + + CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), + writeCircuitBreaker, + FaultTolerantRedisCluster.class); + } + + public void useReadCluster(Consumer> consumer) { + this.readCircuitBreaker.executeRunnable(() -> consumer.accept(clusterConnection)); + } + + public T withReadCluster(Function, T> consumer) { + return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(clusterConnection)); + } + + public void useWriteCluster(Consumer> consumer) { + this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(clusterConnection)); + } + + public T withWriteCluster(Function, T> consumer) { + return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(clusterConnection)); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java new file mode 100644 index 000000000..b4e8742e8 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java @@ -0,0 +1,86 @@ +package org.whispersystems.textsecuregcm.redis; + +import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; +import io.lettuce.core.RedisException; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; +import org.junit.Before; +import org.junit.Test; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class FaultTolerantRedisClusterTest { + + private RedisAdvancedClusterCommands clusterCommands; + + private FaultTolerantRedisCluster faultTolerantCluster; + + @SuppressWarnings("unchecked") + @Before + public void setUp() { + final RedisClusterClient clusterClient = mock(RedisClusterClient.class); + final StatefulRedisClusterConnection clusterConnection = mock(StatefulRedisClusterConnection.class); + + clusterCommands = mock(RedisAdvancedClusterCommands.class); + + when(clusterClient.connect()).thenReturn(clusterConnection); + when(clusterConnection.sync()).thenReturn(clusterCommands); + + final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); + breakerConfiguration.setFailureRateThreshold(100); + breakerConfiguration.setRingBufferSizeInClosedState(1); + breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); + + faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, breakerConfiguration); + } + + @Test + public void testReadBreaker() { + when(clusterCommands.get(anyString())) + .thenReturn("value") + .thenThrow(new RedisException("Badness has ensued.")); + + assertEquals("value", faultTolerantCluster.withReadCluster(connection -> connection.sync().get("key"))); + + assertThrows(RedisException.class, + () -> faultTolerantCluster.withReadCluster(connection -> connection.sync().get("OH NO"))); + + assertThrows(CircuitBreakerOpenException.class, + () -> faultTolerantCluster.withReadCluster(connection -> connection.sync().get("OH NO"))); + } + + @Test + public void testReadsContinueWhileWriteBreakerOpen() { + when(clusterCommands.set(anyString(), anyString())).thenThrow(new RedisException("Badness has ensued.")); + + assertThrows(RedisException.class, + () -> faultTolerantCluster.useWriteCluster(connection -> connection.sync().set("OH", "NO"))); + + assertThrows(CircuitBreakerOpenException.class, + () -> faultTolerantCluster.useWriteCluster(connection -> connection.sync().set("OH", "NO"))); + + when(clusterCommands.get("key")).thenReturn("value"); + + assertEquals("value", faultTolerantCluster.withReadCluster(connection -> connection.sync().get("key"))); + } + + @Test + public void testWriteBreaker() { + when(clusterCommands.get(anyString())) + .thenReturn("value") + .thenThrow(new RedisException("Badness has ensued.")); + + assertEquals("value", faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("key"))); + + assertThrows(RedisException.class, + () -> faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("OH NO"))); + + assertThrows(CircuitBreakerOpenException.class, + () -> faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("OH NO"))); + } +}