From 9c469c2f961411cb0ef8355a17d32d9f3254368a Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 20 Aug 2020 11:42:58 -0400 Subject: [PATCH] Base persister tests on a real Redis cluster. --- .../storage/RedisClusterMessagePersister.java | 11 +- .../RedisClusterMessagePersisterTest.java | 175 +++++++----------- 2 files changed, 78 insertions(+), 108 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java index 4cff493a0..e32a86d8b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java @@ -58,6 +58,11 @@ public class RedisClusterMessagePersister implements Managed { this.persistDelay = persistDelay; } + @VisibleForTesting + Duration getPersistDelay() { + return persistDelay; + } + @Override public void start() { running = true; @@ -125,7 +130,7 @@ public class RedisClusterMessagePersister implements Managed { messagesCache.lockQueueForPersistence(queue); try { - /* int messageCount = 0; + int messageCount = 0; List messages; do { @@ -139,9 +144,9 @@ public class RedisClusterMessagePersister implements Managed { messageCount++; } - } while (messages.size() == MESSAGE_BATCH_LIMIT); + } while (!messages.isEmpty()); - queueSizeHistogram.update(messageCount); */ + queueSizeHistogram.update(messageCount); } finally { messagesCache.unlockQueueForPersistence(queue); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java index cd9153452..07c56f4b0 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java @@ -1,22 +1,22 @@ package org.whispersystems.textsecuregcm.storage; import com.google.protobuf.ByteString; +import io.lettuce.core.cluster.SlotHash; import org.apache.commons.lang3.RandomStringUtils; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.push.PushSender; +import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Optional; -import java.util.Random; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -28,10 +28,12 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class RedisClusterMessagePersisterTest { +public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { + private ExecutorService notificationExecutorService; private RedisClusterMessagesCache messagesCache; private Messages messagesDatabase; + private PubSubManager pubSubManager; private RedisClusterMessagePersister messagePersister; private AccountsManager accountsManager; @@ -43,151 +45,114 @@ public class RedisClusterMessagePersisterTest { private static final Duration PERSIST_DELAY = Duration.ofMinutes(5); - private static final Random RANDOM = new Random(); - + @Override @Before - public void setUp() { - messagesCache = mock(RedisClusterMessagesCache.class); + public void setUp() throws Exception { + super.setUp(); + messagesDatabase = mock(Messages.class); accountsManager = mock(AccountsManager.class); + pubSubManager = mock(PubSubManager.class); final Account account = mock(Account.class); when(accountsManager.get(DESTINATION_ACCOUNT_UUID)).thenReturn(Optional.of(account)); when(account.getNumber()).thenReturn(DESTINATION_ACCOUNT_NUMBER); - messagePersister = new RedisClusterMessagePersister(messagesCache, messagesDatabase, mock(PubSubManager.class), mock(PushSender.class), accountsManager, PERSIST_DELAY); + notificationExecutorService = Executors.newSingleThreadExecutor(); + messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService); + messagePersister = new RedisClusterMessagePersister(messagesCache, messagesDatabase, pubSubManager, mock(PushSender.class), accountsManager, PERSIST_DELAY); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + + notificationExecutorService.shutdown(); + notificationExecutorService.awaitTermination(1, TimeUnit.SECONDS); } @Test public void testPersistNextQueuesNoQueues() { - final int slot = 7; - - when(messagesCache.getNextSlotToPersist()).thenReturn(slot); - when(messagesCache.getQueuesToPersist(eq(slot), any(Instant.class), eq(RedisClusterMessagePersister.QUEUE_BATCH_LIMIT))).thenReturn(Collections.emptyList()); - messagePersister.persistNextQueues(Instant.now()); - verify(messagesCache, never()).lockQueueForPersistence(any()); + verify(accountsManager, never()).get(any(UUID.class)); } @Test public void testPersistNextQueuesSingleQueue() { - final int slot = 7; - final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); + final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); + final int messageCount = (RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7; - when(messagesCache.getNextSlotToPersist()).thenReturn(slot); - when(messagesCache.getQueuesToPersist(eq(slot), any(Instant.class), eq(RedisClusterMessagePersister.QUEUE_BATCH_LIMIT))).thenReturn(List.of(queueName)); + insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount); + setNextSlotToPersist(SlotHash.getSlot(queueName)); - messagePersister.persistNextQueues(Instant.now()); + messagePersister.persistNextQueues(Instant.now().plus(messagePersister.getPersistDelay())); - verify(messagesCache).lockQueueForPersistence(queueName); + verify(messagesDatabase, times(messageCount)).store(any(UUID.class), any(MessageProtos.Envelope.class), eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_DEVICE_ID)); } @Test public void testPersistNextQueuesMultiplePages() { - final int slot = 7; - final int queueCount = RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 3; - - final List queues = new ArrayList<>(queueCount); + final int slot = 7; + final int queueCount = (RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 3) + 7; + final int messagesPerQueue = 10; for (int i = 0; i < queueCount; i++) { - final String queueName = generateRandomQueueName(); - final UUID accountUuid = RedisClusterMessagesCache.getAccountUuidFromQueueName(queueName); - - queues.add(queueName); + final String queueName = generateRandomQueueNameForSlot(slot); + final UUID accountUuid = RedisClusterMessagesCache.getAccountUuidFromQueueName(queueName); + final long deviceId = RedisClusterMessagesCache.getDeviceIdFromQueueName(queueName); + final String accountNumber = "+1" + RandomStringUtils.randomNumeric(10); final Account account = mock(Account.class); when(accountsManager.get(accountUuid)).thenReturn(Optional.of(account)); - when(account.getNumber()).thenReturn("+1" + RandomStringUtils.randomNumeric(10)); + when(account.getNumber()).thenReturn(accountNumber); + + insertMessages(accountUuid, accountNumber, deviceId, messagesPerQueue); } - when(messagesCache.getNextSlotToPersist()).thenReturn(slot); - when(messagesCache.getQueuesToPersist(eq(slot), any(Instant.class), eq(RedisClusterMessagePersister.QUEUE_BATCH_LIMIT))) - .thenReturn(queues.subList(0, RedisClusterMessagePersister.QUEUE_BATCH_LIMIT)) - .thenReturn(queues.subList(RedisClusterMessagePersister.QUEUE_BATCH_LIMIT, RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 2)) - .thenReturn(queues.subList(RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 2, RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 3)) - .thenReturn(Collections.emptyList()); + setNextSlotToPersist(slot); - messagePersister.persistNextQueues(Instant.now()); + messagePersister.persistNextQueues(Instant.now().plus(messagePersister.getPersistDelay())); - verify(messagesCache, times(queueCount)).lockQueueForPersistence(any()); + verify(pubSubManager, times(queueCount)).publish(any(), any()); + verify(messagesDatabase, times(queueCount * messagesPerQueue)).store(any(UUID.class), any(MessageProtos.Envelope.class), anyString(), anyLong()); } - @Test - @Ignore - public void testPersistQueueNoMessages() { - final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); + private static String generateRandomQueueNameForSlot(final int slot) { + final UUID uuid = UUID.randomUUID(); - when(messagesCache.getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT)).thenReturn(Collections.emptyList()); + final String queueNameBase = "user_queue::{" + uuid.toString() + "::"; - messagePersister.persistQueue(queueName); + for (int deviceId = 0; deviceId < Integer.MAX_VALUE; deviceId++) { + final String queueName = queueNameBase + deviceId + "}"; - verify(messagesCache).lockQueueForPersistence(queueName); - verify(messagesCache).getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT); - verify(messagesDatabase, never()).store(any(), any(), any(), anyLong()); - verify(messagesCache, never()).remove(anyString(), any(UUID.class), anyLong(), any(UUID.class)); - verify(messagesCache).unlockQueueForPersistence(queueName); + if (SlotHash.getSlot(queueName) == slot) { + return queueName; + } + } + + throw new IllegalStateException("Could not find a queue name for slot " + slot); } - @Test - @Ignore - public void testPersistQueueSingleMessage() { - final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); - - final MessageProtos.Envelope message = generateRandomMessage(); - - when(messagesCache.getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT)).thenReturn(List.of(message)); - - messagePersister.persistQueue(queueName); - - verify(messagesCache).lockQueueForPersistence(queueName); - verify(messagesCache).getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT); - verify(messagesDatabase).store(UUID.fromString(message.getServerGuid()), message, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID); - verify(messagesCache).remove(DESTINATION_ACCOUNT_NUMBER, DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, UUID.fromString(message.getServerGuid())); - verify(messagesCache).unlockQueueForPersistence(queueName); - } - - @Test - @Ignore - public void testPersistQueueMultiplePages() { - final int messageCount = RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3; - final List messagesInQueue = new ArrayList<>(messageCount); - + private void insertMessages(final UUID accountUuid, final String accountNumber, final long deviceId, final int messageCount) { for (int i = 0; i < messageCount; i++) { - messagesInQueue.add(generateRandomMessage()); + final UUID messageGuid = UUID.randomUUID(); + + final MessageProtos.Envelope envelope = MessageProtos.Envelope.newBuilder() + .setTimestamp(serialTimestamp++) + .setServerTimestamp(serialTimestamp++) + .setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256))) + .setType(MessageProtos.Envelope.Type.CIPHERTEXT) + .setServerGuid(messageGuid.toString()) + .build(); + + messagesCache.insert(messageGuid, accountNumber, accountUuid, deviceId, envelope); } - - when(messagesCache.getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT)) - .thenReturn(messagesInQueue.subList(0, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT)) - .thenReturn(messagesInQueue.subList(RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 2)) - .thenReturn(messagesInQueue.subList(RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 2, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3)) - .thenReturn(Collections.emptyList()); - - final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); - - messagePersister.persistQueue(queueName); - - verify(messagesCache).lockQueueForPersistence(queueName); - verify(messagesCache, times(4)).getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT); - verify(messagesDatabase, times(messageCount)).store(any(UUID.class), any(MessageProtos.Envelope.class), eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_DEVICE_ID)); - verify(messagesCache, times(messageCount)).remove(eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_ACCOUNT_UUID), eq(DESTINATION_DEVICE_ID), any(UUID.class)); - verify(messagesCache).unlockQueueForPersistence(queueName); } - private MessageProtos.Envelope generateRandomMessage() { - final MessageProtos.Envelope.Builder envelopeBuilder = MessageProtos.Envelope.newBuilder() - .setTimestamp(serialTimestamp++) - .setServerTimestamp(serialTimestamp++) - .setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256))) - .setType(MessageProtos.Envelope.Type.CIPHERTEXT) - .setServerGuid(UUID.randomUUID().toString()); - - return envelopeBuilder.build(); - } - - private String generateRandomQueueName() { - return String.format("user_queue::{%s::%d}", UUID.randomUUID().toString(), RANDOM.nextInt(10)); + private void setNextSlotToPersist(final int nextSlot) { + getRedisCluster().useCluster(connection -> connection.sync().set(RedisClusterMessagesCache.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(nextSlot - 1))); } }