Clean redis message cache structure
This commit is contained in:
parent
739c5bf22c
commit
ba522b1691
|
@ -135,15 +135,12 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
|
|
||||||
public long insert(final UUID guid, final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message) {
|
public long insert(final UUID guid, final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message) {
|
||||||
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
|
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
|
||||||
final String sender = message.hasSource() ? (message.getSource() + "::" + message.getTimestamp()) : "nil";
|
|
||||||
|
|
||||||
return (long)insertTimer.record(() ->
|
return (long)insertTimer.record(() ->
|
||||||
insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||||
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
|
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
|
||||||
getQueueIndexKey(destinationUuid, destinationDevice)),
|
getQueueIndexKey(destinationUuid, destinationDevice)),
|
||||||
List.of(messageWithGuid.toByteArray(),
|
List.of(messageWithGuid.toByteArray(),
|
||||||
String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8),
|
String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8),
|
||||||
sender.getBytes(StandardCharsets.UTF_8),
|
|
||||||
guid.toString().getBytes(StandardCharsets.UTF_8))));
|
guid.toString().getBytes(StandardCharsets.UTF_8))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,8 +3,7 @@ local queueMetadataKey = KEYS[2]
|
||||||
local queueTotalIndexKey = KEYS[3]
|
local queueTotalIndexKey = KEYS[3]
|
||||||
local message = ARGV[1]
|
local message = ARGV[1]
|
||||||
local currentTime = ARGV[2]
|
local currentTime = ARGV[2]
|
||||||
local sender = ARGV[3]
|
local guid = ARGV[3]
|
||||||
local guid = ARGV[4]
|
|
||||||
|
|
||||||
if redis.call("HEXISTS", queueMetadataKey, guid) == 1 then
|
if redis.call("HEXISTS", queueMetadataKey, guid) == 1 then
|
||||||
return tonumber(redis.call("HGET", queueMetadataKey, guid))
|
return tonumber(redis.call("HGET", queueMetadataKey, guid))
|
||||||
|
@ -14,13 +13,7 @@ local messageId = redis.call("HINCRBY", queueMetadataKey, "counter", 1)
|
||||||
|
|
||||||
redis.call("ZADD", queueKey, "NX", messageId, message)
|
redis.call("ZADD", queueKey, "NX", messageId, message)
|
||||||
|
|
||||||
if sender ~= "nil" then
|
|
||||||
redis.call("HSET", queueMetadataKey, sender, messageId)
|
|
||||||
redis.call("HSET", queueMetadataKey, messageId, sender)
|
|
||||||
end
|
|
||||||
|
|
||||||
redis.call("HSET", queueMetadataKey, guid, messageId)
|
redis.call("HSET", queueMetadataKey, guid, messageId)
|
||||||
redis.call("HSET", queueMetadataKey, messageId .. "guid", guid)
|
|
||||||
|
|
||||||
redis.call("EXPIRE", queueKey, 7776000) -- 90 days
|
redis.call("EXPIRE", queueKey, 7776000) -- 90 days
|
||||||
redis.call("EXPIRE", queueMetadataKey, 7776000) -- 90 days
|
redis.call("EXPIRE", queueMetadataKey, 7776000) -- 90 days
|
||||||
|
|
|
@ -9,20 +9,9 @@ for _, guid in ipairs(ARGV) do
|
||||||
|
|
||||||
if messageId then
|
if messageId then
|
||||||
local envelope = redis.call("ZRANGEBYSCORE", queueKey, messageId, messageId, "LIMIT", 0, 1)
|
local envelope = redis.call("ZRANGEBYSCORE", queueKey, messageId, messageId, "LIMIT", 0, 1)
|
||||||
local sender = redis.call("HGET", queueMetadataKey, messageId)
|
|
||||||
|
|
||||||
redis.call("ZREMRANGEBYSCORE", queueKey, messageId, messageId)
|
redis.call("ZREMRANGEBYSCORE", queueKey, messageId, messageId)
|
||||||
redis.call("HDEL", queueMetadataKey, guid)
|
redis.call("HDEL", queueMetadataKey, guid)
|
||||||
redis.call("HDEL", queueMetadataKey, messageId .. "guid")
|
|
||||||
|
|
||||||
if sender then
|
|
||||||
redis.call("HDEL", queueMetadataKey, sender)
|
|
||||||
redis.call("HDEL", queueMetadataKey, messageId)
|
|
||||||
end
|
|
||||||
|
|
||||||
if (redis.call("ZCARD", queueKey) == 0) then
|
|
||||||
redis.call("ZREM", queueTotalIndexKey, queueKey)
|
|
||||||
end
|
|
||||||
|
|
||||||
if envelope and next(envelope) then
|
if envelope and next(envelope) then
|
||||||
removedMessages[#removedMessages + 1] = envelope[1]
|
removedMessages[#removedMessages + 1] = envelope[1]
|
||||||
|
@ -30,4 +19,10 @@ for _, guid in ipairs(ARGV) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
if (redis.call("ZCARD", queueKey) == 0) then
|
||||||
|
redis.call("DEL", queueKey)
|
||||||
|
redis.call("DEL", queueMetadataKey)
|
||||||
|
redis.call("ZREM", queueTotalIndexKey, queueKey)
|
||||||
|
end
|
||||||
|
|
||||||
return removedMessages
|
return removedMessages
|
||||||
|
|
Loading…
Reference in New Issue