Add a test to make sure the persister is respecting persist delays.

This commit is contained in:
Jon Chambers 2020-08-20 11:49:56 -04:00 committed by Jon Chambers
parent 9c469c2f96
commit 6a76afc20d
1 changed files with 28 additions and 14 deletions

View File

@ -37,8 +37,6 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
private RedisClusterMessagePersister messagePersister; private RedisClusterMessagePersister messagePersister;
private AccountsManager accountsManager; private AccountsManager accountsManager;
private long serialTimestamp = 0;
private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID(); private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID();
private static final String DESTINATION_ACCOUNT_NUMBER = "+18005551234"; private static final String DESTINATION_ACCOUNT_NUMBER = "+18005551234";
private static final long DESTINATION_DEVICE_ID = 7; private static final long DESTINATION_DEVICE_ID = 7;
@ -81,22 +79,38 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
@Test @Test
public void testPersistNextQueuesSingleQueue() { public void testPersistNextQueuesSingleQueue() {
final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
final int messageCount = (RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7; final int messageCount = (RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7;
final Instant now = Instant.now();
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount); insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now);
setNextSlotToPersist(SlotHash.getSlot(queueName)); setNextSlotToPersist(SlotHash.getSlot(queueName));
messagePersister.persistNextQueues(Instant.now().plus(messagePersister.getPersistDelay())); messagePersister.persistNextQueues(now.plus(messagePersister.getPersistDelay()));
verify(messagesDatabase, times(messageCount)).store(any(UUID.class), any(MessageProtos.Envelope.class), eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_DEVICE_ID)); verify(messagesDatabase, times(messageCount)).store(any(UUID.class), any(MessageProtos.Envelope.class), eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_DEVICE_ID));
} }
@Test
public void testPersistNextQueuesSingleQueueTooSoon() {
final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
final int messageCount = (RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7;
final Instant now = Instant.now();
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now);
setNextSlotToPersist(SlotHash.getSlot(queueName));
messagePersister.persistNextQueues(now);
verify(messagesDatabase, never()).store(any(UUID.class), any(MessageProtos.Envelope.class), anyString(), anyLong());
}
@Test @Test
public void testPersistNextQueuesMultiplePages() { public void testPersistNextQueuesMultiplePages() {
final int slot = 7; final int slot = 7;
final int queueCount = (RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 3) + 7; final int queueCount = (RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 3) + 7;
final int messagesPerQueue = 10; final int messagesPerQueue = 10;
final Instant now = Instant.now();
for (int i = 0; i < queueCount; i++) { for (int i = 0; i < queueCount; i++) {
final String queueName = generateRandomQueueNameForSlot(slot); final String queueName = generateRandomQueueNameForSlot(slot);
@ -109,12 +123,12 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
when(accountsManager.get(accountUuid)).thenReturn(Optional.of(account)); when(accountsManager.get(accountUuid)).thenReturn(Optional.of(account));
when(account.getNumber()).thenReturn(accountNumber); when(account.getNumber()).thenReturn(accountNumber);
insertMessages(accountUuid, accountNumber, deviceId, messagesPerQueue); insertMessages(accountUuid, accountNumber, deviceId, messagesPerQueue, now);
} }
setNextSlotToPersist(slot); setNextSlotToPersist(slot);
messagePersister.persistNextQueues(Instant.now().plus(messagePersister.getPersistDelay())); messagePersister.persistNextQueues(now.plus(messagePersister.getPersistDelay()));
verify(pubSubManager, times(queueCount)).publish(any(), any()); verify(pubSubManager, times(queueCount)).publish(any(), any());
verify(messagesDatabase, times(queueCount * messagesPerQueue)).store(any(UUID.class), any(MessageProtos.Envelope.class), anyString(), anyLong()); verify(messagesDatabase, times(queueCount * messagesPerQueue)).store(any(UUID.class), any(MessageProtos.Envelope.class), anyString(), anyLong());
@ -136,13 +150,13 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
throw new IllegalStateException("Could not find a queue name for slot " + slot); throw new IllegalStateException("Could not find a queue name for slot " + slot);
} }
private void insertMessages(final UUID accountUuid, final String accountNumber, final long deviceId, final int messageCount) { private void insertMessages(final UUID accountUuid, final String accountNumber, final long deviceId, final int messageCount, final Instant firstMessageTimestamp) {
for (int i = 0; i < messageCount; i++) { for (int i = 0; i < messageCount; i++) {
final UUID messageGuid = UUID.randomUUID(); final UUID messageGuid = UUID.randomUUID();
final MessageProtos.Envelope envelope = MessageProtos.Envelope.newBuilder() final MessageProtos.Envelope envelope = MessageProtos.Envelope.newBuilder()
.setTimestamp(serialTimestamp++) .setTimestamp(firstMessageTimestamp.toEpochMilli() + i)
.setServerTimestamp(serialTimestamp++) .setServerTimestamp(firstMessageTimestamp.toEpochMilli() + i)
.setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256))) .setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256)))
.setType(MessageProtos.Envelope.Type.CIPHERTEXT) .setType(MessageProtos.Envelope.Type.CIPHERTEXT)
.setServerGuid(messageGuid.toString()) .setServerGuid(messageGuid.toString())