Add byte-array-based methods to FaultTolerantRedisCluster.

This commit is contained in:
Jon Chambers 2020-07-09 14:24:58 -04:00 committed by Jon Chambers
parent 229caea5fd
commit 69c8968cb0
2 changed files with 61 additions and 16 deletions

View File

@ -7,6 +7,7 @@ import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.Constants;
@ -25,7 +26,9 @@ import java.util.stream.Collectors;
public class FaultTolerantRedisCluster implements Managed {
private final RedisClusterClient clusterClient;
private final StatefulRedisClusterConnection<String, String> clusterConnection;
private final StatefulRedisClusterConnection<String, String> stringClusterConnection;
private final StatefulRedisClusterConnection<byte[], byte[]> binaryClusterConnection;
private final CircuitBreaker readCircuitBreaker;
private final CircuitBreaker writeCircuitBreaker;
@ -39,9 +42,10 @@ public class FaultTolerantRedisCluster implements Managed {
this.clusterClient = clusterClient;
this.clusterClient.setDefaultTimeout(timeout);
this.clusterConnection = clusterClient.connect();
this.readCircuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig());
this.writeCircuitBreaker = CircuitBreaker.of(name + "-write", circuitBreakerConfiguration.toCircuitBreakerConfig());
this.stringClusterConnection = clusterClient.connect();
this.binaryClusterConnection = clusterClient.connect(ByteArrayCodec.INSTANCE);
this.readCircuitBreaker = CircuitBreaker.of(name + "-read", circuitBreakerConfiguration.toCircuitBreakerConfig());
this.writeCircuitBreaker = CircuitBreaker.of(name + "-write", circuitBreakerConfiguration.toCircuitBreakerConfig());
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME),
readCircuitBreaker,
@ -53,19 +57,35 @@ public class FaultTolerantRedisCluster implements Managed {
}
public void useReadCluster(final Consumer<StatefulRedisClusterConnection<String, String>> consumer) {
this.readCircuitBreaker.executeRunnable(() -> consumer.accept(clusterConnection));
this.readCircuitBreaker.executeRunnable(() -> consumer.accept(stringClusterConnection));
}
public <T> T withReadCluster(final Function<StatefulRedisClusterConnection<String, String>, T> consumer) {
return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(clusterConnection));
return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(stringClusterConnection));
}
public void useWriteCluster(final Consumer<StatefulRedisClusterConnection<String, String>> consumer) {
this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(clusterConnection));
this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(stringClusterConnection));
}
public <T> T withWriteCluster(final Function<StatefulRedisClusterConnection<String, String>, T> consumer) {
return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(clusterConnection));
return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(stringClusterConnection));
}
public void useBinaryReadCluster(final Consumer<StatefulRedisClusterConnection<byte[], byte[]>> consumer) {
this.readCircuitBreaker.executeRunnable(() -> consumer.accept(binaryClusterConnection));
}
public <T> T withBinaryReadCluster(final Function<StatefulRedisClusterConnection<byte[], byte[]>, T> consumer) {
return this.readCircuitBreaker.executeSupplier(() -> consumer.apply(binaryClusterConnection));
}
public void useBinaryWriteCluster(final Consumer<StatefulRedisClusterConnection<byte[], byte[]>> consumer) {
this.writeCircuitBreaker.executeRunnable(() -> consumer.accept(binaryClusterConnection));
}
public <T> T withBinaryWriteCluster(final Function<StatefulRedisClusterConnection<byte[], byte[]>, T> consumer) {
return this.writeCircuitBreaker.executeSupplier(() -> consumer.apply(binaryClusterConnection));
}
@Override

View File

@ -15,29 +15,54 @@ import static org.mockito.Mockito.when;
public class RedisClusterHelper {
@SuppressWarnings("unchecked")
public static FaultTolerantRedisCluster buildMockRedisCluster(final RedisAdvancedClusterCommands<String, String> commands) {
final FaultTolerantRedisCluster cluster = mock(FaultTolerantRedisCluster.class);
final StatefulRedisClusterConnection<String, String> connection = mock(StatefulRedisClusterConnection.class);
public static FaultTolerantRedisCluster buildMockRedisCluster(final RedisAdvancedClusterCommands<String, String> stringCommands) {
return buildMockRedisCluster(stringCommands, mock(RedisAdvancedClusterCommands.class));
}
when(connection.sync()).thenReturn(commands);
@SuppressWarnings("unchecked")
public static FaultTolerantRedisCluster buildMockRedisCluster(final RedisAdvancedClusterCommands<String, String> stringCommands, final RedisAdvancedClusterCommands<byte[], byte[]> binaryCommands) {
final FaultTolerantRedisCluster cluster = mock(FaultTolerantRedisCluster.class);
final StatefulRedisClusterConnection<String, String> stringConnection = mock(StatefulRedisClusterConnection.class);
final StatefulRedisClusterConnection<byte[], byte[]> binaryConnection = mock(StatefulRedisClusterConnection.class);
when(stringConnection.sync()).thenReturn(stringCommands);
when(binaryConnection.sync()).thenReturn(binaryCommands);
when(cluster.withReadCluster(any(Function.class))).thenAnswer(invocation -> {
return invocation.getArgument(0, Function.class).apply(connection);
return invocation.getArgument(0, Function.class).apply(stringConnection);
});
doAnswer(invocation -> {
invocation.getArgument(0, Consumer.class).accept(connection);
invocation.getArgument(0, Consumer.class).accept(stringConnection);
return null;
}).when(cluster).useReadCluster(any(Consumer.class));
when(cluster.withWriteCluster(any(Function.class))).thenAnswer(invocation -> {
return invocation.getArgument(0, Function.class).apply(connection);
return invocation.getArgument(0, Function.class).apply(stringConnection);
});
doAnswer(invocation -> {
invocation.getArgument(0, Consumer.class).accept(connection);
invocation.getArgument(0, Consumer.class).accept(stringConnection);
return null;
}).when(cluster).useWriteCluster(any(Consumer.class));
when(cluster.withBinaryReadCluster(any(Function.class))).thenAnswer(invocation -> {
return invocation.getArgument(0, Function.class).apply(binaryConnection);
});
doAnswer(invocation -> {
invocation.getArgument(0, Consumer.class).accept(binaryConnection);
return null;
}).when(cluster).useBinaryReadCluster(any(Consumer.class));
when(cluster.withBinaryWriteCluster(any(Function.class))).thenAnswer(invocation -> {
return invocation.getArgument(0, Function.class).apply(binaryConnection);
});
doAnswer(invocation -> {
invocation.getArgument(0, Consumer.class).accept(binaryConnection);
return null;
}).when(cluster).useBinaryWriteCluster(any(Consumer.class));
return cluster;
}