From 1d11683ce8ec307406aaf6a1885562c4fce7326a Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Fri, 29 Mar 2019 21:52:37 -0700 Subject: [PATCH] Support for circuit breaker on redis pools --- pom.xml | 8 ++ .../textsecuregcm/WhisperServerService.java | 8 +- .../CircuitBreakerConfiguration.java | 69 ++++++++++++ .../configuration/RedisConfiguration.java | 10 ++ .../providers/RedisClientFactory.java | 9 +- .../redis/ReplicatedJedisPool.java | 73 +++++++++--- .../storage/DirectoryManager.java | 2 +- .../workers/DeleteUserCommand.java | 4 +- .../workers/DirectoryCommand.java | 4 +- .../tests/redis/ReplicatedJedisPoolTest.java | 105 +++++++++++++++++- 10 files changed, 258 insertions(+), 34 deletions(-) create mode 100644 src/main/java/org/whispersystems/textsecuregcm/configuration/CircuitBreakerConfiguration.java diff --git a/pom.xml b/pom.xml index 74d38992a..fad3d0a55 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,7 @@ 1.3.9 2.9.8 + 0.13.2 UTF-8 @@ -65,6 +66,13 @@ + + io.github.resilience4j + resilience4j-circuitbreaker + ${resilience4j.version} + + + com.amazonaws aws-java-sdk-s3 diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 150194acd..e4ee28319 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -160,10 +160,10 @@ public class WhisperServerService extends Application replicaUrls; + @JsonProperty + @NotNull + @Valid + private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration(); + public String getUrl() { return url; } @@ -41,4 +47,8 @@ public class RedisConfiguration { public List getReplicaUrls() { return replicaUrls; } + + public CircuitBreakerConfiguration getCircuitBreakerConfiguration() { + return circuitBreaker; + } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java b/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java index 7333f73f5..7991fa621 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java +++ b/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java @@ -1,4 +1,4 @@ -/** +/* * Copyright (C) 2013 Open WhisperSystems * * This program is free software: you can redistribute it and/or modify @@ -20,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory; import org.whispersystems.dispatch.redis.PubSubConnection; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.Util; @@ -42,7 +43,9 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory { private final int port; private final ReplicatedJedisPool jedisPool; - public RedisClientFactory(String url, List replicaUrls) throws URISyntaxException { + public RedisClientFactory(String name, String url, List replicaUrls, CircuitBreakerConfiguration circuitBreakerConfiguration) + throws URISyntaxException + { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setTestOnBorrow(true); @@ -63,7 +66,7 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory { null, null)); } - this.jedisPool = new ReplicatedJedisPool(masterPool, replicaPools); + this.jedisPool = new ReplicatedJedisPool(name, masterPool, replicaPools, circuitBreakerConfiguration); } public ReplicatedJedisPool getRedisClientPool() { diff --git a/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java b/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java index d112d6e4a..70596a0f6 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java +++ b/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java @@ -1,49 +1,76 @@ package org.whispersystems.textsecuregcm.redis; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.util.Constants; +import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import static com.codahale.metrics.MetricRegistry.name; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.exceptions.JedisException; public class ReplicatedJedisPool { - private final Logger logger = LoggerFactory.getLogger(ReplicatedJedisPool.class); - private final AtomicInteger replicaIndex = new AtomicInteger(0); + private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + private final Logger logger = LoggerFactory.getLogger(ReplicatedJedisPool.class); + private final AtomicInteger replicaIndex = new AtomicInteger(0); - private final JedisPool master; - private final JedisPool[] replicas; + private final Supplier master; + private final ArrayList> replicas; - public ReplicatedJedisPool(JedisPool master, List replicas) { + public ReplicatedJedisPool(String name, + JedisPool master, + List replicas, + CircuitBreakerConfiguration circuitBreakerConfiguration) + { if (replicas.size() < 1) throw new IllegalArgumentException("There must be at least one replica"); - this.master = master; - this.replicas = new JedisPool[replicas.size()]; - for (int i=0;i(replicas.size()); + + for (int i=0;i ()-> circuitBreaker.getState().getOrder()); + + circuitBreaker.getEventPublisher().onSuccess(event -> successMeter.mark()); + circuitBreaker.getEventPublisher().onError(event -> failureMeter.mark()); + circuitBreaker.getEventPublisher().onCallNotPermitted(event -> unpermittedMeter.mark()); + } + } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java index 36ae2b83c..5eefcfb99 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java @@ -156,7 +156,7 @@ public class DirectoryManager { Jedis jedis = handle.jedis; pipeline.sync(); - redisPool.returnWriteResource(jedis); + jedis.close(); } public static class BatchOperationHandle { diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index 49f683d10..0d41326e8 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -75,8 +75,8 @@ public class DeleteUserCommand extends EnvironmentCommand()); + new ReplicatedJedisPool("testWriteCheckoutNoSlaves", master, new LinkedList<>(), new CircuitBreakerConfiguration()); throw new AssertionError(); } catch (Exception e) { // good @@ -35,7 +38,7 @@ public class ReplicatedJedisPoolTest { when(master.getResource()).thenReturn(instance); - ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Collections.singletonList(slave)); + ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testWriteCheckoutWithSlaves", master, Collections.singletonList(slave), new CircuitBreakerConfiguration()); Jedis writeResource = replicatedJedisPool.getWriteResource(); assertThat(writeResource).isEqualTo(instance); @@ -53,7 +56,7 @@ public class ReplicatedJedisPoolTest { when(slaveOne.getResource()).thenReturn(instanceOne); when(slaveTwo.getResource()).thenReturn(instanceTwo); - ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo)); + ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testReadCheckouts", master, Arrays.asList(slaveOne, slaveTwo), new CircuitBreakerConfiguration()); assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceOne); assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo); @@ -74,7 +77,7 @@ public class ReplicatedJedisPoolTest { when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!")); when(slaveTwo.getResource()).thenReturn(instanceTwo); - ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo)); + ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testBrokenReadCheckout", master, Arrays.asList(slaveOne, slaveTwo), new CircuitBreakerConfiguration()); assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo); assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo); @@ -92,7 +95,7 @@ public class ReplicatedJedisPoolTest { when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!")); when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!")); - ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo)); + ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testAllBrokenReadCheckout", master, Arrays.asList(slaveOne, slaveTwo), new CircuitBreakerConfiguration()); try { replicatedJedisPool.getReadResource(); @@ -104,4 +107,96 @@ public class ReplicatedJedisPoolTest { verifyNoMoreInteractions(master); } + @Test + public void testCircuitBreakerOpen() { + CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration(); + configuration.setFailureRateThreshold(50); + configuration.setRingBufferSizeInClosedState(2); + + JedisPool master = mock(JedisPool.class); + JedisPool slaveOne = mock(JedisPool.class); + JedisPool slaveTwo = mock(JedisPool.class); + + when(master.getResource()).thenReturn(null); + when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!")); + when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!")); + + ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testCircuitBreakerOpen", master, Arrays.asList(slaveOne, slaveTwo), configuration); + replicatedJedisPool.getWriteResource(); + + when(master.getResource()).thenThrow(new JedisException("Master broken!")); + + try { + replicatedJedisPool.getWriteResource(); + throw new AssertionError(); + } catch (JedisException exception) { + // good + } + + try { + replicatedJedisPool.getWriteResource(); + throw new AssertionError(); + } catch (CircuitBreakerOpenException e) { + // good + } + } + + @Test + public void testCircuitBreakerHalfOpen() throws InterruptedException { + CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration(); + configuration.setFailureRateThreshold(50); + configuration.setRingBufferSizeInClosedState(2); + configuration.setRingBufferSizeInHalfOpenState(1); + configuration.setWaitDurationInOpenStateInSeconds(1); + + JedisPool master = mock(JedisPool.class); + JedisPool slaveOne = mock(JedisPool.class); + JedisPool slaveTwo = mock(JedisPool.class); + + when(master.getResource()).thenThrow(new JedisException("Master broken!")); + when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!")); + when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!")); + + ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testCircuitBreakerHalfOpen", master, Arrays.asList(slaveOne, slaveTwo), configuration); + + try { + replicatedJedisPool.getWriteResource(); + throw new AssertionError(); + } catch (JedisException exception) { + // good + } + + try { + replicatedJedisPool.getWriteResource(); + throw new AssertionError(); + } catch (JedisException exception) { + // good + } + + try { + replicatedJedisPool.getWriteResource(); + throw new AssertionError(); + } catch (CircuitBreakerOpenException e) { + // good + } + + Thread.sleep(1100); + + try { + replicatedJedisPool.getWriteResource(); + throw new AssertionError(); + } catch (JedisException exception) { + // good + } + + try { + replicatedJedisPool.getWriteResource(); + throw new AssertionError(); + } catch (CircuitBreakerOpenException e) { + // good + } + + } + + }