apply editorconfig formatting

This commit is contained in:
Chris Eager 2021-09-27 16:45:38 -07:00 committed by Chris Eager
parent d1d6e5c652
commit 5189cbe5c7
2 changed files with 558 additions and 522 deletions

View File

@ -47,99 +47,105 @@ import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
public class MessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed { public class MessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed {
private final FaultTolerantRedisCluster readDeleteCluster; private final FaultTolerantRedisCluster readDeleteCluster;
private final FaultTolerantPubSubConnection<String, String> pubSubConnection; private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
private final ExecutorService notificationExecutorService; private final ExecutorService notificationExecutorService;
private final ClusterLuaScript insertScript; private final ClusterLuaScript insertScript;
private final ClusterLuaScript removeByGuidScript; private final ClusterLuaScript removeByGuidScript;
private final ClusterLuaScript getItemsScript; private final ClusterLuaScript getItemsScript;
private final ClusterLuaScript removeQueueScript; private final ClusterLuaScript removeQueueScript;
private final ClusterLuaScript getQueuesToPersistScript; private final ClusterLuaScript getQueuesToPersistScript;
private final Map<String, MessageAvailabilityListener> messageListenersByQueueName = new HashMap<>(); private final Map<String, MessageAvailabilityListener> messageListenersByQueueName = new HashMap<>();
private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>(); private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>();
private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert")); private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert"));
private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get")); private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get"));
private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist")); private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist"));
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear")); private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage")); private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage"));
private final Counter newMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification")); private final Counter newMessageNotificationCounter = Metrics.counter(
private final Counter queuePersistedNotificationCounter = Metrics.counter( name(MessagesCache.class, "newMessageNotification"));
name(MessagesCache.class, "queuePersisted")); private final Counter queuePersistedNotificationCounter = Metrics.counter(
name(MessagesCache.class, "queuePersisted"));
private final Counter staleEphemeralMessagesCounter = Metrics.counter( private final Counter staleEphemeralMessagesCounter = Metrics.counter(
name(MessagesCache.class, "staleEphemeralMessages")); name(MessagesCache.class, "staleEphemeralMessages"));
static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot"; static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot";
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8); private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);
private static final String QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue::"; private static final String QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue::";
private static final String PERSISTING_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_persisting::"; private static final String PERSISTING_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_persisting::";
private static final Duration MAX_EPHEMERAL_MESSAGE_DELAY = Duration.ofSeconds(10); private static final Duration MAX_EPHEMERAL_MESSAGE_DELAY = Duration.ofSeconds(10);
private static final String REMOVE_TIMER_NAME = name(MessagesCache.class, "remove"); private static final String REMOVE_TIMER_NAME = name(MessagesCache.class, "remove");
private static final String REMOVE_METHOD_TAG = "method"; private static final String REMOVE_METHOD_TAG = "method";
private static final String REMOVE_METHOD_UUID = "uuid"; private static final String REMOVE_METHOD_UUID = "uuid";
private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class); private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class);
public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster, final ExecutorService notificationExecutorService) throws IOException { public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster,
final ExecutorService notificationExecutorService) throws IOException {
this.readDeleteCluster = readDeleteCluster; this.readDeleteCluster = readDeleteCluster;
this.pubSubConnection = readDeleteCluster.createPubSubConnection(); this.pubSubConnection = readDeleteCluster.createPubSubConnection();
this.notificationExecutorService = notificationExecutorService; this.notificationExecutorService = notificationExecutorService;
this.insertScript = ClusterLuaScript.fromResource(insertCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER); this.insertScript = ClusterLuaScript.fromResource(insertCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER);
this.removeByGuidScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_guid.lua", ScriptOutputType.MULTI); this.removeByGuidScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_guid.lua",
this.getItemsScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_items.lua", ScriptOutputType.MULTI); ScriptOutputType.MULTI);
this.removeQueueScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS); this.getItemsScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_items.lua", ScriptOutputType.MULTI);
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI); this.removeQueueScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_queue.lua",
ScriptOutputType.STATUS);
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_queues_to_persist.lua",
ScriptOutputType.MULTI);
}
@Override
public void start() {
pubSubConnection.usePubSubConnection(connection -> {
connection.addListener(this);
connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribe(event -> resubscribeAll());
});
}
@Override
public void stop() {
pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe());
}
private void resubscribeAll() {
logger.info("Got topology change event, resubscribing all keyspace notifications");
final Set<String> queueNames;
synchronized (messageListenersByQueueName) {
queueNames = new HashSet<>(messageListenersByQueueName.keySet());
} }
@Override for (final String queueName : queueNames) {
public void start() { subscribeForKeyspaceNotifications(queueName);
pubSubConnection.usePubSubConnection(connection -> {
connection.addListener(this);
connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribe(event -> resubscribeAll());
});
} }
}
@Override public long insert(final UUID guid, final UUID destinationUuid, final long destinationDevice,
public void stop() { final MessageProtos.Envelope message) {
pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe()); final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
} return (long) insertTimer.record(() ->
insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
private void resubscribeAll() { getMessageQueueMetadataKey(destinationUuid, destinationDevice),
logger.info("Got topology change event, resubscribing all keyspace notifications"); getQueueIndexKey(destinationUuid, destinationDevice)),
List.of(messageWithGuid.toByteArray(),
final Set<String> queueNames; String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8),
guid.toString().getBytes(StandardCharsets.UTF_8))));
synchronized (messageListenersByQueueName) { }
queueNames = new HashSet<>(messageListenersByQueueName.keySet());
}
for (final String queueName : queueNames) {
subscribeForKeyspaceNotifications(queueName);
}
}
public long insert(final UUID guid, final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message) {
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
return (long) insertTimer.record(() ->
insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
getQueueIndexKey(destinationUuid, destinationDevice)),
List.of(messageWithGuid.toByteArray(),
String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8),
guid.toString().getBytes(StandardCharsets.UTF_8))));
}
public Optional<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice, public Optional<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice,
final UUID messageGuid) { final UUID messageGuid) {
@ -170,247 +176,261 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
return removedMessages; return removedMessages;
} }
public boolean hasMessages(final UUID destinationUuid, final long destinationDevice) { public boolean hasMessages(final UUID destinationUuid, final long destinationDevice) {
return readDeleteCluster.withBinaryCluster(connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0); return readDeleteCluster.withBinaryCluster(
} connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<OutgoingMessageEntity> get(final UUID destinationUuid, final long destinationDevice, final int limit) { public List<OutgoingMessageEntity> get(final UUID destinationUuid, final long destinationDevice, final int limit) {
return getMessagesTimer.record(() -> { return getMessagesTimer.record(() -> {
final List<byte[]> queueItems = (List<byte[]>)getItemsScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice), final List<byte[]> queueItems = (List<byte[]>) getItemsScript.executeBinary(
getPersistInProgressKey(destinationUuid, destinationDevice)), List.of(getMessageQueueKey(destinationUuid, destinationDevice),
List.of(String.valueOf(limit).getBytes(StandardCharsets.UTF_8))); getPersistInProgressKey(destinationUuid, destinationDevice)),
List.of(String.valueOf(limit).getBytes(StandardCharsets.UTF_8)));
final long earliestAllowableEphemeralTimestamp = System.currentTimeMillis() - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis(); final long earliestAllowableEphemeralTimestamp =
System.currentTimeMillis() - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis();
final List<OutgoingMessageEntity> messageEntities; final List<OutgoingMessageEntity> messageEntities;
final List<UUID> staleEphemeralMessageGuids = new ArrayList<>(); final List<UUID> staleEphemeralMessageGuids = new ArrayList<>();
if (queueItems.size() % 2 == 0) { if (queueItems.size() % 2 == 0) {
messageEntities = new ArrayList<>(queueItems.size() / 2); messageEntities = new ArrayList<>(queueItems.size() / 2);
for (int i = 0; i < queueItems.size() - 1; i += 2) {
try {
final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(queueItems.get(i));
if (message.getEphemeral() && message.getTimestamp() < earliestAllowableEphemeralTimestamp) {
staleEphemeralMessageGuids.add(UUID.fromString(message.getServerGuid()));
continue;
}
final long id = Long.parseLong(new String(queueItems.get(i + 1), StandardCharsets.UTF_8));
messageEntities.add(constructEntityFromEnvelope(id, message));
} catch (InvalidProtocolBufferException e) {
logger.warn("Failed to parse envelope", e);
}
}
} else {
logger.error("\"Get messages\" operation returned a list with a non-even number of elements.");
messageEntities = Collections.emptyList();
}
for (int i = 0; i < queueItems.size() - 1; i += 2) {
try { try {
remove(destinationUuid, destinationDevice, staleEphemeralMessageGuids); final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(queueItems.get(i));
staleEphemeralMessagesCounter.increment(staleEphemeralMessageGuids.size()); if (message.getEphemeral() && message.getTimestamp() < earliestAllowableEphemeralTimestamp) {
} catch (final Throwable e) { staleEphemeralMessageGuids.add(UUID.fromString(message.getServerGuid()));
logger.warn("Could not remove stale ephemeral messages from cache", e); continue;
}
final long id = Long.parseLong(new String(queueItems.get(i + 1), StandardCharsets.UTF_8));
messageEntities.add(constructEntityFromEnvelope(id, message));
} catch (InvalidProtocolBufferException e) {
logger.warn("Failed to parse envelope", e);
} }
return messageEntities;
});
}
@VisibleForTesting
List<MessageProtos.Envelope> getMessagesToPersist(final UUID accountUuid, final long destinationDevice, final int limit) {
return getMessagesTimer.record(() -> {
final List<ScoredValue<byte[]>> scoredMessages = readDeleteCluster.withBinaryCluster(connection -> connection.sync().zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit));
final List<MessageProtos.Envelope> envelopes = new ArrayList<>(scoredMessages.size());
for (final ScoredValue<byte[]> scoredMessage : scoredMessages) {
try {
envelopes.add(MessageProtos.Envelope.parseFrom(scoredMessage.getValue()));
} catch (InvalidProtocolBufferException e) {
logger.warn("Failed to parse envelope", e);
}
}
return envelopes;
});
}
public void clear(final UUID destinationUuid) {
// TODO Remove null check in a fully UUID-based world
if (destinationUuid != null) {
for (int i = 1; i < 256; i++) {
clear(destinationUuid, i);
}
} }
} } else {
logger.error("\"Get messages\" operation returned a list with a non-even number of elements.");
messageEntities = Collections.emptyList();
}
public void clear(final UUID destinationUuid, final long deviceId) { try {
clearQueueTimer.record(() -> remove(destinationUuid, destinationDevice, staleEphemeralMessageGuids);
removeQueueScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, deviceId), staleEphemeralMessagesCounter.increment(staleEphemeralMessageGuids.size());
getMessageQueueMetadataKey(destinationUuid, deviceId), } catch (final Throwable e) {
getQueueIndexKey(destinationUuid, deviceId)), logger.warn("Could not remove stale ephemeral messages from cache", e);
Collections.emptyList())); }
}
int getNextSlotToPersist() { return messageEntities;
return (int)(readDeleteCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT); });
} }
List<String> getQueuesToPersist(final int slot, final Instant maxTime, final int limit) { @VisibleForTesting
//noinspection unchecked List<MessageProtos.Envelope> getMessagesToPersist(final UUID accountUuid, final long destinationDevice,
return getQueuesToPersistTimer.record(() -> (List<String>)getQueuesToPersistScript.execute(List.of(new String(getQueueIndexKey(slot), StandardCharsets.UTF_8)), final int limit) {
List.of(String.valueOf(maxTime.toEpochMilli()), return getMessagesTimer.record(() -> {
String.valueOf(limit)))); final List<ScoredValue<byte[]>> scoredMessages = readDeleteCluster.withBinaryCluster(
} connection -> connection.sync()
.zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit));
final List<MessageProtos.Envelope> envelopes = new ArrayList<>(scoredMessages.size());
void addQueueToPersist(final UUID accountUuid, final long deviceId) { for (final ScoredValue<byte[]> scoredMessage : scoredMessages) {
readDeleteCluster.useBinaryCluster(connection -> connection.sync().zadd(getQueueIndexKey(accountUuid, deviceId), ZAddArgs.Builder.nx(), System.currentTimeMillis(), getMessageQueueKey(accountUuid, deviceId))); try {
} envelopes.add(MessageProtos.Envelope.parseFrom(scoredMessage.getValue()));
} catch (InvalidProtocolBufferException e) {
void lockQueueForPersistence(final UUID accountUuid, final long deviceId) { logger.warn("Failed to parse envelope", e);
readDeleteCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE));
}
void unlockQueueForPersistence(final UUID accountUuid, final long deviceId) {
readDeleteCluster.useBinaryCluster(connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId)));
}
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
final String queueName = getQueueName(destinationUuid, deviceId);
synchronized (messageListenersByQueueName) {
messageListenersByQueueName.put(queueName, listener);
queueNamesByMessageListener.put(listener, queueName);
} }
}
subscribeForKeyspaceNotifications(queueName); return envelopes;
});
}
public void clear(final UUID destinationUuid) {
// TODO Remove null check in a fully UUID-based world
if (destinationUuid != null) {
for (int i = 1; i < 256; i++) {
clear(destinationUuid, i);
}
}
}
public void clear(final UUID destinationUuid, final long deviceId) {
clearQueueTimer.record(() ->
removeQueueScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, deviceId),
getMessageQueueMetadataKey(destinationUuid, deviceId),
getQueueIndexKey(destinationUuid, deviceId)),
Collections.emptyList()));
}
int getNextSlotToPersist() {
return (int) (readDeleteCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY))
% SlotHash.SLOT_COUNT);
}
List<String> getQueuesToPersist(final int slot, final Instant maxTime, final int limit) {
//noinspection unchecked
return getQueuesToPersistTimer.record(() -> (List<String>) getQueuesToPersistScript.execute(
List.of(new String(getQueueIndexKey(slot), StandardCharsets.UTF_8)),
List.of(String.valueOf(maxTime.toEpochMilli()),
String.valueOf(limit))));
}
void addQueueToPersist(final UUID accountUuid, final long deviceId) {
readDeleteCluster.useBinaryCluster(connection -> connection.sync()
.zadd(getQueueIndexKey(accountUuid, deviceId), ZAddArgs.Builder.nx(), System.currentTimeMillis(),
getMessageQueueKey(accountUuid, deviceId)));
}
void lockQueueForPersistence(final UUID accountUuid, final long deviceId) {
readDeleteCluster.useBinaryCluster(
connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE));
}
void unlockQueueForPersistence(final UUID accountUuid, final long deviceId) {
readDeleteCluster.useBinaryCluster(
connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId)));
}
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId,
final MessageAvailabilityListener listener) {
final String queueName = getQueueName(destinationUuid, deviceId);
synchronized (messageListenersByQueueName) {
messageListenersByQueueName.put(queueName, listener);
queueNamesByMessageListener.put(listener, queueName);
} }
public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) { subscribeForKeyspaceNotifications(queueName);
final String queueName = queueNamesByMessageListener.remove(listener); }
unsubscribeFromKeyspaceNotifications(queueName); public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) {
final String queueName = queueNamesByMessageListener.remove(listener);
synchronized (messageListenersByQueueName) { unsubscribeFromKeyspaceNotifications(queueName);
if (queueName != null) {
messageListenersByQueueName.remove(queueName); synchronized (messageListenersByQueueName) {
} if (queueName != null) {
messageListenersByQueueName.remove(queueName);
}
}
}
private void subscribeForKeyspaceNotifications(final String queueName) {
final int slot = SlotHash.getSlot(queueName);
pubSubConnection.usePubSubConnection(
connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot))
.commands()
.subscribe(getKeyspaceChannels(queueName)));
}
private void unsubscribeFromKeyspaceNotifications(final String queueName) {
pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream()
.commands()
.unsubscribe(getKeyspaceChannels(queueName)));
}
private static String[] getKeyspaceChannels(final String queueName) {
return new String[]{
QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}",
PERSISTING_KEYSPACE_PREFIX + "{" + queueName + "}"
};
}
@Override
public void message(final RedisClusterNode node, final String channel, final String message) {
pubSubMessageCounter.increment();
if (channel.startsWith(QUEUE_KEYSPACE_PREFIX) && "zadd".equals(message)) {
newMessageNotificationCounter.increment();
notificationExecutorService.execute(() -> {
try {
findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable);
} catch (final Exception e) {
logger.warn("Unexpected error handling new message", e);
} }
} });
} else if (channel.startsWith(PERSISTING_KEYSPACE_PREFIX) && "del".equals(message)) {
private void subscribeForKeyspaceNotifications(final String queueName) { queuePersistedNotificationCounter.increment();
final int slot = SlotHash.getSlot(queueName); notificationExecutorService.execute(() -> {
try {
pubSubConnection.usePubSubConnection(connection -> connection.sync().nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)) findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted);
.commands() } catch (final Exception e) {
.subscribe(getKeyspaceChannels(queueName))); logger.warn("Unexpected error handling messages persisted", e);
}
private void unsubscribeFromKeyspaceNotifications(final String queueName) {
pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream()
.commands()
.unsubscribe(getKeyspaceChannels(queueName)));
}
private static String[] getKeyspaceChannels(final String queueName) {
return new String[] {
QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}",
PERSISTING_KEYSPACE_PREFIX + "{" + queueName + "}"
};
}
@Override
public void message(final RedisClusterNode node, final String channel, final String message) {
pubSubMessageCounter.increment();
if (channel.startsWith(QUEUE_KEYSPACE_PREFIX) && "zadd".equals(message)) {
newMessageNotificationCounter.increment();
notificationExecutorService.execute(() -> {
try {
findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable);
} catch (final Exception e) {
logger.warn("Unexpected error handling new message", e);
}
});
} else if (channel.startsWith(PERSISTING_KEYSPACE_PREFIX) && "del".equals(message)) {
queuePersistedNotificationCounter.increment();
notificationExecutorService.execute(() -> {
try {
findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted);
} catch (final Exception e) {
logger.warn("Unexpected error handling messages persisted", e);
}
});
} }
});
} }
}
private Optional<MessageAvailabilityListener> findListener(final String keyspaceChannel) { private Optional<MessageAvailabilityListener> findListener(final String keyspaceChannel) {
final String queueName = getQueueNameFromKeyspaceChannel(keyspaceChannel); final String queueName = getQueueNameFromKeyspaceChannel(keyspaceChannel);
synchronized (messageListenersByQueueName) { synchronized (messageListenersByQueueName) {
return Optional.ofNullable(messageListenersByQueueName.get(queueName)); return Optional.ofNullable(messageListenersByQueueName.get(queueName));
}
} }
}
@VisibleForTesting @VisibleForTesting
static OutgoingMessageEntity constructEntityFromEnvelope(long id, MessageProtos.Envelope envelope) { static OutgoingMessageEntity constructEntityFromEnvelope(long id, MessageProtos.Envelope envelope) {
return new OutgoingMessageEntity(id, true, return new OutgoingMessageEntity(id, true,
envelope.hasServerGuid() ? UUID.fromString(envelope.getServerGuid()) : null, envelope.hasServerGuid() ? UUID.fromString(envelope.getServerGuid()) : null,
envelope.getType().getNumber(), envelope.getType().getNumber(),
envelope.getRelay(), envelope.getRelay(),
envelope.getTimestamp(), envelope.getTimestamp(),
envelope.getSource(), envelope.getSource(),
envelope.hasSourceUuid() ? UUID.fromString(envelope.getSourceUuid()) : null, envelope.hasSourceUuid() ? UUID.fromString(envelope.getSourceUuid()) : null,
envelope.getSourceDevice(), envelope.getSourceDevice(),
envelope.hasLegacyMessage() ? envelope.getLegacyMessage().toByteArray() : null, envelope.hasLegacyMessage() ? envelope.getLegacyMessage().toByteArray() : null,
envelope.hasContent() ? envelope.getContent().toByteArray() : null, envelope.hasContent() ? envelope.getContent().toByteArray() : null,
envelope.hasServerTimestamp() ? envelope.getServerTimestamp() : 0); envelope.hasServerTimestamp() ? envelope.getServerTimestamp() : 0);
} }
@VisibleForTesting @VisibleForTesting
static String getQueueName(final UUID accountUuid, final long deviceId) { static String getQueueName(final UUID accountUuid, final long deviceId) {
return accountUuid + "::" + deviceId; return accountUuid + "::" + deviceId;
} }
@VisibleForTesting @VisibleForTesting
static String getQueueNameFromKeyspaceChannel(final String channel) { static String getQueueNameFromKeyspaceChannel(final String channel) {
final int startOfHashTag = channel.indexOf('{'); final int startOfHashTag = channel.indexOf('{');
final int endOfHashTag = channel.lastIndexOf('}'); final int endOfHashTag = channel.lastIndexOf('}');
return channel.substring(startOfHashTag + 1, endOfHashTag); return channel.substring(startOfHashTag + 1, endOfHashTag);
} }
@VisibleForTesting @VisibleForTesting
static byte[] getMessageQueueKey(final UUID accountUuid, final long deviceId) { static byte[] getMessageQueueKey(final UUID accountUuid, final long deviceId) {
return ("user_queue::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); return ("user_queue::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
} }
private static byte[] getMessageQueueMetadataKey(final UUID accountUuid, final long deviceId) { private static byte[] getMessageQueueMetadataKey(final UUID accountUuid, final long deviceId) {
return ("user_queue_metadata::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); return ("user_queue_metadata::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
} }
private static byte[] getQueueIndexKey(final UUID accountUuid, final long deviceId) { private static byte[] getQueueIndexKey(final UUID accountUuid, final long deviceId) {
return getQueueIndexKey(SlotHash.getSlot(accountUuid.toString() + "::" + deviceId)); return getQueueIndexKey(SlotHash.getSlot(accountUuid.toString() + "::" + deviceId));
} }
private static byte[] getQueueIndexKey(final int slot) { private static byte[] getQueueIndexKey(final int slot) {
return ("user_queue_index::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}").getBytes(StandardCharsets.UTF_8); return ("user_queue_index::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}").getBytes(StandardCharsets.UTF_8);
} }
private static byte[] getPersistInProgressKey(final UUID accountUuid, final long deviceId) { private static byte[] getPersistInProgressKey(final UUID accountUuid, final long deviceId) {
return ("user_queue_persisting::{" + accountUuid + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); return ("user_queue_persisting::{" + accountUuid + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
} }
static UUID getAccountUuidFromQueueName(final String queueName) { static UUID getAccountUuidFromQueueName(final String queueName) {
final int startOfHashTag = queueName.indexOf('{'); final int startOfHashTag = queueName.indexOf('{');
return UUID.fromString(queueName.substring(startOfHashTag + 1, queueName.indexOf("::", startOfHashTag))); return UUID.fromString(queueName.substring(startOfHashTag + 1, queueName.indexOf("::", startOfHashTag)));
} }
static long getDeviceIdFromQueueName(final String queueName) { static long getDeviceIdFromQueueName(final String queueName) {
return Long.parseLong(queueName.substring(queueName.lastIndexOf("::") + 2, queueName.lastIndexOf('}'))); return Long.parseLong(queueName.substring(queueName.lastIndexOf("::") + 2, queueName.lastIndexOf('}')));
} }
} }

View File

@ -37,296 +37,312 @@ import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
@RunWith(JUnitParamsRunner.class) @RunWith(JUnitParamsRunner.class)
public class MessagesCacheTest extends AbstractRedisClusterTest { public class MessagesCacheTest extends AbstractRedisClusterTest {
private ExecutorService notificationExecutorService; private ExecutorService notificationExecutorService;
private MessagesCache messagesCache; private MessagesCache messagesCache;
private final Random random = new Random(); private final Random random = new Random();
private long serialTimestamp = 0; private long serialTimestamp = 0;
private static final UUID DESTINATION_UUID = UUID.randomUUID(); private static final UUID DESTINATION_UUID = UUID.randomUUID();
private static final int DESTINATION_DEVICE_ID = 7; private static final int DESTINATION_DEVICE_ID = 7;
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
getRedisCluster().useCluster(connection -> connection.sync().upstream().commands().configSet("notify-keyspace-events", "Klgz")); getRedisCluster().useCluster(
connection -> connection.sync().upstream().commands().configSet("notify-keyspace-events", "Klgz"));
notificationExecutorService = Executors.newSingleThreadExecutor(); notificationExecutorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService); messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService);
messagesCache.start(); messagesCache.start();
}
@Override
public void tearDown() throws Exception {
messagesCache.stop();
notificationExecutorService.shutdown();
notificationExecutorService.awaitTermination(1, TimeUnit.SECONDS);
super.tearDown();
}
@Test
@Parameters({"true", "false"})
public void testInsert(final boolean sealedSender) {
final UUID messageGuid = UUID.randomUUID();
assertTrue(messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID,
generateRandomMessage(messageGuid, sealedSender)) > 0);
}
@Test
public void testDoubleInsertGuid() {
final UUID duplicateGuid = UUID.randomUUID();
final MessageProtos.Envelope duplicateMessage = generateRandomMessage(duplicateGuid, false);
final long firstId = messagesCache.insert(duplicateGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, duplicateMessage);
final long secondId = messagesCache.insert(duplicateGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID,
duplicateMessage);
assertEquals(firstId, secondId);
}
@Test
@Parameters({"true", "false"})
public void testRemoveByUUID(final boolean sealedSender) {
final UUID messageGuid = UUID.randomUUID();
assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid));
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_UUID,
DESTINATION_DEVICE_ID, messageGuid);
assertTrue(maybeRemovedMessage.isPresent());
assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get());
}
@Test
@Parameters({"true", "false"})
public void testRemoveBatchByUUID(final boolean sealedSender) {
final int messageCount = 10;
final List<MessageProtos.Envelope> messagesToRemove = new ArrayList<>(messageCount);
final List<MessageProtos.Envelope> messagesToPreserve = new ArrayList<>(messageCount);
for (int i = 0; i < 10; i++) {
messagesToRemove.add(generateRandomMessage(UUID.randomUUID(), sealedSender));
messagesToPreserve.add(generateRandomMessage(UUID.randomUUID(), sealedSender));
} }
@Override assertEquals(Collections.emptyList(), messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID,
public void tearDown() throws Exception { messagesToRemove.stream().map(message -> UUID.fromString(message.getServerGuid()))
messagesCache.stop(); .collect(Collectors.toList())));
notificationExecutorService.shutdown(); for (final MessageProtos.Envelope message : messagesToRemove) {
notificationExecutorService.awaitTermination(1, TimeUnit.SECONDS); messagesCache.insert(UUID.fromString(message.getServerGuid()), DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
super.tearDown();
} }
@Test for (final MessageProtos.Envelope message : messagesToPreserve) {
@Parameters({"true", "false"}) messagesCache.insert(UUID.fromString(message.getServerGuid()), DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
public void testInsert(final boolean sealedSender) { }
final List<OutgoingMessageEntity> removedMessages = messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID,
messagesToRemove.stream().map(message -> UUID.fromString(message.getServerGuid()))
.collect(Collectors.toList()));
assertEquals(messagesToRemove.stream().map(message -> MessagesCache.constructEntityFromEnvelope(0, message))
.collect(Collectors.toList()),
removedMessages);
assertEquals(messagesToPreserve,
messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
}
@Test
public void testHasMessages() {
assertFalse(messagesCache.hasMessages(DESTINATION_UUID, DESTINATION_DEVICE_ID));
final UUID messageGuid = UUID.randomUUID();
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, true);
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
assertTrue(messagesCache.hasMessages(DESTINATION_UUID, DESTINATION_DEVICE_ID));
}
@Test
@Parameters({"true", "false"})
public void testGetMessages(final boolean sealedSender) {
final int messageCount = 100;
final List<OutgoingMessageEntity> expectedMessages = new ArrayList<>(messageCount);
for (int i = 0; i < messageCount; i++) {
final UUID messageGuid = UUID.randomUUID();
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
final long messageId = messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
expectedMessages.add(MessagesCache.constructEntityFromEnvelope(messageId, message));
}
assertEquals(expectedMessages, messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
}
@Test
@Parameters({"true", "false"})
public void testClearQueueForDevice(final boolean sealedSender) {
final int messageCount = 100;
for (final int deviceId : new int[]{DESTINATION_DEVICE_ID, DESTINATION_DEVICE_ID + 1}) {
for (int i = 0; i < messageCount; i++) {
final UUID messageGuid = UUID.randomUUID(); final UUID messageGuid = UUID.randomUUID();
assertTrue(messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, sealedSender)) > 0);
}
@Test
public void testDoubleInsertGuid() {
final UUID duplicateGuid = UUID.randomUUID();
final MessageProtos.Envelope duplicateMessage = generateRandomMessage(duplicateGuid, false);
final long firstId = messagesCache.insert(duplicateGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, duplicateMessage);
final long secondId = messagesCache.insert(duplicateGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, duplicateMessage);
assertEquals(firstId, secondId);
}
@Test
@Parameters({"true", "false"})
public void testRemoveByUUID(final boolean sealedSender) {
final UUID messageGuid = UUID.randomUUID();
assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid));
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender); final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message); messagesCache.insert(messageGuid, DESTINATION_UUID, deviceId, message);
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid); }
assertTrue(maybeRemovedMessage.isPresent());
assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get());
} }
@Test messagesCache.clear(DESTINATION_UUID, DESTINATION_DEVICE_ID);
@Parameters({"true", "false"})
public void testRemoveBatchByUUID(final boolean sealedSender) {
final int messageCount = 10;
final List<MessageProtos.Envelope> messagesToRemove = new ArrayList<>(messageCount); assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
final List<MessageProtos.Envelope> messagesToPreserve = new ArrayList<>(messageCount); assertEquals(messageCount, messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount).size());
}
for (int i = 0; i < 10; i++) { @Test
messagesToRemove.add(generateRandomMessage(UUID.randomUUID(), sealedSender)); @Parameters({"true", "false"})
messagesToPreserve.add(generateRandomMessage(UUID.randomUUID(), sealedSender)); public void testClearQueueForAccount(final boolean sealedSender) {
} final int messageCount = 100;
assertEquals(Collections.emptyList(), messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, for (final int deviceId : new int[]{DESTINATION_DEVICE_ID, DESTINATION_DEVICE_ID + 1}) {
messagesToRemove.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList()))); for (int i = 0; i < messageCount; i++) {
final UUID messageGuid = UUID.randomUUID();
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
for (final MessageProtos.Envelope message : messagesToRemove) { messagesCache.insert(messageGuid, DESTINATION_UUID, deviceId, message);
messagesCache.insert(UUID.fromString(message.getServerGuid()), DESTINATION_UUID, DESTINATION_DEVICE_ID, message); }
}
for (final MessageProtos.Envelope message : messagesToPreserve) {
messagesCache.insert(UUID.fromString(message.getServerGuid()), DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
}
final List<OutgoingMessageEntity> removedMessages = messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID,
messagesToRemove.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList()));
assertEquals(messagesToRemove.stream().map(message -> MessagesCache.constructEntityFromEnvelope(0, message)).collect(Collectors.toList()),
removedMessages);
assertEquals(messagesToPreserve, messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
} }
@Test messagesCache.clear(DESTINATION_UUID);
public void testHasMessages() {
assertFalse(messagesCache.hasMessages(DESTINATION_UUID, DESTINATION_DEVICE_ID));
final UUID messageGuid = UUID.randomUUID(); assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, true); assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount));
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message); }
assertTrue(messagesCache.hasMessages(DESTINATION_UUID, DESTINATION_DEVICE_ID)); private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final boolean sealedSender) {
return generateRandomMessage(messageGuid, sealedSender, serialTimestamp++);
}
private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final boolean sealedSender,
final long timestamp) {
final MessageProtos.Envelope.Builder envelopeBuilder = MessageProtos.Envelope.newBuilder()
.setTimestamp(timestamp)
.setServerTimestamp(timestamp)
.setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256)))
.setType(MessageProtos.Envelope.Type.CIPHERTEXT)
.setServerGuid(messageGuid.toString());
if (!sealedSender) {
envelopeBuilder.setSourceDevice(random.nextInt(256))
.setSource("+1" + RandomStringUtils.randomNumeric(10));
} }
@Test return envelopeBuilder.build();
@Parameters({"true", "false"}) }
public void testGetMessages(final boolean sealedSender) {
final int messageCount = 100;
final List<OutgoingMessageEntity> expectedMessages = new ArrayList<>(messageCount); @Test
public void testClearNullUuid() {
// We're happy as long as this doesn't throw an exception
messagesCache.clear(null);
}
for (int i = 0; i < messageCount; i++) { @Test
final UUID messageGuid = UUID.randomUUID(); public void testGetAccountFromQueueName() {
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender); assertEquals(DESTINATION_UUID,
final long messageId = messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message); MessagesCache.getAccountUuidFromQueueName(
new String(MessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID),
StandardCharsets.UTF_8)));
}
expectedMessages.add(MessagesCache.constructEntityFromEnvelope(messageId, message)); @Test
} public void testGetDeviceIdFromQueueName() {
assertEquals(DESTINATION_DEVICE_ID,
MessagesCache.getDeviceIdFromQueueName(
new String(MessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID),
StandardCharsets.UTF_8)));
}
assertEquals(expectedMessages, messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount)); @Test
} public void testGetQueueNameFromKeyspaceChannel() {
assertEquals("1b363a31-a429-4fb6-8959-984a025e72ff::7",
MessagesCache.getQueueNameFromKeyspaceChannel(
"__keyspace@0__:user_queue::{1b363a31-a429-4fb6-8959-984a025e72ff::7}"));
}
@Test @Test
@Parameters({"true", "false"}) @Parameters({"true", "false"})
public void testClearQueueForDevice(final boolean sealedSender) { public void testGetQueuesToPersist(final boolean sealedSender) {
final int messageCount = 100; final UUID messageGuid = UUID.randomUUID();
for (final int deviceId : new int[] { DESTINATION_DEVICE_ID, DESTINATION_DEVICE_ID + 1 }) { messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID,
for (int i = 0; i < messageCount; i++) { generateRandomMessage(messageGuid, sealedSender));
final UUID messageGuid = UUID.randomUUID(); final int slot = SlotHash.getSlot(DESTINATION_UUID.toString() + "::" + DESTINATION_DEVICE_ID);
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
messagesCache.insert(messageGuid, DESTINATION_UUID, deviceId, message); assertTrue(messagesCache.getQueuesToPersist(slot + 1, Instant.now().plusSeconds(60), 100).isEmpty());
}
}
messagesCache.clear(DESTINATION_UUID, DESTINATION_DEVICE_ID); final List<String> queues = messagesCache.getQueuesToPersist(slot, Instant.now().plusSeconds(60), 100);
assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount)); assertEquals(1, queues.size());
assertEquals(messageCount, messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount).size()); assertEquals(DESTINATION_UUID, MessagesCache.getAccountUuidFromQueueName(queues.get(0)));
} assertEquals(DESTINATION_DEVICE_ID, MessagesCache.getDeviceIdFromQueueName(queues.get(0)));
}
@Test @Test(timeout = 5_000L)
@Parameters({"true", "false"}) public void testNotifyListenerNewMessage() throws InterruptedException {
public void testClearQueueForAccount(final boolean sealedSender) { final AtomicBoolean notified = new AtomicBoolean(false);
final int messageCount = 100; final UUID messageGuid = UUID.randomUUID();
for (final int deviceId : new int[] { DESTINATION_DEVICE_ID, DESTINATION_DEVICE_ID + 1 }) {
for (int i = 0; i < messageCount; i++) {
final UUID messageGuid = UUID.randomUUID();
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
messagesCache.insert(messageGuid, DESTINATION_UUID, deviceId, message);
}
}
messagesCache.clear(DESTINATION_UUID);
assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount));
}
private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final boolean sealedSender) {
return generateRandomMessage(messageGuid, sealedSender, serialTimestamp++);
}
private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final boolean sealedSender, final long timestamp) {
final MessageProtos.Envelope.Builder envelopeBuilder = MessageProtos.Envelope.newBuilder()
.setTimestamp(timestamp)
.setServerTimestamp(timestamp)
.setContent(ByteString.copyFromUtf8(RandomStringUtils.randomAlphanumeric(256)))
.setType(MessageProtos.Envelope.Type.CIPHERTEXT)
.setServerGuid(messageGuid.toString());
if (!sealedSender) {
envelopeBuilder.setSourceDevice(random.nextInt(256))
.setSource("+1" + RandomStringUtils.randomNumeric(10));
}
return envelopeBuilder.build();
}
@Test
public void testClearNullUuid() {
// We're happy as long as this doesn't throw an exception
messagesCache.clear(null);
}
@Test
public void testGetAccountFromQueueName() {
assertEquals(DESTINATION_UUID,
MessagesCache.getAccountUuidFromQueueName(new String(MessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8)));
}
@Test
public void testGetDeviceIdFromQueueName() {
assertEquals(DESTINATION_DEVICE_ID,
MessagesCache.getDeviceIdFromQueueName(new String(MessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8)));
}
@Test
public void testGetQueueNameFromKeyspaceChannel() {
assertEquals("1b363a31-a429-4fb6-8959-984a025e72ff::7",
MessagesCache.getQueueNameFromKeyspaceChannel("__keyspace@0__:user_queue::{1b363a31-a429-4fb6-8959-984a025e72ff::7}"));
}
@Test
@Parameters({"true", "false"})
public void testGetQueuesToPersist(final boolean sealedSender) {
final UUID messageGuid = UUID.randomUUID();
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, sealedSender));
final int slot = SlotHash.getSlot(DESTINATION_UUID.toString() + "::" + DESTINATION_DEVICE_ID);
assertTrue(messagesCache.getQueuesToPersist(slot + 1, Instant.now().plusSeconds(60), 100).isEmpty());
final List<String> queues = messagesCache.getQueuesToPersist(slot, Instant.now().plusSeconds(60), 100);
assertEquals(1, queues.size());
assertEquals(DESTINATION_UUID, MessagesCache.getAccountUuidFromQueueName(queues.get(0)));
assertEquals(DESTINATION_DEVICE_ID, MessagesCache.getDeviceIdFromQueueName(queues.get(0)));
}
@Test(timeout = 5_000L)
public void testNotifyListenerNewMessage() throws InterruptedException {
final AtomicBoolean notified = new AtomicBoolean(false);
final UUID messageGuid = UUID.randomUUID();
final MessageAvailabilityListener listener = new MessageAvailabilityListener() {
@Override
public void handleNewMessagesAvailable() {
synchronized (notified) {
notified.set(true);
notified.notifyAll();
}
}
@Override
public void handleMessagesPersisted() {
}
};
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, true));
final MessageAvailabilityListener listener = new MessageAvailabilityListener() {
@Override
public void handleNewMessagesAvailable() {
synchronized (notified) { synchronized (notified) {
while (!notified.get()) { notified.set(true);
notified.wait(); notified.notifyAll();
}
} }
}
assertTrue(notified.get()); @Override
public void handleMessagesPersisted() {
}
};
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID,
generateRandomMessage(messageGuid, true));
synchronized (notified) {
while (!notified.get()) {
notified.wait();
}
} }
@Test(timeout = 5_000L) assertTrue(notified.get());
public void testNotifyListenerPersisted() throws InterruptedException { }
final AtomicBoolean notified = new AtomicBoolean(false);
final MessageAvailabilityListener listener = new MessageAvailabilityListener() { @Test(timeout = 5_000L)
@Override public void testNotifyListenerPersisted() throws InterruptedException {
public void handleNewMessagesAvailable() { final AtomicBoolean notified = new AtomicBoolean(false);
}
@Override final MessageAvailabilityListener listener = new MessageAvailabilityListener() {
public void handleMessagesPersisted() { @Override
synchronized (notified) { public void handleNewMessagesAvailable() {
notified.set(true); }
notified.notifyAll();
}
}
};
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
messagesCache.lockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID);
messagesCache.unlockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID);
@Override
public void handleMessagesPersisted() {
synchronized (notified) { synchronized (notified) {
while (!notified.get()) { notified.set(true);
notified.wait(); notified.notifyAll();
}
} }
}
};
assertTrue(notified.get()); messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
messagesCache.lockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID);
messagesCache.unlockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID);
synchronized (notified) {
while (!notified.get()) {
notified.wait();
}
} }
assertTrue(notified.get());
}
} }