From ec858b2d4cb2594e122d1f20f98d499c974aff3f Mon Sep 17 00:00:00 2001 From: Jon Chambers <63609320+jon-signal@users.noreply.github.com> Date: Sun, 7 Jun 2020 18:27:57 -0400 Subject: [PATCH] Set a timeout for Redis cluster operations and shut down the cluster as part of service shutdown --- .../textsecuregcm/WhisperServerService.java | 4 ++- .../RedisClusterConfiguration.java | 9 +++++ .../redis/FaultTolerantRedisCluster.java | 35 +++++++++++++++---- .../workers/DeleteUserCommand.java | 3 +- .../redis/FaultTolerantRedisClusterTest.java | 4 ++- 5 files changed, 45 insertions(+), 10 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index aa7d04133..c78803c2d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -246,8 +246,9 @@ public class WhisperServerService extends Application urls; + @JsonProperty + @NotNull + private Duration timeout = Duration.ofSeconds(2); + @JsonProperty @NotNull @Valid @@ -22,6 +27,10 @@ public class RedisClusterConfiguration { return urls; } + public Duration getTimeout() { + return timeout; + } + public CircuitBreakerConfiguration getCircuitBreakerConfiguration() { return circuitBreaker; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java index ffd852d68..857be7765 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisCluster.java @@ -2,29 +2,43 @@ package org.whispersystems.textsecuregcm.redis; import com.codahale.metrics.SharedMetricRegistries; import com.google.common.annotations.VisibleForTesting; +import io.dropwizard.lifecycle.Managed; import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.Constants; +import java.time.Duration; +import java.util.List; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; /** * A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster has separate circuit breakers for * read and write operations because the leader in a Redis cluster shard may fail while its read-only replicas can still * serve traffic. */ -public class FaultTolerantRedisCluster { +public class FaultTolerantRedisCluster implements Managed { + private final RedisClusterClient clusterClient; private final StatefulRedisClusterConnection clusterConnection; private final CircuitBreaker readCircuitBreaker; private final CircuitBreaker writeCircuitBreaker; - public FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final CircuitBreakerConfiguration circuitBreakerConfiguration) { + public FaultTolerantRedisCluster(final String name, final List urls, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) { + this(name, RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())), timeout, circuitBreakerConfiguration); + } + + @VisibleForTesting + FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration timeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) { + this.clusterClient = clusterClient; + this.clusterClient.setDefaultTimeout(timeout); + this.clusterConnection = clusterClient.connect(); this.readCircuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig()); this.writeCircuitBreaker = CircuitBreaker.of(name + "-write", circuitBreakerConfiguration.toCircuitBreakerConfig()); @@ -38,19 +52,28 @@ public class FaultTolerantRedisCluster { FaultTolerantRedisCluster.class); } - public void useReadCluster(Consumer> consumer) { + public void useReadCluster(final Consumer> consumer) { this.readCircuitBreaker.executeRunnable(() -> consumer.accept(clusterConnection)); } - public T withReadCluster(Function, T> consumer) { + public T withReadCluster(final Function, T> consumer) { return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(clusterConnection)); } - public void useWriteCluster(Consumer> consumer) { + public void useWriteCluster(final Consumer> consumer) { this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(clusterConnection)); } - public T withWriteCluster(Function, T> consumer) { + public T withWriteCluster(final Function, T> consumer) { return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(clusterConnection)); } + + @Override + public void start() { + } + + @Override + public void stop() { + clusterClient.shutdown(); + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index 4ab7e1d55..4d842a73b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -71,8 +71,7 @@ public class DeleteUserCommand extends EnvironmentCommand