Return queues to the "to persist" list if something goes wrong during persistence.
This commit is contained in:
parent
a97e0982e3
commit
57d2ef8740
|
@ -94,7 +94,15 @@ public class MessagePersister implements Managed {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final String queue : queuesToPersist) {
|
for (final String queue : queuesToPersist) {
|
||||||
persistQueue(queue);
|
final UUID accountUuid = MessagesCache.getAccountUuidFromQueueName(queue);
|
||||||
|
final long deviceId = MessagesCache.getDeviceIdFromQueueName(queue);
|
||||||
|
|
||||||
|
try {
|
||||||
|
persistQueue(accountUuid, deviceId);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId);
|
||||||
|
messagesCache.addQueueToPersist(accountUuid, deviceId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
queuesPersisted += queuesToPersist.size();
|
queuesPersisted += queuesToPersist.size();
|
||||||
|
@ -104,10 +112,7 @@ public class MessagePersister implements Managed {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void persistQueue(final String queue) {
|
void persistQueue(final UUID accountUuid, final long deviceId) {
|
||||||
final UUID accountUuid = MessagesCache.getAccountUuidFromQueueName(queue);
|
|
||||||
final long deviceId = MessagesCache.getDeviceIdFromQueueName(queue);
|
|
||||||
|
|
||||||
final Optional<Account> maybeAccount = accountsManager.get(accountUuid);
|
final Optional<Account> maybeAccount = accountsManager.get(accountUuid);
|
||||||
|
|
||||||
final String accountNumber;
|
final String accountNumber;
|
||||||
|
|
|
@ -5,6 +5,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import io.dropwizard.lifecycle.Managed;
|
import io.dropwizard.lifecycle.Managed;
|
||||||
import io.lettuce.core.ScoredValue;
|
import io.lettuce.core.ScoredValue;
|
||||||
import io.lettuce.core.ScriptOutputType;
|
import io.lettuce.core.ScriptOutputType;
|
||||||
|
import io.lettuce.core.ZAddArgs;
|
||||||
import io.lettuce.core.cluster.SlotHash;
|
import io.lettuce.core.cluster.SlotHash;
|
||||||
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
|
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
|
||||||
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
|
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
|
||||||
|
@ -322,6 +323,10 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
String.valueOf(limit)));
|
String.valueOf(limit)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addQueueToPersist(final UUID accountUuid, final long deviceId) {
|
||||||
|
redisCluster.useBinaryCluster(connection -> connection.sync().zadd(getQueueIndexKey(accountUuid, deviceId), ZAddArgs.Builder.nx(), System.currentTimeMillis(), getMessageQueueKey(accountUuid, deviceId)));
|
||||||
|
}
|
||||||
|
|
||||||
void lockQueueForPersistence(final UUID accountUuid, final long deviceId) {
|
void lockQueueForPersistence(final UUID accountUuid, final long deviceId) {
|
||||||
redisCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE));
|
redisCluster.useBinaryCluster(connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE));
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||||
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
|
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
|
||||||
|
|
||||||
|
@ -165,6 +166,25 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
assertEquals(queueCount * messagesPerQueue, messagesCaptor.getAllValues().stream().mapToInt(List::size).sum());
|
assertEquals(queueCount * messagesPerQueue, messagesCaptor.getAllValues().stream().mapToInt(List::size).sum());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPersistQueueRetry() {
|
||||||
|
final String queueName = new String(MessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
|
||||||
|
final int messageCount = (MessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7;
|
||||||
|
final Instant now = Instant.now();
|
||||||
|
|
||||||
|
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, messageCount, now);
|
||||||
|
setNextSlotToPersist(SlotHash.getSlot(queueName));
|
||||||
|
|
||||||
|
doAnswer((Answer<Void>)invocation -> {
|
||||||
|
throw new RuntimeException("OH NO.");
|
||||||
|
}).when(messagesDatabase).store(any(), eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_DEVICE_ID));
|
||||||
|
|
||||||
|
messagePersister.persistNextQueues(now.plus(messagePersister.getPersistDelay()));
|
||||||
|
|
||||||
|
assertEquals(List.of(queueName),
|
||||||
|
messagesCache.getQueuesToPersist(SlotHash.getSlot(queueName), Instant.now().plus(messagePersister.getPersistDelay()), 1));
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("SameParameterValue")
|
@SuppressWarnings("SameParameterValue")
|
||||||
private static String generateRandomQueueNameForSlot(final int slot) {
|
private static String generateRandomQueueNameForSlot(final int slot) {
|
||||||
final UUID uuid = UUID.randomUUID();
|
final UUID uuid = UUID.randomUUID();
|
||||||
|
|
Loading…
Reference in New Issue