diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ProvisioningManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ProvisioningManager.java index 1c56cfa82..c3cad2427 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ProvisioningManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ProvisioningManager.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.redis.RedisOperation; +import org.whispersystems.textsecuregcm.redis.RedisUriUtil; import org.whispersystems.textsecuregcm.storage.PubSubProtos; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.websocket.InvalidWebsocketAddressException; @@ -60,7 +61,8 @@ public class ProvisioningManager extends RedisPubSubAdapter impl final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) { - this(RedisClient.create(clientResources, redisUri), timeout, circuitBreakerConfiguration); + this(RedisClient.create(clientResources, RedisUriUtil.createRedisUriWithTimeout(redisUri, timeout)), timeout, + circuitBreakerConfiguration); } @VisibleForTesting @@ -69,7 +71,6 @@ public class ProvisioningManager extends RedisPubSubAdapter impl final CircuitBreakerConfiguration circuitBreakerConfiguration) { this.redisClient = redisClient; - this.redisClient.setDefaultTimeout(timeout); this.subscriptionConnection = redisClient.connectPubSub(new ByteArrayCodec()); this.publicationConnection = redisClient.connect(new ByteArrayCodec()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index 9d6955ac9..ec4621e4e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -15,6 +15,7 @@ import io.github.resilience4j.retry.RetryConfig; import io.lettuce.core.ClientOptions.DisconnectedBehavior; import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisException; +import io.lettuce.core.TimeoutOptions; import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.cluster.RedisClusterClient; @@ -57,7 +58,9 @@ public class FaultTolerantRedisCluster { public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, final ClientResources clientResources) { this(name, - RedisClusterClient.create(clientResources, clusterConfiguration.getConfigurationUri()), + RedisClusterClient.create(clientResources, + RedisUriUtil.createRedisUriWithTimeout(clusterConfiguration.getConfigurationUri(), + clusterConfiguration.getTimeout())), clusterConfiguration.getTimeout(), clusterConfiguration.getCircuitBreakerConfiguration(), clusterConfiguration.getRetryConfiguration()); @@ -69,13 +72,16 @@ public class FaultTolerantRedisCluster { this.name = name; this.clusterClient = clusterClient; - this.clusterClient.setDefaultTimeout(commandTimeout); this.clusterClient.setOptions(ClusterClientOptions.builder() .disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS) .validateClusterNodeMembership(false) .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() .enableAllAdaptiveRefreshTriggers() .build()) + // for asynchronous commands + .timeoutOptions(TimeoutOptions.builder() + .fixedTimeout(commandTimeout) + .build()) .publishOnScheduler(true) .build()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/RedisUriUtil.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/RedisUriUtil.java new file mode 100644 index 000000000..79d39e370 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/RedisUriUtil.java @@ -0,0 +1,20 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.redis; + +import io.lettuce.core.RedisURI; +import java.time.Duration; + +public class RedisUriUtil { + + public static RedisURI createRedisUriWithTimeout(final String uri, final Duration timeout) { + final RedisURI redisUri = RedisURI.create(uri); + // for synchronous commands and the initial connection + redisUri.setTimeout(timeout); + return redisUri; + } + +}