diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/CircuitBreakerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/CircuitBreakerConfiguration.java index fa6cbdf40..2fd70bec1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/CircuitBreakerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/CircuitBreakerConfiguration.java @@ -27,12 +27,17 @@ public class CircuitBreakerConfiguration { @JsonProperty @NotNull @Min(1) - private int ringBufferSizeInHalfOpenState = 10; + private int permittedNumberOfCallsInHalfOpenState = 10; @JsonProperty @NotNull @Min(1) - private int ringBufferSizeInClosedState = 100; + private int slidingWindowSize = 100; + + @JsonProperty + @NotNull + @Min(1) + private int slidingWindowMinimumNumberOfCalls = 100; @JsonProperty @NotNull @@ -47,28 +52,32 @@ public class CircuitBreakerConfiguration { return failureRateThreshold; } - public int getRingBufferSizeInHalfOpenState() { - return ringBufferSizeInHalfOpenState; + public int getPermittedNumberOfCallsInHalfOpenState() { + return permittedNumberOfCallsInHalfOpenState; } - public int getRingBufferSizeInClosedState() { - return ringBufferSizeInClosedState; + public int getSlidingWindowSize() { + return slidingWindowSize; + } + + public int getSlidingWindowMinimumNumberOfCalls() { + return slidingWindowMinimumNumberOfCalls; } public long getWaitDurationInOpenStateInSeconds() { return waitDurationInOpenStateInSeconds; } - public List getIgnoredExceptions() { - return ignoredExceptions.stream() - .map(name -> { - try { - return Class.forName(name); - } catch (final ClassNotFoundException e) { - throw new RuntimeException(e); - } - }) - .collect(Collectors.toList()); + public List> getIgnoredExceptions() { + return ignoredExceptions.stream() + .map(name -> { + try { + return Class.forName(name); + } catch (final ClassNotFoundException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); } @VisibleForTesting @@ -77,13 +86,18 @@ public class CircuitBreakerConfiguration { } @VisibleForTesting - public void setRingBufferSizeInClosedState(int size) { - this.ringBufferSizeInClosedState = size; + public void setSlidingWindowSize(int size) { + this.slidingWindowSize = size; } @VisibleForTesting - public void setRingBufferSizeInHalfOpenState(int size) { - this.ringBufferSizeInHalfOpenState = size; + public void setSlidingWindowMinimumNumberOfCalls(int size) { + this.slidingWindowMinimumNumberOfCalls = size; + } + + @VisibleForTesting + public void setPermittedNumberOfCallsInHalfOpenState(int size) { + this.permittedNumberOfCallsInHalfOpenState = size; } @VisibleForTesting @@ -98,11 +112,12 @@ public class CircuitBreakerConfiguration { public CircuitBreakerConfig toCircuitBreakerConfig() { return CircuitBreakerConfig.custom() - .failureRateThreshold(getFailureRateThreshold()) - .ignoreExceptions(getIgnoredExceptions().toArray(new Class[0])) - .ringBufferSizeInHalfOpenState(getRingBufferSizeInHalfOpenState()) - .waitDurationInOpenState(Duration.ofSeconds(getWaitDurationInOpenStateInSeconds())) - .ringBufferSizeInClosedState(getRingBufferSizeInClosedState()) - .build(); + .failureRateThreshold(getFailureRateThreshold()) + .ignoreExceptions(getIgnoredExceptions().toArray(new Class[0])) + .permittedNumberOfCallsInHalfOpenState(getPermittedNumberOfCallsInHalfOpenState()) + .waitDurationInOpenState(Duration.ofSeconds(getWaitDurationInOpenStateInSeconds())) + .slidingWindow(getSlidingWindowSize(), getSlidingWindowMinimumNumberOfCalls(), + CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) + .build(); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java index 5262ccccf..a2fb5b7ae 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantPubSubConnectionTest.java @@ -19,8 +19,8 @@ import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisException; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; @@ -32,25 +32,28 @@ class FaultTolerantPubSubConnectionTest { @SuppressWarnings("unchecked") @BeforeEach public void setUp() { - final StatefulRedisClusterPubSubConnection pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class); + final StatefulRedisClusterPubSubConnection pubSubConnection = mock( + StatefulRedisClusterPubSubConnection.class); - pubSubCommands = mock(RedisClusterPubSubCommands.class); + pubSubCommands = mock(RedisClusterPubSubCommands.class); - when(pubSubConnection.sync()).thenReturn(pubSubCommands); + when(pubSubConnection.sync()).thenReturn(pubSubCommands); - final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); - breakerConfiguration.setFailureRateThreshold(100); - breakerConfiguration.setRingBufferSizeInClosedState(1); - breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); + final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); + breakerConfiguration.setFailureRateThreshold(100); + breakerConfiguration.setSlidingWindowSize(1); + breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1); + breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); - final RetryConfiguration retryConfiguration = new RetryConfiguration(); - retryConfiguration.setMaxAttempts(3); - retryConfiguration.setWaitDuration(0); + final RetryConfiguration retryConfiguration = new RetryConfiguration(); + retryConfiguration.setMaxAttempts(3); + retryConfiguration.setWaitDuration(0); - final CircuitBreaker circuitBreaker = CircuitBreaker.of("test", breakerConfiguration.toCircuitBreakerConfig()); - final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig()); + final CircuitBreaker circuitBreaker = CircuitBreaker.of("test", breakerConfiguration.toCircuitBreakerConfig()); + final Retry retry = Retry.of("test", retryConfiguration.toRetryConfig()); - faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, circuitBreaker, retry); + faultTolerantPubSubConnection = new FaultTolerantPubSubConnection<>("test", pubSubConnection, circuitBreaker, + retry); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java index 57d09e50c..922b433f5 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java @@ -22,8 +22,8 @@ import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.event.EventBus; import io.lettuce.core.resource.ClientResources; import java.time.Duration; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import reactor.core.publisher.Flux; @@ -44,23 +44,25 @@ class FaultTolerantRedisClusterTest { clusterCommands = mock(RedisAdvancedClusterCommands.class); - when(clusterClient.connect()).thenReturn(clusterConnection); - when(clusterClient.connectPubSub()).thenReturn(pubSubConnection); - when(clusterClient.getResources()).thenReturn(clientResources); - when(clusterConnection.sync()).thenReturn(clusterCommands); - when(clientResources.eventBus()).thenReturn(eventBus); - when(eventBus.get()).thenReturn(mock(Flux.class)); + when(clusterClient.connect()).thenReturn(clusterConnection); + when(clusterClient.connectPubSub()).thenReturn(pubSubConnection); + when(clusterClient.getResources()).thenReturn(clientResources); + when(clusterConnection.sync()).thenReturn(clusterCommands); + when(clientResources.eventBus()).thenReturn(eventBus); + when(eventBus.get()).thenReturn(mock(Flux.class)); - final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); - breakerConfiguration.setFailureRateThreshold(100); - breakerConfiguration.setRingBufferSizeInClosedState(1); - breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); + final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration(); + breakerConfiguration.setFailureRateThreshold(100); + breakerConfiguration.setSlidingWindowSize(1); + breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1); + breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); - final RetryConfiguration retryConfiguration = new RetryConfiguration(); - retryConfiguration.setMaxAttempts(3); - retryConfiguration.setWaitDuration(0); + final RetryConfiguration retryConfiguration = new RetryConfiguration(); + retryConfiguration.setMaxAttempts(3); + retryConfiguration.setWaitDuration(0); - faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), breakerConfiguration, retryConfiguration); + faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), + breakerConfiguration, retryConfiguration); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/http/FaultTolerantHttpClientTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/http/FaultTolerantHttpClientTest.java index f5de71037..a3d963b9b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/http/FaultTolerantHttpClientTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/http/FaultTolerantHttpClientTest.java @@ -95,18 +95,19 @@ class FaultTolerantHttpClientTest { @Test void testNetworkFailureCircuitBreaker() throws InterruptedException { CircuitBreakerConfiguration circuitBreakerConfiguration = new CircuitBreakerConfiguration(); - circuitBreakerConfiguration.setRingBufferSizeInClosedState(2); - circuitBreakerConfiguration.setRingBufferSizeInHalfOpenState(1); + circuitBreakerConfiguration.setSlidingWindowSize(2); + circuitBreakerConfiguration.setSlidingWindowMinimumNumberOfCalls(2); + circuitBreakerConfiguration.setPermittedNumberOfCallsInHalfOpenState(1); circuitBreakerConfiguration.setFailureRateThreshold(50); circuitBreakerConfiguration.setWaitDurationInOpenStateInSeconds(1); FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder() - .withCircuitBreaker(circuitBreakerConfiguration) - .withRetry(new RetryConfiguration()) - .withExecutor(Executors.newSingleThreadExecutor()) - .withName("test") - .withVersion(HttpClient.Version.HTTP_2) - .build(); + .withCircuitBreaker(circuitBreakerConfiguration) + .withRetry(new RetryConfiguration()) + .withExecutor(Executors.newSingleThreadExecutor()) + .withName("test") + .withVersion(HttpClient.Version.HTTP_2) + .build(); HttpRequest request = HttpRequest.newBuilder() .uri(URI.create("http://localhost:" + 39873 + "/failure")) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/redis/ReplicatedJedisPoolTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/redis/ReplicatedJedisPoolTest.java index 5dd118f19..bba66d03f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/redis/ReplicatedJedisPoolTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/redis/ReplicatedJedisPoolTest.java @@ -118,17 +118,19 @@ class ReplicatedJedisPoolTest { void testCircuitBreakerOpen() { CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration(); configuration.setFailureRateThreshold(50); - configuration.setRingBufferSizeInClosedState(2); + configuration.setSlidingWindowSize(2); + configuration.setSlidingWindowMinimumNumberOfCalls(2); - JedisPool master = mock(JedisPool.class); - JedisPool slaveOne = mock(JedisPool.class); - JedisPool slaveTwo = mock(JedisPool.class); + JedisPool master = mock(JedisPool.class); + JedisPool slaveOne = mock(JedisPool.class); + JedisPool slaveTwo = mock(JedisPool.class); when(master.getResource()).thenReturn(null); when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!")); when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!")); - ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testCircuitBreakerOpen", master, Arrays.asList(slaveOne, slaveTwo), configuration); + ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testCircuitBreakerOpen", master, + Arrays.asList(slaveOne, slaveTwo), configuration); replicatedJedisPool.getWriteResource(); when(master.getResource()).thenThrow(new JedisException("Master broken!")); @@ -152,13 +154,14 @@ class ReplicatedJedisPoolTest { void testCircuitBreakerHalfOpen() throws InterruptedException { CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration(); configuration.setFailureRateThreshold(50); - configuration.setRingBufferSizeInClosedState(2); - configuration.setRingBufferSizeInHalfOpenState(1); + configuration.setSlidingWindowSize(2); + configuration.setSlidingWindowMinimumNumberOfCalls(2); + configuration.setPermittedNumberOfCallsInHalfOpenState(1); configuration.setWaitDurationInOpenStateInSeconds(1); - JedisPool master = mock(JedisPool.class); - JedisPool slaveOne = mock(JedisPool.class); - JedisPool slaveTwo = mock(JedisPool.class); + JedisPool master = mock(JedisPool.class); + JedisPool slaveOne = mock(JedisPool.class); + JedisPool slaveTwo = mock(JedisPool.class); when(master.getResource()).thenThrow(new JedisException("Master broken!")); when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));