Support for circuit breaker on redis pools

This commit is contained in:
Moxie Marlinspike 2019-03-29 21:52:37 -07:00
parent cca4258887
commit 1d11683ce8
10 changed files with 258 additions and 34 deletions

View File

@ -14,6 +14,7 @@
<properties>
<dropwizard.version>1.3.9</dropwizard.version>
<jackson.api.version>2.9.8</jackson.api.version>
<resilience4j.version>0.13.2</resilience4j.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
@ -65,6 +66,13 @@
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>${resilience4j.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>

View File

@ -160,10 +160,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
Messages messages = messagedb.onDemand(Messages.class);
AbusiveHostRules abusiveHostRules = abusedb.onDemand(AbusiveHostRules.class);
RedisClientFactory cacheClientFactory = new RedisClientFactory(config.getCacheConfiguration().getUrl(), config.getCacheConfiguration().getReplicaUrls() );
RedisClientFactory directoryClientFactory = new RedisClientFactory(config.getDirectoryConfiguration().getRedisConfiguration().getUrl(), config.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls() );
RedisClientFactory messagesClientFactory = new RedisClientFactory(config.getMessageCacheConfiguration().getRedisConfiguration().getUrl(), config.getMessageCacheConfiguration().getRedisConfiguration().getReplicaUrls());
RedisClientFactory pushSchedulerClientFactory = new RedisClientFactory(config.getPushScheduler().getUrl(), config.getPushScheduler().getReplicaUrls() );
RedisClientFactory cacheClientFactory = new RedisClientFactory("main_cache", config.getCacheConfiguration().getUrl(), config.getCacheConfiguration().getReplicaUrls(), config.getCacheConfiguration().getCircuitBreakerConfiguration());
RedisClientFactory directoryClientFactory = new RedisClientFactory("directory_cache", config.getDirectoryConfiguration().getRedisConfiguration().getUrl(), config.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), config.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
RedisClientFactory messagesClientFactory = new RedisClientFactory("message_cache", config.getMessageCacheConfiguration().getRedisConfiguration().getUrl(), config.getMessageCacheConfiguration().getRedisConfiguration().getReplicaUrls(), config.getMessageCacheConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
RedisClientFactory pushSchedulerClientFactory = new RedisClientFactory("push_scheduler_cache", config.getPushScheduler().getUrl(), config.getPushScheduler().getReplicaUrls(), config.getPushScheduler().getCircuitBreakerConfiguration());
ReplicatedJedisPool cacheClient = cacheClientFactory.getRedisClientPool();
ReplicatedJedisPool directoryClient = directoryClientFactory.getRedisClientPool();

View File

@ -0,0 +1,69 @@
package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
public class CircuitBreakerConfiguration {
@JsonProperty
@NotNull
@Min(1)
@Max(100)
private int failureRateThreshold = 50;
@JsonProperty
@NotNull
@Min(1)
private int ringBufferSizeInHalfOpenState = 10;
@JsonProperty
@NotNull
@Min(1)
private int ringBufferSizeInClosedState = 100;
@JsonProperty
@NotNull
@Min(1)
private long waitDurationInOpenStateInSeconds = 10;
public int getFailureRateThreshold() {
return failureRateThreshold;
}
public int getRingBufferSizeInHalfOpenState() {
return ringBufferSizeInHalfOpenState;
}
public int getRingBufferSizeInClosedState() {
return ringBufferSizeInClosedState;
}
public long getWaitDurationInOpenStateInSeconds() {
return waitDurationInOpenStateInSeconds;
}
@VisibleForTesting
public void setFailureRateThreshold(int failureRateThreshold) {
this.failureRateThreshold = failureRateThreshold;
}
@VisibleForTesting
public void setRingBufferSizeInClosedState(int size) {
this.ringBufferSizeInClosedState = size;
}
@VisibleForTesting
public void setRingBufferSizeInHalfOpenState(int size) {
this.ringBufferSizeInHalfOpenState = size;
}
@VisibleForTesting
public void setWaitDurationInOpenStateInSeconds(int seconds) {
this.waitDurationInOpenStateInSeconds = seconds;
}
}

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.hibernate.validator.constraints.NotEmpty;
import org.hibernate.validator.constraints.URL;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import java.util.List;
@ -34,6 +35,11 @@ public class RedisConfiguration {
@NotNull
private List<String> replicaUrls;
@JsonProperty
@NotNull
@Valid
private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration();
public String getUrl() {
return url;
}
@ -41,4 +47,8 @@ public class RedisConfiguration {
public List<String> getReplicaUrls() {
return replicaUrls;
}
public CircuitBreakerConfiguration getCircuitBreakerConfiguration() {
return circuitBreaker;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Copyright (C) 2013 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
@ -20,6 +20,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory;
import org.whispersystems.dispatch.redis.PubSubConnection;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Util;
@ -42,7 +43,9 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory {
private final int port;
private final ReplicatedJedisPool jedisPool;
public RedisClientFactory(String url, List<String> replicaUrls) throws URISyntaxException {
public RedisClientFactory(String name, String url, List<String> replicaUrls, CircuitBreakerConfiguration circuitBreakerConfiguration)
throws URISyntaxException
{
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setTestOnBorrow(true);
@ -63,7 +66,7 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory {
null, null));
}
this.jedisPool = new ReplicatedJedisPool(masterPool, replicaPools);
this.jedisPool = new ReplicatedJedisPool(name, masterPool, replicaPools, circuitBreakerConfiguration);
}
public ReplicatedJedisPool getRedisClientPool() {

View File

@ -1,49 +1,76 @@
package org.whispersystems.textsecuregcm.redis;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.util.Constants;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import static com.codahale.metrics.MetricRegistry.name;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisException;
public class ReplicatedJedisPool {
private final Logger logger = LoggerFactory.getLogger(ReplicatedJedisPool.class);
private final AtomicInteger replicaIndex = new AtomicInteger(0);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Logger logger = LoggerFactory.getLogger(ReplicatedJedisPool.class);
private final AtomicInteger replicaIndex = new AtomicInteger(0);
private final JedisPool master;
private final JedisPool[] replicas;
private final Supplier<Jedis> master;
private final ArrayList<Supplier<Jedis>> replicas;
public ReplicatedJedisPool(JedisPool master, List<JedisPool> replicas) {
public ReplicatedJedisPool(String name,
JedisPool master,
List<JedisPool> replicas,
CircuitBreakerConfiguration circuitBreakerConfiguration)
{
if (replicas.size() < 1) throw new IllegalArgumentException("There must be at least one replica");
this.master = master;
this.replicas = new JedisPool[replicas.size()];
for (int i=0;i<this.replicas.length;i++) {
this.replicas[i] = replicas.get(i);
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(circuitBreakerConfiguration.getFailureRateThreshold())
.ringBufferSizeInHalfOpenState(circuitBreakerConfiguration.getRingBufferSizeInHalfOpenState())
.waitDurationInOpenState(Duration.ofSeconds(circuitBreakerConfiguration.getWaitDurationInOpenStateInSeconds()))
.ringBufferSizeInClosedState(circuitBreakerConfiguration.getRingBufferSizeInClosedState())
.build();
CircuitBreaker masterBreaker = CircuitBreaker.of(String.format("%s-master", name), config);
registerMetrics(masterBreaker);
this.master = CircuitBreaker.decorateSupplier(masterBreaker, master::getResource);
this.replicas = new ArrayList<>(replicas.size());
for (int i=0;i<replicas.size();i++) {
JedisPool replica = replicas.get(i);
CircuitBreaker slaveBreaker = CircuitBreaker.of(String.format("%s-slave-%d", name, i), config);
registerMetrics(slaveBreaker);
this.replicas.add(CircuitBreaker.decorateSupplier(slaveBreaker, replica::getResource));
}
}
public Jedis getWriteResource() {
return master.getResource();
}
public void returnWriteResource(Jedis jedis) {
master.returnResource(jedis);
return master.get();
}
public Jedis getReadResource() {
int failureCount = 0;
while (failureCount < replicas.length) {
while (failureCount < replicas.size()) {
try {
return replicas[replicaIndex.getAndIncrement() % replicas.length].getResource();
} catch (JedisException e) {
return replicas.get(replicaIndex.getAndIncrement() % replicas.size()).get();
} catch (RuntimeException e) {
logger.error("Failure obtaining read replica pool", e);
}
@ -53,4 +80,16 @@ public class ReplicatedJedisPool {
throw new JedisException("All read replica pools failed!");
}
private void registerMetrics(CircuitBreaker circuitBreaker) {
Meter successMeter = metricRegistry.meter(name(ReplicatedJedisPool.class, circuitBreaker.getName(), "success" ));
Meter failureMeter = metricRegistry.meter(name(ReplicatedJedisPool.class, circuitBreaker.getName(), "failure" ));
Meter unpermittedMeter = metricRegistry.meter(name(ReplicatedJedisPool.class, circuitBreaker.getName(), "unpermitted"));
metricRegistry.gauge(name(ReplicatedJedisPool.class, circuitBreaker.getName(), "state"), () -> ()-> circuitBreaker.getState().getOrder());
circuitBreaker.getEventPublisher().onSuccess(event -> successMeter.mark());
circuitBreaker.getEventPublisher().onError(event -> failureMeter.mark());
circuitBreaker.getEventPublisher().onCallNotPermitted(event -> unpermittedMeter.mark());
}
}

View File

@ -156,7 +156,7 @@ public class DirectoryManager {
Jedis jedis = handle.jedis;
pipeline.sync();
redisPool.returnWriteResource(jedis);
jedis.close();
}
public static class BatchOperationHandle {

View File

@ -75,8 +75,8 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
dbi.registerContainerFactory(new OptionalContainerFactory());
Accounts accounts = dbi.onDemand(Accounts.class);
ReplicatedJedisPool cacheClient = new RedisClientFactory(configuration.getCacheConfiguration().getUrl(), configuration.getCacheConfiguration().getReplicaUrls()).getRedisClientPool();
ReplicatedJedisPool redisClient = new RedisClientFactory(configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls()).getRedisClientPool();
ReplicatedJedisPool cacheClient = new RedisClientFactory("main_cache_delete_command", configuration.getCacheConfiguration().getUrl(), configuration.getCacheConfiguration().getReplicaUrls(), configuration.getCacheConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
ReplicatedJedisPool redisClient = new RedisClientFactory("directory_cache_delete_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
DirectoryQueue directoryQueue = new DirectoryQueue(configuration.getDirectoryConfiguration().getSqsConfiguration());
DirectoryManager directory = new DirectoryManager(redisClient);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient);

View File

@ -69,8 +69,8 @@ public class DirectoryCommand extends EnvironmentCommand<WhisperServerConfigurat
dbi.registerContainerFactory(new OptionalContainerFactory());
Accounts accounts = dbi.onDemand(Accounts.class);
ReplicatedJedisPool cacheClient = new RedisClientFactory(configuration.getCacheConfiguration().getUrl(), configuration.getCacheConfiguration().getReplicaUrls()).getRedisClientPool();
ReplicatedJedisPool redisClient = new RedisClientFactory(configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls()).getRedisClientPool();
ReplicatedJedisPool cacheClient = new RedisClientFactory("main_cache_directory_command", configuration.getCacheConfiguration().getUrl(), configuration.getCacheConfiguration().getReplicaUrls(), configuration.getCacheConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
ReplicatedJedisPool redisClient = new RedisClientFactory("directory_cache_directory_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
DirectoryManager directory = new DirectoryManager(redisClient);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient);

View File

@ -1,12 +1,15 @@
package org.whispersystems.textsecuregcm.tests.redis;
import org.junit.Test;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*;
import redis.clients.jedis.Jedis;
@ -20,7 +23,7 @@ public class ReplicatedJedisPoolTest {
JedisPool master = mock(JedisPool.class);
try {
new ReplicatedJedisPool(master, new LinkedList<>());
new ReplicatedJedisPool("testWriteCheckoutNoSlaves", master, new LinkedList<>(), new CircuitBreakerConfiguration());
throw new AssertionError();
} catch (Exception e) {
// good
@ -35,7 +38,7 @@ public class ReplicatedJedisPoolTest {
when(master.getResource()).thenReturn(instance);
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Collections.singletonList(slave));
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testWriteCheckoutWithSlaves", master, Collections.singletonList(slave), new CircuitBreakerConfiguration());
Jedis writeResource = replicatedJedisPool.getWriteResource();
assertThat(writeResource).isEqualTo(instance);
@ -53,7 +56,7 @@ public class ReplicatedJedisPoolTest {
when(slaveOne.getResource()).thenReturn(instanceOne);
when(slaveTwo.getResource()).thenReturn(instanceTwo);
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo));
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testReadCheckouts", master, Arrays.asList(slaveOne, slaveTwo), new CircuitBreakerConfiguration());
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceOne);
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
@ -74,7 +77,7 @@ public class ReplicatedJedisPoolTest {
when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));
when(slaveTwo.getResource()).thenReturn(instanceTwo);
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo));
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testBrokenReadCheckout", master, Arrays.asList(slaveOne, slaveTwo), new CircuitBreakerConfiguration());
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
@ -92,7 +95,7 @@ public class ReplicatedJedisPoolTest {
when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));
when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!"));
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo));
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testAllBrokenReadCheckout", master, Arrays.asList(slaveOne, slaveTwo), new CircuitBreakerConfiguration());
try {
replicatedJedisPool.getReadResource();
@ -104,4 +107,96 @@ public class ReplicatedJedisPoolTest {
verifyNoMoreInteractions(master);
}
@Test
public void testCircuitBreakerOpen() {
CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration();
configuration.setFailureRateThreshold(50);
configuration.setRingBufferSizeInClosedState(2);
JedisPool master = mock(JedisPool.class);
JedisPool slaveOne = mock(JedisPool.class);
JedisPool slaveTwo = mock(JedisPool.class);
when(master.getResource()).thenReturn(null);
when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));
when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!"));
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testCircuitBreakerOpen", master, Arrays.asList(slaveOne, slaveTwo), configuration);
replicatedJedisPool.getWriteResource();
when(master.getResource()).thenThrow(new JedisException("Master broken!"));
try {
replicatedJedisPool.getWriteResource();
throw new AssertionError();
} catch (JedisException exception) {
// good
}
try {
replicatedJedisPool.getWriteResource();
throw new AssertionError();
} catch (CircuitBreakerOpenException e) {
// good
}
}
@Test
public void testCircuitBreakerHalfOpen() throws InterruptedException {
CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration();
configuration.setFailureRateThreshold(50);
configuration.setRingBufferSizeInClosedState(2);
configuration.setRingBufferSizeInHalfOpenState(1);
configuration.setWaitDurationInOpenStateInSeconds(1);
JedisPool master = mock(JedisPool.class);
JedisPool slaveOne = mock(JedisPool.class);
JedisPool slaveTwo = mock(JedisPool.class);
when(master.getResource()).thenThrow(new JedisException("Master broken!"));
when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));
when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!"));
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool("testCircuitBreakerHalfOpen", master, Arrays.asList(slaveOne, slaveTwo), configuration);
try {
replicatedJedisPool.getWriteResource();
throw new AssertionError();
} catch (JedisException exception) {
// good
}
try {
replicatedJedisPool.getWriteResource();
throw new AssertionError();
} catch (JedisException exception) {
// good
}
try {
replicatedJedisPool.getWriteResource();
throw new AssertionError();
} catch (CircuitBreakerOpenException e) {
// good
}
Thread.sleep(1100);
try {
replicatedJedisPool.getWriteResource();
throw new AssertionError();
} catch (JedisException exception) {
// good
}
try {
replicatedJedisPool.getWriteResource();
throw new AssertionError();
} catch (CircuitBreakerOpenException e) {
// good
}
}
}