Repair queue metadata before persisting queues.
This commit is contained in:
parent
df7f209ebc
commit
952cfae4e6
|
@ -141,6 +141,7 @@ public class MessagePersister implements Managed {
|
||||||
|
|
||||||
try (final Timer.Context ignored = persistQueueTimer.time()) {
|
try (final Timer.Context ignored = persistQueueTimer.time()) {
|
||||||
messagesCache.lockQueueForPersistence(accountUuid, deviceId);
|
messagesCache.lockQueueForPersistence(accountUuid, deviceId);
|
||||||
|
messagesCache.repairMetadata(accountUuid, deviceId);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int messageCount = 0;
|
int messageCount = 0;
|
||||||
|
|
|
@ -55,6 +55,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
private final ClusterLuaScript getItemsScript;
|
private final ClusterLuaScript getItemsScript;
|
||||||
private final ClusterLuaScript removeQueueScript;
|
private final ClusterLuaScript removeQueueScript;
|
||||||
private final ClusterLuaScript getQueuesToPersistScript;
|
private final ClusterLuaScript getQueuesToPersistScript;
|
||||||
|
private final ClusterLuaScript repairMetadataScript;
|
||||||
|
|
||||||
private final Map<String, MessageAvailabilityListener> messageListenersByQueueName = new HashMap<>();
|
private final Map<String, MessageAvailabilityListener> messageListenersByQueueName = new HashMap<>();
|
||||||
private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>();
|
private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>();
|
||||||
|
@ -65,6 +66,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist"));
|
private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist"));
|
||||||
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
|
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
|
||||||
private final Timer takeEphemeralMessageTimer = Metrics.timer(name(MessagesCache.class, "takeEphemeral"));
|
private final Timer takeEphemeralMessageTimer = Metrics.timer(name(MessagesCache.class, "takeEphemeral"));
|
||||||
|
private final Timer repairMetadataTimer = Metrics.timer(name(MessagesCache.class, "repairMetadata"));
|
||||||
private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage"));
|
private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage"));
|
||||||
private final Counter newMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification"), "ephemeral", "false");
|
private final Counter newMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification"), "ephemeral", "false");
|
||||||
private final Counter ephemeralMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification"), "ephemeral", "true");
|
private final Counter ephemeralMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification"), "ephemeral", "true");
|
||||||
|
@ -102,6 +104,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.MULTI);
|
this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.MULTI);
|
||||||
this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS);
|
this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS);
|
||||||
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI);
|
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI);
|
||||||
|
|
||||||
|
this.repairMetadataScript = ClusterLuaScript.fromResource(redisCluster, "lua/repair_queue_metadata.lua", ScriptOutputType.VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -220,6 +224,15 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
return removedMessages;
|
return removedMessages;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void repairMetadata(final UUID destinationUuid, final long destinationDevice) {
|
||||||
|
repairMetadataTimer.record(() -> {
|
||||||
|
repairMetadataScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||||
|
getMessageQueueMetadataKey(destinationUuid, destinationDevice)),
|
||||||
|
Collections.emptyList());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public List<OutgoingMessageEntity> get(final UUID destinationUuid, final long destinationDevice, final int limit) {
|
public List<OutgoingMessageEntity> get(final UUID destinationUuid, final long destinationDevice, final int limit) {
|
||||||
return getMessagesTimer.record(() -> {
|
return getMessagesTimer.record(() -> {
|
||||||
|
|
|
@ -16,7 +16,6 @@ if sender ~= "nil" then
|
||||||
end
|
end
|
||||||
|
|
||||||
redis.call("HSET", queueMetadataKey, guid, messageId)
|
redis.call("HSET", queueMetadataKey, guid, messageId)
|
||||||
|
|
||||||
redis.call("HSET", queueMetadataKey, messageId .. "guid", guid)
|
redis.call("HSET", queueMetadataKey, messageId .. "guid", guid)
|
||||||
|
|
||||||
redis.call("EXPIRE", queueKey, 7776000) -- 90 days
|
redis.call("EXPIRE", queueKey, 7776000) -- 90 days
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
local queueKey = KEYS[1]
|
||||||
|
local queueMetadataKey = KEYS[2]
|
||||||
|
|
||||||
|
local firstMessageWithScore = redis.call("ZRANGE", queueKey, 0, 0, "WITHSCORES")
|
||||||
|
local lastMessageWithScore = redis.call("ZRANGE", queueKey, -1, -1, "WITHSCORES")
|
||||||
|
|
||||||
|
if firstMessageWithScore ~= nil and lastMessageWithScore ~= nil then
|
||||||
|
local firstMessageId = tonumber(firstMessageWithScore[2])
|
||||||
|
local lastMessageId = tonumber(lastMessageWithScore[2])
|
||||||
|
|
||||||
|
for messageId = firstMessageId,lastMessageId do
|
||||||
|
if redis.call("ZRANGEBYSCORE", queueKey, messageId, messageId) then
|
||||||
|
-- This message actually exists, and its GUID may be pointing to the wrong ID
|
||||||
|
local guid = redis.call("HGET", queueMetadataKey, messageId .. "guid")
|
||||||
|
redis.call("HSET", queueMetadataKey, guid, messageId)
|
||||||
|
else
|
||||||
|
-- No message actually exists with that ID; drop the metadata reference to that ID
|
||||||
|
redis.call("HDEL", queueMetadataKey, messageId .. "guid")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
@RunWith(JUnitParamsRunner.class)
|
@RunWith(JUnitParamsRunner.class)
|
||||||
|
@ -71,6 +72,41 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
assertTrue(messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, sealedSender)) > 0);
|
assertTrue(messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, sealedSender)) > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRepairMetadata() {
|
||||||
|
final int distinctUuidCount = 17;
|
||||||
|
|
||||||
|
for (int i = 0; i < distinctUuidCount; i++) {
|
||||||
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
|
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, false));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(distinctUuidCount, messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, 100).size());
|
||||||
|
|
||||||
|
final int duplicateGuidCount = 5;
|
||||||
|
|
||||||
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
|
final MessageProtos.Envelope duplicatedMessage = generateRandomMessage(messageGuid, false);
|
||||||
|
|
||||||
|
for (int i = 0; i < duplicateGuidCount; i++) {
|
||||||
|
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, duplicatedMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(distinctUuidCount + 1, messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, 100).size());
|
||||||
|
assertFalse(messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid).isPresent());
|
||||||
|
|
||||||
|
messagesCache.repairMetadata(DESTINATION_UUID, DESTINATION_DEVICE_ID);
|
||||||
|
|
||||||
|
assertTrue(messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid).isPresent());
|
||||||
|
|
||||||
|
final List<MessageProtos.Envelope> messagesToPersist = messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, 100);
|
||||||
|
assertEquals(distinctUuidCount, messagesToPersist.size());
|
||||||
|
|
||||||
|
messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messagesToPersist.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList()));
|
||||||
|
|
||||||
|
assertTrue(messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, 100).isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Parameters({"true", "false"})
|
@Parameters({"true", "false"})
|
||||||
public void testRemoveById(final boolean sealedSender) {
|
public void testRemoveById(final boolean sealedSender) {
|
||||||
|
|
Loading…
Reference in New Issue