diff --git a/pom.xml b/pom.xml
index 74d38992a..fad3d0a55 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,6 +14,7 @@
1.3.9
2.9.8
+ 0.13.2
UTF-8
@@ -65,6 +66,13 @@
+
+ io.github.resilience4j
+ resilience4j-circuitbreaker
+ ${resilience4j.version}
+
+
+
com.amazonaws
aws-java-sdk-s3
diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
index 150194acd..e4ee28319 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
@@ -160,10 +160,10 @@ public class WhisperServerService extends Application replicaUrls;
+ @JsonProperty
+ @NotNull
+ @Valid
+ private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration();
+
public String getUrl() {
return url;
}
@@ -41,4 +47,8 @@ public class RedisConfiguration {
public List getReplicaUrls() {
return replicaUrls;
}
+
+ public CircuitBreakerConfiguration getCircuitBreakerConfiguration() {
+ return circuitBreaker;
+ }
}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java b/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java
index 7333f73f5..7991fa621 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (C) 2013 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
@@ -20,6 +20,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory;
import org.whispersystems.dispatch.redis.PubSubConnection;
+import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Util;
@@ -42,7 +43,9 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory {
private final int port;
private final ReplicatedJedisPool jedisPool;
- public RedisClientFactory(String url, List replicaUrls) throws URISyntaxException {
+ public RedisClientFactory(String name, String url, List replicaUrls, CircuitBreakerConfiguration circuitBreakerConfiguration)
+ throws URISyntaxException
+ {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setTestOnBorrow(true);
@@ -63,7 +66,7 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory {
null, null));
}
- this.jedisPool = new ReplicatedJedisPool(masterPool, replicaPools);
+ this.jedisPool = new ReplicatedJedisPool(name, masterPool, replicaPools, circuitBreakerConfiguration);
}
public ReplicatedJedisPool getRedisClientPool() {
diff --git a/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java b/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java
index d112d6e4a..70596a0f6 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java
@@ -1,49 +1,76 @@
package org.whispersystems.textsecuregcm.redis;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SharedMetricRegistries;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
+import org.whispersystems.textsecuregcm.util.Constants;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import static com.codahale.metrics.MetricRegistry.name;
+import io.github.resilience4j.circuitbreaker.CircuitBreaker;
+import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisException;
public class ReplicatedJedisPool {
- private final Logger logger = LoggerFactory.getLogger(ReplicatedJedisPool.class);
- private final AtomicInteger replicaIndex = new AtomicInteger(0);
+ private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
+ private final Logger logger = LoggerFactory.getLogger(ReplicatedJedisPool.class);
+ private final AtomicInteger replicaIndex = new AtomicInteger(0);
- private final JedisPool master;
- private final JedisPool[] replicas;
+ private final Supplier master;
+ private final ArrayList> replicas;
- public ReplicatedJedisPool(JedisPool master, List replicas) {
+ public ReplicatedJedisPool(String name,
+ JedisPool master,
+ List replicas,
+ CircuitBreakerConfiguration circuitBreakerConfiguration)
+ {
if (replicas.size() < 1) throw new IllegalArgumentException("There must be at least one replica");
- this.master = master;
- this.replicas = new JedisPool[replicas.size()];
- for (int i=0;i(replicas.size());
+
+ for (int i=0;i ()-> circuitBreaker.getState().getOrder());
+
+ circuitBreaker.getEventPublisher().onSuccess(event -> successMeter.mark());
+ circuitBreaker.getEventPublisher().onError(event -> failureMeter.mark());
+ circuitBreaker.getEventPublisher().onCallNotPermitted(event -> unpermittedMeter.mark());
+ }
+
}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java
index 36ae2b83c..5eefcfb99 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java
@@ -156,7 +156,7 @@ public class DirectoryManager {
Jedis jedis = handle.jedis;
pipeline.sync();
- redisPool.returnWriteResource(jedis);
+ jedis.close();
}
public static class BatchOperationHandle {
diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java
index 49f683d10..0d41326e8 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java
@@ -75,8 +75,8 @@ public class DeleteUserCommand extends EnvironmentCommand());
+ new ReplicatedJedisPool("testWriteCheckoutNoSlaves", master, new LinkedList<>(), new CircuitBreakerConfiguration());
throw new AssertionError();
} catch (Exception e) {
// good
@@ -35,7 +38,7 @@ public class ReplicatedJedisPoolTest {
when(master.getResource()).thenReturn(instance);
- ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Collections.singletonList(slave));
+ ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testWriteCheckoutWithSlaves", master, Collections.singletonList(slave), new CircuitBreakerConfiguration());
Jedis writeResource = replicatedJedisPool.getWriteResource();
assertThat(writeResource).isEqualTo(instance);
@@ -53,7 +56,7 @@ public class ReplicatedJedisPoolTest {
when(slaveOne.getResource()).thenReturn(instanceOne);
when(slaveTwo.getResource()).thenReturn(instanceTwo);
- ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo));
+ ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testReadCheckouts", master, Arrays.asList(slaveOne, slaveTwo), new CircuitBreakerConfiguration());
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceOne);
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
@@ -74,7 +77,7 @@ public class ReplicatedJedisPoolTest {
when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));
when(slaveTwo.getResource()).thenReturn(instanceTwo);
- ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo));
+ ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testBrokenReadCheckout", master, Arrays.asList(slaveOne, slaveTwo), new CircuitBreakerConfiguration());
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
@@ -92,7 +95,7 @@ public class ReplicatedJedisPoolTest {
when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));
when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!"));
- ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo));
+ ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testAllBrokenReadCheckout", master, Arrays.asList(slaveOne, slaveTwo), new CircuitBreakerConfiguration());
try {
replicatedJedisPool.getReadResource();
@@ -104,4 +107,96 @@ public class ReplicatedJedisPoolTest {
verifyNoMoreInteractions(master);
}
+ @Test
+ public void testCircuitBreakerOpen() {
+ CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration();
+ configuration.setFailureRateThreshold(50);
+ configuration.setRingBufferSizeInClosedState(2);
+
+ JedisPool master = mock(JedisPool.class);
+ JedisPool slaveOne = mock(JedisPool.class);
+ JedisPool slaveTwo = mock(JedisPool.class);
+
+ when(master.getResource()).thenReturn(null);
+ when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));
+ when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!"));
+
+ ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testCircuitBreakerOpen", master, Arrays.asList(slaveOne, slaveTwo), configuration);
+ replicatedJedisPool.getWriteResource();
+
+ when(master.getResource()).thenThrow(new JedisException("Master broken!"));
+
+ try {
+ replicatedJedisPool.getWriteResource();
+ throw new AssertionError();
+ } catch (JedisException exception) {
+ // good
+ }
+
+ try {
+ replicatedJedisPool.getWriteResource();
+ throw new AssertionError();
+ } catch (CircuitBreakerOpenException e) {
+ // good
+ }
+ }
+
+ @Test
+ public void testCircuitBreakerHalfOpen() throws InterruptedException {
+ CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration();
+ configuration.setFailureRateThreshold(50);
+ configuration.setRingBufferSizeInClosedState(2);
+ configuration.setRingBufferSizeInHalfOpenState(1);
+ configuration.setWaitDurationInOpenStateInSeconds(1);
+
+ JedisPool master = mock(JedisPool.class);
+ JedisPool slaveOne = mock(JedisPool.class);
+ JedisPool slaveTwo = mock(JedisPool.class);
+
+ when(master.getResource()).thenThrow(new JedisException("Master broken!"));
+ when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));
+ when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!"));
+
+ ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testCircuitBreakerHalfOpen", master, Arrays.asList(slaveOne, slaveTwo), configuration);
+
+ try {
+ replicatedJedisPool.getWriteResource();
+ throw new AssertionError();
+ } catch (JedisException exception) {
+ // good
+ }
+
+ try {
+ replicatedJedisPool.getWriteResource();
+ throw new AssertionError();
+ } catch (JedisException exception) {
+ // good
+ }
+
+ try {
+ replicatedJedisPool.getWriteResource();
+ throw new AssertionError();
+ } catch (CircuitBreakerOpenException e) {
+ // good
+ }
+
+ Thread.sleep(1100);
+
+ try {
+ replicatedJedisPool.getWriteResource();
+ throw new AssertionError();
+ } catch (JedisException exception) {
+ // good
+ }
+
+ try {
+ replicatedJedisPool.getWriteResource();
+ throw new AssertionError();
+ } catch (CircuitBreakerOpenException e) {
+ // good
+ }
+
+ }
+
+
}