diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java index 12088d75a..11b7b5d10 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java @@ -23,10 +23,6 @@ public class RedisClusterConfiguration { @Valid private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration(); - @JsonProperty - @Valid - private RedisConnectionPoolConfiguration connectionPool = new RedisConnectionPoolConfiguration(); - public List getUrls() { return urls; } @@ -38,8 +34,4 @@ public class RedisClusterConfiguration { public CircuitBreakerConfiguration getCircuitBreakerConfiguration() { return circuitBreaker; } - - public RedisConnectionPoolConfiguration getConnectionPoolConfiguration() { - return connectionPool; - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisConnectionPoolConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisConnectionPoolConfiguration.java deleted file mode 100644 index 3aa8ec680..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisConnectionPoolConfiguration.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.whispersystems.textsecuregcm.configuration; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; -import java.time.Duration; - -public class RedisConnectionPoolConfiguration { - - @JsonProperty - @Min(1) - private int poolSize = 16; - - @JsonProperty - @NotNull - private Duration maxWait = Duration.ofSeconds(10); - - public int getPoolSize() { - return poolSize; - } - - public Duration getMaxWait() { - return maxWait; - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 6f97c5940..f577bf09e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -10,14 +10,10 @@ import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.codec.ByteArrayCodec; -import io.lettuce.core.support.ConnectionPoolSupport; -import org.apache.commons.pool2.impl.GenericObjectPool; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration; -import org.whispersystems.textsecuregcm.configuration.RedisConnectionPoolConfiguration; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.Constants; @@ -32,7 +28,7 @@ import static com.codahale.metrics.MetricRegistry.name; /** * A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed, - * circuit-breaker-protected access to a pool of connections. + * circuit-breaker-protected access to connections. */ public class FaultTolerantRedisCluster { @@ -40,15 +36,14 @@ public class FaultTolerantRedisCluster { private final RedisClusterClient clusterClient; - private final GenericObjectPool> stringConnectionPool; - private final GenericObjectPool> binaryConnectionPool; + private final StatefulRedisClusterConnection stringConnection; + private final StatefulRedisClusterConnection binaryConnection; private final List> pubSubConnections = new ArrayList<>(); private final CircuitBreakerConfiguration circuitBreakerConfiguration; private final CircuitBreaker circuitBreaker; - private final Timer acquireAndExecuteTimer; private final Timer executeTimer; private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisCluster.class); @@ -57,27 +52,18 @@ public class FaultTolerantRedisCluster { this(name, RedisClusterClient.create(clusterConfiguration.getUrls().stream().map(RedisURI::create).collect(Collectors.toList())), clusterConfiguration.getTimeout(), - clusterConfiguration.getCircuitBreakerConfiguration(), - clusterConfiguration.getConnectionPoolConfiguration()); + clusterConfiguration.getCircuitBreakerConfiguration()); } @VisibleForTesting - FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RedisConnectionPoolConfiguration connectionPoolConfiguration) { + FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) { this.name = name; this.clusterClient = clusterClient; this.clusterClient.setDefaultTimeout(timeout); - @SuppressWarnings("rawtypes") final GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); - poolConfig.setMaxIdle(connectionPoolConfiguration.getPoolSize()); - poolConfig.setMaxTotal(connectionPoolConfiguration.getPoolSize()); - poolConfig.setMaxWaitMillis(connectionPoolConfiguration.getMaxWait().toMillis()); - - //noinspection unchecked - this.stringConnectionPool = ConnectionPoolSupport.createGenericObjectPool(clusterClient::connect, poolConfig); - - //noinspection unchecked - this.binaryConnectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> clusterClient.connect(ByteArrayCodec.INSTANCE), poolConfig); + this.stringConnection = clusterClient.connect(); + this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); this.circuitBreakerConfiguration = circuitBreakerConfiguration; this.circuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig()); @@ -88,13 +74,12 @@ public class FaultTolerantRedisCluster { final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - this.acquireAndExecuteTimer = metricRegistry.timer(name(getClass(), name, "acquireConnectionAndExecute")); - this.executeTimer = metricRegistry.timer(name(getClass(), name, "execute")); + this.executeTimer = metricRegistry.timer(name(getClass(), name, "execute")); } void shutdown() { - stringConnectionPool.close(); - binaryConnectionPool.close(); + stringConnection.close(); + binaryConnection.close(); for (final StatefulRedisClusterPubSubConnection pubSubConnection : pubSubConnections) { pubSubConnection.close(); @@ -104,30 +89,26 @@ public class FaultTolerantRedisCluster { } public void useCluster(final Consumer> consumer) { - acceptPooledConnection(stringConnectionPool, consumer); + useConnection(stringConnection, consumer); } public T withCluster(final Function, T> function) { - return applyToPooledConnection(stringConnectionPool, function); + return withConnection(stringConnection, function); } public void useBinaryCluster(final Consumer> consumer) { - acceptPooledConnection(binaryConnectionPool, consumer); + useConnection(binaryConnection, consumer); } public T withBinaryCluster(final Function, T> function) { - return applyToPooledConnection(binaryConnectionPool, function); + return withConnection(binaryConnection, function); } - private void acceptPooledConnection(final GenericObjectPool> pool, final Consumer> consumer) { + private void useConnection(final StatefulRedisClusterConnection connection, final Consumer> consumer) { try { circuitBreaker.executeCheckedRunnable(() -> { - try (final Timer.Context ignored = acquireAndExecuteTimer.time(); - final StatefulRedisClusterConnection connection = pool.borrowObject()) { - - try (final Timer.Context ignored2 = executeTimer.time()) { - consumer.accept(connection); - } + try (final Timer.Context ignored = executeTimer.time()) { + consumer.accept(connection); } }); } catch (final Throwable t) { @@ -141,15 +122,11 @@ public class FaultTolerantRedisCluster { } } - private T applyToPooledConnection(final GenericObjectPool> pool, final Function, T> function) { + private T withConnection(final StatefulRedisClusterConnection connection, final Function, T> function) { try { return circuitBreaker.executeCheckedSupplier(() -> { - try (final Timer.Context ignored = acquireAndExecuteTimer.time(); - final StatefulRedisClusterConnection connection = pool.borrowObject()) { - - try (final Timer.Context ignored2 = executeTimer.time()) { - return function.apply(connection); - } + try (final Timer.Context ignored = executeTimer.time()) { + return function.apply(connection); } }); } catch (final Throwable t) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java index b4cda3f1f..02ab7df80 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java @@ -12,8 +12,6 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration; -import org.whispersystems.textsecuregcm.configuration.RedisConnectionPoolConfiguration; import org.whispersystems.textsecuregcm.util.RedisClusterUtil; import redis.embedded.RedisServer; @@ -62,8 +60,7 @@ public abstract class AbstractRedisClusterTest { redisCluster = new FaultTolerantRedisCluster("test-cluster", RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())), Duration.ofSeconds(2), - new CircuitBreakerConfiguration(), - new RedisConnectionPoolConfiguration()); + new CircuitBreakerConfiguration()); redisCluster.useCluster(connection -> { boolean setAll = false; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java index 5f03e760f..488080b06 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterTest.java @@ -9,7 +9,6 @@ import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import org.junit.Before; import org.junit.Test; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.configuration.RedisConnectionPoolConfiguration; import java.time.Duration; @@ -41,7 +40,7 @@ public class FaultTolerantRedisClusterTest { breakerConfiguration.setRingBufferSizeInClosedState(1); breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE); - faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), breakerConfiguration, new RedisConnectionPoolConfiguration()); + faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), breakerConfiguration); } @Test