Stop queue persistence attempt if items aren’t removed from cache
This commit is contained in:
parent
4cfcdb0c96
commit
41a113e22c
|
@ -0,0 +1,13 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2022 Signal Messenger, LLC
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.whispersystems.textsecuregcm.storage;
|
||||||
|
|
||||||
|
public class MessagePersistenceException extends Exception {
|
||||||
|
|
||||||
|
public MessagePersistenceException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -53,6 +53,8 @@ public class MessagePersister implements Managed {
|
||||||
private static final String DISABLE_PERSISTER_FEATURE_FLAG = "DISABLE_MESSAGE_PERSISTER";
|
private static final String DISABLE_PERSISTER_FEATURE_FLAG = "DISABLE_MESSAGE_PERSISTER";
|
||||||
private static final int WORKER_THREAD_COUNT = 4;
|
private static final int WORKER_THREAD_COUNT = 4;
|
||||||
|
|
||||||
|
private static final int CONSECUTIVE_EMPTY_CACHE_REMOVAL_LIMIT = 3;
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
||||||
|
|
||||||
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager,
|
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager,
|
||||||
|
@ -150,7 +152,7 @@ public class MessagePersister implements Managed {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void persistQueue(final UUID accountUuid, final long deviceId) {
|
void persistQueue(final UUID accountUuid, final long deviceId) throws MessagePersistenceException {
|
||||||
final Optional<Account> maybeAccount = accountsManager.getByAccountIdentifier(accountUuid);
|
final Optional<Account> maybeAccount = accountsManager.getByAccountIdentifier(accountUuid);
|
||||||
|
|
||||||
if (maybeAccount.isEmpty()) {
|
if (maybeAccount.isEmpty()) {
|
||||||
|
@ -165,12 +167,23 @@ public class MessagePersister implements Managed {
|
||||||
int messageCount = 0;
|
int messageCount = 0;
|
||||||
List<MessageProtos.Envelope> messages;
|
List<MessageProtos.Envelope> messages;
|
||||||
|
|
||||||
|
int consecutiveEmptyCacheRemovals = 0;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
|
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
|
||||||
|
|
||||||
messagesManager.persistMessages(accountUuid, deviceId, messages);
|
int messagesRemovedFromCache = messagesManager.persistMessages(accountUuid, deviceId, messages);
|
||||||
messageCount += messages.size();
|
messageCount += messages.size();
|
||||||
|
|
||||||
|
if (messagesRemovedFromCache == 0) {
|
||||||
|
consecutiveEmptyCacheRemovals += 1;
|
||||||
|
} else {
|
||||||
|
consecutiveEmptyCacheRemovals = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (consecutiveEmptyCacheRemovals > CONSECUTIVE_EMPTY_CACHE_REMOVAL_LIMIT) {
|
||||||
|
throw new MessagePersistenceException("persistence failure loop detected");
|
||||||
|
}
|
||||||
|
|
||||||
} while (!messages.isEmpty());
|
} while (!messages.isEmpty());
|
||||||
|
|
||||||
|
|
|
@ -104,7 +104,10 @@ public class MessagesManager {
|
||||||
return removed;
|
return removed;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void persistMessages(
|
/**
|
||||||
|
* @return the number of messages successfully removed from the cache.
|
||||||
|
*/
|
||||||
|
public int persistMessages(
|
||||||
final UUID destinationUuid,
|
final UUID destinationUuid,
|
||||||
final long destinationDeviceId,
|
final long destinationDeviceId,
|
||||||
final List<Envelope> messages) {
|
final List<Envelope> messages) {
|
||||||
|
@ -114,10 +117,14 @@ public class MessagesManager {
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
messagesDynamoDb.store(nonEphemeralMessages, destinationUuid, destinationDeviceId);
|
messagesDynamoDb.store(nonEphemeralMessages, destinationUuid, destinationDeviceId);
|
||||||
messagesCache.remove(destinationUuid, destinationDeviceId,
|
|
||||||
messages.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList()));
|
final List<UUID> messageGuids = messages.stream().map(message -> UUID.fromString(message.getServerGuid()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
int messagesRemovedFromCache = messagesCache.remove(destinationUuid, destinationDeviceId, messageGuids).size();
|
||||||
|
|
||||||
persistMessageMeter.mark(nonEphemeralMessages.size());
|
persistMessageMeter.mark(nonEphemeralMessages.size());
|
||||||
|
|
||||||
|
return messagesRemovedFromCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addMessageAvailabilityListener(
|
public void addMessageAvailabilityListener(
|
||||||
|
|
|
@ -6,7 +6,10 @@
|
||||||
package org.whispersystems.textsecuregcm.storage;
|
package org.whispersystems.textsecuregcm.storage;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyList;
|
||||||
import static org.mockito.ArgumentMatchers.anyLong;
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.atLeastOnce;
|
import static org.mockito.Mockito.atLeastOnce;
|
||||||
|
@ -48,6 +51,7 @@ class MessagePersisterTest {
|
||||||
private MessagesDynamoDb messagesDynamoDb;
|
private MessagesDynamoDb messagesDynamoDb;
|
||||||
private MessagePersister messagePersister;
|
private MessagePersister messagePersister;
|
||||||
private AccountsManager accountsManager;
|
private AccountsManager accountsManager;
|
||||||
|
private MessagesManager messagesManager;
|
||||||
|
|
||||||
private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID();
|
private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID();
|
||||||
private static final String DESTINATION_ACCOUNT_NUMBER = "+18005551234";
|
private static final String DESTINATION_ACCOUNT_NUMBER = "+18005551234";
|
||||||
|
@ -58,7 +62,7 @@ class MessagePersisterTest {
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() throws Exception {
|
void setUp() throws Exception {
|
||||||
|
|
||||||
final MessagesManager messagesManager = mock(MessagesManager.class);
|
messagesManager = mock(MessagesManager.class);
|
||||||
final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
|
final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
|
||||||
|
|
||||||
messagesDynamoDb = mock(MessagesDynamoDb.class);
|
messagesDynamoDb = mock(MessagesDynamoDb.class);
|
||||||
|
@ -190,6 +194,24 @@ class MessagePersisterTest {
|
||||||
Instant.now().plus(messagePersister.getPersistDelay()), 1));
|
Instant.now().plus(messagePersister.getPersistDelay()), 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testPersistQueueRetryLoop() {
|
||||||
|
final String queueName = new String(
|
||||||
|
MessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
|
||||||
|
final int messageCount = (MessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7;
|
||||||
|
final Instant now = Instant.now();
|
||||||
|
|
||||||
|
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, messageCount, now);
|
||||||
|
setNextSlotToPersist(SlotHash.getSlot(queueName));
|
||||||
|
|
||||||
|
// returning `0` indicates something not working correctly
|
||||||
|
when(messagesManager.persistMessages(any(UUID.class), anyLong(), anyList())).thenReturn(0);
|
||||||
|
|
||||||
|
assertTimeoutPreemptively(Duration.ofSeconds(1), () ->
|
||||||
|
assertThrows(MessagePersistenceException.class,
|
||||||
|
() -> messagePersister.persistQueue(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID)));
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("SameParameterValue")
|
@SuppressWarnings("SameParameterValue")
|
||||||
private static String generateRandomQueueNameForSlot(final int slot) {
|
private static String generateRandomQueueNameForSlot(final int slot) {
|
||||||
final UUID uuid = UUID.randomUUID();
|
final UUID uuid = UUID.randomUUID();
|
||||||
|
|
Loading…
Reference in New Issue