From 5b42593fbb568d2262eb770bb306af8ef49e957f Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 3 Sep 2020 10:56:07 -0400 Subject: [PATCH] Persist messages one page at a time. --- .../workers/ScourMessageCacheCommand.java | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ScourMessageCacheCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ScourMessageCacheCommand.java index 01319e072..546a69e1a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ScourMessageCacheCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ScourMessageCacheCommand.java @@ -6,6 +6,7 @@ import io.dropwizard.cli.ConfiguredCommand; import io.dropwizard.setup.Bootstrap; import io.lettuce.core.ScanArgs; import io.lettuce.core.ScanIterator; +import io.lettuce.core.ScoredValue; import net.sourceforge.argparse4j.inf.Namespace; import org.jdbi.v3.core.Jdbi; import org.slf4j.Logger; @@ -18,14 +19,16 @@ import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.Messages; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; public class ScourMessageCacheCommand extends ConfiguredCommand { private FaultTolerantRedisClient redisClient; private Messages messageDatabase; + private static final int MESSAGE_PAGE_SIZE = 100; + private static final Logger log = LoggerFactory.getLogger(ScourMessageCacheCommand.class); public ScourMessageCacheCommand() { @@ -70,14 +73,20 @@ public class ScourMessageCacheCommand extends ConfiguredCommand> messages; - redisClient.useBinaryClient(connection -> connection.sync().zrange(messageBytes -> { - persistMessage(accountNumber, deviceId, messageBytes); - messageCount.incrementAndGet(); - }, queueKey.getBytes(StandardCharsets.UTF_8), 0, Long.MAX_VALUE)); + do { + messages = redisClient.withBinaryClient(connection -> connection.sync().zpopmin(queueKeyBytes, MESSAGE_PAGE_SIZE)); + + for (final ScoredValue scoredValue : messages) { + persistMessage(accountNumber, deviceId, scoredValue.getValue()); + messageCount++; + } + } while (!messages.isEmpty()); redisClient.useClient(connection -> { final String accountNumberAndDeviceId = accountNumber + "::" + deviceId; @@ -87,7 +96,7 @@ public class ScourMessageCacheCommand extends ConfiguredCommand