Configure Redis timeouts using TimeoutOptions and RediURI
This commit is contained in:
parent
30ae2037e8
commit
bdcd055aaf
|
@ -29,6 +29,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||||
|
import org.whispersystems.textsecuregcm.redis.RedisUriUtil;
|
||||||
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
|
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
|
||||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||||
import org.whispersystems.textsecuregcm.websocket.InvalidWebsocketAddressException;
|
import org.whispersystems.textsecuregcm.websocket.InvalidWebsocketAddressException;
|
||||||
|
@ -60,7 +61,8 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
|
||||||
final Duration timeout,
|
final Duration timeout,
|
||||||
final CircuitBreakerConfiguration circuitBreakerConfiguration) {
|
final CircuitBreakerConfiguration circuitBreakerConfiguration) {
|
||||||
|
|
||||||
this(RedisClient.create(clientResources, redisUri), timeout, circuitBreakerConfiguration);
|
this(RedisClient.create(clientResources, RedisUriUtil.createRedisUriWithTimeout(redisUri, timeout)), timeout,
|
||||||
|
circuitBreakerConfiguration);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -69,7 +71,6 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
|
||||||
final CircuitBreakerConfiguration circuitBreakerConfiguration) {
|
final CircuitBreakerConfiguration circuitBreakerConfiguration) {
|
||||||
|
|
||||||
this.redisClient = redisClient;
|
this.redisClient = redisClient;
|
||||||
this.redisClient.setDefaultTimeout(timeout);
|
|
||||||
|
|
||||||
this.subscriptionConnection = redisClient.connectPubSub(new ByteArrayCodec());
|
this.subscriptionConnection = redisClient.connectPubSub(new ByteArrayCodec());
|
||||||
this.publicationConnection = redisClient.connect(new ByteArrayCodec());
|
this.publicationConnection = redisClient.connect(new ByteArrayCodec());
|
||||||
|
|
|
@ -15,6 +15,7 @@ import io.github.resilience4j.retry.RetryConfig;
|
||||||
import io.lettuce.core.ClientOptions.DisconnectedBehavior;
|
import io.lettuce.core.ClientOptions.DisconnectedBehavior;
|
||||||
import io.lettuce.core.RedisCommandTimeoutException;
|
import io.lettuce.core.RedisCommandTimeoutException;
|
||||||
import io.lettuce.core.RedisException;
|
import io.lettuce.core.RedisException;
|
||||||
|
import io.lettuce.core.TimeoutOptions;
|
||||||
import io.lettuce.core.cluster.ClusterClientOptions;
|
import io.lettuce.core.cluster.ClusterClientOptions;
|
||||||
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
|
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
|
||||||
import io.lettuce.core.cluster.RedisClusterClient;
|
import io.lettuce.core.cluster.RedisClusterClient;
|
||||||
|
@ -57,7 +58,9 @@ public class FaultTolerantRedisCluster {
|
||||||
public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration,
|
public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration,
|
||||||
final ClientResources clientResources) {
|
final ClientResources clientResources) {
|
||||||
this(name,
|
this(name,
|
||||||
RedisClusterClient.create(clientResources, clusterConfiguration.getConfigurationUri()),
|
RedisClusterClient.create(clientResources,
|
||||||
|
RedisUriUtil.createRedisUriWithTimeout(clusterConfiguration.getConfigurationUri(),
|
||||||
|
clusterConfiguration.getTimeout())),
|
||||||
clusterConfiguration.getTimeout(),
|
clusterConfiguration.getTimeout(),
|
||||||
clusterConfiguration.getCircuitBreakerConfiguration(),
|
clusterConfiguration.getCircuitBreakerConfiguration(),
|
||||||
clusterConfiguration.getRetryConfiguration());
|
clusterConfiguration.getRetryConfiguration());
|
||||||
|
@ -69,13 +72,16 @@ public class FaultTolerantRedisCluster {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
|
|
||||||
this.clusterClient = clusterClient;
|
this.clusterClient = clusterClient;
|
||||||
this.clusterClient.setDefaultTimeout(commandTimeout);
|
|
||||||
this.clusterClient.setOptions(ClusterClientOptions.builder()
|
this.clusterClient.setOptions(ClusterClientOptions.builder()
|
||||||
.disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS)
|
.disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS)
|
||||||
.validateClusterNodeMembership(false)
|
.validateClusterNodeMembership(false)
|
||||||
.topologyRefreshOptions(ClusterTopologyRefreshOptions.builder()
|
.topologyRefreshOptions(ClusterTopologyRefreshOptions.builder()
|
||||||
.enableAllAdaptiveRefreshTriggers()
|
.enableAllAdaptiveRefreshTriggers()
|
||||||
.build())
|
.build())
|
||||||
|
// for asynchronous commands
|
||||||
|
.timeoutOptions(TimeoutOptions.builder()
|
||||||
|
.fixedTimeout(commandTimeout)
|
||||||
|
.build())
|
||||||
.publishOnScheduler(true)
|
.publishOnScheduler(true)
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2023 Signal Messenger, LLC
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.whispersystems.textsecuregcm.redis;
|
||||||
|
|
||||||
|
import io.lettuce.core.RedisURI;
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
public class RedisUriUtil {
|
||||||
|
|
||||||
|
public static RedisURI createRedisUriWithTimeout(final String uri, final Duration timeout) {
|
||||||
|
final RedisURI redisUri = RedisURI.create(uri);
|
||||||
|
// for synchronous commands and the initial connection
|
||||||
|
redisUri.setTimeout(timeout);
|
||||||
|
return redisUri;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue