Jettison UUID-or-E164 plumbing in favor of UUID-only.
This commit is contained in:
parent
8356264fe0
commit
1d5087374e
|
@ -103,7 +103,7 @@ public class WebsocketSender {
|
||||||
|
|
||||||
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
|
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
|
||||||
|
|
||||||
messagesManager.insert(account.getNumber(), account.getUuid(), device.getId(), message);
|
messagesManager.insert(account.getUuid(), device.getId(), message);
|
||||||
pubSubManager.publish(address, PubSubMessage.newBuilder()
|
pubSubManager.publish(address, PubSubMessage.newBuilder()
|
||||||
.setType(PubSubMessage.Type.QUERY_DB)
|
.setType(PubSubMessage.Type.QUERY_DB)
|
||||||
.build());
|
.build());
|
||||||
|
|
|
@ -123,7 +123,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public long insert(final UUID guid, final String destination, final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message) {
|
public long insert(final UUID guid, final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message) {
|
||||||
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
|
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
|
||||||
final String sender = message.hasSource() ? (message.getSource() + "::" + message.getTimestamp()) : "nil";
|
final String sender = message.hasSource() ? (message.getSource() + "::" + message.getTimestamp()) : "nil";
|
||||||
|
|
||||||
|
@ -137,7 +137,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
guid.toString().getBytes(StandardCharsets.UTF_8))));
|
guid.toString().getBytes(StandardCharsets.UTF_8))));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> remove(final String destination, final UUID destinationUuid, final long destinationDevice, final long id) {
|
public Optional<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice, final long id) {
|
||||||
try {
|
try {
|
||||||
final byte[] serialized = (byte[])Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_ID).record(() ->
|
final byte[] serialized = (byte[])Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_ID).record(() ->
|
||||||
removeByIdScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
removeByIdScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||||
|
@ -156,7 +156,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> remove(final String destination, final UUID destinationUuid, final long destinationDevice, final String sender, final long timestamp) {
|
public Optional<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice, final String sender, final long timestamp) {
|
||||||
try {
|
try {
|
||||||
final byte[] serialized = (byte[])Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_SENDER).record(() ->
|
final byte[] serialized = (byte[])Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_SENDER).record(() ->
|
||||||
removeBySenderScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
removeBySenderScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||||
|
@ -174,7 +174,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> remove(final String destination, final UUID destinationUuid, final long destinationDevice, final UUID messageGuid) {
|
public Optional<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice, final UUID messageGuid) {
|
||||||
try {
|
try {
|
||||||
final byte[] serialized = (byte[])Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_UUID).record(() ->
|
final byte[] serialized = (byte[])Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_UUID).record(() ->
|
||||||
removeByGuidScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
removeByGuidScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||||
|
@ -193,7 +193,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public List<OutgoingMessageEntity> get(final String destination, final UUID destinationUuid, final long destinationDevice, final int limit) {
|
public List<OutgoingMessageEntity> get(final UUID destinationUuid, final long destinationDevice, final int limit) {
|
||||||
return getMessagesTimer.record(() -> {
|
return getMessagesTimer.record(() -> {
|
||||||
final List<byte[]> queueItems = (List<byte[]>)getItemsScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
final List<byte[]> queueItems = (List<byte[]>)getItemsScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||||
getPersistInProgressKey(destinationUuid, destinationDevice)),
|
getPersistInProgressKey(destinationUuid, destinationDevice)),
|
||||||
|
@ -252,16 +252,16 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear(final String destination, final UUID destinationUuid) {
|
public void clear(final UUID destinationUuid) {
|
||||||
// TODO Remove null check in a fully UUID-based world
|
// TODO Remove null check in a fully UUID-based world
|
||||||
if (destinationUuid != null) {
|
if (destinationUuid != null) {
|
||||||
for (int i = 1; i < 256; i++) {
|
for (int i = 1; i < 256; i++) {
|
||||||
clear(destination, destinationUuid, i);
|
clear(destinationUuid, i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear(final String destination, final UUID destinationUuid, final long deviceId) {
|
public void clear(final UUID destinationUuid, final long deviceId) {
|
||||||
clearQueueTimer.record(() ->
|
clearQueueTimer.record(() ->
|
||||||
removeQueueScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, deviceId),
|
removeQueueScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, deviceId),
|
||||||
getMessageQueueMetadataKey(destinationUuid, deviceId),
|
getMessageQueueMetadataKey(destinationUuid, deviceId),
|
||||||
|
|
|
@ -32,13 +32,13 @@ public class MessagesManager {
|
||||||
private final PushLatencyManager pushLatencyManager;
|
private final PushLatencyManager pushLatencyManager;
|
||||||
|
|
||||||
public MessagesManager(Messages messages, MessagesCache messagesCache, PushLatencyManager pushLatencyManager) {
|
public MessagesManager(Messages messages, MessagesCache messagesCache, PushLatencyManager pushLatencyManager) {
|
||||||
this.messages = messages;
|
this.messages = messages;
|
||||||
this.messagesCache = messagesCache;
|
this.messagesCache = messagesCache;
|
||||||
this.pushLatencyManager = pushLatencyManager;
|
this.pushLatencyManager = pushLatencyManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) {
|
public void insert(UUID destinationUuid, long destinationDevice, Envelope message) {
|
||||||
messagesCache.insert(UUID.randomUUID(), destination, destinationUuid, destinationDevice, message);
|
messagesCache.insert(UUID.randomUUID(), destinationUuid, destinationDevice, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
||||||
|
@ -47,25 +47,33 @@ public class MessagesManager {
|
||||||
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
||||||
|
|
||||||
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
||||||
messages.addAll(messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));
|
messages.addAll(messagesCache.get(destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE);
|
return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear(String destination, UUID destinationUuid) {
|
public void clear(String destination, UUID destinationUuid) {
|
||||||
this.messagesCache.clear(destination, destinationUuid);
|
// TODO Remove this null check in a fully-UUID-ified world
|
||||||
|
if (destinationUuid != null) {
|
||||||
|
this.messagesCache.clear(destinationUuid);
|
||||||
|
}
|
||||||
|
|
||||||
this.messages.clear(destination);
|
this.messages.clear(destination);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear(String destination, UUID destinationUuid, long deviceId) {
|
public void clear(String destination, UUID destinationUuid, long deviceId) {
|
||||||
this.messagesCache.clear(destination, destinationUuid, deviceId);
|
// TODO Remove this null check in a fully-UUID-ified world
|
||||||
|
if (destinationUuid != null) {
|
||||||
|
this.messagesCache.clear(destinationUuid, deviceId);
|
||||||
|
}
|
||||||
|
|
||||||
this.messages.clear(destination, deviceId);
|
this.messages.clear(destination, deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp)
|
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp)
|
||||||
{
|
{
|
||||||
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp);
|
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destinationUuid, destinationDevice, source, timestamp);
|
||||||
|
|
||||||
if (!removed.isPresent()) {
|
if (!removed.isPresent()) {
|
||||||
removed = this.messages.remove(destination, destinationDevice, source, timestamp);
|
removed = this.messages.remove(destination, destinationDevice, source, timestamp);
|
||||||
|
@ -78,7 +86,7 @@ public class MessagesManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
|
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
|
||||||
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destination, destinationUuid, deviceId, guid);
|
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destinationUuid, deviceId, guid);
|
||||||
|
|
||||||
if (!removed.isPresent()) {
|
if (!removed.isPresent()) {
|
||||||
removed = this.messages.remove(destination, guid);
|
removed = this.messages.remove(destination, guid);
|
||||||
|
@ -92,7 +100,7 @@ public class MessagesManager {
|
||||||
|
|
||||||
public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) {
|
public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) {
|
||||||
if (cached) {
|
if (cached) {
|
||||||
messagesCache.remove(destination, destinationUuid, deviceId, id);
|
messagesCache.remove(destinationUuid, deviceId, id);
|
||||||
cacheHitByIdMeter.mark();
|
cacheHitByIdMeter.mark();
|
||||||
} else {
|
} else {
|
||||||
this.messages.remove(destination, id);
|
this.messages.remove(destination, id);
|
||||||
|
@ -102,7 +110,7 @@ public class MessagesManager {
|
||||||
|
|
||||||
public void persistMessage(String destination, UUID destinationUuid, Envelope envelope, UUID messageGuid, long deviceId) {
|
public void persistMessage(String destination, UUID destinationUuid, Envelope envelope, UUID messageGuid, long deviceId) {
|
||||||
messages.store(messageGuid, envelope, destination, deviceId);
|
messages.store(messageGuid, envelope, destination, deviceId);
|
||||||
messagesCache.remove(destination, destinationUuid, deviceId, messageGuid);
|
messagesCache.remove(destinationUuid, deviceId, messageGuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
||||||
|
|
|
@ -45,7 +45,7 @@ public class DeadLetterHandler implements DispatchChannel {
|
||||||
Optional<Account> maybeAccount = accountsManager.get(address.getNumber());
|
Optional<Account> maybeAccount = accountsManager.get(address.getNumber());
|
||||||
|
|
||||||
if (maybeAccount.isPresent()) {
|
if (maybeAccount.isPresent()) {
|
||||||
messagesManager.insert(address.getNumber(), maybeAccount.get().getUuid(), address.getDeviceId(), message);
|
messagesManager.insert(maybeAccount.get().getUuid(), address.getDeviceId(), message);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Dead letter for account that no longer exists: {}", address);
|
logger.warn("Dead letter for account that no longer exists: {}", address);
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
final long deviceId = invocation.getArgument(4, Long.class);
|
final long deviceId = invocation.getArgument(4, Long.class);
|
||||||
|
|
||||||
messagesDatabase.store(messageGuid, message, destination, deviceId);
|
messagesDatabase.store(messageGuid, message, destination, deviceId);
|
||||||
messagesCache.remove(destination, destinationUuid, deviceId, messageGuid);
|
messagesCache.remove(destinationUuid, deviceId, messageGuid);
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}).when(messagesManager).persistMessage(anyString(), any(UUID.class), any(MessageProtos.Envelope.class), any(UUID.class), anyLong());
|
}).when(messagesManager).persistMessage(anyString(), any(UUID.class), any(MessageProtos.Envelope.class), any(UUID.class), anyLong());
|
||||||
|
@ -181,7 +181,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
.setServerGuid(messageGuid.toString())
|
.setServerGuid(messageGuid.toString())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
messagesCache.insert(messageGuid, accountNumber, accountUuid, deviceId, envelope);
|
messagesCache.insert(messageGuid, accountUuid, deviceId, envelope);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,6 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
private final Random random = new Random();
|
private final Random random = new Random();
|
||||||
private long serialTimestamp = 0;
|
private long serialTimestamp = 0;
|
||||||
|
|
||||||
private static final String DESTINATION_ACCOUNT = "+18005551234";
|
|
||||||
private static final UUID DESTINATION_UUID = UUID.randomUUID();
|
private static final UUID DESTINATION_UUID = UUID.randomUUID();
|
||||||
private static final int DESTINATION_DEVICE_ID = 7;
|
private static final int DESTINATION_DEVICE_ID = 7;
|
||||||
|
|
||||||
|
@ -64,12 +63,11 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
super.tearDown();
|
super.tearDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Parameters({"true", "false"})
|
@Parameters({"true", "false"})
|
||||||
public void testInsert(final boolean sealedSender) {
|
public void testInsert(final boolean sealedSender) {
|
||||||
final UUID messageGuid = UUID.randomUUID();
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
assertTrue(messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, sealedSender)) > 0);
|
assertTrue(messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, sealedSender)) > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -78,12 +76,12 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
final UUID messageGuid = UUID.randomUUID();
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
|
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
|
||||||
|
|
||||||
final long messageId = messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
|
final long messageId = messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageId);
|
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageId);
|
||||||
|
|
||||||
assertTrue(maybeRemovedMessage.isPresent());
|
assertTrue(maybeRemovedMessage.isPresent());
|
||||||
assertEquals(MessagesCache.constructEntityFromEnvelope(messageId, message), maybeRemovedMessage.get());
|
assertEquals(MessagesCache.constructEntityFromEnvelope(messageId, message), maybeRemovedMessage.get());
|
||||||
assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageId));
|
assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageId));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -91,12 +89,12 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
final UUID messageGuid = UUID.randomUUID();
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, false);
|
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, false);
|
||||||
|
|
||||||
messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
|
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message.getSource(), message.getTimestamp());
|
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, message.getSource(), message.getTimestamp());
|
||||||
|
|
||||||
assertTrue(maybeRemovedMessage.isPresent());
|
assertTrue(maybeRemovedMessage.isPresent());
|
||||||
assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get());
|
assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get());
|
||||||
assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message.getSource(), message.getTimestamp()));
|
assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, message.getSource(), message.getTimestamp()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -104,12 +102,12 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
public void testRemoveByUUID(final boolean sealedSender) {
|
public void testRemoveByUUID(final boolean sealedSender) {
|
||||||
final UUID messageGuid = UUID.randomUUID();
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
|
|
||||||
assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid));
|
assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid));
|
||||||
|
|
||||||
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
|
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
|
||||||
|
|
||||||
messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
|
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid);
|
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid);
|
||||||
|
|
||||||
assertTrue(maybeRemovedMessage.isPresent());
|
assertTrue(maybeRemovedMessage.isPresent());
|
||||||
assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get());
|
assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get());
|
||||||
|
@ -125,12 +123,12 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
for (int i = 0; i < messageCount; i++) {
|
for (int i = 0; i < messageCount; i++) {
|
||||||
final UUID messageGuid = UUID.randomUUID();
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
|
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
|
||||||
final long messageId = messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
|
final long messageId = messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
|
||||||
|
|
||||||
expectedMessages.add(MessagesCache.constructEntityFromEnvelope(messageId, message));
|
expectedMessages.add(MessagesCache.constructEntityFromEnvelope(messageId, message));
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals(expectedMessages, messagesCache.get(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
|
assertEquals(expectedMessages, messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -143,14 +141,14 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
final UUID messageGuid = UUID.randomUUID();
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
|
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
|
||||||
|
|
||||||
messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, deviceId, message);
|
messagesCache.insert(messageGuid, DESTINATION_UUID, deviceId, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
messagesCache.clear(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID);
|
messagesCache.clear(DESTINATION_UUID, DESTINATION_DEVICE_ID);
|
||||||
|
|
||||||
assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
|
assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
|
||||||
assertEquals(messageCount, messagesCache.get(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount).size());
|
assertEquals(messageCount, messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount).size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -163,14 +161,14 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
final UUID messageGuid = UUID.randomUUID();
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
|
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
|
||||||
|
|
||||||
messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, deviceId, message);
|
messagesCache.insert(messageGuid, DESTINATION_UUID, deviceId, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
messagesCache.clear(DESTINATION_ACCOUNT, DESTINATION_UUID);
|
messagesCache.clear(DESTINATION_UUID);
|
||||||
|
|
||||||
assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
|
assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
|
||||||
assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount));
|
assertEquals(Collections.emptyList(), messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final boolean sealedSender) {
|
protected MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final boolean sealedSender) {
|
||||||
|
@ -192,7 +190,7 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
@Test
|
@Test
|
||||||
public void testClearNullUuid() {
|
public void testClearNullUuid() {
|
||||||
// We're happy as long as this doesn't throw an exception
|
// We're happy as long as this doesn't throw an exception
|
||||||
messagesCache.clear(DESTINATION_ACCOUNT, null);
|
messagesCache.clear(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -218,7 +216,7 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
public void testGetQueuesToPersist(final boolean sealedSender) {
|
public void testGetQueuesToPersist(final boolean sealedSender) {
|
||||||
final UUID messageGuid = UUID.randomUUID();
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
|
|
||||||
messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, sealedSender));
|
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, sealedSender));
|
||||||
final int slot = SlotHash.getSlot(DESTINATION_UUID.toString() + "::" + DESTINATION_DEVICE_ID);
|
final int slot = SlotHash.getSlot(DESTINATION_UUID.toString() + "::" + DESTINATION_DEVICE_ID);
|
||||||
|
|
||||||
assertTrue(messagesCache.getQueuesToPersist(slot + 1, Instant.now().plusSeconds(60), 100).isEmpty());
|
assertTrue(messagesCache.getQueuesToPersist(slot + 1, Instant.now().plusSeconds(60), 100).isEmpty());
|
||||||
|
@ -250,7 +248,7 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
};
|
};
|
||||||
|
|
||||||
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
|
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
|
||||||
messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, true));
|
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, true));
|
||||||
|
|
||||||
synchronized (notified) {
|
synchronized (notified) {
|
||||||
while (!notified.get()) {
|
while (!notified.get()) {
|
||||||
|
|
Loading…
Reference in New Issue