Make Lettuce connection pools configurable. Double the default size.

This commit is contained in:
Jon Chambers 2020-08-18 11:33:52 -04:00 committed by Jon Chambers
parent b9abd2f9a5
commit d243b73678
8 changed files with 67 additions and 14 deletions

View File

@ -333,9 +333,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ReplicatedJedisPool messagesClient = messagesClientFactory.getRedisClientPool();
ReplicatedJedisPool pushSchedulerClient = pushSchedulerClientFactory.getRedisClientPool();
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration().getUrls(), config.getCacheClusterConfiguration().getTimeout(), config.getCacheClusterConfiguration().getCircuitBreakerConfiguration());
FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration().getUrls(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getTimeout(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getCircuitBreakerConfiguration());
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration().getUrls(), config.getMetricsClusterConfiguration().getTimeout(), config.getMetricsClusterConfiguration().getCircuitBreakerConfiguration());
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration());
FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration());
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration());
ScheduledExecutorService clientPresenceExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build();
ExecutorService messageNotificationExecutor = environment.lifecycle().executorService("messageCacheNotifications").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();

View File

@ -23,6 +23,10 @@ public class RedisClusterConfiguration {
@Valid
private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration();
@JsonProperty
@Valid
private RedisConnectionPoolConfiguration connectionPool = new RedisConnectionPoolConfiguration();
public List<String> getUrls() {
return urls;
}
@ -34,4 +38,8 @@ public class RedisClusterConfiguration {
public CircuitBreakerConfiguration getCircuitBreakerConfiguration() {
return circuitBreaker;
}
public RedisConnectionPoolConfiguration getConnectionPoolConfiguration() {
return connectionPool;
}
}

View File

@ -0,0 +1,26 @@
package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.time.Duration;
public class RedisConnectionPoolConfiguration {
@JsonProperty
@Min(1)
private int poolSize = 16;
@JsonProperty
@NotNull
private Duration maxWait = Duration.ofSeconds(10);
public int getPoolSize() {
return poolSize;
}
public Duration getMaxWait() {
return maxWait;
}
}

View File

@ -14,6 +14,8 @@ import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisConnectionPoolConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.Constants;
@ -44,22 +46,31 @@ public class FaultTolerantRedisCluster {
private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisCluster.class);
public FaultTolerantRedisCluster(final String name, final List<String> urls, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) {
this(name, RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())), timeout, circuitBreakerConfiguration);
public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration) {
this(name,
RedisClusterClient.create(clusterConfiguration.getUrls().stream().map(RedisURI::create).collect(Collectors.toList())),
clusterConfiguration.getTimeout(),
clusterConfiguration.getCircuitBreakerConfiguration(),
clusterConfiguration.getConnectionPoolConfiguration());
}
@VisibleForTesting
FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) {
FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RedisConnectionPoolConfiguration connectionPoolConfiguration) {
this.name = name;
this.clusterClient = clusterClient;
this.clusterClient.setDefaultTimeout(timeout);
//noinspection unchecked,rawtypes,rawtypes
this.stringConnectionPool = ConnectionPoolSupport.createGenericObjectPool(clusterClient::connect, new GenericObjectPoolConfig());
@SuppressWarnings("rawtypes") final GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxIdle(connectionPoolConfiguration.getPoolSize());
poolConfig.setMaxTotal(connectionPoolConfiguration.getPoolSize());
poolConfig.setMaxWaitMillis(connectionPoolConfiguration.getMaxWait().toMillis());
//noinspection unchecked,rawtypes,rawtypes
this.binaryConnectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> clusterClient.connect(ByteArrayCodec.INSTANCE), new GenericObjectPoolConfig());
//noinspection unchecked
this.stringConnectionPool = ConnectionPoolSupport.createGenericObjectPool(clusterClient::connect, poolConfig);
//noinspection unchecked
this.binaryConnectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> clusterClient.connect(ByteArrayCodec.INSTANCE), poolConfig);
this.circuitBreakerConfiguration = circuitBreakerConfiguration;
this.circuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig());

View File

@ -14,7 +14,7 @@ public class ClearMessagesCacheClusterCommand extends ConfiguredCommand<WhisperS
@Override
protected void run(final Bootstrap<WhisperServerConfiguration> bootstrap, final Namespace namespace, final WhisperServerConfiguration config) {
final FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration().getUrls(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getTimeout(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getCircuitBreakerConfiguration());
final FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration());
messagesCacheCluster.useCluster(connection -> connection.sync().masters().commands().flushallAsync());
}
}

View File

@ -66,7 +66,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
Jdbi accountJdbi = jdbiFactory.build(environment, configuration.getAccountsDatabaseConfiguration(), "accountdb");
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_delete_user", accountJdbi, configuration.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration().getUrls(), configuration.getCacheClusterConfiguration().getTimeout(), configuration.getCacheClusterConfiguration().getCircuitBreakerConfiguration());
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration());
Accounts accounts = new Accounts(accountDatabase);
ReplicatedJedisPool redisClient = new RedisClientFactory("directory_cache_delete_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();

View File

@ -5,12 +5,15 @@ import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.SlotHash;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisConnectionPoolConfiguration;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import redis.embedded.RedisServer;
@ -56,7 +59,11 @@ public abstract class AbstractRedisClusterTest {
.map(node -> String.format("redis://127.0.0.1:%d", node.ports().get(0)))
.collect(Collectors.toList());
redisCluster = new FaultTolerantRedisCluster("test-cluster", urls, Duration.ofSeconds(2), new CircuitBreakerConfiguration());
redisCluster = new FaultTolerantRedisCluster("test-cluster",
RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())),
Duration.ofSeconds(2),
new CircuitBreakerConfiguration(),
new RedisConnectionPoolConfiguration());
redisCluster.useCluster(connection -> {
boolean setAll = false;

View File

@ -9,6 +9,7 @@ import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import org.junit.Before;
import org.junit.Test;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisConnectionPoolConfiguration;
import java.time.Duration;
@ -40,7 +41,7 @@ public class FaultTolerantRedisClusterTest {
breakerConfiguration.setRingBufferSizeInClosedState(1);
breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE);
faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), breakerConfiguration);
faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2), breakerConfiguration, new RedisConnectionPoolConfiguration());
}
@Test