diff --git a/service/config/sample.yml b/service/config/sample.yml
index 7fe9a03c5..2e55985df 100644
--- a/service/config/sample.yml
+++ b/service/config/sample.yml
@@ -28,6 +28,10 @@ cache: # Redis server configuration for cache cluster
url:
replicaUrls:
+cacheCluster: # Redis server configuration for cache cluster
+ urls:
+ - redis://redis.example.com:6379/
+
directory:
redis: # Redis server configuration for directory cluster
url:
diff --git a/service/pom.xml b/service/pom.xml
index 2799d82b0..7af2b1700 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -99,6 +99,12 @@
compile
+
+ io.lettuce
+ lettuce-core
+ 5.3.0.RELEASE
+
+
org.postgresql
postgresql
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
index a627dc9b1..46a031546 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
@@ -68,6 +68,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private RedisConfiguration cache;
+ @NotNull
+ @Valid
+ @JsonProperty
+ private RedisClusterConfiguration cacheCluster;
+
@NotNull
@Valid
@JsonProperty
@@ -221,6 +226,10 @@ public class WhisperServerConfiguration extends Configuration {
return cache;
}
+ public RedisClusterConfiguration getCacheClusterConfiguration() {
+ return cacheCluster;
+ }
+
public RedisConfiguration getPubsubCacheConfiguration() {
return pubsub;
}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java
new file mode 100644
index 000000000..6f97e59cf
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java
@@ -0,0 +1,28 @@
+package org.whispersystems.textsecuregcm.configuration;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.validation.Valid;
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+import java.util.List;
+
+public class RedisClusterConfiguration {
+
+ @JsonProperty
+ @NotEmpty
+ private List urls;
+
+ @JsonProperty
+ @NotNull
+ @Valid
+ private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration();
+
+ public List getUrls() {
+ return urls;
+ }
+
+ public CircuitBreakerConfiguration getCircuitBreakerConfiguration() {
+ return circuitBreaker;
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java
new file mode 100644
index 000000000..ffd852d68
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java
@@ -0,0 +1,56 @@
+package org.whispersystems.textsecuregcm.redis;
+
+import com.codahale.metrics.SharedMetricRegistries;
+import com.google.common.annotations.VisibleForTesting;
+import io.github.resilience4j.circuitbreaker.CircuitBreaker;
+import io.lettuce.core.cluster.RedisClusterClient;
+import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
+import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
+import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
+import org.whispersystems.textsecuregcm.util.Constants;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster has separate circuit breakers for
+ * read and write operations because the leader in a Redis cluster shard may fail while its read-only replicas can still
+ * serve traffic.
+ */
+public class FaultTolerantRedisCluster {
+
+ private final StatefulRedisClusterConnection clusterConnection;
+
+ private final CircuitBreaker readCircuitBreaker;
+ private final CircuitBreaker writeCircuitBreaker;
+
+ public FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final CircuitBreakerConfiguration circuitBreakerConfiguration) {
+ this.clusterConnection = clusterClient.connect();
+ this.readCircuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig());
+ this.writeCircuitBreaker = CircuitBreaker.of(name + "-write", circuitBreakerConfiguration.toCircuitBreakerConfig());
+
+ CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME),
+ readCircuitBreaker,
+ FaultTolerantRedisCluster.class);
+
+ CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME),
+ writeCircuitBreaker,
+ FaultTolerantRedisCluster.class);
+ }
+
+ public void useReadCluster(Consumer> consumer) {
+ this.readCircuitBreaker.executeRunnable(() -> consumer.accept(clusterConnection));
+ }
+
+ public T withReadCluster(Function, T> consumer) {
+ return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(clusterConnection));
+ }
+
+ public void useWriteCluster(Consumer> consumer) {
+ this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(clusterConnection));
+ }
+
+ public T withWriteCluster(Function, T> consumer) {
+ return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(clusterConnection));
+ }
+}
diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java
new file mode 100644
index 000000000..b4e8742e8
--- /dev/null
+++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java
@@ -0,0 +1,86 @@
+package org.whispersystems.textsecuregcm.redis;
+
+import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.cluster.RedisClusterClient;
+import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
+import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
+import org.junit.Before;
+import org.junit.Test;
+import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FaultTolerantRedisClusterTest {
+
+ private RedisAdvancedClusterCommands clusterCommands;
+
+ private FaultTolerantRedisCluster faultTolerantCluster;
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setUp() {
+ final RedisClusterClient clusterClient = mock(RedisClusterClient.class);
+ final StatefulRedisClusterConnection clusterConnection = mock(StatefulRedisClusterConnection.class);
+
+ clusterCommands = mock(RedisAdvancedClusterCommands.class);
+
+ when(clusterClient.connect()).thenReturn(clusterConnection);
+ when(clusterConnection.sync()).thenReturn(clusterCommands);
+
+ final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration();
+ breakerConfiguration.setFailureRateThreshold(100);
+ breakerConfiguration.setRingBufferSizeInClosedState(1);
+ breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE);
+
+ faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, breakerConfiguration);
+ }
+
+ @Test
+ public void testReadBreaker() {
+ when(clusterCommands.get(anyString()))
+ .thenReturn("value")
+ .thenThrow(new RedisException("Badness has ensued."));
+
+ assertEquals("value", faultTolerantCluster.withReadCluster(connection -> connection.sync().get("key")));
+
+ assertThrows(RedisException.class,
+ () -> faultTolerantCluster.withReadCluster(connection -> connection.sync().get("OH NO")));
+
+ assertThrows(CircuitBreakerOpenException.class,
+ () -> faultTolerantCluster.withReadCluster(connection -> connection.sync().get("OH NO")));
+ }
+
+ @Test
+ public void testReadsContinueWhileWriteBreakerOpen() {
+ when(clusterCommands.set(anyString(), anyString())).thenThrow(new RedisException("Badness has ensued."));
+
+ assertThrows(RedisException.class,
+ () -> faultTolerantCluster.useWriteCluster(connection -> connection.sync().set("OH", "NO")));
+
+ assertThrows(CircuitBreakerOpenException.class,
+ () -> faultTolerantCluster.useWriteCluster(connection -> connection.sync().set("OH", "NO")));
+
+ when(clusterCommands.get("key")).thenReturn("value");
+
+ assertEquals("value", faultTolerantCluster.withReadCluster(connection -> connection.sync().get("key")));
+ }
+
+ @Test
+ public void testWriteBreaker() {
+ when(clusterCommands.get(anyString()))
+ .thenReturn("value")
+ .thenThrow(new RedisException("Badness has ensued."));
+
+ assertEquals("value", faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("key")));
+
+ assertThrows(RedisException.class,
+ () -> faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("OH NO")));
+
+ assertThrows(CircuitBreakerOpenException.class,
+ () -> faultTolerantCluster.withWriteCluster(connection -> connection.sync().get("OH NO")));
+ }
+}