diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index ed172225f..de5420733 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -47,99 +47,105 @@ import org.whispersystems.textsecuregcm.util.RedisClusterUtil; public class MessagesCache extends RedisClusterPubSubAdapter implements Managed { - private final FaultTolerantRedisCluster readDeleteCluster; - private final FaultTolerantPubSubConnection pubSubConnection; + private final FaultTolerantRedisCluster readDeleteCluster; + private final FaultTolerantPubSubConnection pubSubConnection; - private final ExecutorService notificationExecutorService; + private final ExecutorService notificationExecutorService; - private final ClusterLuaScript insertScript; - private final ClusterLuaScript removeByGuidScript; - private final ClusterLuaScript getItemsScript; - private final ClusterLuaScript removeQueueScript; - private final ClusterLuaScript getQueuesToPersistScript; + private final ClusterLuaScript insertScript; + private final ClusterLuaScript removeByGuidScript; + private final ClusterLuaScript getItemsScript; + private final ClusterLuaScript removeQueueScript; + private final ClusterLuaScript getQueuesToPersistScript; - private final Map messageListenersByQueueName = new HashMap<>(); - private final Map queueNamesByMessageListener = new IdentityHashMap<>(); + private final Map messageListenersByQueueName = new HashMap<>(); + private final Map queueNamesByMessageListener = new IdentityHashMap<>(); - private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert")); - private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get")); - private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist")); - private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear")); - private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage")); - private final Counter newMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification")); - private final Counter queuePersistedNotificationCounter = Metrics.counter( - name(MessagesCache.class, "queuePersisted")); + private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert")); + private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get")); + private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist")); + private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear")); + private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage")); + private final Counter newMessageNotificationCounter = Metrics.counter( + name(MessagesCache.class, "newMessageNotification")); + private final Counter queuePersistedNotificationCounter = Metrics.counter( + name(MessagesCache.class, "queuePersisted")); private final Counter staleEphemeralMessagesCounter = Metrics.counter( name(MessagesCache.class, "staleEphemeralMessages")); - static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot"; - private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8); + static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot"; + private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8); - private static final String QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue::"; - private static final String PERSISTING_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_persisting::"; + private static final String QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue::"; + private static final String PERSISTING_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_persisting::"; - private static final Duration MAX_EPHEMERAL_MESSAGE_DELAY = Duration.ofSeconds(10); + private static final Duration MAX_EPHEMERAL_MESSAGE_DELAY = Duration.ofSeconds(10); - private static final String REMOVE_TIMER_NAME = name(MessagesCache.class, "remove"); + private static final String REMOVE_TIMER_NAME = name(MessagesCache.class, "remove"); - private static final String REMOVE_METHOD_TAG = "method"; - private static final String REMOVE_METHOD_UUID = "uuid"; + private static final String REMOVE_METHOD_TAG = "method"; + private static final String REMOVE_METHOD_UUID = "uuid"; - private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class); + private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class); - public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster, final ExecutorService notificationExecutorService) throws IOException { + public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster, + final ExecutorService notificationExecutorService) throws IOException { - this.readDeleteCluster = readDeleteCluster; - this.pubSubConnection = readDeleteCluster.createPubSubConnection(); + this.readDeleteCluster = readDeleteCluster; + this.pubSubConnection = readDeleteCluster.createPubSubConnection(); - this.notificationExecutorService = notificationExecutorService; + this.notificationExecutorService = notificationExecutorService; - this.insertScript = ClusterLuaScript.fromResource(insertCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER); - this.removeByGuidScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_guid.lua", ScriptOutputType.MULTI); - this.getItemsScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_items.lua", ScriptOutputType.MULTI); - this.removeQueueScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS); - this.getQueuesToPersistScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI); + this.insertScript = ClusterLuaScript.fromResource(insertCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER); + this.removeByGuidScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_guid.lua", + ScriptOutputType.MULTI); + this.getItemsScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_items.lua", ScriptOutputType.MULTI); + this.removeQueueScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_queue.lua", + ScriptOutputType.STATUS); + this.getQueuesToPersistScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_queues_to_persist.lua", + ScriptOutputType.MULTI); + } + + @Override + public void start() { + pubSubConnection.usePubSubConnection(connection -> { + connection.addListener(this); + connection.getResources().eventBus().get() + .filter(event -> event instanceof ClusterTopologyChangedEvent) + .subscribe(event -> resubscribeAll()); + }); + } + + @Override + public void stop() { + pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe()); + } + + private void resubscribeAll() { + logger.info("Got topology change event, resubscribing all keyspace notifications"); + + final Set queueNames; + + synchronized (messageListenersByQueueName) { + queueNames = new HashSet<>(messageListenersByQueueName.keySet()); } - @Override - public void start() { - pubSubConnection.usePubSubConnection(connection -> { - connection.addListener(this); - connection.getResources().eventBus().get() - .filter(event -> event instanceof ClusterTopologyChangedEvent) - .subscribe(event -> resubscribeAll()); - }); + for (final String queueName : queueNames) { + subscribeForKeyspaceNotifications(queueName); } + } - @Override - public void stop() { - pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe()); - } - - private void resubscribeAll() { - logger.info("Got topology change event, resubscribing all keyspace notifications"); - - final Set queueNames; - - synchronized (messageListenersByQueueName) { - queueNames = new HashSet<>(messageListenersByQueueName.keySet()); - } - - for (final String queueName : queueNames) { - subscribeForKeyspaceNotifications(queueName); - } - } - - public long insert(final UUID guid, final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message) { - final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build(); - return (long) insertTimer.record(() -> - insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice), - getMessageQueueMetadataKey(destinationUuid, destinationDevice), - getQueueIndexKey(destinationUuid, destinationDevice)), - List.of(messageWithGuid.toByteArray(), - String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8), - guid.toString().getBytes(StandardCharsets.UTF_8)))); - } + public long insert(final UUID guid, final UUID destinationUuid, final long destinationDevice, + final MessageProtos.Envelope message) { + final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build(); + return (long) insertTimer.record(() -> + insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice), + getMessageQueueMetadataKey(destinationUuid, destinationDevice), + getQueueIndexKey(destinationUuid, destinationDevice)), + List.of(messageWithGuid.toByteArray(), + String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8), + guid.toString().getBytes(StandardCharsets.UTF_8)))); + } public Optional remove(final UUID destinationUuid, final long destinationDevice, final UUID messageGuid) { @@ -170,247 +176,261 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp return removedMessages; } - public boolean hasMessages(final UUID destinationUuid, final long destinationDevice) { - return readDeleteCluster.withBinaryCluster(connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0); - } + public boolean hasMessages(final UUID destinationUuid, final long destinationDevice) { + return readDeleteCluster.withBinaryCluster( + connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0); + } - @SuppressWarnings("unchecked") - public List get(final UUID destinationUuid, final long destinationDevice, final int limit) { - return getMessagesTimer.record(() -> { - final List queueItems = (List)getItemsScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice), - getPersistInProgressKey(destinationUuid, destinationDevice)), - List.of(String.valueOf(limit).getBytes(StandardCharsets.UTF_8))); + @SuppressWarnings("unchecked") + public List get(final UUID destinationUuid, final long destinationDevice, final int limit) { + return getMessagesTimer.record(() -> { + final List queueItems = (List) getItemsScript.executeBinary( + List.of(getMessageQueueKey(destinationUuid, destinationDevice), + getPersistInProgressKey(destinationUuid, destinationDevice)), + List.of(String.valueOf(limit).getBytes(StandardCharsets.UTF_8))); - final long earliestAllowableEphemeralTimestamp = System.currentTimeMillis() - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis(); + final long earliestAllowableEphemeralTimestamp = + System.currentTimeMillis() - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis(); - final List messageEntities; - final List staleEphemeralMessageGuids = new ArrayList<>(); + final List messageEntities; + final List staleEphemeralMessageGuids = new ArrayList<>(); - if (queueItems.size() % 2 == 0) { - messageEntities = new ArrayList<>(queueItems.size() / 2); - - for (int i = 0; i < queueItems.size() - 1; i += 2) { - try { - final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(queueItems.get(i)); - if (message.getEphemeral() && message.getTimestamp() < earliestAllowableEphemeralTimestamp) { - staleEphemeralMessageGuids.add(UUID.fromString(message.getServerGuid())); - continue; - } - - final long id = Long.parseLong(new String(queueItems.get(i + 1), StandardCharsets.UTF_8)); - - messageEntities.add(constructEntityFromEnvelope(id, message)); - } catch (InvalidProtocolBufferException e) { - logger.warn("Failed to parse envelope", e); - } - } - } else { - logger.error("\"Get messages\" operation returned a list with a non-even number of elements."); - messageEntities = Collections.emptyList(); - } + if (queueItems.size() % 2 == 0) { + messageEntities = new ArrayList<>(queueItems.size() / 2); + for (int i = 0; i < queueItems.size() - 1; i += 2) { try { - remove(destinationUuid, destinationDevice, staleEphemeralMessageGuids); - staleEphemeralMessagesCounter.increment(staleEphemeralMessageGuids.size()); - } catch (final Throwable e) { - logger.warn("Could not remove stale ephemeral messages from cache", e); + final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(queueItems.get(i)); + if (message.getEphemeral() && message.getTimestamp() < earliestAllowableEphemeralTimestamp) { + staleEphemeralMessageGuids.add(UUID.fromString(message.getServerGuid())); + continue; + } + + final long id = Long.parseLong(new String(queueItems.get(i + 1), StandardCharsets.UTF_8)); + + messageEntities.add(constructEntityFromEnvelope(id, message)); + } catch (InvalidProtocolBufferException e) { + logger.warn("Failed to parse envelope", e); } - - return messageEntities; - }); - } - - @VisibleForTesting - List getMessagesToPersist(final UUID accountUuid, final long destinationDevice, final int limit) { - return getMessagesTimer.record(() -> { - final List> scoredMessages = readDeleteCluster.withBinaryCluster(connection -> connection.sync().zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit)); - final List envelopes = new ArrayList<>(scoredMessages.size()); - - for (final ScoredValue scoredMessage : scoredMessages) { - try { - envelopes.add(MessageProtos.Envelope.parseFrom(scoredMessage.getValue())); - } catch (InvalidProtocolBufferException e) { - logger.warn("Failed to parse envelope", e); - } - } - - return envelopes; - }); - } - - public void clear(final UUID destinationUuid) { - // TODO Remove null check in a fully UUID-based world - if (destinationUuid != null) { - for (int i = 1; i < 256; i++) { - clear(destinationUuid, i); - } } - } + } else { + logger.error("\"Get messages\" operation returned a list with a non-even number of elements."); + messageEntities = Collections.emptyList(); + } - public void clear(final UUID destinationUuid, final long deviceId) { - clearQueueTimer.record(() -> - removeQueueScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, deviceId), - getMessageQueueMetadataKey(destinationUuid, deviceId), - getQueueIndexKey(destinationUuid, deviceId)), - Collections.emptyList())); - } + try { + remove(destinationUuid, destinationDevice, staleEphemeralMessageGuids); + staleEphemeralMessagesCounter.increment(staleEphemeralMessageGuids.size()); + } catch (final Throwable e) { + logger.warn("Could not remove stale ephemeral messages from cache", e); + } - int getNextSlotToPersist() { - return (int)(readDeleteCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT); - } + return messageEntities; + }); + } - List getQueuesToPersist(final int slot, final Instant maxTime, final int limit) { - //noinspection unchecked - return getQueuesToPersistTimer.record(() -> (List)getQueuesToPersistScript.execute(List.of(new String(getQueueIndexKey(slot), StandardCharsets.UTF_8)), - List.of(String.valueOf(maxTime.toEpochMilli()), - String.valueOf(limit)))); - } + @VisibleForTesting + List getMessagesToPersist(final UUID accountUuid, final long destinationDevice, + final int limit) { + return getMessagesTimer.record(() -> { + final List> scoredMessages = readDeleteCluster.withBinaryCluster( + connection -> connection.sync() + .zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit)); + final List envelopes = new ArrayList<>(scoredMessages.size()); - void addQueueToPersist(final UUID accountUuid, final long deviceId) { - readDeleteCluster.useBinaryCluster(connection -> connection.sync().zadd(getQueueIndexKey(accountUuid, deviceId), ZAddArgs.Builder.nx(), System.currentTimeMillis(), getMessageQueueKey(accountUuid, deviceId))); - } - - void lockQueueForPersistence(final UUID accountUuid, final long deviceId) { - readDeleteCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE)); - } - - void unlockQueueForPersistence(final UUID accountUuid, final long deviceId) { - readDeleteCluster.useBinaryCluster(connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId))); - } - - public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) { - final String queueName = getQueueName(destinationUuid, deviceId); - - synchronized (messageListenersByQueueName) { - messageListenersByQueueName.put(queueName, listener); - queueNamesByMessageListener.put(listener, queueName); + for (final ScoredValue scoredMessage : scoredMessages) { + try { + envelopes.add(MessageProtos.Envelope.parseFrom(scoredMessage.getValue())); + } catch (InvalidProtocolBufferException e) { + logger.warn("Failed to parse envelope", e); } + } - subscribeForKeyspaceNotifications(queueName); + return envelopes; + }); + } + + public void clear(final UUID destinationUuid) { + // TODO Remove null check in a fully UUID-based world + if (destinationUuid != null) { + for (int i = 1; i < 256; i++) { + clear(destinationUuid, i); + } + } + } + + public void clear(final UUID destinationUuid, final long deviceId) { + clearQueueTimer.record(() -> + removeQueueScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, deviceId), + getMessageQueueMetadataKey(destinationUuid, deviceId), + getQueueIndexKey(destinationUuid, deviceId)), + Collections.emptyList())); + } + + int getNextSlotToPersist() { + return (int) (readDeleteCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) + % SlotHash.SLOT_COUNT); + } + + List getQueuesToPersist(final int slot, final Instant maxTime, final int limit) { + //noinspection unchecked + return getQueuesToPersistTimer.record(() -> (List) getQueuesToPersistScript.execute( + List.of(new String(getQueueIndexKey(slot), StandardCharsets.UTF_8)), + List.of(String.valueOf(maxTime.toEpochMilli()), + String.valueOf(limit)))); + } + + void addQueueToPersist(final UUID accountUuid, final long deviceId) { + readDeleteCluster.useBinaryCluster(connection -> connection.sync() + .zadd(getQueueIndexKey(accountUuid, deviceId), ZAddArgs.Builder.nx(), System.currentTimeMillis(), + getMessageQueueKey(accountUuid, deviceId))); + } + + void lockQueueForPersistence(final UUID accountUuid, final long deviceId) { + readDeleteCluster.useBinaryCluster( + connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE)); + } + + void unlockQueueForPersistence(final UUID accountUuid, final long deviceId) { + readDeleteCluster.useBinaryCluster( + connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId))); + } + + public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, + final MessageAvailabilityListener listener) { + final String queueName = getQueueName(destinationUuid, deviceId); + + synchronized (messageListenersByQueueName) { + messageListenersByQueueName.put(queueName, listener); + queueNamesByMessageListener.put(listener, queueName); } - public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) { - final String queueName = queueNamesByMessageListener.remove(listener); + subscribeForKeyspaceNotifications(queueName); + } - unsubscribeFromKeyspaceNotifications(queueName); + public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) { + final String queueName = queueNamesByMessageListener.remove(listener); - synchronized (messageListenersByQueueName) { - if (queueName != null) { - messageListenersByQueueName.remove(queueName); - } + unsubscribeFromKeyspaceNotifications(queueName); + + synchronized (messageListenersByQueueName) { + if (queueName != null) { + messageListenersByQueueName.remove(queueName); + } + } + } + + private void subscribeForKeyspaceNotifications(final String queueName) { + final int slot = SlotHash.getSlot(queueName); + + pubSubConnection.usePubSubConnection( + connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)) + .commands() + .subscribe(getKeyspaceChannels(queueName))); + } + + private void unsubscribeFromKeyspaceNotifications(final String queueName) { + pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream() + .commands() + .unsubscribe(getKeyspaceChannels(queueName))); + } + + private static String[] getKeyspaceChannels(final String queueName) { + return new String[]{ + QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}", + PERSISTING_KEYSPACE_PREFIX + "{" + queueName + "}" + }; + } + + @Override + public void message(final RedisClusterNode node, final String channel, final String message) { + pubSubMessageCounter.increment(); + + if (channel.startsWith(QUEUE_KEYSPACE_PREFIX) && "zadd".equals(message)) { + newMessageNotificationCounter.increment(); + notificationExecutorService.execute(() -> { + try { + findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable); + } catch (final Exception e) { + logger.warn("Unexpected error handling new message", e); } - } - - private void subscribeForKeyspaceNotifications(final String queueName) { - final int slot = SlotHash.getSlot(queueName); - - pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)) - .commands() - .subscribe(getKeyspaceChannels(queueName))); - } - - private void unsubscribeFromKeyspaceNotifications(final String queueName) { - pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream() - .commands() - .unsubscribe(getKeyspaceChannels(queueName))); - } - - private static String[] getKeyspaceChannels(final String queueName) { - return new String[] { - QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}", - PERSISTING_KEYSPACE_PREFIX + "{" + queueName + "}" - }; - } - - @Override - public void message(final RedisClusterNode node, final String channel, final String message) { - pubSubMessageCounter.increment(); - - if (channel.startsWith(QUEUE_KEYSPACE_PREFIX) && "zadd".equals(message)) { - newMessageNotificationCounter.increment(); - notificationExecutorService.execute(() -> { - try { - findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable); - } catch (final Exception e) { - logger.warn("Unexpected error handling new message", e); - } - }); - } else if (channel.startsWith(PERSISTING_KEYSPACE_PREFIX) && "del".equals(message)) { - queuePersistedNotificationCounter.increment(); - notificationExecutorService.execute(() -> { - try { - findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted); - } catch (final Exception e) { - logger.warn("Unexpected error handling messages persisted", e); - } - }); + }); + } else if (channel.startsWith(PERSISTING_KEYSPACE_PREFIX) && "del".equals(message)) { + queuePersistedNotificationCounter.increment(); + notificationExecutorService.execute(() -> { + try { + findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted); + } catch (final Exception e) { + logger.warn("Unexpected error handling messages persisted", e); } + }); } + } - private Optional findListener(final String keyspaceChannel) { - final String queueName = getQueueNameFromKeyspaceChannel(keyspaceChannel); + private Optional findListener(final String keyspaceChannel) { + final String queueName = getQueueNameFromKeyspaceChannel(keyspaceChannel); - synchronized (messageListenersByQueueName) { - return Optional.ofNullable(messageListenersByQueueName.get(queueName)); - } + synchronized (messageListenersByQueueName) { + return Optional.ofNullable(messageListenersByQueueName.get(queueName)); } + } - @VisibleForTesting - static OutgoingMessageEntity constructEntityFromEnvelope(long id, MessageProtos.Envelope envelope) { - return new OutgoingMessageEntity(id, true, - envelope.hasServerGuid() ? UUID.fromString(envelope.getServerGuid()) : null, - envelope.getType().getNumber(), - envelope.getRelay(), - envelope.getTimestamp(), - envelope.getSource(), - envelope.hasSourceUuid() ? UUID.fromString(envelope.getSourceUuid()) : null, - envelope.getSourceDevice(), - envelope.hasLegacyMessage() ? envelope.getLegacyMessage().toByteArray() : null, - envelope.hasContent() ? envelope.getContent().toByteArray() : null, - envelope.hasServerTimestamp() ? envelope.getServerTimestamp() : 0); - } + @VisibleForTesting + static OutgoingMessageEntity constructEntityFromEnvelope(long id, MessageProtos.Envelope envelope) { + return new OutgoingMessageEntity(id, true, + envelope.hasServerGuid() ? UUID.fromString(envelope.getServerGuid()) : null, + envelope.getType().getNumber(), + envelope.getRelay(), + envelope.getTimestamp(), + envelope.getSource(), + envelope.hasSourceUuid() ? UUID.fromString(envelope.getSourceUuid()) : null, + envelope.getSourceDevice(), + envelope.hasLegacyMessage() ? envelope.getLegacyMessage().toByteArray() : null, + envelope.hasContent() ? envelope.getContent().toByteArray() : null, + envelope.hasServerTimestamp() ? envelope.getServerTimestamp() : 0); + } - @VisibleForTesting - static String getQueueName(final UUID accountUuid, final long deviceId) { - return accountUuid + "::" + deviceId; - } + @VisibleForTesting + static String getQueueName(final UUID accountUuid, final long deviceId) { + return accountUuid + "::" + deviceId; + } - @VisibleForTesting - static String getQueueNameFromKeyspaceChannel(final String channel) { - final int startOfHashTag = channel.indexOf('{'); - final int endOfHashTag = channel.lastIndexOf('}'); + @VisibleForTesting + static String getQueueNameFromKeyspaceChannel(final String channel) { + final int startOfHashTag = channel.indexOf('{'); + final int endOfHashTag = channel.lastIndexOf('}'); - return channel.substring(startOfHashTag + 1, endOfHashTag); - } + return channel.substring(startOfHashTag + 1, endOfHashTag); + } - @VisibleForTesting - static byte[] getMessageQueueKey(final UUID accountUuid, final long deviceId) { - return ("user_queue::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); - } + @VisibleForTesting + static byte[] getMessageQueueKey(final UUID accountUuid, final long deviceId) { + return ("user_queue::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); + } - private static byte[] getMessageQueueMetadataKey(final UUID accountUuid, final long deviceId) { - return ("user_queue_metadata::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); - } + private static byte[] getMessageQueueMetadataKey(final UUID accountUuid, final long deviceId) { + return ("user_queue_metadata::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); + } - private static byte[] getQueueIndexKey(final UUID accountUuid, final long deviceId) { - return getQueueIndexKey(SlotHash.getSlot(accountUuid.toString() + "::" + deviceId)); - } + private static byte[] getQueueIndexKey(final UUID accountUuid, final long deviceId) { + return getQueueIndexKey(SlotHash.getSlot(accountUuid.toString() + "::" + deviceId)); + } - private static byte[] getQueueIndexKey(final int slot) { - return ("user_queue_index::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}").getBytes(StandardCharsets.UTF_8); - } + private static byte[] getQueueIndexKey(final int slot) { + return ("user_queue_index::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}").getBytes(StandardCharsets.UTF_8); + } - private static byte[] getPersistInProgressKey(final UUID accountUuid, final long deviceId) { - return ("user_queue_persisting::{" + accountUuid + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); - } + private static byte[] getPersistInProgressKey(final UUID accountUuid, final long deviceId) { + return ("user_queue_persisting::{" + accountUuid + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); + } - static UUID getAccountUuidFromQueueName(final String queueName) { - final int startOfHashTag = queueName.indexOf('{'); + static UUID getAccountUuidFromQueueName(final String queueName) { + final int startOfHashTag = queueName.indexOf('{'); - return UUID.fromString(queueName.substring(startOfHashTag + 1, queueName.indexOf("::", startOfHashTag))); - } + return UUID.fromString(queueName.substring(startOfHashTag + 1, queueName.indexOf("::", startOfHashTag))); + } - static long getDeviceIdFromQueueName(final String queueName) { - return Long.parseLong(queueName.substring(queueName.lastIndexOf("::") + 2, queueName.lastIndexOf('}'))); - } + static long getDeviceIdFromQueueName(final String queueName) { + return Long.parseLong(queueName.substring(queueName.lastIndexOf("::") + 2, queueName.lastIndexOf('}'))); + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 9849c9271..cd178de67 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -37,296 +37,312 @@ import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; @RunWith(JUnitParamsRunner.class) public class MessagesCacheTest extends AbstractRedisClusterTest { - private ExecutorService notificationExecutorService; - private MessagesCache messagesCache; + private ExecutorService notificationExecutorService; + private MessagesCache messagesCache; - private final Random random = new Random(); - private long serialTimestamp = 0; + private final Random random = new Random(); + private long serialTimestamp = 0; - private static final UUID DESTINATION_UUID = UUID.randomUUID(); - private static final int DESTINATION_DEVICE_ID = 7; + private static final UUID DESTINATION_UUID = UUID.randomUUID(); + private static final int DESTINATION_DEVICE_ID = 7; - @Override - @Before - public void setUp() throws Exception { - super.setUp(); + @Override + @Before + public void setUp() throws Exception { + super.setUp(); - getRedisCluster().useCluster(connection -> connection.sync().upstream().commands().configSet("notify-keyspace-events", "Klgz")); + getRedisCluster().useCluster( + connection -> connection.sync().upstream().commands().configSet("notify-keyspace-events", "Klgz")); - notificationExecutorService = Executors.newSingleThreadExecutor(); - messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService); + notificationExecutorService = Executors.newSingleThreadExecutor(); + messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService); - messagesCache.start(); + messagesCache.start(); + } + + @Override + public void tearDown() throws Exception { + messagesCache.stop(); + + notificationExecutorService.shutdown(); + notificationExecutorService.awaitTermination(1, TimeUnit.SECONDS); + + super.tearDown(); + } + + @Test + @Parameters({"true", "false"}) + public void testInsert(final boolean sealedSender) { + final UUID messageGuid = UUID.randomUUID(); + assertTrue(messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, + generateRandomMessage(messageGuid, sealedSender)) > 0); + } + + @Test + public void testDoubleInsertGuid() { + final UUID duplicateGuid = UUID.randomUUID(); + final MessageProtos.Envelope duplicateMessage = generateRandomMessage(duplicateGuid, false); + + final long firstId = messagesCache.insert(duplicateGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, duplicateMessage); + final long secondId = messagesCache.insert(duplicateGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, + duplicateMessage); + + assertEquals(firstId, secondId); + } + + @Test + @Parameters({"true", "false"}) + public void testRemoveByUUID(final boolean sealedSender) { + final UUID messageGuid = UUID.randomUUID(); + + assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid)); + + final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender); + + messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message); + final Optional maybeRemovedMessage = messagesCache.remove(DESTINATION_UUID, + DESTINATION_DEVICE_ID, messageGuid); + + assertTrue(maybeRemovedMessage.isPresent()); + assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get()); + } + + @Test + @Parameters({"true", "false"}) + public void testRemoveBatchByUUID(final boolean sealedSender) { + final int messageCount = 10; + + final List messagesToRemove = new ArrayList<>(messageCount); + final List messagesToPreserve = new ArrayList<>(messageCount); + + for (int i = 0; i < 10; i++) { + messagesToRemove.add(generateRandomMessage(UUID.randomUUID(), sealedSender)); + messagesToPreserve.add(generateRandomMessage(UUID.randomUUID(), sealedSender)); } - @Override - public void tearDown() throws Exception { - messagesCache.stop(); + assertEquals(Collections.emptyList(), messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, + messagesToRemove.stream().map(message -> UUID.fromString(message.getServerGuid())) + .collect(Collectors.toList()))); - notificationExecutorService.shutdown(); - notificationExecutorService.awaitTermination(1, TimeUnit.SECONDS); - - super.tearDown(); + for (final MessageProtos.Envelope message : messagesToRemove) { + messagesCache.insert(UUID.fromString(message.getServerGuid()), DESTINATION_UUID, DESTINATION_DEVICE_ID, message); } - @Test - @Parameters({"true", "false"}) - public void testInsert(final boolean sealedSender) { + for (final MessageProtos.Envelope message : messagesToPreserve) { + messagesCache.insert(UUID.fromString(message.getServerGuid()), DESTINATION_UUID, DESTINATION_DEVICE_ID, message); + } + + final List removedMessages = messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, + messagesToRemove.stream().map(message -> UUID.fromString(message.getServerGuid())) + .collect(Collectors.toList())); + + assertEquals(messagesToRemove.stream().map(message -> MessagesCache.constructEntityFromEnvelope(0, message)) + .collect(Collectors.toList()), + removedMessages); + + assertEquals(messagesToPreserve, + messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount)); + } + + @Test + public void testHasMessages() { + assertFalse(messagesCache.hasMessages(DESTINATION_UUID, DESTINATION_DEVICE_ID)); + + final UUID messageGuid = UUID.randomUUID(); + final MessageProtos.Envelope message = generateRandomMessage(messageGuid, true); + messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message); + + assertTrue(messagesCache.hasMessages(DESTINATION_UUID, DESTINATION_DEVICE_ID)); + } + + @Test + @Parameters({"true", "false"}) + public void testGetMessages(final boolean sealedSender) { + final int messageCount = 100; + + final List expectedMessages = new ArrayList<>(messageCount); + + for (int i = 0; i < messageCount; i++) { + final UUID messageGuid = UUID.randomUUID(); + final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender); + final long messageId = messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message); + + expectedMessages.add(MessagesCache.constructEntityFromEnvelope(messageId, message)); + } + + assertEquals(expectedMessages, messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount)); + } + + @Test + @Parameters({"true", "false"}) + public void testClearQueueForDevice(final boolean sealedSender) { + final int messageCount = 100; + + for (final int deviceId : new int[]{DESTINATION_DEVICE_ID, DESTINATION_DEVICE_ID + 1}) { + for (int i = 0; i < messageCount; i++) { final UUID messageGuid = UUID.randomUUID(); - assertTrue(messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, sealedSender)) > 0); - } - - @Test - public void testDoubleInsertGuid() { - final UUID duplicateGuid = UUID.randomUUID(); - final MessageProtos.Envelope duplicateMessage = generateRandomMessage(duplicateGuid, false); - - final long firstId = messagesCache.insert(duplicateGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, duplicateMessage); - final long secondId = messagesCache.insert(duplicateGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, duplicateMessage); - - assertEquals(firstId, secondId); - } - - @Test - @Parameters({"true", "false"}) - public void testRemoveByUUID(final boolean sealedSender) { - final UUID messageGuid = UUID.randomUUID(); - - assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid)); - final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender); - messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message); - final Optional maybeRemovedMessage = messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid); - - assertTrue(maybeRemovedMessage.isPresent()); - assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get()); + messagesCache.insert(messageGuid, DESTINATION_UUID, deviceId, message); + } } - @Test - @Parameters({"true", "false"}) - public void testRemoveBatchByUUID(final boolean sealedSender) { - final int messageCount = 10; + messagesCache.clear(DESTINATION_UUID, DESTINATION_DEVICE_ID); - final List messagesToRemove = new ArrayList<>(messageCount); - final List messagesToPreserve = new ArrayList<>(messageCount); + assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount)); + assertEquals(messageCount, messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount).size()); + } - for (int i = 0; i < 10; i++) { - messagesToRemove.add(generateRandomMessage(UUID.randomUUID(), sealedSender)); - messagesToPreserve.add(generateRandomMessage(UUID.randomUUID(), sealedSender)); - } + @Test + @Parameters({"true", "false"}) + public void testClearQueueForAccount(final boolean sealedSender) { + final int messageCount = 100; - assertEquals(Collections.emptyList(), messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, - messagesToRemove.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList()))); + for (final int deviceId : new int[]{DESTINATION_DEVICE_ID, DESTINATION_DEVICE_ID + 1}) { + for (int i = 0; i < messageCount; i++) { + final UUID messageGuid = UUID.randomUUID(); + final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender); - for (final MessageProtos.Envelope message : messagesToRemove) { - messagesCache.insert(UUID.fromString(message.getServerGuid()), DESTINATION_UUID, DESTINATION_DEVICE_ID, message); - } - - for (final MessageProtos.Envelope message : messagesToPreserve) { - messagesCache.insert(UUID.fromString(message.getServerGuid()), DESTINATION_UUID, DESTINATION_DEVICE_ID, message); - } - - final List removedMessages = messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, - messagesToRemove.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList())); - - assertEquals(messagesToRemove.stream().map(message -> MessagesCache.constructEntityFromEnvelope(0, message)).collect(Collectors.toList()), - removedMessages); - - assertEquals(messagesToPreserve, messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount)); + messagesCache.insert(messageGuid, DESTINATION_UUID, deviceId, message); + } } - @Test - public void testHasMessages() { - assertFalse(messagesCache.hasMessages(DESTINATION_UUID, DESTINATION_DEVICE_ID)); + messagesCache.clear(DESTINATION_UUID); - final UUID messageGuid = UUID.randomUUID(); - final MessageProtos.Envelope message = generateRandomMessage(messageGuid, true); - messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message); + assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount)); + assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount)); + } - assertTrue(messagesCache.hasMessages(DESTINATION_UUID, DESTINATION_DEVICE_ID)); + private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final boolean sealedSender) { + return generateRandomMessage(messageGuid, sealedSender, serialTimestamp++); + } + + private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final boolean sealedSender, + final long timestamp) { + final MessageProtos.Envelope.Builder envelopeBuilder = MessageProtos.Envelope.newBuilder() + .setTimestamp(timestamp) + .setServerTimestamp(timestamp) + .setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256))) + .setType(MessageProtos.Envelope.Type.CIPHERTEXT) + .setServerGuid(messageGuid.toString()); + + if (!sealedSender) { + envelopeBuilder.setSourceDevice(random.nextInt(256)) + .setSource("+1" + RandomStringUtils.randomNumeric(10)); } - @Test - @Parameters({"true", "false"}) - public void testGetMessages(final boolean sealedSender) { - final int messageCount = 100; + return envelopeBuilder.build(); + } - final List expectedMessages = new ArrayList<>(messageCount); + @Test + public void testClearNullUuid() { + // We're happy as long as this doesn't throw an exception + messagesCache.clear(null); + } - for (int i = 0; i < messageCount; i++) { - final UUID messageGuid = UUID.randomUUID(); - final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender); - final long messageId = messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message); + @Test + public void testGetAccountFromQueueName() { + assertEquals(DESTINATION_UUID, + MessagesCache.getAccountUuidFromQueueName( + new String(MessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), + StandardCharsets.UTF_8))); + } - expectedMessages.add(MessagesCache.constructEntityFromEnvelope(messageId, message)); - } + @Test + public void testGetDeviceIdFromQueueName() { + assertEquals(DESTINATION_DEVICE_ID, + MessagesCache.getDeviceIdFromQueueName( + new String(MessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), + StandardCharsets.UTF_8))); + } - assertEquals(expectedMessages, messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount)); - } + @Test + public void testGetQueueNameFromKeyspaceChannel() { + assertEquals("1b363a31-a429-4fb6-8959-984a025e72ff::7", + MessagesCache.getQueueNameFromKeyspaceChannel( + "__keyspace@0__:user_queue::{1b363a31-a429-4fb6-8959-984a025e72ff::7}")); + } - @Test - @Parameters({"true", "false"}) - public void testClearQueueForDevice(final boolean sealedSender) { - final int messageCount = 100; + @Test + @Parameters({"true", "false"}) + public void testGetQueuesToPersist(final boolean sealedSender) { + final UUID messageGuid = UUID.randomUUID(); - for (final int deviceId : new int[] { DESTINATION_DEVICE_ID, DESTINATION_DEVICE_ID + 1 }) { - for (int i = 0; i < messageCount; i++) { - final UUID messageGuid = UUID.randomUUID(); - final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender); + messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, + generateRandomMessage(messageGuid, sealedSender)); + final int slot = SlotHash.getSlot(DESTINATION_UUID.toString() + "::" + DESTINATION_DEVICE_ID); - messagesCache.insert(messageGuid, DESTINATION_UUID, deviceId, message); - } - } + assertTrue(messagesCache.getQueuesToPersist(slot + 1, Instant.now().plusSeconds(60), 100).isEmpty()); - messagesCache.clear(DESTINATION_UUID, DESTINATION_DEVICE_ID); + final List queues = messagesCache.getQueuesToPersist(slot, Instant.now().plusSeconds(60), 100); - assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount)); - assertEquals(messageCount, messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount).size()); - } + assertEquals(1, queues.size()); + assertEquals(DESTINATION_UUID, MessagesCache.getAccountUuidFromQueueName(queues.get(0))); + assertEquals(DESTINATION_DEVICE_ID, MessagesCache.getDeviceIdFromQueueName(queues.get(0))); + } - @Test - @Parameters({"true", "false"}) - public void testClearQueueForAccount(final boolean sealedSender) { - final int messageCount = 100; - - for (final int deviceId : new int[] { DESTINATION_DEVICE_ID, DESTINATION_DEVICE_ID + 1 }) { - for (int i = 0; i < messageCount; i++) { - final UUID messageGuid = UUID.randomUUID(); - final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender); - - messagesCache.insert(messageGuid, DESTINATION_UUID, deviceId, message); - } - } - - messagesCache.clear(DESTINATION_UUID); - - assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount)); - assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount)); - } - - private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final boolean sealedSender) { - return generateRandomMessage(messageGuid, sealedSender, serialTimestamp++); - } - - private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final boolean sealedSender, final long timestamp) { - final MessageProtos.Envelope.Builder envelopeBuilder = MessageProtos.Envelope.newBuilder() - .setTimestamp(timestamp) - .setServerTimestamp(timestamp) - .setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256))) - .setType(MessageProtos.Envelope.Type.CIPHERTEXT) - .setServerGuid(messageGuid.toString()); - - if (!sealedSender) { - envelopeBuilder.setSourceDevice(random.nextInt(256)) - .setSource("+1" + RandomStringUtils.randomNumeric(10)); - } - - return envelopeBuilder.build(); - } - - @Test - public void testClearNullUuid() { - // We're happy as long as this doesn't throw an exception - messagesCache.clear(null); - } - - @Test - public void testGetAccountFromQueueName() { - assertEquals(DESTINATION_UUID, - MessagesCache.getAccountUuidFromQueueName(new String(MessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8))); - } - - @Test - public void testGetDeviceIdFromQueueName() { - assertEquals(DESTINATION_DEVICE_ID, - MessagesCache.getDeviceIdFromQueueName(new String(MessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8))); - } - - @Test - public void testGetQueueNameFromKeyspaceChannel() { - assertEquals("1b363a31-a429-4fb6-8959-984a025e72ff::7", - MessagesCache.getQueueNameFromKeyspaceChannel("__keyspace@0__:user_queue::{1b363a31-a429-4fb6-8959-984a025e72ff::7}")); - } - - @Test - @Parameters({"true", "false"}) - public void testGetQueuesToPersist(final boolean sealedSender) { - final UUID messageGuid = UUID.randomUUID(); - - messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, sealedSender)); - final int slot = SlotHash.getSlot(DESTINATION_UUID.toString() + "::" + DESTINATION_DEVICE_ID); - - assertTrue(messagesCache.getQueuesToPersist(slot + 1, Instant.now().plusSeconds(60), 100).isEmpty()); - - final List queues = messagesCache.getQueuesToPersist(slot, Instant.now().plusSeconds(60), 100); - - assertEquals(1, queues.size()); - assertEquals(DESTINATION_UUID, MessagesCache.getAccountUuidFromQueueName(queues.get(0))); - assertEquals(DESTINATION_DEVICE_ID, MessagesCache.getDeviceIdFromQueueName(queues.get(0))); - } - - @Test(timeout = 5_000L) - public void testNotifyListenerNewMessage() throws InterruptedException { - final AtomicBoolean notified = new AtomicBoolean(false); - final UUID messageGuid = UUID.randomUUID(); - - final MessageAvailabilityListener listener = new MessageAvailabilityListener() { - @Override - public void handleNewMessagesAvailable() { - synchronized (notified) { - notified.set(true); - notified.notifyAll(); - } - } - - @Override - public void handleMessagesPersisted() { - } - }; - - messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener); - messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, true)); + @Test(timeout = 5_000L) + public void testNotifyListenerNewMessage() throws InterruptedException { + final AtomicBoolean notified = new AtomicBoolean(false); + final UUID messageGuid = UUID.randomUUID(); + final MessageAvailabilityListener listener = new MessageAvailabilityListener() { + @Override + public void handleNewMessagesAvailable() { synchronized (notified) { - while (!notified.get()) { - notified.wait(); - } + notified.set(true); + notified.notifyAll(); } + } - assertTrue(notified.get()); + @Override + public void handleMessagesPersisted() { + } + }; + + messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener); + messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, + generateRandomMessage(messageGuid, true)); + + synchronized (notified) { + while (!notified.get()) { + notified.wait(); + } } - @Test(timeout = 5_000L) - public void testNotifyListenerPersisted() throws InterruptedException { - final AtomicBoolean notified = new AtomicBoolean(false); + assertTrue(notified.get()); + } - final MessageAvailabilityListener listener = new MessageAvailabilityListener() { - @Override - public void handleNewMessagesAvailable() { - } + @Test(timeout = 5_000L) + public void testNotifyListenerPersisted() throws InterruptedException { + final AtomicBoolean notified = new AtomicBoolean(false); - @Override - public void handleMessagesPersisted() { - synchronized (notified) { - notified.set(true); - notified.notifyAll(); - } - } - }; - - messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener); - - messagesCache.lockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID); - messagesCache.unlockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID); + final MessageAvailabilityListener listener = new MessageAvailabilityListener() { + @Override + public void handleNewMessagesAvailable() { + } + @Override + public void handleMessagesPersisted() { synchronized (notified) { - while (!notified.get()) { - notified.wait(); - } + notified.set(true); + notified.notifyAll(); } + } + }; - assertTrue(notified.get()); + messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener); + + messagesCache.lockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID); + messagesCache.unlockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID); + + synchronized (notified) { + while (!notified.get()) { + notified.wait(); + } } + assertTrue(notified.get()); + } + }