From d243b73678f37ddb0142876de601d09fdfa6b006 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Tue, 18 Aug 2020 11:33:52 -0400 Subject: [PATCH] Make Lettuce connection pools configurable. Double the default size. --- .../textsecuregcm/WhisperServerService.java | 6 ++--- .../RedisClusterConfiguration.java | 8 ++++++ .../RedisConnectionPoolConfiguration.java | 26 +++++++++++++++++++ .../redis/FaultTolerantRedisCluster.java | 25 +++++++++++++----- .../ClearMessagesCacheClusterCommand.java | 2 +- .../workers/DeleteUserCommand.java | 2 +- .../redis/AbstractRedisClusterTest.java | 9 ++++++- .../redis/FaultTolerantRedisClusterTest.java | 3 ++- 8 files changed, 67 insertions(+), 14 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisConnectionPoolConfiguration.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 224b99e57..bb6521c57 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -333,9 +333,9 @@ public class WhisperServerService extends Application(1_000)).build(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java index 11b7b5d10..12088d75a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java @@ -23,6 +23,10 @@ public class RedisClusterConfiguration { @Valid private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration(); + @JsonProperty + @Valid + private RedisConnectionPoolConfiguration connectionPool = new RedisConnectionPoolConfiguration(); + public List getUrls() { return urls; } @@ -34,4 +38,8 @@ public class RedisClusterConfiguration { public CircuitBreakerConfiguration getCircuitBreakerConfiguration() { return circuitBreaker; } + + public RedisConnectionPoolConfiguration getConnectionPoolConfiguration() { + return connectionPool; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisConnectionPoolConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisConnectionPoolConfiguration.java new file mode 100644 index 000000000..3aa8ec680 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisConnectionPoolConfiguration.java @@ -0,0 +1,26 @@ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import java.time.Duration; + +public class RedisConnectionPoolConfiguration { + + @JsonProperty + @Min(1) + private int poolSize = 16; + + @JsonProperty + @NotNull + private Duration maxWait = Duration.ofSeconds(10); + + public int getPoolSize() { + return poolSize; + } + + public Duration getMaxWait() { + return maxWait; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 80cfedd8c..66c7f9652 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -14,6 +14,8 @@ import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration; +import org.whispersystems.textsecuregcm.configuration.RedisConnectionPoolConfiguration; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.Constants; @@ -44,22 +46,31 @@ public class FaultTolerantRedisCluster { private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisCluster.class); - public FaultTolerantRedisCluster(final String name, final List urls, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) { - this(name, RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())), timeout, circuitBreakerConfiguration); + public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration) { + this(name, + RedisClusterClient.create(clusterConfiguration.getUrls().stream().map(RedisURI::create).collect(Collectors.toList())), + clusterConfiguration.getTimeout(), + clusterConfiguration.getCircuitBreakerConfiguration(), + clusterConfiguration.getConnectionPoolConfiguration()); } @VisibleForTesting - FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) { + FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RedisConnectionPoolConfiguration connectionPoolConfiguration) { this.name = name; this.clusterClient = clusterClient; this.clusterClient.setDefaultTimeout(timeout); - //noinspection unchecked,rawtypes,rawtypes - this.stringConnectionPool = ConnectionPoolSupport.createGenericObjectPool(clusterClient::connect, new GenericObjectPoolConfig()); + @SuppressWarnings("rawtypes") final GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); + poolConfig.setMaxIdle(connectionPoolConfiguration.getPoolSize()); + poolConfig.setMaxTotal(connectionPoolConfiguration.getPoolSize()); + poolConfig.setMaxWaitMillis(connectionPoolConfiguration.getMaxWait().toMillis()); - //noinspection unchecked,rawtypes,rawtypes - this.binaryConnectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> clusterClient.connect(ByteArrayCodec.INSTANCE), new GenericObjectPoolConfig()); + //noinspection unchecked + this.stringConnectionPool = ConnectionPoolSupport.createGenericObjectPool(clusterClient::connect, poolConfig); + + //noinspection unchecked + this.binaryConnectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> clusterClient.connect(ByteArrayCodec.INSTANCE), poolConfig); this.circuitBreakerConfiguration = circuitBreakerConfiguration; this.circuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearMessagesCacheClusterCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearMessagesCacheClusterCommand.java index ad7a994db..b57b0edaa 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearMessagesCacheClusterCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearMessagesCacheClusterCommand.java @@ -14,7 +14,7 @@ public class ClearMessagesCacheClusterCommand extends ConfiguredCommand bootstrap, final Namespace namespace, final WhisperServerConfiguration config) { - final FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration().getUrls(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getTimeout(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getCircuitBreakerConfiguration()); + final FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration()); messagesCacheCluster.useCluster(connection -> connection.sync().masters().commands().flushallAsync()); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index c9d77d187..0138b5cdc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -66,7 +66,7 @@ public class DeleteUserCommand extends EnvironmentCommand String.format("redis://127.0.0.1:%d", node.ports().get(0))) .collect(Collectors.toList()); - redisCluster = new FaultTolerantRedisCluster("test-cluster", urls, Duration.ofSeconds(2), new CircuitBreakerConfiguration()); + redisCluster = new FaultTolerantRedisCluster("test-cluster", + RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())), + Duration.ofSeconds(2), + new CircuitBreakerConfiguration(), + new RedisConnectionPoolConfiguration()); redisCluster.useCluster(connection -> { boolean setAll = false; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java index 488080b06..5f03e760f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java @@ -9,6 +9,7 @@ import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import org.junit.Before; import org.junit.Test; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RedisConnectionPoolConfiguration; import java.time.Duration; @@ -40,7 +41,7 @@ public class FaultTolerantRedisClusterTest { breakerConfiguration.setRingBufferSizeInClosedState(1); breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); - faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), breakerConfiguration); + faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), breakerConfiguration, new RedisConnectionPoolConfiguration()); } @Test