diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommand.java index 622e0d1d9..05f8bb67f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommand.java @@ -1,5 +1,6 @@ package org.whispersystems.textsecuregcm.workers; +import com.google.common.annotations.VisibleForTesting; import io.dropwizard.cli.ConfiguredCommand; import io.dropwizard.setup.Bootstrap; import net.sourceforge.argparse4j.inf.Namespace; @@ -14,7 +15,11 @@ public class ClearCacheClusterCommand extends ConfiguredCommand bootstrap, final Namespace namespace, final WhisperServerConfiguration config) { - final FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration().getUrls(), config.getCacheClusterConfiguration().getTimeout(), config.getCacheClusterConfiguration().getCircuitBreakerConfiguration()); - cacheCluster.useWriteCluster(connection -> connection.sync().flushallAsync()); + clearCache(new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration().getUrls(), config.getCacheClusterConfiguration().getTimeout(), config.getCacheClusterConfiguration().getCircuitBreakerConfiguration())); + } + + @VisibleForTesting + static void clearCache(final FaultTolerantRedisCluster cacheCluster) { + cacheCluster.useWriteCluster(connection -> connection.sync().masters().commands().flushallAsync()); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommandTest.java new file mode 100644 index 000000000..163958d89 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/ClearCacheClusterCommandTest.java @@ -0,0 +1,54 @@ +package org.whispersystems.textsecuregcm.workers; + +import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; +import org.junit.Test; +import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +public class ClearCacheClusterCommandTest extends AbstractRedisClusterTest { + + private static final int KEY_COUNT = 10_000; + + @Test + public void testClearCache() throws InterruptedException { + final FaultTolerantRedisCluster cluster = getRedisCluster(); + + cluster.useWriteCluster(connection -> { + final RedisAdvancedClusterCommands clusterCommands = connection.sync(); + + for (int i = 0; i < KEY_COUNT; i++) { + clusterCommands.set("key::" + i, String.valueOf(i)); + } + }); + + { + final AtomicInteger nodeCount = new AtomicInteger(0); + + cluster.useWriteCluster(connection -> connection.sync().masters().asMap().forEach((node, commands) -> { + assertTrue(commands.dbsize() > 0); + nodeCount.incrementAndGet(); + })); + + assertTrue(nodeCount.get() > 0); + } + + ClearCacheClusterCommand.clearCache(cluster); + + Thread.sleep(1000); + + { + final AtomicInteger nodeCount = new AtomicInteger(0); + + cluster.useWriteCluster(connection -> connection.sync().masters().asMap().forEach((node, commands) -> { + assertEquals(0L, (long)commands.dbsize()); + nodeCount.incrementAndGet(); + })); + + assertTrue(nodeCount.get() > 0); + } + } +}