diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index a73f6c642..54c186de2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -333,7 +333,7 @@ public class WhisperServerService extends Application maybeRemovedMessage = messagesCache.remove(destination, destinationUuid, deviceId, id); - removeByIdExperiment.compareSupplierResult(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id)); + delete(destination, destinationUuid, deviceId, messageGuid); } public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java index 270f837c0..eb88d6143 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java @@ -26,7 +26,7 @@ import static com.codahale.metrics.MetricRegistry.name; public class RedisClusterMessagePersister implements Managed { private final RedisClusterMessagesCache messagesCache; - private final Messages messagesDatabase; + private final MessagesManager messagesManager; private final PubSubManager pubSubManager; private final PushSender pushSender; private final AccountsManager accountsManager; @@ -51,9 +51,9 @@ public class RedisClusterMessagePersister implements Managed { private static final Logger logger = LoggerFactory.getLogger(RedisClusterMessagePersister.class); - public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final Messages messagesDatabase, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final FeatureFlagsManager featureFlagsManager, final Duration persistDelay) { + public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final MessagesManager messagesManager, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final FeatureFlagsManager featureFlagsManager, final Duration persistDelay) { this.messagesCache = messagesCache; - this.messagesDatabase = messagesDatabase; + this.messagesManager = messagesManager; this.pubSubManager = pubSubManager; this.pushSender = pushSender; this.accountsManager = accountsManager; @@ -143,11 +143,7 @@ public class RedisClusterMessagePersister implements Managed { messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT); for (final MessageProtos.Envelope message : messages) { - final UUID uuid = UUID.fromString(message.getServerGuid()); - - messagesDatabase.store(uuid, message, accountNumber, deviceId); - messagesCache.remove(accountNumber, accountUuid, deviceId, uuid); - + messagesManager.persistMessage(accountNumber, accountUuid, message, UUID.fromString(message.getServerGuid()), deviceId); messageCount++; } } while (!messages.isEmpty()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java index 25bd30ac3..f0b1b7d3f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java @@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -48,6 +49,7 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { public void setUp() throws Exception { super.setUp(); + final MessagesManager messagesManager = mock(MessagesManager.class); final FeatureFlagsManager featureFlagsManager = mock(FeatureFlagsManager.class); when(featureFlagsManager.isFeatureFlagActive(RedisClusterMessagePersister.ENABLE_PERSISTENCE_FLAG)).thenReturn(true); @@ -62,7 +64,20 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { notificationExecutorService = Executors.newSingleThreadExecutor(); messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService); - messagePersister = new RedisClusterMessagePersister(messagesCache, messagesDatabase, pubSubManager, mock(PushSender.class), accountsManager, featureFlagsManager, PERSIST_DELAY); + messagePersister = new RedisClusterMessagePersister(messagesCache, messagesManager, pubSubManager, mock(PushSender.class), accountsManager, featureFlagsManager, PERSIST_DELAY); + + doAnswer(invocation -> { + final String destination = invocation.getArgument(0, String.class); + final UUID destinationUuid = invocation.getArgument(1, UUID.class); + final MessageProtos.Envelope message = invocation.getArgument(2, MessageProtos.Envelope.class); + final UUID messageGuid = invocation.getArgument(3, UUID.class); + final long deviceId = invocation.getArgument(4, Long.class); + + messagesDatabase.store(messageGuid, message, destination, deviceId); + messagesCache.remove(destination, destinationUuid, deviceId, messageGuid); + + return null; + }).when(messagesManager).persistMessage(anyString(), any(UUID.class), any(MessageProtos.Envelope.class), any(UUID.class), anyLong()); } @Override