diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 58f47ce93..08ef99b52 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -220,6 +220,7 @@ import org.whispersystems.textsecuregcm.workers.CertificateCommand; import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand; import org.whispersystems.textsecuregcm.workers.CrawlAccountsCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; +import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand; import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSenderServiceCommand; import org.whispersystems.textsecuregcm.workers.ServerVersionCommand; import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask; @@ -269,6 +270,7 @@ public class WhisperServerService extends Application dynamicConfigurationManager, - final Duration persistDelay) { + final Duration persistDelay, + final Optional dedicatedProcessWorkerThreadCount) { this.messagesCache = messagesCache; this.messagesManager = messagesManager; this.accountsManager = accountsManager; this.persistDelay = persistDelay; + this.workerThreads = dedicatedProcessWorkerThreadCount.map(Thread[]::new) + .orElseGet(() -> new Thread[DEFAULT_WORKER_THREAD_COUNT]); + this.dedicatedProcess = dedicatedProcessWorkerThreadCount.isPresent(); for (int i = 0; i < workerThreads.length; i++) { workerThreads[i] = new Thread(() -> { while (running) { - if (dynamicConfigurationManager.getConfiguration().getMessagePersisterConfiguration() - .isPersistenceEnabled()) { + if (enabled(dynamicConfigurationManager)) { try { final int queuesPersisted = persistNextQueues(Instant.now()); queueCountHistogram.update(queuesPersisted); @@ -89,6 +94,17 @@ public class MessagePersister implements Managed { } } + @VisibleForTesting + boolean enabled(final DynamicConfigurationManager dynamicConfigurationManager) { + final DynamicMessagePersisterConfiguration messagePersisterConfiguration = dynamicConfigurationManager.getConfiguration() + .getMessagePersisterConfiguration(); + if (dedicatedProcess) { + return messagePersisterConfiguration.isDedicatedProcessEnabled(); + } + + return messagePersisterConfiguration.isServerPersistenceEnabled(); + } + @VisibleForTesting Duration getPersistDelay() { return persistDelay; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 0ccfdacae..00b7c991d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -62,6 +62,7 @@ record CommandDependencies( AccountsManager accountsManager, ProfilesManager profilesManager, ReportMessageManager reportMessageManager, + MessagesCache messagesCache, MessagesManager messagesManager, DeletedAccountsManager deletedAccountsManager, StoredVerificationCodeManager pendingAccountsManager, @@ -204,6 +205,7 @@ record CommandDependencies( accountsManager, profilesManager, reportMessageManager, + messagesCache, messagesManager, deletedAccountsManager, pendingAccountsManager, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java new file mode 100644 index 000000000..ebbbd7045 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java @@ -0,0 +1,66 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.workers; + +import io.dropwizard.Application; +import io.dropwizard.cli.EnvironmentCommand; +import io.dropwizard.setup.Environment; +import java.time.Duration; +import java.util.Optional; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; +import org.whispersystems.textsecuregcm.storage.MessagePersister; + +public class MessagePersisterServiceCommand extends EnvironmentCommand { + + private static final String WORKER_COUNT = "workers"; + + public MessagePersisterServiceCommand() { + super(new Application<>() { + @Override + public void run(WhisperServerConfiguration configuration, Environment environment) { + + } + }, "message-persister-service", + "Starts a persistent service to persist undelivered messages from Redis to Dynamo DB"); + } + + @Override + public void configure(final Subparser subparser) { + super.configure(subparser); + subparser.addArgument("--workers") + .type(Integer.class) + .dest(WORKER_COUNT) + .required(true) + .help("The number of worker threads"); + } + + @Override + protected void run(Environment environment, Namespace namespace, WhisperServerConfiguration configuration) + throws Exception { + + final CommandDependencies deps = CommandDependencies.build("message-persister-service", environment, configuration); + + final DynamicConfigurationManager dynamicConfigurationManager = new DynamicConfigurationManager<>( + configuration.getAppConfig().getApplication(), + configuration.getAppConfig().getEnvironment(), + configuration.getAppConfig().getConfigurationName(), + DynamicConfiguration.class); + + MessagePersister messagePersister = new MessagePersister(deps.messagesCache(), deps.messagesManager(), + deps.accountsManager(), + dynamicConfigurationManager, + Duration.ofMinutes(configuration.getMessageCacheConfiguration().getPersistDelayMinutes()), + Optional.of(namespace.getInt(WORKER_COUNT))); + + environment.lifecycle().manage(deps.messagesCache()); + environment.lifecycle().manage(messagePersister); + } + +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfigurationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfigurationTest.java index d8146cb10..de6916a22 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfigurationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfigurationTest.java @@ -360,33 +360,37 @@ class DynamicConfigurationTest { final DynamicConfiguration emptyConfig = DynamicConfigurationManager.parseConfiguration(emptyConfigYaml, DynamicConfiguration.class).orElseThrow(); - assertTrue(emptyConfig.getMessagePersisterConfiguration().isPersistenceEnabled()); + assertTrue(emptyConfig.getMessagePersisterConfiguration().isServerPersistenceEnabled()); + assertFalse(emptyConfig.getMessagePersisterConfiguration().isDedicatedProcessEnabled()); } { final String messagePersisterEnabledYaml = REQUIRED_CONFIG.concat(""" messagePersister: - persistenceEnabled: true + serverPersistenceEnabled: true + dedicatedProcessEnabled: true """); final DynamicConfiguration config = DynamicConfigurationManager.parseConfiguration(messagePersisterEnabledYaml, DynamicConfiguration.class) .orElseThrow(); - assertTrue(config.getMessagePersisterConfiguration().isPersistenceEnabled()); + assertTrue(config.getMessagePersisterConfiguration().isServerPersistenceEnabled()); + assertTrue(config.getMessagePersisterConfiguration().isDedicatedProcessEnabled()); } { final String messagePersisterDisabledYaml = REQUIRED_CONFIG.concat(""" messagePersister: - persistenceEnabled: false + serverPersistenceEnabled: false """); final DynamicConfiguration config = DynamicConfigurationManager.parseConfiguration(messagePersisterDisabledYaml, DynamicConfiguration.class) .orElseThrow(); - assertFalse(config.getMessagePersisterConfiguration().isPersistenceEnabled()); + assertFalse(config.getMessagePersisterConfiguration().isServerPersistenceEnabled()); + assertFalse(config.getMessagePersisterConfiguration().isDedicatedProcessEnabled()); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index e46418ee9..65308d886 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -14,7 +14,6 @@ import static org.mockito.Mockito.when; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import io.lettuce.core.cluster.SlotHash; -import java.nio.ByteBuffer; import java.time.Clock; import java.time.Duration; import java.time.Instant; @@ -24,7 +23,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.RandomStringUtils; @@ -52,7 +50,6 @@ class MessagePersisterIntegrationTest { private ExecutorService notificationExecutorService; private Scheduler messageDeliveryScheduler; private ExecutorService messageDeletionExecutorService; - private ScheduledExecutorService resubscribeRetryExecutorService; private MessagesCache messagesCache; private MessagesManager messagesManager; private MessagePersister messagePersister; @@ -80,14 +77,13 @@ class MessagePersisterIntegrationTest { final AccountsManager accountsManager = mock(AccountsManager.class); notificationExecutorService = Executors.newSingleThreadExecutor(); - resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor(); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), REDIS_CLUSTER_EXTENSION.getRedisCluster(), notificationExecutorService, messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC()); messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class), messageDeletionExecutorService); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, - dynamicConfigurationManager, PERSIST_DELAY); + dynamicConfigurationManager, PERSIST_DELAY, Optional.empty()); account = mock(Account.class); @@ -182,12 +178,6 @@ class MessagePersisterIntegrationTest { }); } - private static long extractServerTimestamp(byte[] bytes) { - ByteBuffer bb = ByteBuffer.wrap(bytes); - bb.getLong(); - return bb.getLong(); - } - private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final long serverTimestamp) { return MessageProtos.Envelope.newBuilder() .setTimestamp(serverTimestamp * 2) // client timestamp may not be accurate diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index 3e8ef0ea1..51d0af241 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -37,9 +37,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagePersisterConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import reactor.core.scheduler.Scheduler; @@ -69,7 +72,8 @@ class MessagePersisterTest { void setUp() throws Exception { messagesManager = mock(MessagesManager.class); - final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class); + final DynamicConfigurationManager dynamicConfigurationManager = mock( + DynamicConfigurationManager.class); messagesDynamoDb = mock(MessagesDynamoDb.class); accountsManager = mock(AccountsManager.class); @@ -87,7 +91,7 @@ class MessagePersisterTest { REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC()); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, - dynamicConfigurationManager, PERSIST_DELAY); + dynamicConfigurationManager, PERSIST_DELAY, Optional.empty()); doAnswer(invocation -> { final UUID destinationUuid = invocation.getArgument(0); @@ -225,6 +229,31 @@ class MessagePersisterTest { () -> messagePersister.persistQueue(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID))); } + @ParameterizedTest + @CsvSource({ + "true, true, false, false", + "true, false, true, true", + "false, true, false, true", + "false, false, true, false", + }) + void testEnabled(final boolean dedicatedProcess, final boolean serverPersistenceEnabled, + final boolean dedicatedProcessEnabled, final boolean expectEnabled) { + final DynamicConfigurationManager dynamicConfigurationManager = mock( + DynamicConfigurationManager.class); + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + + final DynamicMessagePersisterConfiguration dynamicMessagePersisterConfiguration = mock( + DynamicMessagePersisterConfiguration.class); + when(dynamicConfiguration.getMessagePersisterConfiguration()).thenReturn(dynamicMessagePersisterConfiguration); + when(dynamicMessagePersisterConfiguration.isDedicatedProcessEnabled()).thenReturn(dedicatedProcessEnabled); + when(dynamicMessagePersisterConfiguration.isServerPersistenceEnabled()).thenReturn(serverPersistenceEnabled); + + messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, + dynamicConfigurationManager, PERSIST_DELAY, dedicatedProcess ? Optional.of(4) : Optional.empty()); + assertEquals(expectEnabled, messagePersister.enabled(dynamicConfigurationManager)); + } + @SuppressWarnings("SameParameterValue") private static String generateRandomQueueNameForSlot(final int slot) { final UUID uuid = UUID.randomUUID(); @@ -263,4 +292,5 @@ class MessagePersisterTest { REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster( connection -> connection.sync().set(MessagesCache.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(nextSlot - 1))); } + }