Remove the "repair queue metadata" script.
This commit is contained in:
parent
c31348ea9a
commit
c82496b972
|
@ -18,9 +18,6 @@ import java.time.Instant;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
|
@ -141,7 +138,6 @@ public class MessagePersister implements Managed {
|
|||
|
||||
try (final Timer.Context ignored = persistQueueTimer.time()) {
|
||||
messagesCache.lockQueueForPersistence(accountUuid, deviceId);
|
||||
messagesCache.repairMetadata(accountUuid, deviceId);
|
||||
|
||||
try {
|
||||
int messageCount = 0;
|
||||
|
|
|
@ -55,7 +55,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
|||
private final ClusterLuaScript getItemsScript;
|
||||
private final ClusterLuaScript removeQueueScript;
|
||||
private final ClusterLuaScript getQueuesToPersistScript;
|
||||
private final ClusterLuaScript repairMetadataScript;
|
||||
|
||||
private final Map<String, MessageAvailabilityListener> messageListenersByQueueName = new HashMap<>();
|
||||
private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>();
|
||||
|
@ -66,7 +65,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
|||
private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist"));
|
||||
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
|
||||
private final Timer takeEphemeralMessageTimer = Metrics.timer(name(MessagesCache.class, "takeEphemeral"));
|
||||
private final Timer repairMetadataTimer = Metrics.timer(name(MessagesCache.class, "repairMetadata"));
|
||||
private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage"));
|
||||
private final Counter newMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification"), "ephemeral", "false");
|
||||
private final Counter ephemeralMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification"), "ephemeral", "true");
|
||||
|
@ -104,8 +102,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
|||
this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.MULTI);
|
||||
this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua", ScriptOutputType.STATUS);
|
||||
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua", ScriptOutputType.MULTI);
|
||||
|
||||
this.repairMetadataScript = ClusterLuaScript.fromResource(redisCluster, "lua/repair_queue_metadata.lua", ScriptOutputType.VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -224,15 +220,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
|||
return removedMessages;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void repairMetadata(final UUID destinationUuid, final long destinationDevice) {
|
||||
repairMetadataTimer.record(() -> {
|
||||
repairMetadataScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||
getMessageQueueMetadataKey(destinationUuid, destinationDevice)),
|
||||
Collections.emptyList());
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<OutgoingMessageEntity> get(final UUID destinationUuid, final long destinationDevice, final int limit) {
|
||||
return getMessagesTimer.record(() -> {
|
||||
|
|
|
@ -84,48 +84,6 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
|||
assertEquals(firstId, secondId);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Depends on incorrect duplicate-insert behavior that has since been fixed")
|
||||
public void testRepairMetadata() {
|
||||
final int distinctUuidCount = 17;
|
||||
|
||||
for (int i = 0; i < distinctUuidCount; i++) {
|
||||
final UUID messageGuid = UUID.randomUUID();
|
||||
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(messageGuid, false));
|
||||
}
|
||||
|
||||
assertEquals(distinctUuidCount, messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, 100).size());
|
||||
|
||||
final int duplicateGuidCount = 5;
|
||||
|
||||
final UUID messageGuid = UUID.randomUUID();
|
||||
final MessageProtos.Envelope duplicatedMessage = generateRandomMessage(messageGuid, false);
|
||||
|
||||
for (int i = 0; i < duplicateGuidCount; i++) {
|
||||
messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, duplicatedMessage);
|
||||
}
|
||||
|
||||
assertEquals(distinctUuidCount + 1, messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, 100).size());
|
||||
assertFalse(messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid).isPresent());
|
||||
|
||||
messagesCache.repairMetadata(DESTINATION_UUID, DESTINATION_DEVICE_ID);
|
||||
|
||||
assertTrue(messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid).isPresent());
|
||||
|
||||
final List<MessageProtos.Envelope> messagesToPersist = messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, 100);
|
||||
assertEquals(distinctUuidCount, messagesToPersist.size());
|
||||
|
||||
messagesCache.remove(DESTINATION_UUID, DESTINATION_DEVICE_ID, messagesToPersist.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList()));
|
||||
|
||||
assertTrue(messagesCache.getMessagesToPersist(DESTINATION_UUID, DESTINATION_DEVICE_ID, 100).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRepairEmptyQueueMetadata() {
|
||||
// We're happy as long as this doesn't throw an exception
|
||||
messagesCache.repairMetadata(DESTINATION_UUID, DESTINATION_DEVICE_ID);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Parameters({"true", "false"})
|
||||
public void testRemoveById(final boolean sealedSender) {
|
||||
|
|
Loading…
Reference in New Issue