Clear each cluster node individually.

This commit is contained in:
Jon Chambers 2020-07-21 17:53:21 -04:00 committed by Jon Chambers
parent db9b7ca447
commit 0fbf31ec98
2 changed files with 61 additions and 2 deletions

View File

@ -1,5 +1,6 @@
package org.whispersystems.textsecuregcm.workers;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.cli.ConfiguredCommand;
import io.dropwizard.setup.Bootstrap;
import net.sourceforge.argparse4j.inf.Namespace;
@ -14,7 +15,11 @@ public class ClearCacheClusterCommand extends ConfiguredCommand<WhisperServerCon
@Override
protected void run(final Bootstrap<WhisperServerConfiguration> bootstrap, final Namespace namespace, final WhisperServerConfiguration config) {
final FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration().getUrls(), config.getCacheClusterConfiguration().getTimeout(), config.getCacheClusterConfiguration().getCircuitBreakerConfiguration());
cacheCluster.useWriteCluster(connection -> connection.sync().flushallAsync());
clearCache(new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration().getUrls(), config.getCacheClusterConfiguration().getTimeout(), config.getCacheClusterConfiguration().getCircuitBreakerConfiguration()));
}
@VisibleForTesting
static void clearCache(final FaultTolerantRedisCluster cacheCluster) {
cacheCluster.useWriteCluster(connection -> connection.sync().masters().commands().flushallAsync());
}
}

View File

@ -0,0 +1,54 @@
package org.whispersystems.textsecuregcm.workers;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.junit.Test;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
public class ClearCacheClusterCommandTest extends AbstractRedisClusterTest {
private static final int KEY_COUNT = 10_000;
@Test
public void testClearCache() throws InterruptedException {
final FaultTolerantRedisCluster cluster = getRedisCluster();
cluster.useWriteCluster(connection -> {
final RedisAdvancedClusterCommands<String, String> clusterCommands = connection.sync();
for (int i = 0; i < KEY_COUNT; i++) {
clusterCommands.set("key::" + i, String.valueOf(i));
}
});
{
final AtomicInteger nodeCount = new AtomicInteger(0);
cluster.useWriteCluster(connection -> connection.sync().masters().asMap().forEach((node, commands) -> {
assertTrue(commands.dbsize() > 0);
nodeCount.incrementAndGet();
}));
assertTrue(nodeCount.get() > 0);
}
ClearCacheClusterCommand.clearCache(cluster);
Thread.sleep(1000);
{
final AtomicInteger nodeCount = new AtomicInteger(0);
cluster.useWriteCluster(connection -> connection.sync().masters().asMap().forEach((node, commands) -> {
assertEquals(0L, (long)commands.dbsize());
nodeCount.incrementAndGet();
}));
assertTrue(nodeCount.get() > 0);
}
}
}