From 189f8afcc9b86e23337212e8444854c0fe50ac27 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 6 Aug 2020 13:15:08 -0400 Subject: [PATCH] Warm up the test cluster before running tests to avoid transient startup jitters. --- .../redis/AbstractRedisClusterTest.java | 47 +++++++++++++++---- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java index 76c15eaa5..3f1a9fd59 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/AbstractRedisClusterTest.java @@ -1,14 +1,17 @@ package org.whispersystems.textsecuregcm.redis; import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisException; import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.cluster.SlotHash; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.util.RedisClusterUtil; import redis.embedded.RedisServer; import java.io.File; @@ -27,7 +30,6 @@ import static org.junit.Assume.assumeFalse; */ public abstract class AbstractRedisClusterTest { - private static final int MAX_SLOT = 16384; private static final int NODE_COUNT = 2; private static RedisServer[] clusterNodes; @@ -55,6 +57,34 @@ public abstract class AbstractRedisClusterTest { .collect(Collectors.toList()); redisCluster = new FaultTolerantRedisCluster("test-cluster", urls, Duration.ofSeconds(2), new CircuitBreakerConfiguration()); + + redisCluster.useWriteCluster(connection -> { + boolean setAll = false; + + final String[] keys = new String[NODE_COUNT]; + + for (int i = 0; i < keys.length; i++) { + keys[i] = RedisClusterUtil.getMinimalHashTag(i * SlotHash.SLOT_COUNT / keys.length); + } + + while (!setAll) { + try { + for (final String key : keys) { + connection.sync().set(key, "warmup"); + } + + setAll = true; + } catch (final RedisException ignored) { + // Cluster isn't ready; wait and retry. + try { + Thread.sleep(500); + } catch (final InterruptedException ignored2) { + } + } + } + }); + + redisCluster.useWriteCluster(connection -> connection.sync().flushall()); } protected FaultTolerantRedisCluster getRedisCluster() { @@ -67,7 +97,7 @@ public abstract class AbstractRedisClusterTest { } @AfterClass - public static void tearDownAfterClass() throws Exception { + public static void tearDownAfterClass() { for (final RedisServer node : clusterNodes) { node.stop(); } @@ -101,31 +131,30 @@ public abstract class AbstractRedisClusterTest { meetClient.shutdown(); } - final int slotsPerNode = MAX_SLOT / nodes.length; + final int slotsPerNode = SlotHash.SLOT_COUNT / nodes.length; for (int i = 0; i < nodes.length; i++) { final int startInclusive = i * slotsPerNode; - final int endExclusive = i == nodes.length - 1 ? MAX_SLOT : (i + 1) * slotsPerNode; + final int endExclusive = i == nodes.length - 1 ? SlotHash.SLOT_COUNT : (i + 1) * slotsPerNode; final RedisClient assignSlotClient = RedisClient.create(RedisURI.create("127.0.0.1", nodes[i].ports().get(0))); - try { + try (final StatefulRedisConnection assignSlotConnection = assignSlotClient.connect()) { final int[] slots = new int[endExclusive - startInclusive]; for (int s = startInclusive; s < endExclusive; s++) { slots[s - startInclusive] = s; } - assignSlotClient.connect().sync().clusterAddSlots(slots); + assignSlotConnection.sync().clusterAddSlots(slots); } finally { assignSlotClient.shutdown(); } } - final RedisClient waitClient = RedisClient.create(RedisURI.create("127.0.0.1", nodes[0].ports().get(0))); - final StatefulRedisConnection connection = waitClient.connect(); + final RedisClient waitClient = RedisClient.create(RedisURI.create("127.0.0.1", nodes[0].ports().get(0))); - try { + try (final StatefulRedisConnection connection = waitClient.connect()) { // CLUSTER INFO gives us a big blob of key-value pairs, but the one we're interested in is `cluster_state`. // According to https://redis.io/commands/cluster-info, `cluster_state:ok` means that the node is ready to // receive queries, all slots are assigned, and a majority of master nodes are reachable.