diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java index 07c56f4b0..1e0930fb2 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java @@ -37,8 +37,6 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { private RedisClusterMessagePersister messagePersister; private AccountsManager accountsManager; - private long serialTimestamp = 0; - private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID(); private static final String DESTINATION_ACCOUNT_NUMBER = "+18005551234"; private static final long DESTINATION_DEVICE_ID = 7; @@ -81,22 +79,38 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { @Test public void testPersistNextQueuesSingleQueue() { - final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); - final int messageCount = (RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7; + final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); + final int messageCount = (RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7; + final Instant now = Instant.now(); - insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount); + insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now); setNextSlotToPersist(SlotHash.getSlot(queueName)); - messagePersister.persistNextQueues(Instant.now().plus(messagePersister.getPersistDelay())); + messagePersister.persistNextQueues(now.plus(messagePersister.getPersistDelay())); verify(messagesDatabase, times(messageCount)).store(any(UUID.class), any(MessageProtos.Envelope.class), eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_DEVICE_ID)); } + @Test + public void testPersistNextQueuesSingleQueueTooSoon() { + final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); + final int messageCount = (RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7; + final Instant now = Instant.now(); + + insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now); + setNextSlotToPersist(SlotHash.getSlot(queueName)); + + messagePersister.persistNextQueues(now); + + verify(messagesDatabase, never()).store(any(UUID.class), any(MessageProtos.Envelope.class), anyString(), anyLong()); + } + @Test public void testPersistNextQueuesMultiplePages() { - final int slot = 7; - final int queueCount = (RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 3) + 7; - final int messagesPerQueue = 10; + final int slot = 7; + final int queueCount = (RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 3) + 7; + final int messagesPerQueue = 10; + final Instant now = Instant.now(); for (int i = 0; i < queueCount; i++) { final String queueName = generateRandomQueueNameForSlot(slot); @@ -109,12 +123,12 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { when(accountsManager.get(accountUuid)).thenReturn(Optional.of(account)); when(account.getNumber()).thenReturn(accountNumber); - insertMessages(accountUuid, accountNumber, deviceId, messagesPerQueue); + insertMessages(accountUuid, accountNumber, deviceId, messagesPerQueue, now); } setNextSlotToPersist(slot); - messagePersister.persistNextQueues(Instant.now().plus(messagePersister.getPersistDelay())); + messagePersister.persistNextQueues(now.plus(messagePersister.getPersistDelay())); verify(pubSubManager, times(queueCount)).publish(any(), any()); verify(messagesDatabase, times(queueCount * messagesPerQueue)).store(any(UUID.class), any(MessageProtos.Envelope.class), anyString(), anyLong()); @@ -136,13 +150,13 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { throw new IllegalStateException("Could not find a queue name for slot " + slot); } - private void insertMessages(final UUID accountUuid, final String accountNumber, final long deviceId, final int messageCount) { + private void insertMessages(final UUID accountUuid, final String accountNumber, final long deviceId, final int messageCount, final Instant firstMessageTimestamp) { for (int i = 0; i < messageCount; i++) { final UUID messageGuid = UUID.randomUUID(); final MessageProtos.Envelope envelope = MessageProtos.Envelope.newBuilder() - .setTimestamp(serialTimestamp++) - .setServerTimestamp(serialTimestamp++) + .setTimestamp(firstMessageTimestamp.toEpochMilli() + i) + .setServerTimestamp(firstMessageTimestamp.toEpochMilli() + i) .setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256))) .setType(MessageProtos.Envelope.Type.CIPHERTEXT) .setServerGuid(messageGuid.toString())