Base persister tests on a real Redis cluster.

This commit is contained in:
Jon Chambers 2020-08-20 11:42:58 -04:00 committed by Jon Chambers
parent e68a1dee33
commit 9c469c2f96
2 changed files with 78 additions and 108 deletions

View File

@ -58,6 +58,11 @@ public class RedisClusterMessagePersister implements Managed {
this.persistDelay = persistDelay;
}
@VisibleForTesting
Duration getPersistDelay() {
return persistDelay;
}
@Override
public void start() {
running = true;
@ -125,7 +130,7 @@ public class RedisClusterMessagePersister implements Managed {
messagesCache.lockQueueForPersistence(queue);
try {
/* int messageCount = 0;
int messageCount = 0;
List<MessageProtos.Envelope> messages;
do {
@ -139,9 +144,9 @@ public class RedisClusterMessagePersister implements Managed {
messageCount++;
}
} while (messages.size() == MESSAGE_BATCH_LIMIT);
} while (!messages.isEmpty());
queueSizeHistogram.update(messageCount); */
queueSizeHistogram.update(messageCount);
} finally {
messagesCache.unlockQueueForPersistence(queue);
}

View File

@ -1,22 +1,22 @@
package org.whispersystems.textsecuregcm.storage;
import com.google.protobuf.ByteString;
import io.lettuce.core.cluster.SlotHash;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@ -28,10 +28,12 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class RedisClusterMessagePersisterTest {
public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
private ExecutorService notificationExecutorService;
private RedisClusterMessagesCache messagesCache;
private Messages messagesDatabase;
private PubSubManager pubSubManager;
private RedisClusterMessagePersister messagePersister;
private AccountsManager accountsManager;
@ -43,151 +45,114 @@ public class RedisClusterMessagePersisterTest {
private static final Duration PERSIST_DELAY = Duration.ofMinutes(5);
private static final Random RANDOM = new Random();
@Override
@Before
public void setUp() {
messagesCache = mock(RedisClusterMessagesCache.class);
public void setUp() throws Exception {
super.setUp();
messagesDatabase = mock(Messages.class);
accountsManager = mock(AccountsManager.class);
pubSubManager = mock(PubSubManager.class);
final Account account = mock(Account.class);
when(accountsManager.get(DESTINATION_ACCOUNT_UUID)).thenReturn(Optional.of(account));
when(account.getNumber()).thenReturn(DESTINATION_ACCOUNT_NUMBER);
messagePersister = new RedisClusterMessagePersister(messagesCache, messagesDatabase, mock(PubSubManager.class), mock(PushSender.class), accountsManager, PERSIST_DELAY);
notificationExecutorService = Executors.newSingleThreadExecutor();
messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService);
messagePersister = new RedisClusterMessagePersister(messagesCache, messagesDatabase, pubSubManager, mock(PushSender.class), accountsManager, PERSIST_DELAY);
}
@Override
public void tearDown() throws Exception {
super.tearDown();
notificationExecutorService.shutdown();
notificationExecutorService.awaitTermination(1, TimeUnit.SECONDS);
}
@Test
public void testPersistNextQueuesNoQueues() {
final int slot = 7;
when(messagesCache.getNextSlotToPersist()).thenReturn(slot);
when(messagesCache.getQueuesToPersist(eq(slot), any(Instant.class), eq(RedisClusterMessagePersister.QUEUE_BATCH_LIMIT))).thenReturn(Collections.emptyList());
messagePersister.persistNextQueues(Instant.now());
verify(messagesCache, never()).lockQueueForPersistence(any());
verify(accountsManager, never()).get(any(UUID.class));
}
@Test
public void testPersistNextQueuesSingleQueue() {
final int slot = 7;
final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
final int messageCount = (RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7;
when(messagesCache.getNextSlotToPersist()).thenReturn(slot);
when(messagesCache.getQueuesToPersist(eq(slot), any(Instant.class), eq(RedisClusterMessagePersister.QUEUE_BATCH_LIMIT))).thenReturn(List.of(queueName));
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount);
setNextSlotToPersist(SlotHash.getSlot(queueName));
messagePersister.persistNextQueues(Instant.now());
messagePersister.persistNextQueues(Instant.now().plus(messagePersister.getPersistDelay()));
verify(messagesCache).lockQueueForPersistence(queueName);
verify(messagesDatabase, times(messageCount)).store(any(UUID.class), any(MessageProtos.Envelope.class), eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_DEVICE_ID));
}
@Test
public void testPersistNextQueuesMultiplePages() {
final int slot = 7;
final int queueCount = RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 3;
final List<String> queues = new ArrayList<>(queueCount);
final int slot = 7;
final int queueCount = (RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 3) + 7;
final int messagesPerQueue = 10;
for (int i = 0; i < queueCount; i++) {
final String queueName = generateRandomQueueName();
final UUID accountUuid = RedisClusterMessagesCache.getAccountUuidFromQueueName(queueName);
queues.add(queueName);
final String queueName = generateRandomQueueNameForSlot(slot);
final UUID accountUuid = RedisClusterMessagesCache.getAccountUuidFromQueueName(queueName);
final long deviceId = RedisClusterMessagesCache.getDeviceIdFromQueueName(queueName);
final String accountNumber = "+1" + RandomStringUtils.randomNumeric(10);
final Account account = mock(Account.class);
when(accountsManager.get(accountUuid)).thenReturn(Optional.of(account));
when(account.getNumber()).thenReturn("+1" + RandomStringUtils.randomNumeric(10));
when(account.getNumber()).thenReturn(accountNumber);
insertMessages(accountUuid, accountNumber, deviceId, messagesPerQueue);
}
when(messagesCache.getNextSlotToPersist()).thenReturn(slot);
when(messagesCache.getQueuesToPersist(eq(slot), any(Instant.class), eq(RedisClusterMessagePersister.QUEUE_BATCH_LIMIT)))
.thenReturn(queues.subList(0, RedisClusterMessagePersister.QUEUE_BATCH_LIMIT))
.thenReturn(queues.subList(RedisClusterMessagePersister.QUEUE_BATCH_LIMIT, RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 2))
.thenReturn(queues.subList(RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 2, RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 3))
.thenReturn(Collections.emptyList());
setNextSlotToPersist(slot);
messagePersister.persistNextQueues(Instant.now());
messagePersister.persistNextQueues(Instant.now().plus(messagePersister.getPersistDelay()));
verify(messagesCache, times(queueCount)).lockQueueForPersistence(any());
verify(pubSubManager, times(queueCount)).publish(any(), any());
verify(messagesDatabase, times(queueCount * messagesPerQueue)).store(any(UUID.class), any(MessageProtos.Envelope.class), anyString(), anyLong());
}
@Test
@Ignore
public void testPersistQueueNoMessages() {
final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
private static String generateRandomQueueNameForSlot(final int slot) {
final UUID uuid = UUID.randomUUID();
when(messagesCache.getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT)).thenReturn(Collections.emptyList());
final String queueNameBase = "user_queue::{" + uuid.toString() + "::";
messagePersister.persistQueue(queueName);
for (int deviceId = 0; deviceId < Integer.MAX_VALUE; deviceId++) {
final String queueName = queueNameBase + deviceId + "}";
verify(messagesCache).lockQueueForPersistence(queueName);
verify(messagesCache).getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT);
verify(messagesDatabase, never()).store(any(), any(), any(), anyLong());
verify(messagesCache, never()).remove(anyString(), any(UUID.class), anyLong(), any(UUID.class));
verify(messagesCache).unlockQueueForPersistence(queueName);
if (SlotHash.getSlot(queueName) == slot) {
return queueName;
}
}
throw new IllegalStateException("Could not find a queue name for slot " + slot);
}
@Test
@Ignore
public void testPersistQueueSingleMessage() {
final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
final MessageProtos.Envelope message = generateRandomMessage();
when(messagesCache.getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT)).thenReturn(List.of(message));
messagePersister.persistQueue(queueName);
verify(messagesCache).lockQueueForPersistence(queueName);
verify(messagesCache).getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT);
verify(messagesDatabase).store(UUID.fromString(message.getServerGuid()), message, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID);
verify(messagesCache).remove(DESTINATION_ACCOUNT_NUMBER, DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, UUID.fromString(message.getServerGuid()));
verify(messagesCache).unlockQueueForPersistence(queueName);
}
@Test
@Ignore
public void testPersistQueueMultiplePages() {
final int messageCount = RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3;
final List<MessageProtos.Envelope> messagesInQueue = new ArrayList<>(messageCount);
private void insertMessages(final UUID accountUuid, final String accountNumber, final long deviceId, final int messageCount) {
for (int i = 0; i < messageCount; i++) {
messagesInQueue.add(generateRandomMessage());
final UUID messageGuid = UUID.randomUUID();
final MessageProtos.Envelope envelope = MessageProtos.Envelope.newBuilder()
.setTimestamp(serialTimestamp++)
.setServerTimestamp(serialTimestamp++)
.setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256)))
.setType(MessageProtos.Envelope.Type.CIPHERTEXT)
.setServerGuid(messageGuid.toString())
.build();
messagesCache.insert(messageGuid, accountNumber, accountUuid, deviceId, envelope);
}
when(messagesCache.getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT))
.thenReturn(messagesInQueue.subList(0, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT))
.thenReturn(messagesInQueue.subList(RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 2))
.thenReturn(messagesInQueue.subList(RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 2, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3))
.thenReturn(Collections.emptyList());
final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
messagePersister.persistQueue(queueName);
verify(messagesCache).lockQueueForPersistence(queueName);
verify(messagesCache, times(4)).getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT);
verify(messagesDatabase, times(messageCount)).store(any(UUID.class), any(MessageProtos.Envelope.class), eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_DEVICE_ID));
verify(messagesCache, times(messageCount)).remove(eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_ACCOUNT_UUID), eq(DESTINATION_DEVICE_ID), any(UUID.class));
verify(messagesCache).unlockQueueForPersistence(queueName);
}
private MessageProtos.Envelope generateRandomMessage() {
final MessageProtos.Envelope.Builder envelopeBuilder = MessageProtos.Envelope.newBuilder()
.setTimestamp(serialTimestamp++)
.setServerTimestamp(serialTimestamp++)
.setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256)))
.setType(MessageProtos.Envelope.Type.CIPHERTEXT)
.setServerGuid(UUID.randomUUID().toString());
return envelopeBuilder.build();
}
private String generateRandomQueueName() {
return String.format("user_queue::{%s::%d}", UUID.randomUUID().toString(), RANDOM.nextInt(10));
private void setNextSlotToPersist(final int nextSlot) {
getRedisCluster().useCluster(connection -> connection.sync().set(RedisClusterMessagesCache.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(nextSlot - 1)));
}
}