diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java index a1acbd2c8..4da9f89e9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java @@ -3,6 +3,8 @@ package org.whispersystems.textsecuregcm.storage; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; +import org.jdbi.v3.core.statement.PreparedBatch; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.storage.mappers.OutgoingMessageEntityRowMapper; @@ -49,24 +51,35 @@ public class Messages { this.database.getDatabase().registerRowMapper(new OutgoingMessageEntityRowMapper()); } + @VisibleForTesting public void store(UUID guid, Envelope message, String destination, long destinationDevice) { + final Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build(); + store(List.of(messageWithGuid), destination, destinationDevice); + } + + public void store(List messages, String destination, long destinationDevice) { database.use(jdbi ->jdbi.useHandle(handle -> { try (Timer.Context ignored = storeTimer.time()) { - handle.createUpdate("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_UUID + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " + - "VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_uuid, :source_device, :destination, :destination_device, :message, :content)") - .bind("guid", guid) - .bind("destination", destination) - .bind("destination_device", destinationDevice) - .bind("type", message.getType().getNumber()) - .bind("relay", message.getRelay()) - .bind("timestamp", message.getTimestamp()) - .bind("server_timestamp", message.getServerTimestamp()) - .bind("source", message.hasSource() ? message.getSource() : null) - .bind("source_uuid", message.hasSourceUuid() ? UUID.fromString(message.getSourceUuid()) : null) - .bind("source_device", message.hasSourceDevice() ? message.getSourceDevice() : null) - .bind("message", message.hasLegacyMessage() ? message.getLegacyMessage().toByteArray() : null) - .bind("content", message.hasContent() ? message.getContent().toByteArray() : null) - .execute(); + final PreparedBatch batch = handle.prepareBatch("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_UUID + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " + + "VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_uuid, :source_device, :destination, :destination_device, :message, :content)"); + + for (final Envelope message : messages) { + batch.bind("guid", UUID.fromString(message.getServerGuid())) + .bind("type", message.getType().getNumber()) + .bind("relay", message.getRelay()) + .bind("timestamp", message.getTimestamp()) + .bind("server_timestamp", message.getServerTimestamp()) + .bind("source", message.hasSource() ? message.getSource() : null) + .bind("source_uuid", message.hasSourceUuid() ? UUID.fromString(message.getSourceUuid()) : null) + .bind("source_device", message.hasSourceDevice() ? message.getSourceDevice() : null) + .bind("destination", destination) + .bind("destination_device", destinationDevice) + .bind("message", message.hasLegacyMessage() ? message.getLegacyMessage().toByteArray() : null) + .bind("content", message.hasContent() ? message.getContent().toByteArray() : null) + .add(); + } + + batch.execute(); } })); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 44d3f02a1..908a75f5f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -19,6 +19,7 @@ import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -451,47 +452,43 @@ public class MessagesCache implements Managed { } private void persistQueue(ReplicatedJedisPool jedisPool, Key key) throws IOException { - Timer.Context timer = persistQueueTimer.time(); - int messagesPersistedCount = 0; - try (Jedis jedis = jedisPool.getWriteResource()) { + try (Jedis jedis = jedisPool.getWriteResource(); + Timer.Context ignored = persistQueueTimer.time()) { + while (true) { jedis.setex(key.getUserMessageQueuePersistInProgress(), 30, "1".getBytes()); Set messages = jedis.zrangeWithScores(key.getUserMessageQueue(), 0, CHUNK_SIZE); + List envelopes = new ArrayList<>(messages.size()); - for (Tuple message : messages) { - persistMessage(jedis, key, (long)message.getScore(), message.getBinaryElement()); - messagesPersistedCount++; + for (Tuple tuple : messages) { + try { + envelopes.add(Envelope.parseFrom(tuple.getBinaryElement())); + } catch (InvalidProtocolBufferException e) { + logger.error("Error parsing envelope", e); + } } + database.store(envelopes, key.getAddress(), key.getDeviceId()); + + for (Tuple tuple : messages) { + removeOperation.remove(jedis, key.getAddress(), key.getDeviceId(), (long)tuple.getScore()); + } + + messagesPersistedCount += envelopes.size(); + if (messages.size() < CHUNK_SIZE) { jedis.del(key.getUserMessageQueuePersistInProgress()); return; } } } finally { - timer.stop(); queueSizeHistogram.update(messagesPersistedCount); } } - private void persistMessage(Jedis jedis, Key key, long score, byte[] message) { - try { - Envelope envelope = Envelope.parseFrom(message); - UUID guid = envelope.hasServerGuid() ? UUID.fromString(envelope.getServerGuid()) : null; - - envelope = envelope.toBuilder().clearServerGuid().build(); - - database.store(guid, envelope, key.getAddress(), key.getDeviceId()); - } catch (InvalidProtocolBufferException e) { - logger.error("Error parsing envelope", e); - } - - removeOperation.remove(jedis, key.getAddress(), key.getDeviceId(), score); - } - private List getQueuesToPersist(GetOperation getOperation) { Timer.Context timer = getQueuesTimer.time(); try {