From 41a113e22c346fee707c6f7d4f0020f389043e4f Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Wed, 27 Jul 2022 13:35:05 -0500 Subject: [PATCH] =?UTF-8?q?Stop=20queue=20persistence=20attempt=20if=20ite?= =?UTF-8?q?ms=20aren=E2=80=99t=20removed=20from=20cache?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../storage/MessagePersistenceException.java | 13 ++++++++++ .../storage/MessagePersister.java | 17 +++++++++++-- .../storage/MessagesManager.java | 13 +++++++--- .../storage/MessagePersisterTest.java | 24 ++++++++++++++++++- 4 files changed, 61 insertions(+), 6 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersistenceException.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersistenceException.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersistenceException.java new file mode 100644 index 000000000..3e96a8fc4 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersistenceException.java @@ -0,0 +1,13 @@ +/* + * Copyright 2022 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +public class MessagePersistenceException extends Exception { + + public MessagePersistenceException(String message) { + super(message); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index b6c5fcdaf..642f395c1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -53,6 +53,8 @@ public class MessagePersister implements Managed { private static final String DISABLE_PERSISTER_FEATURE_FLAG = "DISABLE_MESSAGE_PERSISTER"; private static final int WORKER_THREAD_COUNT = 4; + private static final int CONSECUTIVE_EMPTY_CACHE_REMOVAL_LIMIT = 3; + private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class); public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, @@ -150,7 +152,7 @@ public class MessagePersister implements Managed { } @VisibleForTesting - void persistQueue(final UUID accountUuid, final long deviceId) { + void persistQueue(final UUID accountUuid, final long deviceId) throws MessagePersistenceException { final Optional maybeAccount = accountsManager.getByAccountIdentifier(accountUuid); if (maybeAccount.isEmpty()) { @@ -165,12 +167,23 @@ public class MessagePersister implements Managed { int messageCount = 0; List messages; + int consecutiveEmptyCacheRemovals = 0; + do { messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT); - messagesManager.persistMessages(accountUuid, deviceId, messages); + int messagesRemovedFromCache = messagesManager.persistMessages(accountUuid, deviceId, messages); messageCount += messages.size(); + if (messagesRemovedFromCache == 0) { + consecutiveEmptyCacheRemovals += 1; + } else { + consecutiveEmptyCacheRemovals = 0; + } + + if (consecutiveEmptyCacheRemovals > CONSECUTIVE_EMPTY_CACHE_REMOVAL_LIMIT) { + throw new MessagePersistenceException("persistence failure loop detected"); + } } while (!messages.isEmpty()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index a6767714c..a955f63da 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -104,7 +104,10 @@ public class MessagesManager { return removed; } - public void persistMessages( + /** + * @return the number of messages successfully removed from the cache. + */ + public int persistMessages( final UUID destinationUuid, final long destinationDeviceId, final List messages) { @@ -114,10 +117,14 @@ public class MessagesManager { .collect(Collectors.toList()); messagesDynamoDb.store(nonEphemeralMessages, destinationUuid, destinationDeviceId); - messagesCache.remove(destinationUuid, destinationDeviceId, - messages.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList())); + + final List messageGuids = messages.stream().map(message -> UUID.fromString(message.getServerGuid())) + .collect(Collectors.toList()); + int messagesRemovedFromCache = messagesCache.remove(destinationUuid, destinationDeviceId, messageGuids).size(); persistMessageMeter.mark(nonEphemeralMessages.size()); + + return messagesRemovedFromCache; } public void addMessageAvailabilityListener( diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index 2bf11dcbc..bf0b36bc0 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -6,7 +6,10 @@ package org.whispersystems.textsecuregcm.storage; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; @@ -48,6 +51,7 @@ class MessagePersisterTest { private MessagesDynamoDb messagesDynamoDb; private MessagePersister messagePersister; private AccountsManager accountsManager; + private MessagesManager messagesManager; private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID(); private static final String DESTINATION_ACCOUNT_NUMBER = "+18005551234"; @@ -58,7 +62,7 @@ class MessagePersisterTest { @BeforeEach void setUp() throws Exception { - final MessagesManager messagesManager = mock(MessagesManager.class); + messagesManager = mock(MessagesManager.class); final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class); messagesDynamoDb = mock(MessagesDynamoDb.class); @@ -190,6 +194,24 @@ class MessagePersisterTest { Instant.now().plus(messagePersister.getPersistDelay()), 1)); } + @Test + void testPersistQueueRetryLoop() { + final String queueName = new String( + MessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8); + final int messageCount = (MessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7; + final Instant now = Instant.now(); + + insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, messageCount, now); + setNextSlotToPersist(SlotHash.getSlot(queueName)); + + // returning `0` indicates something not working correctly + when(messagesManager.persistMessages(any(UUID.class), anyLong(), anyList())).thenReturn(0); + + assertTimeoutPreemptively(Duration.ofSeconds(1), () -> + assertThrows(MessagePersistenceException.class, + () -> messagePersister.persistQueue(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID))); + } + @SuppressWarnings("SameParameterValue") private static String generateRandomQueueNameForSlot(final int slot) { final UUID uuid = UUID.randomUUID();