Persist messages one page at a time.
This commit is contained in:
parent
25f3c6a548
commit
5b42593fbb
|
@ -6,6 +6,7 @@ import io.dropwizard.cli.ConfiguredCommand;
|
|||
import io.dropwizard.setup.Bootstrap;
|
||||
import io.lettuce.core.ScanArgs;
|
||||
import io.lettuce.core.ScanIterator;
|
||||
import io.lettuce.core.ScoredValue;
|
||||
import net.sourceforge.argparse4j.inf.Namespace;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -18,14 +19,16 @@ import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
|||
import org.whispersystems.textsecuregcm.storage.Messages;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class ScourMessageCacheCommand extends ConfiguredCommand<WhisperServerConfiguration> {
|
||||
|
||||
private FaultTolerantRedisClient redisClient;
|
||||
private Messages messageDatabase;
|
||||
|
||||
private static final int MESSAGE_PAGE_SIZE = 100;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ScourMessageCacheCommand.class);
|
||||
|
||||
public ScourMessageCacheCommand() {
|
||||
|
@ -70,14 +73,20 @@ public class ScourMessageCacheCommand extends ConfiguredCommand<WhisperServerCon
|
|||
accountNumber = queueKey.substring(startOfAccountNumber + 2, queueKey.indexOf("::", startOfAccountNumber + 1));
|
||||
}
|
||||
|
||||
final long deviceId = Long.parseLong(queueKey.substring(queueKey.lastIndexOf("::") + 2));
|
||||
final long deviceId = Long.parseLong(queueKey.substring(queueKey.lastIndexOf("::") + 2));
|
||||
final byte[] queueKeyBytes = queueKey.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
final AtomicInteger messageCount = new AtomicInteger(0);
|
||||
int messageCount = 0;
|
||||
List<ScoredValue<byte[]>> messages;
|
||||
|
||||
redisClient.useBinaryClient(connection -> connection.sync().zrange(messageBytes -> {
|
||||
persistMessage(accountNumber, deviceId, messageBytes);
|
||||
messageCount.incrementAndGet();
|
||||
}, queueKey.getBytes(StandardCharsets.UTF_8), 0, Long.MAX_VALUE));
|
||||
do {
|
||||
messages = redisClient.withBinaryClient(connection -> connection.sync().zpopmin(queueKeyBytes, MESSAGE_PAGE_SIZE));
|
||||
|
||||
for (final ScoredValue<byte[]> scoredValue : messages) {
|
||||
persistMessage(accountNumber, deviceId, scoredValue.getValue());
|
||||
messageCount++;
|
||||
}
|
||||
} while (!messages.isEmpty());
|
||||
|
||||
redisClient.useClient(connection -> {
|
||||
final String accountNumberAndDeviceId = accountNumber + "::" + deviceId;
|
||||
|
@ -87,7 +96,7 @@ public class ScourMessageCacheCommand extends ConfiguredCommand<WhisperServerCon
|
|||
"user_queue_persisting::" + accountNumberAndDeviceId);
|
||||
});
|
||||
|
||||
log.info("Persisted a queue with {} messages", messageCount.get());
|
||||
log.info("Persisted a queue with {} messages", messageCount);
|
||||
}
|
||||
|
||||
private void persistMessage(final String accountNumber, final long deviceId, final byte[] message) {
|
||||
|
|
Loading…
Reference in New Issue