From eaa2060d84ff2bf5cfac08715e04cb462eaf88d9 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Tue, 29 Sep 2020 17:01:22 -0400 Subject: [PATCH] Fix an incorrect locking key and some previously-suppressed lock contention issues. --- .../storage/MessagePersister.java | 4 +- .../textsecuregcm/storage/MessagesCache.java | 40 ++++++------------- .../storage/MessagesCacheTest.java | 4 +- 3 files changed, 17 insertions(+), 31 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index 455049cec..ba034fad8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -120,7 +120,7 @@ public class MessagePersister implements Managed { } try (final Timer.Context ignored = persistQueueTimer.time()) { - messagesCache.lockQueueForPersistence(queue); + messagesCache.lockQueueForPersistence(accountUuid, deviceId); try { int messageCount = 0; @@ -135,7 +135,7 @@ public class MessagePersister implements Managed { queueSizeHistogram.update(messageCount); } finally { - messagesCache.unlockQueueForPersistence(queue); + messagesCache.unlockQueueForPersistence(accountUuid, deviceId); } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index f08456b98..47068953f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -3,6 +3,7 @@ package org.whispersystems.textsecuregcm.storage; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.InvalidProtocolBufferException; import io.dropwizard.lifecycle.Managed; +import io.lettuce.core.ScoredValue; import io.lettuce.core.ScriptOutputType; import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; @@ -248,29 +249,18 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp }); } - @SuppressWarnings("unchecked") @VisibleForTesting List getMessagesToPersist(final UUID accountUuid, final long destinationDevice, final int limit) { return getMessagesTimer.record(() -> { - final List queueItems = (List)getItemsScript.executeBinary(List.of(getMessageQueueKey(accountUuid, destinationDevice), - getPersistInProgressKey(accountUuid, destinationDevice)), - List.of(String.valueOf(limit).getBytes(StandardCharsets.UTF_8))); + final List> scoredMessages = redisCluster.withBinaryCluster(connection -> connection.sync().zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit)); + final List envelopes = new ArrayList<>(scoredMessages.size()); - final List envelopes; - - if (queueItems.size() % 2 == 0) { - envelopes = new ArrayList<>(queueItems.size() / 2); - - for (int i = 0; i < queueItems.size(); i += 2) { - try { - envelopes.add(MessageProtos.Envelope.parseFrom(queueItems.get(i))); - } catch (InvalidProtocolBufferException e) { - logger.warn("Failed to parse envelope", e); - } + for (final ScoredValue scoredMessage : scoredMessages) { + try { + envelopes.add(MessageProtos.Envelope.parseFrom(scoredMessage.getValue())); + } catch (InvalidProtocolBufferException e) { + logger.warn("Failed to parse envelope", e); } - } else { - logger.error("\"Get messages\" operation returned a list with a non-even number of elements."); - envelopes = Collections.emptyList(); } return envelopes; @@ -332,12 +322,12 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp String.valueOf(limit))); } - void lockQueueForPersistence(final String queue) { - redisCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(queue), 30, LOCK_VALUE)); + void lockQueueForPersistence(final UUID accountUuid, final long deviceId) { + redisCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE)); } - void unlockQueueForPersistence(final String queue) { - redisCluster.useBinaryCluster(connection -> connection.sync().del(getPersistInProgressKey(queue))); + void unlockQueueForPersistence(final UUID accountUuid, final long deviceId) { + redisCluster.useBinaryCluster(connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId))); } public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) { @@ -459,11 +449,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp } private static byte[] getPersistInProgressKey(final UUID accountUuid, final long deviceId) { - return getPersistInProgressKey(accountUuid + "::" + deviceId); - } - - private static byte[] getPersistInProgressKey(final String queueName) { - return ("user_queue_persisting::{" + queueName + "}").getBytes(StandardCharsets.UTF_8); + return ("user_queue_persisting::{" + accountUuid + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); } static UUID getAccountUuidFromQueueName(final String queueName) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 49d7ce545..43b3e4ef3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -325,8 +325,8 @@ public class MessagesCacheTest extends AbstractRedisClusterTest { messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener); - messagesCache.lockQueueForPersistence(MessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID)); - messagesCache.unlockQueueForPersistence(MessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID)); + messagesCache.lockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID); + messagesCache.unlockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID); synchronized (notified) { while (!notified.get()) {