Remove messages from the cache in bulk.

This commit is contained in:
Jon Chambers 2020-09-23 14:10:51 -04:00 committed by Jon Chambers
parent fc71ced660
commit c7230ccbb0
4 changed files with 77 additions and 34 deletions

View File

@ -35,6 +35,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
@ -94,7 +95,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
this.insertScript = ClusterLuaScript.fromResource(redisCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER); this.insertScript = ClusterLuaScript.fromResource(redisCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER);
this.removeByIdScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_id.lua", ScriptOutputType.VALUE); this.removeByIdScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_id.lua", ScriptOutputType.VALUE);
this.removeBySenderScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_sender.lua", ScriptOutputType.VALUE); this.removeBySenderScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_sender.lua", ScriptOutputType.VALUE);
this.removeByGuidScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_guid.lua", ScriptOutputType.VALUE); this.removeByGuidScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_guid.lua", ScriptOutputType.MULTI);
this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.MULTI); this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.MULTI);
this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS); this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS);
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI); this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI);
@ -163,7 +164,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
getQueueIndexKey(destinationUuid, destinationDevice)), getQueueIndexKey(destinationUuid, destinationDevice)),
List.of(String.valueOf(id).getBytes(StandardCharsets.UTF_8)))); List.of(String.valueOf(id).getBytes(StandardCharsets.UTF_8))));
if (serialized != null) { if (serialized != null) {
return Optional.of(constructEntityFromEnvelope(id, MessageProtos.Envelope.parseFrom(serialized))); return Optional.of(constructEntityFromEnvelope(id, MessageProtos.Envelope.parseFrom(serialized)));
} }
@ -193,21 +193,28 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
} }
public Optional<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice, final UUID messageGuid) { public Optional<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice, final UUID messageGuid) {
try { return remove(destinationUuid, destinationDevice, List.of(messageGuid)).stream().findFirst();
final byte[] serialized = (byte[])Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_UUID).record(() -> }
removeByGuidScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
getQueueIndexKey(destinationUuid, destinationDevice)),
List.of(messageGuid.toString().getBytes(StandardCharsets.UTF_8))));
if (serialized != null) { @SuppressWarnings("unchecked")
return Optional.of(constructEntityFromEnvelope(0, MessageProtos.Envelope.parseFrom(serialized))); public List<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice, final List<UUID> messageGuids) {
final List<byte[]> serialized = (List<byte[]>)Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_UUID).record(() ->
removeByGuidScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
getQueueIndexKey(destinationUuid, destinationDevice)),
messageGuids.stream().map(guid -> guid.toString().getBytes(StandardCharsets.UTF_8)).collect(Collectors.toList())));
final List<OutgoingMessageEntity> removedMessages = new ArrayList<>(serialized.size());
for (final byte[] bytes : serialized) {
try {
removedMessages.add(constructEntityFromEnvelope(0, MessageProtos.Envelope.parseFrom(bytes)));
} catch (final InvalidProtocolBufferException e) {
logger.warn("Failed to parse envelope", e);
} }
} catch (final InvalidProtocolBufferException e) {
logger.warn("Failed to parse envelope", e);
} }
return Optional.empty(); return removedMessages;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -15,6 +15,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
@ -119,10 +120,7 @@ public class MessagesManager {
public void persistMessages(final String destination, final UUID destinationUuid, final long destinationDeviceId, final List<Envelope> messages) { public void persistMessages(final String destination, final UUID destinationUuid, final long destinationDeviceId, final List<Envelope> messages) {
this.messages.store(messages, destination, destinationDeviceId); this.messages.store(messages, destination, destinationDeviceId);
messagesCache.remove(destinationUuid, destinationDeviceId, messages.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList()));
for (final Envelope message : messages) {
messagesCache.remove(destinationUuid, destinationDeviceId, UUID.fromString(message.getServerGuid()));
}
} }
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) { public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {

View File

@ -1,28 +1,32 @@
-- keys: queue_key, queue_metadata_key, queue_index -- keys: queue_key, queue_metadata_key, queue_index
-- argv: guid_to_remove -- argv: guid_to_remove
local messageId = redis.call("HGET", KEYS[2], ARGV[1]) local removedMessages = {}
if messageId then for _, guid in ipairs(ARGV) do
local envelope = redis.call("ZRANGEBYSCORE", KEYS[1], messageId, messageId, "LIMIT", 0, 1) local messageId = redis.call("HGET", KEYS[2], guid)
local sender = redis.call("HGET", KEYS[2], messageId)
redis.call("ZREMRANGEBYSCORE", KEYS[1], messageId, messageId) if messageId then
redis.call("HDEL", KEYS[2], ARGV[1]) local envelope = redis.call("ZRANGEBYSCORE", KEYS[1], messageId, messageId, "LIMIT", 0, 1)
redis.call("HDEL", KEYS[2], messageId .. "guid") local sender = redis.call("HGET", KEYS[2], messageId)
if sender then redis.call("ZREMRANGEBYSCORE", KEYS[1], messageId, messageId)
redis.call("HDEL", KEYS[2], sender) redis.call("HDEL", KEYS[2], guid)
redis.call("HDEL", KEYS[2], messageId) redis.call("HDEL", KEYS[2], messageId .. "guid")
end
if (redis.call("ZCARD", KEYS[1]) == 0) then if sender then
redis.call("ZREM", KEYS[3], KEYS[1]) redis.call("HDEL", KEYS[2], sender)
end redis.call("HDEL", KEYS[2], messageId)
end
if envelope and next(envelope) then if (redis.call("ZCARD", KEYS[1]) == 0) then
return envelope[1] redis.call("ZREM", KEYS[3], KEYS[1])
end
if envelope and next(envelope) then
removedMessages[#removedMessages + 1] = envelope[1]
end
end end
end end
return nil return removedMessages

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -113,6 +114,39 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get()); assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get());
} }
@Test
@Parameters({"true", "false"})
public void testRemoveBatchByUUID(final boolean sealedSender) {
final int messageCount = 10;
final List<MessageProtos.Envelope> messagesToRemove = new ArrayList<>(messageCount);
final List<MessageProtos.Envelope> messagesToPreserve = new ArrayList<>(messageCount);
for (int i = 0; i < 10; i++) {
messagesToRemove.add(generateRandomMessage(UUID.randomUUID(), sealedSender));
messagesToPreserve.add(generateRandomMessage(UUID.randomUUID(), sealedSender));
}
assertEquals(Collections.emptyList(), messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID,
messagesToRemove.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList())));
for (final MessageProtos.Envelope message : messagesToRemove) {
messagesCache.insert(UUID.fromString(message.getServerGuid()), DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
}
for (final MessageProtos.Envelope message : messagesToPreserve) {
messagesCache.insert(UUID.fromString(message.getServerGuid()), DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
}
final List<OutgoingMessageEntity> removedMessages = messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID,
messagesToRemove.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList()));
assertEquals(messagesToRemove.stream().map(message -> MessagesCache.constructEntityFromEnvelope(0, message)).collect(Collectors.toList()),
removedMessages);
assertEquals(messagesToPreserve, messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
}
@Test @Test
@Parameters({"true", "false"}) @Parameters({"true", "false"})
public void testGetMessages(final boolean sealedSender) { public void testGetMessages(final boolean sealedSender) {