Add test for Redis timeouts
This commit is contained in:
parent
463dd9d7d8
commit
457ecf145f
|
@ -7,6 +7,7 @@ package org.whispersystems.textsecuregcm.redis;
|
|||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -22,81 +23,122 @@ import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
|
|||
import io.lettuce.core.event.EventBus;
|
||||
import io.lettuce.core.resource.ClientResources;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
class FaultTolerantRedisClusterTest {
|
||||
|
||||
private RedisAdvancedClusterCommands<String, String> clusterCommands;
|
||||
private FaultTolerantRedisCluster faultTolerantCluster;
|
||||
private RedisAdvancedClusterCommands<String, String> clusterCommands;
|
||||
private FaultTolerantRedisCluster faultTolerantCluster;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
final RedisClusterClient clusterClient = mock(RedisClusterClient.class);
|
||||
final StatefulRedisClusterConnection<String, String> clusterConnection = mock(StatefulRedisClusterConnection.class);
|
||||
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = mock(StatefulRedisClusterPubSubConnection.class);
|
||||
final ClientResources clientResources = mock(ClientResources.class);
|
||||
final EventBus eventBus = mock(EventBus.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
final RedisClusterClient clusterClient = mock(RedisClusterClient.class);
|
||||
final StatefulRedisClusterConnection<String, String> clusterConnection = mock(StatefulRedisClusterConnection.class);
|
||||
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = mock(
|
||||
StatefulRedisClusterPubSubConnection.class);
|
||||
final ClientResources clientResources = mock(ClientResources.class);
|
||||
final EventBus eventBus = mock(EventBus.class);
|
||||
|
||||
clusterCommands = mock(RedisAdvancedClusterCommands.class);
|
||||
clusterCommands = mock(RedisAdvancedClusterCommands.class);
|
||||
|
||||
when(clusterClient.connect()).thenReturn(clusterConnection);
|
||||
when(clusterClient.connectPubSub()).thenReturn(pubSubConnection);
|
||||
when(clusterClient.getResources()).thenReturn(clientResources);
|
||||
when(clusterConnection.sync()).thenReturn(clusterCommands);
|
||||
when(clientResources.eventBus()).thenReturn(eventBus);
|
||||
when(eventBus.get()).thenReturn(mock(Flux.class));
|
||||
when(clusterClient.connect()).thenReturn(clusterConnection);
|
||||
when(clusterClient.connectPubSub()).thenReturn(pubSubConnection);
|
||||
when(clusterClient.getResources()).thenReturn(clientResources);
|
||||
when(clusterConnection.sync()).thenReturn(clusterCommands);
|
||||
when(clientResources.eventBus()).thenReturn(eventBus);
|
||||
when(eventBus.get()).thenReturn(mock(Flux.class));
|
||||
|
||||
final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration();
|
||||
breakerConfiguration.setFailureRateThreshold(100);
|
||||
breakerConfiguration.setSlidingWindowSize(1);
|
||||
breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1);
|
||||
breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE);
|
||||
final CircuitBreakerConfiguration breakerConfiguration = new CircuitBreakerConfiguration();
|
||||
breakerConfiguration.setFailureRateThreshold(100);
|
||||
breakerConfiguration.setSlidingWindowSize(1);
|
||||
breakerConfiguration.setSlidingWindowMinimumNumberOfCalls(1);
|
||||
breakerConfiguration.setWaitDurationInOpenStateInSeconds(Integer.MAX_VALUE);
|
||||
|
||||
final RetryConfiguration retryConfiguration = new RetryConfiguration();
|
||||
retryConfiguration.setMaxAttempts(3);
|
||||
retryConfiguration.setWaitDuration(0);
|
||||
final RetryConfiguration retryConfiguration = new RetryConfiguration();
|
||||
retryConfiguration.setMaxAttempts(3);
|
||||
retryConfiguration.setWaitDuration(0);
|
||||
|
||||
faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2),
|
||||
breakerConfiguration, retryConfiguration);
|
||||
faultTolerantCluster = new FaultTolerantRedisCluster("test", clusterClient, Duration.ofSeconds(2),
|
||||
breakerConfiguration, retryConfiguration);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testBreaker() {
|
||||
when(clusterCommands.get(anyString()))
|
||||
.thenReturn("value")
|
||||
.thenThrow(new RuntimeException("Badness has ensued."));
|
||||
|
||||
assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
|
||||
|
||||
assertThrows(RedisException.class,
|
||||
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));
|
||||
|
||||
final RedisException redisException = assertThrows(RedisException.class,
|
||||
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));
|
||||
|
||||
assertTrue(redisException.getCause() instanceof CallNotPermittedException);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRetry() {
|
||||
when(clusterCommands.get(anyString()))
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenReturn("value");
|
||||
|
||||
assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
|
||||
|
||||
when(clusterCommands.get(anyString()))
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenReturn("value");
|
||||
|
||||
assertThrows(RedisCommandTimeoutException.class,
|
||||
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
|
||||
|
||||
}
|
||||
|
||||
@Nested
|
||||
class WithRealCluster {
|
||||
|
||||
private static final Duration TIMEOUT = Duration.ofMillis(50);
|
||||
|
||||
private static final RetryConfiguration retryConfiguration = new RetryConfiguration();
|
||||
|
||||
static {
|
||||
retryConfiguration.setMaxAttempts(1);
|
||||
retryConfiguration.setWaitDuration(50);
|
||||
}
|
||||
|
||||
@RegisterExtension
|
||||
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder()
|
||||
.retryConfiguration(retryConfiguration)
|
||||
.timeout(TIMEOUT)
|
||||
.build();
|
||||
|
||||
@Test
|
||||
void testBreaker() {
|
||||
when(clusterCommands.get(anyString()))
|
||||
.thenReturn("value")
|
||||
.thenThrow(new RuntimeException("Badness has ensued."));
|
||||
void testTimeout() {
|
||||
final FaultTolerantRedisCluster cluster = REDIS_CLUSTER_EXTENSION.getRedisCluster();
|
||||
|
||||
assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
|
||||
assertTimeoutPreemptively(Duration.ofSeconds(1), () -> {
|
||||
final ExecutionException asyncException = assertThrows(ExecutionException.class,
|
||||
() -> cluster.withCluster(connection -> connection.async().blpop(TIMEOUT.toMillis() * 2, "key")).get());
|
||||
assertTrue(asyncException.getCause() instanceof RedisCommandTimeoutException);
|
||||
|
||||
assertThrows(RedisException.class,
|
||||
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));
|
||||
assertThrows(RedisCommandTimeoutException.class,
|
||||
() -> cluster.withCluster(connection -> connection.sync().blpop(TIMEOUT.toMillis() * 2, "key")));
|
||||
});
|
||||
|
||||
final RedisException redisException = assertThrows(RedisException.class,
|
||||
() -> faultTolerantCluster.withCluster(connection -> connection.sync().get("OH NO")));
|
||||
|
||||
assertTrue(redisException.getCause() instanceof CallNotPermittedException);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRetry() {
|
||||
when(clusterCommands.get(anyString()))
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenReturn("value");
|
||||
|
||||
assertEquals("value", faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
|
||||
|
||||
when(clusterCommands.get(anyString()))
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenThrow(new RedisCommandTimeoutException())
|
||||
.thenReturn("value");
|
||||
|
||||
assertThrows(RedisCommandTimeoutException.class, () -> faultTolerantCluster.withCluster(connection -> connection.sync().get("key")));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,14 +32,23 @@ import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
|
|||
import redis.embedded.RedisServer;
|
||||
import redis.embedded.exceptions.EmbeddedRedisException;
|
||||
|
||||
public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback {
|
||||
public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback,
|
||||
AfterEachCallback {
|
||||
|
||||
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(2);
|
||||
private static final int NODE_COUNT = 2;
|
||||
|
||||
private static final RedisServer[] CLUSTER_NODES = new RedisServer[NODE_COUNT];
|
||||
|
||||
private final Duration timeout;
|
||||
private final RetryConfiguration retryConfiguration;
|
||||
private FaultTolerantRedisCluster redisCluster;
|
||||
|
||||
public RedisClusterExtension(final Duration timeout, final RetryConfiguration retryConfiguration) {
|
||||
this.timeout = timeout;
|
||||
this.retryConfiguration = retryConfiguration;
|
||||
}
|
||||
|
||||
|
||||
public static RedisClusterExtensionBuilder builder() {
|
||||
return new RedisClusterExtensionBuilder();
|
||||
|
@ -78,9 +87,9 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb
|
|||
|
||||
redisCluster = new FaultTolerantRedisCluster("test-cluster",
|
||||
RedisClusterClient.create(urls.stream().map(RedisURI::create).collect(Collectors.toList())),
|
||||
Duration.ofSeconds(2),
|
||||
timeout,
|
||||
new CircuitBreakerConfiguration(),
|
||||
new RetryConfiguration());
|
||||
retryConfiguration);
|
||||
|
||||
redisCluster.useCluster(connection -> {
|
||||
boolean setAll = false;
|
||||
|
@ -208,11 +217,24 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb
|
|||
|
||||
public static class RedisClusterExtensionBuilder {
|
||||
|
||||
private Duration timeout = DEFAULT_TIMEOUT;
|
||||
private RetryConfiguration retryConfiguration = new RetryConfiguration();
|
||||
|
||||
private RedisClusterExtensionBuilder() {
|
||||
}
|
||||
|
||||
RedisClusterExtensionBuilder timeout(Duration timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
RedisClusterExtensionBuilder retryConfiguration(RetryConfiguration retryConfiguration) {
|
||||
this.retryConfiguration = retryConfiguration;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisClusterExtension build() {
|
||||
return new RedisClusterExtension();
|
||||
return new RedisClusterExtension(timeout, retryConfiguration);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue