Add `MessagePersisterServiceCommand`

This commit is contained in:
Chris Eager 2023-05-10 16:49:56 -05:00 committed by Chris Eager
parent 859fbe9ab1
commit 3e53884979
8 changed files with 152 additions and 30 deletions

View File

@ -220,6 +220,7 @@ import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
import org.whispersystems.textsecuregcm.workers.CrawlAccountsCommand;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSenderServiceCommand;
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
@ -269,6 +270,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new UnlinkDeviceCommand());
bootstrap.addCommand(new CrawlAccountsCommand());
bootstrap.addCommand(new ScheduledApnPushNotificationSenderServiceCommand());
bootstrap.addCommand(new MessagePersisterServiceCommand());
}
@Override
@ -574,15 +576,20 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getRecaptchaConfiguration().getProjectPath(),
config.getRecaptchaConfiguration().getCredentialConfigurationJson(),
dynamicConfigurationManager);
HttpClient hcaptchaHttpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).connectTimeout(Duration.ofSeconds(10)).build();
HCaptchaClient hCaptchaClient = new HCaptchaClient(config.getHCaptchaConfiguration().apiKey().value(), hcaptchaHttpClient, dynamicConfigurationManager);
HttpClient hcaptchaHttpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2)
.connectTimeout(Duration.ofSeconds(10)).build();
HCaptchaClient hCaptchaClient = new HCaptchaClient(config.getHCaptchaConfiguration().apiKey().value(), hcaptchaHttpClient,
dynamicConfigurationManager);
CaptchaChecker captchaChecker = new CaptchaChecker(List.of(recaptchaClient, hCaptchaClient));
PushChallengeManager pushChallengeManager = new PushChallengeManager(pushNotificationManager, pushChallengeDynamoDb);
PushChallengeManager pushChallengeManager = new PushChallengeManager(pushNotificationManager,
pushChallengeDynamoDb);
RateLimitChallengeManager rateLimitChallengeManager = new RateLimitChallengeManager(pushChallengeManager,
captchaChecker, rateLimiters);
MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()),
Optional.empty());
ChangeNumberManager changeNumberManager = new ChangeNumberManager(messageSender, accountsManager);
AccountDatabaseCrawlerCache accountCleanerAccountDatabaseCrawlerCache =

View File

@ -10,9 +10,16 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class DynamicMessagePersisterConfiguration {
@JsonProperty
private boolean persistenceEnabled = true;
private boolean serverPersistenceEnabled = true;
public boolean isPersistenceEnabled() {
return persistenceEnabled;
@JsonProperty
private boolean dedicatedProcessEnabled = false;
public boolean isServerPersistenceEnabled() {
return serverPersistenceEnabled;
}
public boolean isDedicatedProcessEnabled() {
return dedicatedProcessEnabled;
}
}

View File

@ -22,6 +22,7 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagePersisterConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
@ -34,7 +35,8 @@ public class MessagePersister implements Managed {
private final Duration persistDelay;
private final Thread[] workerThreads = new Thread[WORKER_THREAD_COUNT];
private final boolean dedicatedProcess;
private final Thread[] workerThreads;
private volatile boolean running;
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
@ -50,7 +52,7 @@ public class MessagePersister implements Managed {
private static final long EXCEPTION_PAUSE_MILLIS = Duration.ofSeconds(3).toMillis();
private static final int WORKER_THREAD_COUNT = 4;
private static final int DEFAULT_WORKER_THREAD_COUNT = 4;
private static final int CONSECUTIVE_EMPTY_CACHE_REMOVAL_LIMIT = 3;
@ -59,17 +61,20 @@ public class MessagePersister implements Managed {
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager,
final AccountsManager accountsManager,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final Duration persistDelay) {
final Duration persistDelay,
final Optional<Integer> dedicatedProcessWorkerThreadCount) {
this.messagesCache = messagesCache;
this.messagesManager = messagesManager;
this.accountsManager = accountsManager;
this.persistDelay = persistDelay;
this.workerThreads = dedicatedProcessWorkerThreadCount.map(Thread[]::new)
.orElseGet(() -> new Thread[DEFAULT_WORKER_THREAD_COUNT]);
this.dedicatedProcess = dedicatedProcessWorkerThreadCount.isPresent();
for (int i = 0; i < workerThreads.length; i++) {
workerThreads[i] = new Thread(() -> {
while (running) {
if (dynamicConfigurationManager.getConfiguration().getMessagePersisterConfiguration()
.isPersistenceEnabled()) {
if (enabled(dynamicConfigurationManager)) {
try {
final int queuesPersisted = persistNextQueues(Instant.now());
queueCountHistogram.update(queuesPersisted);
@ -89,6 +94,17 @@ public class MessagePersister implements Managed {
}
}
@VisibleForTesting
boolean enabled(final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
final DynamicMessagePersisterConfiguration messagePersisterConfiguration = dynamicConfigurationManager.getConfiguration()
.getMessagePersisterConfiguration();
if (dedicatedProcess) {
return messagePersisterConfiguration.isDedicatedProcessEnabled();
}
return messagePersisterConfiguration.isServerPersistenceEnabled();
}
@VisibleForTesting
Duration getPersistDelay() {
return persistDelay;

View File

@ -62,6 +62,7 @@ record CommandDependencies(
AccountsManager accountsManager,
ProfilesManager profilesManager,
ReportMessageManager reportMessageManager,
MessagesCache messagesCache,
MessagesManager messagesManager,
DeletedAccountsManager deletedAccountsManager,
StoredVerificationCodeManager pendingAccountsManager,
@ -204,6 +205,7 @@ record CommandDependencies(
accountsManager,
profilesManager,
reportMessageManager,
messagesCache,
messagesManager,
deletedAccountsManager,
pendingAccountsManager,

View File

@ -0,0 +1,66 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.setup.Environment;
import java.time.Duration;
import java.util.Optional;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.MessagePersister;
public class MessagePersisterServiceCommand extends EnvironmentCommand<WhisperServerConfiguration> {
private static final String WORKER_COUNT = "workers";
public MessagePersisterServiceCommand() {
super(new Application<>() {
@Override
public void run(WhisperServerConfiguration configuration, Environment environment) {
}
}, "message-persister-service",
"Starts a persistent service to persist undelivered messages from Redis to Dynamo DB");
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--workers")
.type(Integer.class)
.dest(WORKER_COUNT)
.required(true)
.help("The number of worker threads");
}
@Override
protected void run(Environment environment, Namespace namespace, WhisperServerConfiguration configuration)
throws Exception {
final CommandDependencies deps = CommandDependencies.build("message-persister-service", environment, configuration);
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = new DynamicConfigurationManager<>(
configuration.getAppConfig().getApplication(),
configuration.getAppConfig().getEnvironment(),
configuration.getAppConfig().getConfigurationName(),
DynamicConfiguration.class);
MessagePersister messagePersister = new MessagePersister(deps.messagesCache(), deps.messagesManager(),
deps.accountsManager(),
dynamicConfigurationManager,
Duration.ofMinutes(configuration.getMessageCacheConfiguration().getPersistDelayMinutes()),
Optional.of(namespace.getInt(WORKER_COUNT)));
environment.lifecycle().manage(deps.messagesCache());
environment.lifecycle().manage(messagePersister);
}
}

View File

@ -360,33 +360,37 @@ class DynamicConfigurationTest {
final DynamicConfiguration emptyConfig =
DynamicConfigurationManager.parseConfiguration(emptyConfigYaml, DynamicConfiguration.class).orElseThrow();
assertTrue(emptyConfig.getMessagePersisterConfiguration().isPersistenceEnabled());
assertTrue(emptyConfig.getMessagePersisterConfiguration().isServerPersistenceEnabled());
assertFalse(emptyConfig.getMessagePersisterConfiguration().isDedicatedProcessEnabled());
}
{
final String messagePersisterEnabledYaml = REQUIRED_CONFIG.concat("""
messagePersister:
persistenceEnabled: true
serverPersistenceEnabled: true
dedicatedProcessEnabled: true
""");
final DynamicConfiguration config =
DynamicConfigurationManager.parseConfiguration(messagePersisterEnabledYaml, DynamicConfiguration.class)
.orElseThrow();
assertTrue(config.getMessagePersisterConfiguration().isPersistenceEnabled());
assertTrue(config.getMessagePersisterConfiguration().isServerPersistenceEnabled());
assertTrue(config.getMessagePersisterConfiguration().isDedicatedProcessEnabled());
}
{
final String messagePersisterDisabledYaml = REQUIRED_CONFIG.concat("""
messagePersister:
persistenceEnabled: false
serverPersistenceEnabled: false
""");
final DynamicConfiguration config =
DynamicConfigurationManager.parseConfiguration(messagePersisterDisabledYaml, DynamicConfiguration.class)
.orElseThrow();
assertFalse(config.getMessagePersisterConfiguration().isPersistenceEnabled());
assertFalse(config.getMessagePersisterConfiguration().isServerPersistenceEnabled());
assertFalse(config.getMessagePersisterConfiguration().isDedicatedProcessEnabled());
}
}

View File

@ -14,7 +14,6 @@ import static org.mockito.Mockito.when;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.lettuce.core.cluster.SlotHash;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
@ -24,7 +23,6 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.RandomStringUtils;
@ -52,7 +50,6 @@ class MessagePersisterIntegrationTest {
private ExecutorService notificationExecutorService;
private Scheduler messageDeliveryScheduler;
private ExecutorService messageDeletionExecutorService;
private ScheduledExecutorService resubscribeRetryExecutorService;
private MessagesCache messagesCache;
private MessagesManager messagesManager;
private MessagePersister messagePersister;
@ -80,14 +77,13 @@ class MessagePersisterIntegrationTest {
final AccountsManager accountsManager = mock(AccountsManager.class);
notificationExecutorService = Executors.newSingleThreadExecutor();
resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor();
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), notificationExecutorService,
messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC());
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class),
messageDeletionExecutorService);
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, PERSIST_DELAY);
dynamicConfigurationManager, PERSIST_DELAY, Optional.empty());
account = mock(Account.class);
@ -182,12 +178,6 @@ class MessagePersisterIntegrationTest {
});
}
private static long extractServerTimestamp(byte[] bytes) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
bb.getLong();
return bb.getLong();
}
private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final long serverTimestamp) {
return MessageProtos.Envelope.newBuilder()
.setTimestamp(serverTimestamp * 2) // client timestamp may not be accurate

View File

@ -37,9 +37,12 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagePersisterConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import reactor.core.scheduler.Scheduler;
@ -69,7 +72,8 @@ class MessagePersisterTest {
void setUp() throws Exception {
messagesManager = mock(MessagesManager.class);
final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = mock(
DynamicConfigurationManager.class);
messagesDynamoDb = mock(MessagesDynamoDb.class);
accountsManager = mock(AccountsManager.class);
@ -87,7 +91,7 @@ class MessagePersisterTest {
REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler,
sharedExecutorService, Clock.systemUTC());
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, PERSIST_DELAY);
dynamicConfigurationManager, PERSIST_DELAY, Optional.empty());
doAnswer(invocation -> {
final UUID destinationUuid = invocation.getArgument(0);
@ -225,6 +229,31 @@ class MessagePersisterTest {
() -> messagePersister.persistQueue(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID)));
}
@ParameterizedTest
@CsvSource({
"true, true, false, false",
"true, false, true, true",
"false, true, false, true",
"false, false, true, false",
})
void testEnabled(final boolean dedicatedProcess, final boolean serverPersistenceEnabled,
final boolean dedicatedProcessEnabled, final boolean expectEnabled) {
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = mock(
DynamicConfigurationManager.class);
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
final DynamicMessagePersisterConfiguration dynamicMessagePersisterConfiguration = mock(
DynamicMessagePersisterConfiguration.class);
when(dynamicConfiguration.getMessagePersisterConfiguration()).thenReturn(dynamicMessagePersisterConfiguration);
when(dynamicMessagePersisterConfiguration.isDedicatedProcessEnabled()).thenReturn(dedicatedProcessEnabled);
when(dynamicMessagePersisterConfiguration.isServerPersistenceEnabled()).thenReturn(serverPersistenceEnabled);
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, PERSIST_DELAY, dedicatedProcess ? Optional.of(4) : Optional.empty());
assertEquals(expectEnabled, messagePersister.enabled(dynamicConfigurationManager));
}
@SuppressWarnings("SameParameterValue")
private static String generateRandomQueueNameForSlot(final int slot) {
final UUID uuid = UUID.randomUUID();
@ -263,4 +292,5 @@ class MessagePersisterTest {
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(
connection -> connection.sync().set(MessagesCache.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(nextSlot - 1)));
}
}