Remove `MessagePersister` from WhisperServerService environment

Persistence is now exclusively done by a separate command.
This commit is contained in:
Chris Eager 2023-06-23 12:22:08 -05:00 committed by Chris Eager
parent b81a0e99d4
commit c93af9e31e
7 changed files with 16 additions and 74 deletions

View File

@ -163,7 +163,6 @@ import org.whispersystems.textsecuregcm.storage.DeletedAccounts;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.IssuedReceiptsManager; import org.whispersystems.textsecuregcm.storage.IssuedReceiptsManager;
import org.whispersystems.textsecuregcm.storage.KeysManager; import org.whispersystems.textsecuregcm.storage.KeysManager;
import org.whispersystems.textsecuregcm.storage.MessagePersister;
import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb; import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.MessagesManager;
@ -547,9 +546,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
RateLimitChallengeManager rateLimitChallengeManager = new RateLimitChallengeManager(pushChallengeManager, RateLimitChallengeManager rateLimitChallengeManager = new RateLimitChallengeManager(pushChallengeManager,
captchaChecker, rateLimiters); captchaChecker, rateLimiters);
MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()),
Optional.empty());
ChangeNumberManager changeNumberManager = new ChangeNumberManager(messageSender, accountsManager); ChangeNumberManager changeNumberManager = new ChangeNumberManager(messageSender, accountsManager);
HttpClient currencyClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).connectTimeout(Duration.ofSeconds(10)).build(); HttpClient currencyClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).connectTimeout(Duration.ofSeconds(10)).build();
@ -562,7 +558,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(apnPushNotificationScheduler); environment.lifecycle().manage(apnPushNotificationScheduler);
environment.lifecycle().manage(provisioningManager); environment.lifecycle().manage(provisioningManager);
environment.lifecycle().manage(messagesCache); environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(messagePersister);
environment.lifecycle().manage(clientPresenceManager); environment.lifecycle().manage(clientPresenceManager);
environment.lifecycle().manage(currencyManager); environment.lifecycle().manage(currencyManager);
environment.lifecycle().manage(registrationServiceClient); environment.lifecycle().manage(registrationServiceClient);

View File

@ -10,16 +10,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class DynamicMessagePersisterConfiguration { public class DynamicMessagePersisterConfiguration {
@JsonProperty @JsonProperty
private boolean serverPersistenceEnabled = true; private boolean persistenceEnabled = true;
@JsonProperty public boolean isPersistenceEnabled() {
private boolean dedicatedProcessEnabled = false; return persistenceEnabled;
public boolean isServerPersistenceEnabled() {
return serverPersistenceEnabled;
}
public boolean isDedicatedProcessEnabled() {
return dedicatedProcessEnabled;
} }
} }

View File

@ -22,7 +22,6 @@ import java.util.UUID;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagePersisterConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.util.Util;
@ -52,8 +51,6 @@ public class MessagePersister implements Managed {
private static final long EXCEPTION_PAUSE_MILLIS = Duration.ofSeconds(3).toMillis(); private static final long EXCEPTION_PAUSE_MILLIS = Duration.ofSeconds(3).toMillis();
private static final int DEFAULT_WORKER_THREAD_COUNT = 4;
private static final int CONSECUTIVE_EMPTY_CACHE_REMOVAL_LIMIT = 3; private static final int CONSECUTIVE_EMPTY_CACHE_REMOVAL_LIMIT = 3;
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class); private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
@ -62,19 +59,19 @@ public class MessagePersister implements Managed {
final AccountsManager accountsManager, final AccountsManager accountsManager,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager, final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final Duration persistDelay, final Duration persistDelay,
final Optional<Integer> dedicatedProcessWorkerThreadCount) { final int dedicatedProcessWorkerThreadCount) {
this.messagesCache = messagesCache; this.messagesCache = messagesCache;
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
this.accountsManager = accountsManager; this.accountsManager = accountsManager;
this.persistDelay = persistDelay; this.persistDelay = persistDelay;
this.workerThreads = dedicatedProcessWorkerThreadCount.map(Thread[]::new) this.workerThreads = new Thread[dedicatedProcessWorkerThreadCount];
.orElseGet(() -> new Thread[DEFAULT_WORKER_THREAD_COUNT]); this.dedicatedProcess = true;
this.dedicatedProcess = dedicatedProcessWorkerThreadCount.isPresent();
for (int i = 0; i < workerThreads.length; i++) { for (int i = 0; i < workerThreads.length; i++) {
workerThreads[i] = new Thread(() -> { workerThreads[i] = new Thread(() -> {
while (running) { while (running) {
if (enabled(dynamicConfigurationManager)) { if (dynamicConfigurationManager.getConfiguration().getMessagePersisterConfiguration()
.isPersistenceEnabled()) {
try { try {
final int queuesPersisted = persistNextQueues(Instant.now()); final int queuesPersisted = persistNextQueues(Instant.now());
queueCountHistogram.update(queuesPersisted); queueCountHistogram.update(queuesPersisted);
@ -94,17 +91,6 @@ public class MessagePersister implements Managed {
} }
} }
@VisibleForTesting
boolean enabled(final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
final DynamicMessagePersisterConfiguration messagePersisterConfiguration = dynamicConfigurationManager.getConfiguration()
.getMessagePersisterConfiguration();
if (dedicatedProcess) {
return messagePersisterConfiguration.isDedicatedProcessEnabled();
}
return messagePersisterConfiguration.isServerPersistenceEnabled();
}
@VisibleForTesting @VisibleForTesting
Duration getPersistDelay() { Duration getPersistDelay() {
return persistDelay; return persistDelay;

View File

@ -9,7 +9,6 @@ import io.dropwizard.Application;
import io.dropwizard.cli.ServerCommand; import io.dropwizard.cli.ServerCommand;
import io.dropwizard.setup.Environment; import io.dropwizard.setup.Environment;
import java.time.Duration; import java.time.Duration;
import java.util.Optional;
import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser; import net.sourceforge.argparse4j.inf.Subparser;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
@ -65,7 +64,7 @@ public class MessagePersisterServiceCommand extends ServerCommand<WhisperServerC
deps.accountsManager(), deps.accountsManager(),
dynamicConfigurationManager, dynamicConfigurationManager,
Duration.ofMinutes(configuration.getMessageCacheConfiguration().getPersistDelayMinutes()), Duration.ofMinutes(configuration.getMessageCacheConfiguration().getPersistDelayMinutes()),
Optional.of(namespace.getInt(WORKER_COUNT))); namespace.getInt(WORKER_COUNT));
environment.lifecycle().manage(deps.messagesCache()); environment.lifecycle().manage(deps.messagesCache());
environment.lifecycle().manage(messagePersister); environment.lifecycle().manage(messagePersister);

View File

@ -359,14 +359,13 @@ class DynamicConfigurationTest {
final DynamicConfiguration emptyConfig = final DynamicConfiguration emptyConfig =
DynamicConfigurationManager.parseConfiguration(emptyConfigYaml, DynamicConfiguration.class).orElseThrow(); DynamicConfigurationManager.parseConfiguration(emptyConfigYaml, DynamicConfiguration.class).orElseThrow();
assertTrue(emptyConfig.getMessagePersisterConfiguration().isServerPersistenceEnabled()); assertTrue(emptyConfig.getMessagePersisterConfiguration().isPersistenceEnabled());
assertFalse(emptyConfig.getMessagePersisterConfiguration().isDedicatedProcessEnabled());
} }
{ {
final String messagePersisterEnabledYaml = REQUIRED_CONFIG.concat(""" final String messagePersisterEnabledYaml = REQUIRED_CONFIG.concat("""
messagePersister: messagePersister:
serverPersistenceEnabled: true persistenceEnabled: true
dedicatedProcessEnabled: true dedicatedProcessEnabled: true
"""); """);
@ -374,22 +373,20 @@ class DynamicConfigurationTest {
DynamicConfigurationManager.parseConfiguration(messagePersisterEnabledYaml, DynamicConfiguration.class) DynamicConfigurationManager.parseConfiguration(messagePersisterEnabledYaml, DynamicConfiguration.class)
.orElseThrow(); .orElseThrow();
assertTrue(config.getMessagePersisterConfiguration().isServerPersistenceEnabled()); assertTrue(config.getMessagePersisterConfiguration().isPersistenceEnabled());
assertTrue(config.getMessagePersisterConfiguration().isDedicatedProcessEnabled());
} }
{ {
final String messagePersisterDisabledYaml = REQUIRED_CONFIG.concat(""" final String messagePersisterDisabledYaml = REQUIRED_CONFIG.concat("""
messagePersister: messagePersister:
serverPersistenceEnabled: false persistenceEnabled: false
"""); """);
final DynamicConfiguration config = final DynamicConfiguration config =
DynamicConfigurationManager.parseConfiguration(messagePersisterDisabledYaml, DynamicConfiguration.class) DynamicConfigurationManager.parseConfiguration(messagePersisterDisabledYaml, DynamicConfiguration.class)
.orElseThrow(); .orElseThrow();
assertFalse(config.getMessagePersisterConfiguration().isServerPersistenceEnabled()); assertFalse(config.getMessagePersisterConfiguration().isPersistenceEnabled());
assertFalse(config.getMessagePersisterConfiguration().isDedicatedProcessEnabled());
} }
} }

View File

@ -83,7 +83,7 @@ class MessagePersisterIntegrationTest {
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class), messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class),
messageDeletionExecutorService); messageDeletionExecutorService);
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, PERSIST_DELAY, Optional.empty()); dynamicConfigurationManager, PERSIST_DELAY, 1);
account = mock(Account.class); account = mock(Account.class);

View File

@ -37,12 +37,9 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagePersisterConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
@ -91,7 +88,7 @@ class MessagePersisterTest {
REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler,
sharedExecutorService, Clock.systemUTC()); sharedExecutorService, Clock.systemUTC());
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, PERSIST_DELAY, Optional.empty()); dynamicConfigurationManager, PERSIST_DELAY, 1);
doAnswer(invocation -> { doAnswer(invocation -> {
final UUID destinationUuid = invocation.getArgument(0); final UUID destinationUuid = invocation.getArgument(0);
@ -229,31 +226,6 @@ class MessagePersisterTest {
() -> messagePersister.persistQueue(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID))); () -> messagePersister.persistQueue(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID)));
} }
@ParameterizedTest
@CsvSource({
"true, true, false, false",
"true, false, true, true",
"false, true, false, true",
"false, false, true, false",
})
void testEnabled(final boolean dedicatedProcess, final boolean serverPersistenceEnabled,
final boolean dedicatedProcessEnabled, final boolean expectEnabled) {
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = mock(
DynamicConfigurationManager.class);
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
final DynamicMessagePersisterConfiguration dynamicMessagePersisterConfiguration = mock(
DynamicMessagePersisterConfiguration.class);
when(dynamicConfiguration.getMessagePersisterConfiguration()).thenReturn(dynamicMessagePersisterConfiguration);
when(dynamicMessagePersisterConfiguration.isDedicatedProcessEnabled()).thenReturn(dedicatedProcessEnabled);
when(dynamicMessagePersisterConfiguration.isServerPersistenceEnabled()).thenReturn(serverPersistenceEnabled);
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, PERSIST_DELAY, dedicatedProcess ? Optional.of(4) : Optional.empty());
assertEquals(expectEnabled, messagePersister.enabled(dynamicConfigurationManager));
}
@SuppressWarnings("SameParameterValue") @SuppressWarnings("SameParameterValue")
private static String generateRandomQueueNameForSlot(final int slot) { private static String generateRandomQueueNameForSlot(final int slot) {
final UUID uuid = UUID.randomUUID(); final UUID uuid = UUID.randomUUID();