Store messages in batches.
This commit is contained in:
parent
5de72a74f5
commit
dfa94eac41
|
@ -3,6 +3,8 @@ package org.whispersystems.textsecuregcm.storage;
|
|||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.jdbi.v3.core.statement.PreparedBatch;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.OutgoingMessageEntityRowMapper;
|
||||
|
@ -49,24 +51,35 @@ public class Messages {
|
|||
this.database.getDatabase().registerRowMapper(new OutgoingMessageEntityRowMapper());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void store(UUID guid, Envelope message, String destination, long destinationDevice) {
|
||||
final Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
|
||||
store(List.of(messageWithGuid), destination, destinationDevice);
|
||||
}
|
||||
|
||||
public void store(List<Envelope> messages, String destination, long destinationDevice) {
|
||||
database.use(jdbi ->jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = storeTimer.time()) {
|
||||
handle.createUpdate("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_UUID + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " +
|
||||
"VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_uuid, :source_device, :destination, :destination_device, :message, :content)")
|
||||
.bind("guid", guid)
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
.bind("type", message.getType().getNumber())
|
||||
.bind("relay", message.getRelay())
|
||||
.bind("timestamp", message.getTimestamp())
|
||||
.bind("server_timestamp", message.getServerTimestamp())
|
||||
.bind("source", message.hasSource() ? message.getSource() : null)
|
||||
.bind("source_uuid", message.hasSourceUuid() ? UUID.fromString(message.getSourceUuid()) : null)
|
||||
.bind("source_device", message.hasSourceDevice() ? message.getSourceDevice() : null)
|
||||
.bind("message", message.hasLegacyMessage() ? message.getLegacyMessage().toByteArray() : null)
|
||||
.bind("content", message.hasContent() ? message.getContent().toByteArray() : null)
|
||||
.execute();
|
||||
final PreparedBatch batch = handle.prepareBatch("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_UUID + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " +
|
||||
"VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_uuid, :source_device, :destination, :destination_device, :message, :content)");
|
||||
|
||||
for (final Envelope message : messages) {
|
||||
batch.bind("guid", UUID.fromString(message.getServerGuid()))
|
||||
.bind("type", message.getType().getNumber())
|
||||
.bind("relay", message.getRelay())
|
||||
.bind("timestamp", message.getTimestamp())
|
||||
.bind("server_timestamp", message.getServerTimestamp())
|
||||
.bind("source", message.hasSource() ? message.getSource() : null)
|
||||
.bind("source_uuid", message.hasSourceUuid() ? UUID.fromString(message.getSourceUuid()) : null)
|
||||
.bind("source_device", message.hasSourceDevice() ? message.getSourceDevice() : null)
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
.bind("message", message.hasLegacyMessage() ? message.getLegacyMessage().toByteArray() : null)
|
||||
.bind("content", message.hasContent() ? message.getContent().toByteArray() : null)
|
||||
.add();
|
||||
}
|
||||
|
||||
batch.execute();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import org.whispersystems.textsecuregcm.util.Util;
|
|||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
|
@ -451,47 +452,43 @@ public class MessagesCache implements Managed {
|
|||
}
|
||||
|
||||
private void persistQueue(ReplicatedJedisPool jedisPool, Key key) throws IOException {
|
||||
Timer.Context timer = persistQueueTimer.time();
|
||||
|
||||
int messagesPersistedCount = 0;
|
||||
|
||||
try (Jedis jedis = jedisPool.getWriteResource()) {
|
||||
try (Jedis jedis = jedisPool.getWriteResource();
|
||||
Timer.Context ignored = persistQueueTimer.time()) {
|
||||
|
||||
while (true) {
|
||||
jedis.setex(key.getUserMessageQueuePersistInProgress(), 30, "1".getBytes());
|
||||
|
||||
Set<Tuple> messages = jedis.zrangeWithScores(key.getUserMessageQueue(), 0, CHUNK_SIZE);
|
||||
List<Envelope> envelopes = new ArrayList<>(messages.size());
|
||||
|
||||
for (Tuple message : messages) {
|
||||
persistMessage(jedis, key, (long)message.getScore(), message.getBinaryElement());
|
||||
messagesPersistedCount++;
|
||||
for (Tuple tuple : messages) {
|
||||
try {
|
||||
envelopes.add(Envelope.parseFrom(tuple.getBinaryElement()));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
logger.error("Error parsing envelope", e);
|
||||
}
|
||||
}
|
||||
|
||||
database.store(envelopes, key.getAddress(), key.getDeviceId());
|
||||
|
||||
for (Tuple tuple : messages) {
|
||||
removeOperation.remove(jedis, key.getAddress(), key.getDeviceId(), (long)tuple.getScore());
|
||||
}
|
||||
|
||||
messagesPersistedCount += envelopes.size();
|
||||
|
||||
if (messages.size() < CHUNK_SIZE) {
|
||||
jedis.del(key.getUserMessageQueuePersistInProgress());
|
||||
return;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
timer.stop();
|
||||
queueSizeHistogram.update(messagesPersistedCount);
|
||||
}
|
||||
}
|
||||
|
||||
private void persistMessage(Jedis jedis, Key key, long score, byte[] message) {
|
||||
try {
|
||||
Envelope envelope = Envelope.parseFrom(message);
|
||||
UUID guid = envelope.hasServerGuid() ? UUID.fromString(envelope.getServerGuid()) : null;
|
||||
|
||||
envelope = envelope.toBuilder().clearServerGuid().build();
|
||||
|
||||
database.store(guid, envelope, key.getAddress(), key.getDeviceId());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
logger.error("Error parsing envelope", e);
|
||||
}
|
||||
|
||||
removeOperation.remove(jedis, key.getAddress(), key.getDeviceId(), score);
|
||||
}
|
||||
|
||||
private List<byte[]> getQueuesToPersist(GetOperation getOperation) {
|
||||
Timer.Context timer = getQueuesTimer.time();
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue