Update ClusterLuaScript formatting

This commit is contained in:
Jon Chambers 2021-09-27 12:01:22 -04:00 committed by Jon Chambers
parent 715d1157ad
commit 2383aaaa3d
2 changed files with 162 additions and 144 deletions

View File

@ -19,76 +19,84 @@ import java.util.List;
public class ClusterLuaScript { public class ClusterLuaScript {
private final FaultTolerantRedisCluster redisCluster; private final FaultTolerantRedisCluster redisCluster;
private final ScriptOutputType scriptOutputType; private final ScriptOutputType scriptOutputType;
private final String script; private final String script;
private final String sha; private final String sha;
private static final String[] STRING_ARRAY = new String[0]; private static final String[] STRING_ARRAY = new String[0];
private static final byte[][] BYTE_ARRAY_ARRAY = new byte[0][]; private static final byte[][] BYTE_ARRAY_ARRAY = new byte[0][];
private static final Logger log = LoggerFactory.getLogger(ClusterLuaScript.class); private static final Logger log = LoggerFactory.getLogger(ClusterLuaScript.class);
public static ClusterLuaScript fromResource(final FaultTolerantRedisCluster redisCluster, final String resource, final ScriptOutputType scriptOutputType) throws IOException { public static ClusterLuaScript fromResource(final FaultTolerantRedisCluster redisCluster,
try (final InputStream inputStream = LuaScript.class.getClassLoader().getResourceAsStream(resource); final String resource,
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { final ScriptOutputType scriptOutputType) throws IOException {
byte[] buffer = new byte[4096]; try (final InputStream inputStream = LuaScript.class.getClassLoader().getResourceAsStream(resource);
int read; final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
while ((read = inputStream.read(buffer)) != -1) { byte[] buffer = new byte[4096];
baos.write(buffer, 0, read); int read;
}
return new ClusterLuaScript(redisCluster, new String(baos.toByteArray()), scriptOutputType); while ((read = inputStream.read(buffer)) != -1) {
baos.write(buffer, 0, read);
}
return new ClusterLuaScript(redisCluster, new String(baos.toByteArray()), scriptOutputType);
}
}
@VisibleForTesting
ClusterLuaScript(final FaultTolerantRedisCluster redisCluster,
final String script,
final ScriptOutputType scriptOutputType) {
this.redisCluster = redisCluster;
this.scriptOutputType = scriptOutputType;
this.script = script;
this.sha = redisCluster.withCluster(connection -> connection.sync().scriptLoad(script));
}
public Object execute(final List<String> keys, final List<String> args) {
return redisCluster.withCluster(connection -> {
try {
final RedisAdvancedClusterCommands<String, String> clusterCommands = connection.sync();
try {
return clusterCommands.evalsha(sha, scriptOutputType, keys.toArray(STRING_ARRAY), args.toArray(STRING_ARRAY));
} catch (final RedisNoScriptException e) {
reloadScript();
return clusterCommands.evalsha(sha, scriptOutputType, keys.toArray(STRING_ARRAY), args.toArray(STRING_ARRAY));
} }
} } catch (final Exception e) {
log.warn("Failed to execute script", e);
throw e;
}
});
}
@VisibleForTesting public Object executeBinary(final List<byte[]> keys, final List<byte[]> args) {
ClusterLuaScript(final FaultTolerantRedisCluster redisCluster, final String script, final ScriptOutputType scriptOutputType) { return redisCluster.withBinaryCluster(connection -> {
this.redisCluster = redisCluster; try {
this.scriptOutputType = scriptOutputType; final RedisAdvancedClusterCommands<byte[], byte[]> binaryCommands = connection.sync();
this.script = script;
this.sha = redisCluster.withCluster(connection -> connection.sync().scriptLoad(script));
}
public Object execute(final List<String> keys, final List<String> args) { try {
return redisCluster.withCluster(connection -> { return binaryCommands
try { .evalsha(sha, scriptOutputType, keys.toArray(BYTE_ARRAY_ARRAY), args.toArray(BYTE_ARRAY_ARRAY));
final RedisAdvancedClusterCommands<String, String> clusterCommands = connection.sync(); } catch (final RedisNoScriptException e) {
reloadScript();
return binaryCommands
.evalsha(sha, scriptOutputType, keys.toArray(BYTE_ARRAY_ARRAY), args.toArray(BYTE_ARRAY_ARRAY));
}
} catch (final Exception e) {
log.warn("Failed to execute script", e);
throw e;
}
});
}
try { private void reloadScript() {
return clusterCommands.evalsha(sha, scriptOutputType, keys.toArray(STRING_ARRAY), args.toArray(STRING_ARRAY)); redisCluster.useCluster(connection -> connection.sync().upstream().commands().scriptLoad(script));
} catch (final RedisNoScriptException e) { }
reloadScript();
return clusterCommands.evalsha(sha, scriptOutputType, keys.toArray(STRING_ARRAY), args.toArray(STRING_ARRAY));
}
} catch (final Exception e) {
log.warn("Failed to execute script", e);
throw e;
}
});
}
public Object executeBinary(final List<byte[]> keys, final List<byte[]> args) {
return redisCluster.withBinaryCluster(connection -> {
try {
final RedisAdvancedClusterCommands<byte[], byte[]> binaryCommands = connection.sync();
try {
return binaryCommands.evalsha(sha, scriptOutputType, keys.toArray(BYTE_ARRAY_ARRAY), args.toArray(BYTE_ARRAY_ARRAY));
} catch (final RedisNoScriptException e) {
reloadScript();
return binaryCommands.evalsha(sha, scriptOutputType, keys.toArray(BYTE_ARRAY_ARRAY), args.toArray(BYTE_ARRAY_ARRAY));
}
} catch (final Exception e) {
log.warn("Failed to execute script", e);
throw e;
}
});
}
private void reloadScript() {
redisCluster.useCluster(connection -> connection.sync().upstream().commands().scriptLoad(script));
}
} }

View File

@ -24,117 +24,127 @@ import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
public class ClusterLuaScriptTest extends AbstractRedisClusterTest { public class ClusterLuaScriptTest extends AbstractRedisClusterTest {
@Test @Test
public void testExecuteMovedKey() { public void testExecuteMovedKey() {
final String key = "key"; final String key = "key";
final String value = "value"; final String value = "value";
final FaultTolerantRedisCluster redisCluster = getRedisCluster(); final FaultTolerantRedisCluster redisCluster = getRedisCluster();
final ClusterLuaScript script = new ClusterLuaScript(redisCluster, "return redis.call(\"SET\", KEYS[1], ARGV[1])", ScriptOutputType.VALUE); final ClusterLuaScript script = new ClusterLuaScript(redisCluster, "return redis.call(\"SET\", KEYS[1], ARGV[1])",
ScriptOutputType.VALUE);
assertEquals("OK", script.execute(List.of(key), List.of(value))); assertEquals("OK", script.execute(List.of(key), List.of(value)));
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key))); assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
final int slot = SlotHash.getSlot(key); final int slot = SlotHash.getSlot(key);
final int sourcePort = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.UPSTREAM)).node(0).getUri().getPort()); final int sourcePort = redisCluster.withCluster(
final RedisCommands<String, String> sourceCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.UPSTREAM)).commands(0)); connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.UPSTREAM))
final RedisCommands<String, String> destinationCommands = redisCluster.withCluster(connection -> connection.sync().nodes(node -> !node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.UPSTREAM)).commands(0)); .node(0).getUri().getPort());
final RedisCommands<String, String> sourceCommands = redisCluster.withCluster(
connection -> connection.sync().nodes(node -> node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.UPSTREAM))
.commands(0));
final RedisCommands<String, String> destinationCommands = redisCluster.withCluster(connection -> connection.sync()
.nodes(node -> !node.hasSlot(slot) && node.is(RedisClusterNode.NodeFlag.UPSTREAM)).commands(0));
destinationCommands.clusterSetSlotImporting(slot, sourceCommands.clusterMyId()); destinationCommands.clusterSetSlotImporting(slot, sourceCommands.clusterMyId());
assertEquals("OK", script.execute(List.of(key), List.of(value))); assertEquals("OK", script.execute(List.of(key), List.of(value)));
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key))); assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
sourceCommands.clusterSetSlotMigrating(slot, destinationCommands.clusterMyId()); sourceCommands.clusterSetSlotMigrating(slot, destinationCommands.clusterMyId());
assertEquals("OK", script.execute(List.of(key), List.of(value))); assertEquals("OK", script.execute(List.of(key), List.of(value)));
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key))); assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
for (final String migrateKey : sourceCommands.clusterGetKeysInSlot(slot, Integer.MAX_VALUE)) { for (final String migrateKey : sourceCommands.clusterGetKeysInSlot(slot, Integer.MAX_VALUE)) {
destinationCommands.migrate("127.0.0.1", sourcePort, migrateKey, 0, 1000); destinationCommands.migrate("127.0.0.1", sourcePort, migrateKey, 0, 1000);
}
assertEquals("OK", script.execute(List.of(key), List.of(value)));
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
destinationCommands.clusterSetSlotNode(slot, destinationCommands.clusterMyId());
assertEquals("OK", script.execute(List.of(key), List.of(value)));
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
} }
@Test assertEquals("OK", script.execute(List.of(key), List.of(value)));
public void testExecute() { assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
final RedisAdvancedClusterCommands<String, String> commands = mock(RedisAdvancedClusterCommands.class);
final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.buildMockRedisCluster(commands);
final String script = "return redis.call(\"SET\", KEYS[1], ARGV[1])"; destinationCommands.clusterSetSlotNode(slot, destinationCommands.clusterMyId());
final String sha = "abc123";
final ScriptOutputType scriptOutputType = ScriptOutputType.VALUE;
final List<String> keys = List.of("key");
final List<String> values = List.of("value");
when(commands.scriptLoad(script)).thenReturn(sha); assertEquals("OK", script.execute(List.of(key), List.of(value)));
when(commands.evalsha(any(), any(), any(), any())).thenReturn("OK"); assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
}
new ClusterLuaScript(mockCluster, script, scriptOutputType).execute(keys, values); @Test
public void testExecute() {
final RedisAdvancedClusterCommands<String, String> commands = mock(RedisAdvancedClusterCommands.class);
final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.buildMockRedisCluster(commands);
verify(commands).scriptLoad(script); final String script = "return redis.call(\"SET\", KEYS[1], ARGV[1])";
verify(commands).evalsha(sha, scriptOutputType, keys.toArray(new String[0]), values.toArray(new String[0])); final String sha = "abc123";
} final ScriptOutputType scriptOutputType = ScriptOutputType.VALUE;
final List<String> keys = List.of("key");
final List<String> values = List.of("value");
@Test when(commands.scriptLoad(script)).thenReturn(sha);
public void testExecuteNoScriptException() { when(commands.evalsha(any(), any(), any(), any())).thenReturn("OK");
final String key = "key";
final String value = "value";
final FaultTolerantRedisCluster redisCluster = getRedisCluster(); new ClusterLuaScript(mockCluster, script, scriptOutputType).execute(keys, values);
final ClusterLuaScript script = new ClusterLuaScript(redisCluster, "return redis.call(\"SET\", KEYS[1], ARGV[1])", ScriptOutputType.VALUE); verify(commands).scriptLoad(script);
verify(commands).evalsha(sha, scriptOutputType, keys.toArray(new String[0]), values.toArray(new String[0]));
}
// Remove the scripts created by the CLusterLuaScript constructor @Test
redisCluster.useCluster(connection -> connection.sync().upstream().commands().scriptFlush()); public void testExecuteNoScriptException() {
final String key = "key";
final String value = "value";
assertEquals("OK", script.execute(List.of(key), List.of(value))); final FaultTolerantRedisCluster redisCluster = getRedisCluster();
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
}
@Test final ClusterLuaScript script = new ClusterLuaScript(redisCluster, "return redis.call(\"SET\", KEYS[1], ARGV[1])",
public void testExecuteBinary() { ScriptOutputType.VALUE);
final RedisAdvancedClusterCommands<String, String> stringCommands = mock(RedisAdvancedClusterCommands.class);
final RedisAdvancedClusterCommands<byte[], byte[]> binaryCommands = mock(RedisAdvancedClusterCommands.class);
final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.buildMockRedisCluster(stringCommands, binaryCommands);
final String script = "return redis.call(\"SET\", KEYS[1], ARGV[1])"; // Remove the scripts created by the CLusterLuaScript constructor
final String sha = "abc123"; redisCluster.useCluster(connection -> connection.sync().upstream().commands().scriptFlush());
final ScriptOutputType scriptOutputType = ScriptOutputType.VALUE;
final List<byte[]> keys = List.of("key".getBytes(StandardCharsets.UTF_8));
final List<byte[]> values = List.of("value".getBytes(StandardCharsets.UTF_8));
when(stringCommands.scriptLoad(script)).thenReturn(sha); assertEquals("OK", script.execute(List.of(key), List.of(value)));
when(binaryCommands.evalsha(any(), any(), any(), any())).thenReturn("OK".getBytes(StandardCharsets.UTF_8)); assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
}
new ClusterLuaScript(mockCluster, script, scriptOutputType).executeBinary(keys, values); @Test
public void testExecuteBinary() {
final RedisAdvancedClusterCommands<String, String> stringCommands = mock(RedisAdvancedClusterCommands.class);
final RedisAdvancedClusterCommands<byte[], byte[]> binaryCommands = mock(RedisAdvancedClusterCommands.class);
final FaultTolerantRedisCluster mockCluster = RedisClusterHelper
.buildMockRedisCluster(stringCommands, binaryCommands);
verify(stringCommands).scriptLoad(script); final String script = "return redis.call(\"SET\", KEYS[1], ARGV[1])";
verify(binaryCommands).evalsha(sha, scriptOutputType, keys.toArray(new byte[0][]), values.toArray(new byte[0][])); final String sha = "abc123";
} final ScriptOutputType scriptOutputType = ScriptOutputType.VALUE;
final List<byte[]> keys = List.of("key".getBytes(StandardCharsets.UTF_8));
final List<byte[]> values = List.of("value".getBytes(StandardCharsets.UTF_8));
@Test when(stringCommands.scriptLoad(script)).thenReturn(sha);
public void testExecuteBinaryNoScriptException() { when(binaryCommands.evalsha(any(), any(), any(), any())).thenReturn("OK".getBytes(StandardCharsets.UTF_8));
final String key = "key";
final String value = "value";
final FaultTolerantRedisCluster redisCluster = getRedisCluster(); new ClusterLuaScript(mockCluster, script, scriptOutputType).executeBinary(keys, values);
final ClusterLuaScript script = new ClusterLuaScript(redisCluster, "return redis.call(\"SET\", KEYS[1], ARGV[1])", ScriptOutputType.VALUE); verify(stringCommands).scriptLoad(script);
verify(binaryCommands).evalsha(sha, scriptOutputType, keys.toArray(new byte[0][]), values.toArray(new byte[0][]));
}
// Remove the scripts created by the CLusterLuaScript constructor @Test
redisCluster.useCluster(connection -> connection.sync().upstream().commands().scriptFlush()); public void testExecuteBinaryNoScriptException() {
final String key = "key";
final String value = "value";
assertArrayEquals("OK".getBytes(StandardCharsets.UTF_8), (byte[])script.executeBinary(List.of(key.getBytes(StandardCharsets.UTF_8)), List.of(value.getBytes(StandardCharsets.UTF_8)))); final FaultTolerantRedisCluster redisCluster = getRedisCluster();
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
} final ClusterLuaScript script = new ClusterLuaScript(redisCluster, "return redis.call(\"SET\", KEYS[1], ARGV[1])",
ScriptOutputType.VALUE);
// Remove the scripts created by the CLusterLuaScript constructor
redisCluster.useCluster(connection -> connection.sync().upstream().commands().scriptFlush());
assertArrayEquals("OK".getBytes(StandardCharsets.UTF_8), (byte[]) script
.executeBinary(List.of(key.getBytes(StandardCharsets.UTF_8)), List.of(value.getBytes(StandardCharsets.UTF_8))));
assertEquals(value, redisCluster.withCluster(connection -> connection.sync().get(key)));
}
} }