Revert batch message storage. (#95)
This commit is contained in:
parent
d5f69aec10
commit
eecc71c77f
|
@ -1,11 +1,8 @@
|
||||||
package org.whispersystems.textsecuregcm.storage;
|
package org.whispersystems.textsecuregcm.storage;
|
||||||
|
|
||||||
import com.codahale.metrics.Histogram;
|
|
||||||
import com.codahale.metrics.MetricRegistry;
|
import com.codahale.metrics.MetricRegistry;
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.jdbi.v3.core.statement.PreparedBatch;
|
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||||
import org.whispersystems.textsecuregcm.storage.mappers.OutgoingMessageEntityRowMapper;
|
import org.whispersystems.textsecuregcm.storage.mappers.OutgoingMessageEntityRowMapper;
|
||||||
|
@ -44,7 +41,6 @@ public class Messages {
|
||||||
private final Timer clearDeviceTimer = metricRegistry.timer(name(Messages.class, "clearDevice" ));
|
private final Timer clearDeviceTimer = metricRegistry.timer(name(Messages.class, "clearDevice" ));
|
||||||
private final Timer clearTimer = metricRegistry.timer(name(Messages.class, "clear" ));
|
private final Timer clearTimer = metricRegistry.timer(name(Messages.class, "clear" ));
|
||||||
private final Timer vacuumTimer = metricRegistry.timer(name(Messages.class, "vacuum"));
|
private final Timer vacuumTimer = metricRegistry.timer(name(Messages.class, "vacuum"));
|
||||||
private final Histogram storeSizeHistogram = metricRegistry.histogram(name(Messages.class, "storeBatchSize"));
|
|
||||||
|
|
||||||
private final FaultTolerantDatabase database;
|
private final FaultTolerantDatabase database;
|
||||||
|
|
||||||
|
@ -53,14 +49,14 @@ public class Messages {
|
||||||
this.database.getDatabase().registerRowMapper(new OutgoingMessageEntityRowMapper());
|
this.database.getDatabase().registerRowMapper(new OutgoingMessageEntityRowMapper());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void store(List<Envelope> messages, String destination, long destinationDevice) {
|
public void store(UUID guid, Envelope message, String destination, long destinationDevice) {
|
||||||
database.use(jdbi -> jdbi.useTransaction(handle -> {
|
database.use(jdbi ->jdbi.useHandle(handle -> {
|
||||||
try (Timer.Context ignored = storeTimer.time()) {
|
try (Timer.Context ignored = storeTimer.time()) {
|
||||||
final PreparedBatch batch = handle.prepareBatch("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_UUID + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " +
|
handle.createUpdate("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_UUID + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " +
|
||||||
"VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_uuid, :source_device, :destination, :destination_device, :message, :content)");
|
"VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_uuid, :source_device, :destination, :destination_device, :message, :content)")
|
||||||
|
.bind("guid", guid)
|
||||||
for (final Envelope message : messages) {
|
.bind("destination", destination)
|
||||||
batch.bind("guid", UUID.fromString(message.getServerGuid()))
|
.bind("destination_device", destinationDevice)
|
||||||
.bind("type", message.getType().getNumber())
|
.bind("type", message.getType().getNumber())
|
||||||
.bind("relay", message.getRelay())
|
.bind("relay", message.getRelay())
|
||||||
.bind("timestamp", message.getTimestamp())
|
.bind("timestamp", message.getTimestamp())
|
||||||
|
@ -68,15 +64,9 @@ public class Messages {
|
||||||
.bind("source", message.hasSource() ? message.getSource() : null)
|
.bind("source", message.hasSource() ? message.getSource() : null)
|
||||||
.bind("source_uuid", message.hasSourceUuid() ? UUID.fromString(message.getSourceUuid()) : null)
|
.bind("source_uuid", message.hasSourceUuid() ? UUID.fromString(message.getSourceUuid()) : null)
|
||||||
.bind("source_device", message.hasSourceDevice() ? message.getSourceDevice() : null)
|
.bind("source_device", message.hasSourceDevice() ? message.getSourceDevice() : null)
|
||||||
.bind("destination", destination)
|
|
||||||
.bind("destination_device", destinationDevice)
|
|
||||||
.bind("message", message.hasLegacyMessage() ? message.getLegacyMessage().toByteArray() : null)
|
.bind("message", message.hasLegacyMessage() ? message.getLegacyMessage().toByteArray() : null)
|
||||||
.bind("content", message.hasContent() ? message.getContent().toByteArray() : null)
|
.bind("content", message.hasContent() ? message.getContent().toByteArray() : null)
|
||||||
.add();
|
.execute();
|
||||||
}
|
|
||||||
|
|
||||||
batch.execute();
|
|
||||||
storeSizeHistogram.update(messages.size());
|
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@ import org.whispersystems.textsecuregcm.util.Util;
|
||||||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -452,32 +451,20 @@ public class MessagesCache implements Managed {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void persistQueue(ReplicatedJedisPool jedisPool, Key key) throws IOException {
|
private void persistQueue(ReplicatedJedisPool jedisPool, Key key) throws IOException {
|
||||||
|
Timer.Context timer = persistQueueTimer.time();
|
||||||
|
|
||||||
int messagesPersistedCount = 0;
|
int messagesPersistedCount = 0;
|
||||||
|
|
||||||
try (Jedis jedis = jedisPool.getWriteResource();
|
try (Jedis jedis = jedisPool.getWriteResource()) {
|
||||||
Timer.Context ignored = persistQueueTimer.time()) {
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
jedis.setex(key.getUserMessageQueuePersistInProgress(), 30, "1".getBytes());
|
jedis.setex(key.getUserMessageQueuePersistInProgress(), 30, "1".getBytes());
|
||||||
|
|
||||||
Set<Tuple> messages = jedis.zrangeWithScores(key.getUserMessageQueue(), 0, CHUNK_SIZE);
|
Set<Tuple> messages = jedis.zrangeWithScores(key.getUserMessageQueue(), 0, CHUNK_SIZE);
|
||||||
List<Envelope> envelopes = new ArrayList<>(messages.size());
|
|
||||||
|
|
||||||
for (Tuple tuple : messages) {
|
for (Tuple message : messages) {
|
||||||
try {
|
persistMessage(jedis, key, (long)message.getScore(), message.getBinaryElement());
|
||||||
envelopes.add(Envelope.parseFrom(tuple.getBinaryElement()));
|
messagesPersistedCount++;
|
||||||
} catch (InvalidProtocolBufferException e) {
|
|
||||||
logger.error("Error parsing envelope", e);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
database.store(envelopes, key.getAddress(), key.getDeviceId());
|
|
||||||
|
|
||||||
for (Tuple tuple : messages) {
|
|
||||||
removeOperation.remove(jedis, key.getAddress(), key.getDeviceId(), (long)tuple.getScore());
|
|
||||||
}
|
|
||||||
|
|
||||||
messagesPersistedCount += envelopes.size();
|
|
||||||
|
|
||||||
if (messages.size() < CHUNK_SIZE) {
|
if (messages.size() < CHUNK_SIZE) {
|
||||||
jedis.del(key.getUserMessageQueuePersistInProgress());
|
jedis.del(key.getUserMessageQueuePersistInProgress());
|
||||||
|
@ -485,10 +472,26 @@ public class MessagesCache implements Managed {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
timer.stop();
|
||||||
queueSizeHistogram.update(messagesPersistedCount);
|
queueSizeHistogram.update(messagesPersistedCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void persistMessage(Jedis jedis, Key key, long score, byte[] message) {
|
||||||
|
try {
|
||||||
|
Envelope envelope = Envelope.parseFrom(message);
|
||||||
|
UUID guid = envelope.hasServerGuid() ? UUID.fromString(envelope.getServerGuid()) : null;
|
||||||
|
|
||||||
|
envelope = envelope.toBuilder().clearServerGuid().build();
|
||||||
|
|
||||||
|
database.store(guid, envelope, key.getAddress(), key.getDeviceId());
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
logger.error("Error parsing envelope", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
removeOperation.remove(jedis, key.getAddress(), key.getDeviceId(), score);
|
||||||
|
}
|
||||||
|
|
||||||
private List<byte[]> getQueuesToPersist(GetOperation getOperation) {
|
private List<byte[]> getQueuesToPersist(GetOperation getOperation) {
|
||||||
Timer.Context timer = getQueuesTimer.time();
|
Timer.Context timer = getQueuesTimer.time();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
||||||
|
|
||||||
|
@ -45,8 +44,9 @@ public class MessagesTest {
|
||||||
@Test
|
@Test
|
||||||
public void testStore() throws SQLException {
|
public void testStore() throws SQLException {
|
||||||
Envelope envelope = generateEnvelope();
|
Envelope envelope = generateEnvelope();
|
||||||
|
UUID guid = UUID.randomUUID();
|
||||||
|
|
||||||
messages.store(List.of(envelope), "+14151112222", 1);
|
messages.store(guid, envelope, "+14151112222", 1);
|
||||||
|
|
||||||
PreparedStatement statement = db.getTestDatabase().getConnection().prepareStatement("SELECT * FROM messages WHERE destination = ?");
|
PreparedStatement statement = db.getTestDatabase().getConnection().prepareStatement("SELECT * FROM messages WHERE destination = ?");
|
||||||
statement.setString(1, "+14151112222");
|
statement.setString(1, "+14151112222");
|
||||||
|
@ -54,7 +54,7 @@ public class MessagesTest {
|
||||||
ResultSet resultSet = statement.executeQuery();
|
ResultSet resultSet = statement.executeQuery();
|
||||||
assertThat(resultSet.next()).isTrue();
|
assertThat(resultSet.next()).isTrue();
|
||||||
|
|
||||||
assertThat(resultSet.getString("guid")).isEqualTo(envelope.getServerGuid());
|
assertThat(resultSet.getString("guid")).isEqualTo(guid.toString());
|
||||||
assertThat(resultSet.getInt("type")).isEqualTo(envelope.getType().getNumber());
|
assertThat(resultSet.getInt("type")).isEqualTo(envelope.getType().getNumber());
|
||||||
assertThat(resultSet.getString("relay")).isNullOrEmpty();
|
assertThat(resultSet.getString("relay")).isNullOrEmpty();
|
||||||
assertThat(resultSet.getLong("timestamp")).isEqualTo(envelope.getTimestamp());
|
assertThat(resultSet.getLong("timestamp")).isEqualTo(envelope.getTimestamp());
|
||||||
|
@ -71,28 +71,36 @@ public class MessagesTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLoad() {
|
public void testLoad() {
|
||||||
List<Envelope> inserted = insertRandom("+14151112222", 1);
|
List<MessageToStore> inserted = new ArrayList<>(50);
|
||||||
inserted.sort(Comparator.comparingLong(Envelope::getTimestamp));
|
|
||||||
|
for (int i=0;i<50;i++) {
|
||||||
|
MessageToStore message = generateMessageToStore();
|
||||||
|
inserted.add(message);
|
||||||
|
|
||||||
|
messages.store(message.guid, message.envelope, "+14151112222", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
inserted.sort(Comparator.comparingLong(o -> o.envelope.getTimestamp()));
|
||||||
|
|
||||||
List<OutgoingMessageEntity> retrieved = messages.load("+14151112222", 1);
|
List<OutgoingMessageEntity> retrieved = messages.load("+14151112222", 1);
|
||||||
|
|
||||||
assertThat(retrieved.size()).isEqualTo(inserted.size());
|
assertThat(retrieved.size()).isEqualTo(inserted.size());
|
||||||
|
|
||||||
for (int i=0;i<retrieved.size();i++) {
|
for (int i=0;i<retrieved.size();i++) {
|
||||||
verifyExpected(retrieved.get(i), inserted.get(i));
|
verifyExpected(retrieved.get(i), inserted.get(i).envelope, inserted.get(i).guid);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void removeBySourceDestinationTimestamp() {
|
public void removeBySourceDestinationTimestamp() {
|
||||||
List<Envelope> inserted = insertRandom("+14151112222", 1);
|
List<MessageToStore> inserted = insertRandom("+14151112222", 1);
|
||||||
List<Envelope> unrelated = insertRandom("+14151114444", 3);
|
List<MessageToStore> unrelated = insertRandom("+14151114444", 3);
|
||||||
Envelope toRemove = inserted.remove(new Random(System.currentTimeMillis()).nextInt(inserted.size() - 1));
|
MessageToStore toRemove = inserted.remove(new Random(System.currentTimeMillis()).nextInt(inserted.size() - 1));
|
||||||
Optional<OutgoingMessageEntity> removed = messages.remove("+14151112222", 1, toRemove.getSource(), toRemove.getTimestamp());
|
Optional<OutgoingMessageEntity> removed = messages.remove("+14151112222", 1, toRemove.envelope.getSource(), toRemove.envelope.getTimestamp());
|
||||||
|
|
||||||
assertThat(removed.isPresent()).isTrue();
|
assertThat(removed.isPresent()).isTrue();
|
||||||
verifyExpected(removed.get(), toRemove);
|
verifyExpected(removed.get(), toRemove.envelope, toRemove.guid);
|
||||||
|
|
||||||
verifyInTact(inserted, "+14151112222", 1);
|
verifyInTact(inserted, "+14151112222", 1);
|
||||||
verifyInTact(unrelated, "+14151114444", 3);
|
verifyInTact(unrelated, "+14151114444", 3);
|
||||||
|
@ -100,13 +108,13 @@ public class MessagesTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void removeByDestinationGuid() {
|
public void removeByDestinationGuid() {
|
||||||
List<Envelope> unrelated = insertRandom("+14151113333", 2);
|
List<MessageToStore> unrelated = insertRandom("+14151113333", 2);
|
||||||
List<Envelope> inserted = insertRandom("+14151112222", 1);
|
List<MessageToStore> inserted = insertRandom("+14151112222", 1);
|
||||||
Envelope toRemove = inserted.remove(new Random(System.currentTimeMillis()).nextInt(inserted.size() - 1));
|
MessageToStore toRemove = inserted.remove(new Random(System.currentTimeMillis()).nextInt(inserted.size() - 1));
|
||||||
Optional<OutgoingMessageEntity> removed = messages.remove("+14151112222", UUID.fromString(toRemove.getServerGuid()));
|
Optional<OutgoingMessageEntity> removed = messages.remove("+14151112222", toRemove.guid);
|
||||||
|
|
||||||
assertThat(removed).isPresent();
|
assertThat(removed.isPresent()).isTrue();
|
||||||
verifyExpected(removed.get(), toRemove);
|
verifyExpected(removed.get(), toRemove.envelope, toRemove.guid);
|
||||||
|
|
||||||
verifyInTact(inserted, "+14151112222", 1);
|
verifyInTact(inserted, "+14151112222", 1);
|
||||||
verifyInTact(unrelated, "+14151113333", 2);
|
verifyInTact(unrelated, "+14151113333", 2);
|
||||||
|
@ -114,10 +122,10 @@ public class MessagesTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void removeByDestinationRowId() {
|
public void removeByDestinationRowId() {
|
||||||
List<Envelope> unrelatedInserted = insertRandom("+14151111111", 1);
|
List<MessageToStore> unrelatedInserted = insertRandom("+14151111111", 1);
|
||||||
List<Envelope> inserted = insertRandom("+14151112222", 1);
|
List<MessageToStore> inserted = insertRandom("+14151112222", 1);
|
||||||
|
|
||||||
inserted.sort(Comparator.comparingLong(Envelope::getTimestamp));
|
inserted.sort(Comparator.comparingLong(o -> o.envelope.getTimestamp()));
|
||||||
|
|
||||||
List<OutgoingMessageEntity> retrieved = messages.load("+14151112222", 1);
|
List<OutgoingMessageEntity> retrieved = messages.load("+14151112222", 1);
|
||||||
|
|
||||||
|
@ -133,7 +141,7 @@ public class MessagesTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLoadEmpty() {
|
public void testLoadEmpty() {
|
||||||
insertRandom("+14151112222", 1);
|
List<MessageToStore> inserted = insertRandom("+14151112222", 1);
|
||||||
List<OutgoingMessageEntity> loaded = messages.load("+14159999999", 1);
|
List<OutgoingMessageEntity> loaded = messages.load("+14159999999", 1);
|
||||||
assertThat(loaded.isEmpty()).isTrue();
|
assertThat(loaded.isEmpty()).isTrue();
|
||||||
}
|
}
|
||||||
|
@ -143,7 +151,7 @@ public class MessagesTest {
|
||||||
insertRandom("+14151112222", 1);
|
insertRandom("+14151112222", 1);
|
||||||
insertRandom("+14151112222", 2);
|
insertRandom("+14151112222", 2);
|
||||||
|
|
||||||
List<Envelope> unrelated = insertRandom("+14151111111", 1);
|
List<MessageToStore> unrelated = insertRandom("+14151111111", 1);
|
||||||
|
|
||||||
messages.clear("+14151112222");
|
messages.clear("+14151112222");
|
||||||
|
|
||||||
|
@ -155,9 +163,9 @@ public class MessagesTest {
|
||||||
@Test
|
@Test
|
||||||
public void testClearDestinationDevice() {
|
public void testClearDestinationDevice() {
|
||||||
insertRandom("+14151112222", 1);
|
insertRandom("+14151112222", 1);
|
||||||
List<Envelope> inserted = insertRandom("+14151112222", 2);
|
List<MessageToStore> inserted = insertRandom("+14151112222", 2);
|
||||||
|
|
||||||
List<Envelope> unrelated = insertRandom("+14151111111", 1);
|
List<MessageToStore> unrelated = insertRandom("+14151111111", 1);
|
||||||
|
|
||||||
messages.clear("+14151112222", 1);
|
messages.clear("+14151112222", 1);
|
||||||
|
|
||||||
|
@ -169,37 +177,38 @@ public class MessagesTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testVacuum() {
|
public void testVacuum() {
|
||||||
List<Envelope> inserted = insertRandom("+14151112222", 2);
|
List<MessageToStore> inserted = insertRandom("+14151112222", 2);
|
||||||
messages.vacuum();
|
messages.vacuum();
|
||||||
verifyInTact(inserted, "+14151112222", 2);
|
verifyInTact(inserted, "+14151112222", 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Envelope> insertRandom(String destination, int destinationDevice) {
|
private List<MessageToStore> insertRandom(String destination, int destinationDevice) {
|
||||||
List<Envelope> inserted = new ArrayList<>(50);
|
List<MessageToStore> inserted = new ArrayList<>(50);
|
||||||
|
|
||||||
for (int i=0;i<50;i++) {
|
for (int i=0;i<50;i++) {
|
||||||
inserted.add(generateEnvelope());
|
MessageToStore message = generateMessageToStore();
|
||||||
}
|
inserted.add(message);
|
||||||
|
|
||||||
messages.store(inserted, destination, destinationDevice);
|
messages.store(message.guid, message.envelope, destination, destinationDevice);
|
||||||
|
}
|
||||||
|
|
||||||
return inserted;
|
return inserted;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyInTact(List<Envelope> inserted, String destination, int destinationDevice) {
|
private void verifyInTact(List<MessageToStore> inserted, String destination, int destinationDevice) {
|
||||||
inserted.sort(Comparator.comparingLong(Envelope::getTimestamp));
|
inserted.sort(Comparator.comparingLong(o -> o.envelope.getTimestamp()));
|
||||||
|
|
||||||
List<OutgoingMessageEntity> retrieved = messages.load(destination, destinationDevice);
|
List<OutgoingMessageEntity> retrieved = messages.load(destination, destinationDevice);
|
||||||
|
|
||||||
assertThat(retrieved.size()).isEqualTo(inserted.size());
|
assertThat(retrieved.size()).isEqualTo(inserted.size());
|
||||||
|
|
||||||
for (int i=0;i<retrieved.size();i++) {
|
for (int i=0;i<retrieved.size();i++) {
|
||||||
verifyExpected(retrieved.get(i), inserted.get(i));
|
verifyExpected(retrieved.get(i), inserted.get(i).envelope, inserted.get(i).guid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void verifyExpected(OutgoingMessageEntity retrieved, Envelope inserted) {
|
private void verifyExpected(OutgoingMessageEntity retrieved, Envelope inserted, UUID guid) {
|
||||||
assertThat(retrieved.getTimestamp()).isEqualTo(inserted.getTimestamp());
|
assertThat(retrieved.getTimestamp()).isEqualTo(inserted.getTimestamp());
|
||||||
assertThat(retrieved.getSource()).isEqualTo(inserted.getSource());
|
assertThat(retrieved.getSource()).isEqualTo(inserted.getSource());
|
||||||
assertThat(retrieved.getRelay()).isEqualTo(inserted.getRelay());
|
assertThat(retrieved.getRelay()).isEqualTo(inserted.getRelay());
|
||||||
|
@ -207,10 +216,14 @@ public class MessagesTest {
|
||||||
assertThat(retrieved.getContent()).isEqualTo(inserted.getContent().toByteArray());
|
assertThat(retrieved.getContent()).isEqualTo(inserted.getContent().toByteArray());
|
||||||
assertThat(retrieved.getMessage()).isEqualTo(inserted.getLegacyMessage().toByteArray());
|
assertThat(retrieved.getMessage()).isEqualTo(inserted.getLegacyMessage().toByteArray());
|
||||||
assertThat(retrieved.getServerTimestamp()).isEqualTo(inserted.getServerTimestamp());
|
assertThat(retrieved.getServerTimestamp()).isEqualTo(inserted.getServerTimestamp());
|
||||||
assertThat(retrieved.getGuid()).isEqualTo(retrieved.getGuid());
|
assertThat(retrieved.getGuid()).isEqualTo(guid);
|
||||||
assertThat(retrieved.getSourceDevice()).isEqualTo(inserted.getSourceDevice());
|
assertThat(retrieved.getSourceDevice()).isEqualTo(inserted.getSourceDevice());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private MessageToStore generateMessageToStore() {
|
||||||
|
return new MessageToStore(UUID.randomUUID(), generateEnvelope());
|
||||||
|
}
|
||||||
|
|
||||||
private Envelope generateEnvelope() {
|
private Envelope generateEnvelope() {
|
||||||
Random random = new Random();
|
Random random = new Random();
|
||||||
byte[] content = new byte[256];
|
byte[] content = new byte[256];
|
||||||
|
@ -230,4 +243,14 @@ public class MessagesTest {
|
||||||
.setServerGuid(UUID.randomUUID().toString())
|
.setServerGuid(UUID.randomUUID().toString())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class MessageToStore {
|
||||||
|
private final UUID guid;
|
||||||
|
private final Envelope envelope;
|
||||||
|
|
||||||
|
private MessageToStore(UUID guid, Envelope envelope) {
|
||||||
|
this.guid = guid;
|
||||||
|
this.envelope = envelope;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue