diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index fa325f4eb..995b43ef8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -135,8 +135,8 @@ import org.whispersystems.textsecuregcm.storage.Profiles; import org.whispersystems.textsecuregcm.storage.ProfilesManager; import org.whispersystems.textsecuregcm.storage.PubSubManager; import org.whispersystems.textsecuregcm.storage.PushFeedbackProcessor; -import org.whispersystems.textsecuregcm.storage.RedisClusterMessagePersister; -import org.whispersystems.textsecuregcm.storage.RedisClusterMessagesCache; +import org.whispersystems.textsecuregcm.storage.MessagePersister; +import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.RegistrationLockVersionCounter; import org.whispersystems.textsecuregcm.storage.RemoteConfigs; import org.whispersystems.textsecuregcm.storage.RemoteConfigsManager; @@ -290,9 +290,9 @@ public class WhisperServerService extends Application maybeAccount = accountsManager.get(accountUuid); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java similarity index 96% rename from service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java rename to service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 4bf5b3ebc..476583a3d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -37,7 +37,7 @@ import java.util.concurrent.ExecutorService; import static com.codahale.metrics.MetricRegistry.name; -public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter implements Managed { +public class MessagesCache extends RedisClusterPubSubAdapter implements Managed { private final FaultTolerantRedisCluster redisCluster; private final FaultTolerantPubSubConnection pubSubConnection; @@ -55,12 +55,12 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter messageListenersByQueueName = new HashMap<>(); private final Map queueNamesByMessageListener = new IdentityHashMap<>(); - private final Timer insertTimer = Metrics.timer(name(RedisClusterMessagesCache.class, "insert")); - private final Timer getMessagesTimer = Metrics.timer(name(RedisClusterMessagesCache.class, "get")); - private final Timer clearQueueTimer = Metrics.timer(name(RedisClusterMessagesCache.class, "clear")); - private final Counter pubSubMessageCounter = Metrics.counter(name(RedisClusterMessagesCache.class, "pubSubMessage")); - private final Counter newMessageNotificationCounter = Metrics.counter(name(RedisClusterMessagesCache.class, "newMessageNotification")); - private final Counter queuePersistedNotificationCounter = Metrics.counter(name(RedisClusterMessagesCache.class, "queuePersisted")); + private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert")); + private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get")); + private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear")); + private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage")); + private final Counter newMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification")); + private final Counter queuePersistedNotificationCounter = Metrics.counter(name(MessagesCache.class, "queuePersisted")); static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot"; private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8); @@ -68,16 +68,16 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter messages = this.messages.load(destination, destinationDevice); if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) { - messages.addAll(clusterMessagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size())); + messages.addAll(messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size())); } return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE); } public void clear(String destination, UUID destinationUuid) { - this.clusterMessagesCache.clear(destination, destinationUuid); + this.messagesCache.clear(destination, destinationUuid); this.messages.clear(destination); } public void clear(String destination, UUID destinationUuid, long deviceId) { - this.clusterMessagesCache.clear(destination, destinationUuid, deviceId); + this.messagesCache.clear(destination, destinationUuid, deviceId); this.messages.clear(destination, deviceId); } public Optional delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp) { - Optional removed = clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp); + Optional removed = messagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp); if (!removed.isPresent()) { removed = this.messages.remove(destination, destinationDevice, source, timestamp); @@ -78,7 +78,7 @@ public class MessagesManager { } public Optional delete(String destination, UUID destinationUuid, long deviceId, UUID guid) { - Optional removed = clusterMessagesCache.remove(destination, destinationUuid, deviceId, guid); + Optional removed = messagesCache.remove(destination, destinationUuid, deviceId, guid); if (!removed.isPresent()) { removed = this.messages.remove(destination, guid); @@ -92,7 +92,7 @@ public class MessagesManager { public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) { if (cached) { - clusterMessagesCache.remove(destination, destinationUuid, deviceId, id); + messagesCache.remove(destination, destinationUuid, deviceId, id); cacheHitByIdMeter.mark(); } else { this.messages.remove(destination, id); @@ -102,14 +102,14 @@ public class MessagesManager { public void persistMessage(String destination, UUID destinationUuid, Envelope envelope, UUID messageGuid, long deviceId) { messages.store(messageGuid, envelope, destination, deviceId); - clusterMessagesCache.remove(destination, destinationUuid, deviceId, messageGuid); + messagesCache.remove(destination, destinationUuid, deviceId, messageGuid); } public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) { - clusterMessagesCache.addMessageAvailabilityListener(destinationUuid, deviceId, listener); + messagesCache.addMessageAvailabilityListener(destinationUuid, deviceId, listener); } public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) { - clusterMessagesCache.removeMessageAvailabilityListener(listener); + messagesCache.removeMessageAvailabilityListener(listener); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java similarity index 82% rename from service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java rename to service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index 3f2cf032e..56180e87a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -29,13 +29,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { +public class MessagePersisterTest extends AbstractRedisClusterTest { private ExecutorService notificationExecutorService; - private RedisClusterMessagesCache messagesCache; + private MessagesCache messagesCache; private Messages messagesDatabase; private PubSubManager pubSubManager; - private RedisClusterMessagePersister messagePersister; + private MessagePersister messagePersister; private AccountsManager accountsManager; private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID(); @@ -51,7 +51,7 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { final MessagesManager messagesManager = mock(MessagesManager.class); final FeatureFlagsManager featureFlagsManager = mock(FeatureFlagsManager.class); - when(featureFlagsManager.isFeatureFlagActive(RedisClusterMessagePersister.ENABLE_PERSISTENCE_FLAG)).thenReturn(true); + when(featureFlagsManager.isFeatureFlagActive(MessagePersister.ENABLE_PERSISTENCE_FLAG)).thenReturn(true); messagesDatabase = mock(Messages.class); accountsManager = mock(AccountsManager.class); @@ -63,8 +63,8 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { when(account.getNumber()).thenReturn(DESTINATION_ACCOUNT_NUMBER); notificationExecutorService = Executors.newSingleThreadExecutor(); - messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService); - messagePersister = new RedisClusterMessagePersister(messagesCache, messagesManager, pubSubManager, mock(PushSender.class), accountsManager, PERSIST_DELAY); + messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService); + messagePersister = new MessagePersister(messagesCache, messagesManager, pubSubManager, mock(PushSender.class), accountsManager, PERSIST_DELAY); doAnswer(invocation -> { final String destination = invocation.getArgument(0, String.class); @@ -97,8 +97,8 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { @Test public void testPersistNextQueuesSingleQueue() { - final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); - final int messageCount = (RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7; + final String queueName = new String(MessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); + final int messageCount = (MessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7; final Instant now = Instant.now(); insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now); @@ -111,8 +111,8 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { @Test public void testPersistNextQueuesSingleQueueTooSoon() { - final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); - final int messageCount = (RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7; + final String queueName = new String(MessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); + final int messageCount = (MessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7; final Instant now = Instant.now(); insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now); @@ -126,14 +126,14 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { @Test public void testPersistNextQueuesMultiplePages() { final int slot = 7; - final int queueCount = (RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 3) + 7; + final int queueCount = (MessagePersister.QUEUE_BATCH_LIMIT * 3) + 7; final int messagesPerQueue = 10; final Instant now = Instant.now(); for (int i = 0; i < queueCount; i++) { final String queueName = generateRandomQueueNameForSlot(slot); - final UUID accountUuid = RedisClusterMessagesCache.getAccountUuidFromQueueName(queueName); - final long deviceId = RedisClusterMessagesCache.getDeviceIdFromQueueName(queueName); + final UUID accountUuid = MessagesCache.getAccountUuidFromQueueName(queueName); + final long deviceId = MessagesCache.getDeviceIdFromQueueName(queueName); final String accountNumber = "+1" + RandomStringUtils.randomNumeric(10); final Account account = mock(Account.class); @@ -186,6 +186,6 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { } private void setNextSlotToPersist(final int nextSlot) { - getRedisCluster().useCluster(connection -> connection.sync().set(RedisClusterMessagesCache.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(nextSlot - 1))); + getRedisCluster().useCluster(connection -> connection.sync().set(MessagesCache.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(nextSlot - 1))); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java similarity index 86% rename from service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCacheTest.java rename to service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 2f8516fcf..d9e7f48c1 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -29,10 +29,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @RunWith(JUnitParamsRunner.class) -public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest { +public class MessagesCacheTest extends AbstractRedisClusterTest { - private ExecutorService notificationExecutorService; - private RedisClusterMessagesCache messagesCache; + private ExecutorService notificationExecutorService; + private MessagesCache messagesCache; private final Random random = new Random(); private long serialTimestamp = 0; @@ -49,7 +49,7 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest { getRedisCluster().useCluster(connection -> connection.sync().masters().commands().configSet("notify-keyspace-events", "K$gz")); notificationExecutorService = Executors.newSingleThreadExecutor(); - messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService); + messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService); messagesCache.start(); } @@ -82,7 +82,7 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest { final Optional maybeRemovedMessage = messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageId); assertTrue(maybeRemovedMessage.isPresent()); - assertEquals(RedisClusterMessagesCache.constructEntityFromEnvelope(messageId, message), maybeRemovedMessage.get()); + assertEquals(MessagesCache.constructEntityFromEnvelope(messageId, message), maybeRemovedMessage.get()); assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageId)); } @@ -95,7 +95,7 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest { final Optional maybeRemovedMessage = messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message.getSource(), message.getTimestamp()); assertTrue(maybeRemovedMessage.isPresent()); - assertEquals(RedisClusterMessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get()); + assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get()); assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message.getSource(), message.getTimestamp())); } @@ -112,7 +112,7 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest { final Optional maybeRemovedMessage = messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid); assertTrue(maybeRemovedMessage.isPresent()); - assertEquals(RedisClusterMessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get()); + assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get()); } @Test @@ -127,7 +127,7 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest { final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender); final long messageId = messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message); - expectedMessages.add(RedisClusterMessagesCache.constructEntityFromEnvelope(messageId, message)); + expectedMessages.add(MessagesCache.constructEntityFromEnvelope(messageId, message)); } assertEquals(expectedMessages, messagesCache.get(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount)); @@ -198,19 +198,19 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest { @Test public void testGetAccountFromQueueName() { assertEquals(DESTINATION_UUID, - RedisClusterMessagesCache.getAccountUuidFromQueueName(new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8))); + MessagesCache.getAccountUuidFromQueueName(new String(MessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8))); } @Test public void testGetDeviceIdFromQueueName() { assertEquals(DESTINATION_DEVICE_ID, - RedisClusterMessagesCache.getDeviceIdFromQueueName(new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8))); + MessagesCache.getDeviceIdFromQueueName(new String(MessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8))); } @Test public void testGetQueueNameFromKeyspaceChannel() { assertEquals("1b363a31-a429-4fb6-8959-984a025e72ff::7", - RedisClusterMessagesCache.getQueueNameFromKeyspaceChannel("__keyspace@0__:user_queue::{1b363a31-a429-4fb6-8959-984a025e72ff::7}")); + MessagesCache.getQueueNameFromKeyspaceChannel("__keyspace@0__:user_queue::{1b363a31-a429-4fb6-8959-984a025e72ff::7}")); } @Test @@ -226,8 +226,8 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest { final List queues = messagesCache.getQueuesToPersist(slot, Instant.now().plusSeconds(60), 100); assertEquals(1, queues.size()); - assertEquals(DESTINATION_UUID, RedisClusterMessagesCache.getAccountUuidFromQueueName(queues.get(0))); - assertEquals(DESTINATION_DEVICE_ID, RedisClusterMessagesCache.getDeviceIdFromQueueName(queues.get(0))); + assertEquals(DESTINATION_UUID, MessagesCache.getAccountUuidFromQueueName(queues.get(0))); + assertEquals(DESTINATION_DEVICE_ID, MessagesCache.getDeviceIdFromQueueName(queues.get(0))); } @Test(timeout = 5_000L) @@ -281,8 +281,8 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest { messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener); - messagesCache.lockQueueForPersistence(RedisClusterMessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID)); - messagesCache.unlockQueueForPersistence(RedisClusterMessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID)); + messagesCache.lockQueueForPersistence(MessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID)); + messagesCache.unlockQueueForPersistence(MessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID)); synchronized (notified) { while (!notified.get()) {