Add binary execution methods to ClusterLuaScript.

This commit is contained in:
Jon Chambers 2020-07-09 18:23:43 -04:00 committed by Jon Chambers
parent 69c8968cb0
commit 3d3790fdbc
2 changed files with 73 additions and 6 deletions

View File

@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class ClusterLuaScript {
@ -22,7 +23,8 @@ public class ClusterLuaScript {
private final String script;
private final String sha;
private static final String[] STRING_ARRAY = new String[0];
private static final String[] STRING_ARRAY = new String[0];
private static final byte[][] BYTE_ARRAY_ARRAY = new byte[0][];
private static final Logger log = LoggerFactory.getLogger(ClusterLuaScript.class);
@ -66,4 +68,22 @@ public class ClusterLuaScript {
}
});
}
public Object executeBinary(final List<byte[]> keys, final List<byte[]> args) {
return redisCluster.withBinaryWriteCluster(connection -> {
try {
final RedisAdvancedClusterCommands<byte[], byte[]> binaryCommands = connection.sync();
try {
return binaryCommands.evalsha(sha, scriptOutputType, keys.toArray(BYTE_ARRAY_ARRAY), args.toArray(BYTE_ARRAY_ARRAY));
} catch (final RedisNoScriptException e) {
binaryCommands.scriptLoad(script.getBytes(StandardCharsets.UTF_8));
return binaryCommands.evalsha(sha, scriptOutputType, keys.toArray(BYTE_ARRAY_ARRAY), args.toArray(BYTE_ARRAY_ARRAY));
}
} catch (final Exception e) {
log.warn("Failed to execute script", e);
throw e;
}
});
}
}

View File

@ -9,8 +9,10 @@ import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import org.junit.Test;
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
@ -30,7 +32,7 @@ public class ClusterLuaScriptTest extends AbstractRedisClusterTest {
final ClusterLuaScript script = new ClusterLuaScript(redisCluster, "return redis.call(\"SET\", KEYS[1], ARGV[1])", ScriptOutputType.VALUE);
assertEquals("OK", script.execute(List.of(key), List.of("value")));
assertEquals("OK", script.execute(List.of(key), List.of(value)));
assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key)));
final int slot = SlotHash.getSlot(key);
@ -41,24 +43,24 @@ public class ClusterLuaScriptTest extends AbstractRedisClusterTest {
destinationCommands.clusterSetSlotImporting(slot, sourceCommands.clusterMyId());
assertEquals("OK", script.execute(List.of(key), List.of("value")));
assertEquals("OK", script.execute(List.of(key), List.of(value)));
assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key)));
sourceCommands.clusterSetSlotMigrating(slot, destinationCommands.clusterMyId());
assertEquals("OK", script.execute(List.of(key), List.of("value")));
assertEquals("OK", script.execute(List.of(key), List.of(value)));
assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key)));
for (final String migrateKey : sourceCommands.clusterGetKeysInSlot(slot, Integer.MAX_VALUE)) {
destinationCommands.migrate("127.0.0.1", sourcePort, migrateKey, 0, 1000);
}
assertEquals("OK", script.execute(List.of(key), List.of("value")));
assertEquals("OK", script.execute(List.of(key), List.of(value)));
assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key)));
destinationCommands.clusterSetSlotNode(slot, destinationCommands.clusterMyId());
assertEquals("OK", script.execute(List.of(key), List.of("value")));
assertEquals("OK", script.execute(List.of(key), List.of(value)));
assertEquals(value, redisCluster.withReadCluster(connection -> connection.sync().get(key)));
}
@ -103,4 +105,49 @@ public class ClusterLuaScriptTest extends AbstractRedisClusterTest {
verify(commands, times(2)).scriptLoad(script);
verify(commands, times(2)).evalsha(sha, scriptOutputType, keys.toArray(new String[0]), values.toArray(new String[0]));
}
@Test
public void testExecuteBinary() {
final RedisAdvancedClusterCommands<String, String> stringCommands = mock(RedisAdvancedClusterCommands.class);
final RedisAdvancedClusterCommands<byte[], byte[]> binaryCommands = mock(RedisAdvancedClusterCommands.class);
final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.buildMockRedisCluster(stringCommands, binaryCommands);
final String script = "return redis.call(\"SET\", KEYS[1], ARGV[1])";
final String sha = "abc123";
final ScriptOutputType scriptOutputType = ScriptOutputType.VALUE;
final List<byte[]> keys = List.of("key".getBytes(StandardCharsets.UTF_8));
final List<byte[]> values = List.of("value".getBytes(StandardCharsets.UTF_8));
when(stringCommands.scriptLoad(script)).thenReturn(sha);
when(binaryCommands.evalsha(any(), any(), any(), any())).thenReturn("OK".getBytes(StandardCharsets.UTF_8));
new ClusterLuaScript(mockCluster, script, scriptOutputType).executeBinary(keys, values);
verify(stringCommands).scriptLoad(script);
verify(binaryCommands).evalsha(sha, scriptOutputType, keys.toArray(new byte[0][]), values.toArray(new byte[0][]));
}
@Test
public void testExecuteBinaryNoScriptException() {
final RedisAdvancedClusterCommands<String, String> stringCommands = mock(RedisAdvancedClusterCommands.class);
final RedisAdvancedClusterCommands<byte[], byte[]> binaryCommands = mock(RedisAdvancedClusterCommands.class);
final FaultTolerantRedisCluster mockCluster = RedisClusterHelper.buildMockRedisCluster(stringCommands, binaryCommands);
final String script = "return redis.call(\"SET\", KEYS[1], ARGV[1])";
final String sha = "abc123";
final ScriptOutputType scriptOutputType = ScriptOutputType.VALUE;
final List<byte[]> keys = List.of("key".getBytes(StandardCharsets.UTF_8));
final List<byte[]> values = List.of("value".getBytes(StandardCharsets.UTF_8));
when(stringCommands.scriptLoad(script)).thenReturn(sha);
when(binaryCommands.evalsha(any(), any(), any(), any()))
.thenThrow(new RedisNoScriptException("OH NO"))
.thenReturn("OK".getBytes(StandardCharsets.UTF_8));
new ClusterLuaScript(mockCluster, script, scriptOutputType).executeBinary(keys, values);
verify(stringCommands).scriptLoad(script);
verify(binaryCommands).scriptLoad(script.getBytes(StandardCharsets.UTF_8));
verify(binaryCommands, times(2)).evalsha(sha, scriptOutputType, keys.toArray(new byte[0][]), values.toArray(new byte[0][]));
}
}