lettuce: set `publishOnScheduler` to `true`

This commit is contained in:
Chris Eager 2022-11-02 10:31:37 -05:00 committed by Chris Eager
parent afda5ca98f
commit b13cb098ce
1 changed files with 25 additions and 21 deletions

View File

@ -53,35 +53,39 @@ public class FaultTolerantRedisCluster {
private final Retry retry; private final Retry retry;
public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, final ClientResources clientResources) { public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, final ClientResources clientResources) {
this(name, this(name,
RedisClusterClient.create(clientResources, clusterConfiguration.getConfigurationUri()), RedisClusterClient.create(clientResources, clusterConfiguration.getConfigurationUri()),
clusterConfiguration.getTimeout(), clusterConfiguration.getTimeout(),
clusterConfiguration.getCircuitBreakerConfiguration(), clusterConfiguration.getCircuitBreakerConfiguration(),
clusterConfiguration.getRetryConfiguration()); clusterConfiguration.getRetryConfiguration());
} }
@VisibleForTesting @VisibleForTesting
FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) { FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) {
this.name = name; this.name = name;
this.clusterClient = clusterClient; this.clusterClient = clusterClient;
this.clusterClient.setDefaultTimeout(commandTimeout); this.clusterClient.setDefaultTimeout(commandTimeout);
this.clusterClient.setOptions(ClusterClientOptions.builder() this.clusterClient.setOptions(ClusterClientOptions.builder()
.disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS) .disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS)
.validateClusterNodeMembership(false) .validateClusterNodeMembership(false)
.topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder()
.enableAllAdaptiveRefreshTriggers() .enableAllAdaptiveRefreshTriggers()
.build()) .build())
.build()); .publishOnScheduler(true)
.build());
this.stringConnection = clusterClient.connect(); this.stringConnection = clusterClient.connect();
this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE);
this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig());
this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder().retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder()
.retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build());
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), circuitBreaker, FaultTolerantRedisCluster.class); CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), circuitBreaker,
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), retry, FaultTolerantRedisCluster.class); FaultTolerantRedisCluster.class);
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), retry,
FaultTolerantRedisCluster.class);
} }
void shutdown() { void shutdown() {