Fix an incorrect locking key and some previously-suppressed lock contention issues.
This commit is contained in:
parent
3e02c574e7
commit
eaa2060d84
|
@ -120,7 +120,7 @@ public class MessagePersister implements Managed {
|
||||||
}
|
}
|
||||||
|
|
||||||
try (final Timer.Context ignored = persistQueueTimer.time()) {
|
try (final Timer.Context ignored = persistQueueTimer.time()) {
|
||||||
messagesCache.lockQueueForPersistence(queue);
|
messagesCache.lockQueueForPersistence(accountUuid, deviceId);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int messageCount = 0;
|
int messageCount = 0;
|
||||||
|
@ -135,7 +135,7 @@ public class MessagePersister implements Managed {
|
||||||
|
|
||||||
queueSizeHistogram.update(messageCount);
|
queueSizeHistogram.update(messageCount);
|
||||||
} finally {
|
} finally {
|
||||||
messagesCache.unlockQueueForPersistence(queue);
|
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package org.whispersystems.textsecuregcm.storage;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import io.dropwizard.lifecycle.Managed;
|
import io.dropwizard.lifecycle.Managed;
|
||||||
|
import io.lettuce.core.ScoredValue;
|
||||||
import io.lettuce.core.ScriptOutputType;
|
import io.lettuce.core.ScriptOutputType;
|
||||||
import io.lettuce.core.cluster.SlotHash;
|
import io.lettuce.core.cluster.SlotHash;
|
||||||
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
|
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
|
||||||
|
@ -248,29 +249,18 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
List<MessageProtos.Envelope> getMessagesToPersist(final UUID accountUuid, final long destinationDevice, final int limit) {
|
List<MessageProtos.Envelope> getMessagesToPersist(final UUID accountUuid, final long destinationDevice, final int limit) {
|
||||||
return getMessagesTimer.record(() -> {
|
return getMessagesTimer.record(() -> {
|
||||||
final List<byte[]> queueItems = (List<byte[]>)getItemsScript.executeBinary(List.of(getMessageQueueKey(accountUuid, destinationDevice),
|
final List<ScoredValue<byte[]>> scoredMessages = redisCluster.withBinaryCluster(connection -> connection.sync().zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit));
|
||||||
getPersistInProgressKey(accountUuid, destinationDevice)),
|
final List<MessageProtos.Envelope> envelopes = new ArrayList<>(scoredMessages.size());
|
||||||
List.of(String.valueOf(limit).getBytes(StandardCharsets.UTF_8)));
|
|
||||||
|
|
||||||
final List<MessageProtos.Envelope> envelopes;
|
for (final ScoredValue<byte[]> scoredMessage : scoredMessages) {
|
||||||
|
try {
|
||||||
if (queueItems.size() % 2 == 0) {
|
envelopes.add(MessageProtos.Envelope.parseFrom(scoredMessage.getValue()));
|
||||||
envelopes = new ArrayList<>(queueItems.size() / 2);
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
logger.warn("Failed to parse envelope", e);
|
||||||
for (int i = 0; i < queueItems.size(); i += 2) {
|
|
||||||
try {
|
|
||||||
envelopes.add(MessageProtos.Envelope.parseFrom(queueItems.get(i)));
|
|
||||||
} catch (InvalidProtocolBufferException e) {
|
|
||||||
logger.warn("Failed to parse envelope", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
logger.error("\"Get messages\" operation returned a list with a non-even number of elements.");
|
|
||||||
envelopes = Collections.emptyList();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return envelopes;
|
return envelopes;
|
||||||
|
@ -332,12 +322,12 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
String.valueOf(limit)));
|
String.valueOf(limit)));
|
||||||
}
|
}
|
||||||
|
|
||||||
void lockQueueForPersistence(final String queue) {
|
void lockQueueForPersistence(final UUID accountUuid, final long deviceId) {
|
||||||
redisCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(queue), 30, LOCK_VALUE));
|
redisCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE));
|
||||||
}
|
}
|
||||||
|
|
||||||
void unlockQueueForPersistence(final String queue) {
|
void unlockQueueForPersistence(final UUID accountUuid, final long deviceId) {
|
||||||
redisCluster.useBinaryCluster(connection -> connection.sync().del(getPersistInProgressKey(queue)));
|
redisCluster.useBinaryCluster(connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
||||||
|
@ -459,11 +449,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte[] getPersistInProgressKey(final UUID accountUuid, final long deviceId) {
|
private static byte[] getPersistInProgressKey(final UUID accountUuid, final long deviceId) {
|
||||||
return getPersistInProgressKey(accountUuid + "::" + deviceId);
|
return ("user_queue_persisting::{" + accountUuid + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||||
}
|
|
||||||
|
|
||||||
private static byte[] getPersistInProgressKey(final String queueName) {
|
|
||||||
return ("user_queue_persisting::{" + queueName + "}").getBytes(StandardCharsets.UTF_8);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static UUID getAccountUuidFromQueueName(final String queueName) {
|
static UUID getAccountUuidFromQueueName(final String queueName) {
|
||||||
|
|
|
@ -325,8 +325,8 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
|
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
|
||||||
|
|
||||||
messagesCache.lockQueueForPersistence(MessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID));
|
messagesCache.lockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID);
|
||||||
messagesCache.unlockQueueForPersistence(MessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID));
|
messagesCache.unlockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID);
|
||||||
|
|
||||||
synchronized (notified) {
|
synchronized (notified) {
|
||||||
while (!notified.get()) {
|
while (!notified.get()) {
|
||||||
|
|
Loading…
Reference in New Issue