From 6043c1a4e8518716cd87ad7ac4dceae8287aca2f Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Wed, 10 May 2023 15:32:23 -0500 Subject: [PATCH] Add `ScheduledApnPushNotificationSenderServiceCommand` --- .../textsecuregcm/WhisperServerService.java | 15 ++-- .../dynamic/DynamicConfiguration.java | 9 +++ ...edApnNotificationSendingConfiguration.java | 11 +++ .../push/ApnPushNotificationScheduler.java | 49 +++++++++--- ...nPushNotificationSenderServiceCommand.java | 75 +++++++++++++++++++ .../ApnPushNotificationSchedulerTest.java | 60 ++++++++++++++- 6 files changed, 201 insertions(+), 18 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicScheduledApnNotificationSendingConfiguration.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/workers/ScheduledApnPushNotificationSenderServiceCommand.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index d1a1b5c9d..58f47ce93 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -41,6 +41,7 @@ import java.time.Duration; import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Optional; import java.util.ServiceLoader; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -219,6 +220,7 @@ import org.whispersystems.textsecuregcm.workers.CertificateCommand; import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand; import org.whispersystems.textsecuregcm.workers.CrawlAccountsCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; +import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSenderServiceCommand; import org.whispersystems.textsecuregcm.workers.ServerVersionCommand; import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask; import org.whispersystems.textsecuregcm.workers.SetUserDiscoverabilityCommand; @@ -266,6 +268,7 @@ public class WhisperServerService extends Application dynamicConfigurationManager; - private static final int WORKER_THREAD_COUNT = 4; + private static final int DEFAULT_WORKER_THREAD_COUNT = 4; @VisibleForTesting static final Duration BACKGROUND_NOTIFICATION_PERIOD = Duration.ofMinutes(20); @@ -102,6 +105,18 @@ public class ApnPushNotificationScheduler implements Managed { } private long processNextSlot() { + if (dedicatedProcess) { + if (!dynamicConfigurationManager.getConfiguration().getScheduledApnNotificationSendingConfiguration() + .enabledForDedicatedProcess()) { + return 0; + } + } else { + if (!dynamicConfigurationManager.getConfiguration().getScheduledApnNotificationSendingConfiguration() + .enabledForServer()) { + return 0; + } + } + final int slot = (int) (pushSchedulingCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PROCESS_KEY)) % SlotHash.SLOT_COUNT); @@ -166,32 +181,44 @@ public class ApnPushNotificationScheduler implements Managed { } public ApnPushNotificationScheduler(FaultTolerantRedisCluster pushSchedulingCluster, - APNSender apnSender, - AccountsManager accountsManager) throws IOException { + APNSender apnSender, AccountsManager accountsManager, final Optional dedicatedProcessWorkerThreadCount, + DynamicConfigurationManager dynamicConfigurationManager) throws IOException { - this(pushSchedulingCluster, apnSender, accountsManager, Clock.systemUTC()); + this(pushSchedulingCluster, apnSender, accountsManager, Clock.systemUTC(), dedicatedProcessWorkerThreadCount, + dynamicConfigurationManager); } @VisibleForTesting ApnPushNotificationScheduler(FaultTolerantRedisCluster pushSchedulingCluster, APNSender apnSender, AccountsManager accountsManager, - Clock clock) throws IOException { + Clock clock, + Optional dedicatedProcessThreadCount, + DynamicConfigurationManager dynamicConfigurationManager) throws IOException { this.apnSender = apnSender; this.accountsManager = accountsManager; this.pushSchedulingCluster = pushSchedulingCluster; this.clock = clock; - this.getPendingVoipDestinationsScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/get.lua", ScriptOutputType.MULTI); - this.insertPendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/insert.lua", ScriptOutputType.VALUE); - this.removePendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/remove.lua", ScriptOutputType.INTEGER); + this.getPendingVoipDestinationsScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/get.lua", + ScriptOutputType.MULTI); + this.insertPendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/insert.lua", + ScriptOutputType.VALUE); + this.removePendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/remove.lua", + ScriptOutputType.INTEGER); - this.scheduleBackgroundNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/schedule_background_notification.lua", ScriptOutputType.VALUE); + this.scheduleBackgroundNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, + "lua/apn/schedule_background_notification.lua", ScriptOutputType.VALUE); + + this.workerThreads = dedicatedProcessThreadCount.map(Thread[]::new) + .orElseGet(() -> new Thread[DEFAULT_WORKER_THREAD_COUNT]); for (int i = 0; i < this.workerThreads.length; i++) { this.workerThreads[i] = new Thread(new NotificationWorker(), "ApnFallbackManagerWorker-" + i); } + this.dedicatedProcess = dedicatedProcessThreadCount.isPresent(); + this.dynamicConfigurationManager = dynamicConfigurationManager; } /** diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ScheduledApnPushNotificationSenderServiceCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ScheduledApnPushNotificationSenderServiceCommand.java new file mode 100644 index 000000000..0ee699e60 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ScheduledApnPushNotificationSenderServiceCommand.java @@ -0,0 +1,75 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.workers; + +import static com.codahale.metrics.MetricRegistry.name; + +import io.dropwizard.Application; +import io.dropwizard.cli.EnvironmentCommand; +import io.dropwizard.setup.Environment; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.push.APNSender; +import org.whispersystems.textsecuregcm.push.ApnPushNotificationScheduler; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; + +public class ScheduledApnPushNotificationSenderServiceCommand extends EnvironmentCommand { + + private static final String WORKER_COUNT = "workers"; + + public ScheduledApnPushNotificationSenderServiceCommand() { + super(new Application<>() { + @Override + public void run(WhisperServerConfiguration configuration, Environment environment) { + + } + }, "scheduled-apn-push-notification-sender-service", + "Starts a persistent service to send scheduled APNs push notifications"); + } + + @Override + public void configure(final Subparser subparser) { + super.configure(subparser); + subparser.addArgument("--workers") + .type(Integer.class) + .dest(WORKER_COUNT) + .required(true) + .help("The number of worker threads"); + } + + @Override + protected void run(Environment environment, Namespace namespace, WhisperServerConfiguration configuration) + throws Exception { + + final CommandDependencies deps = CommandDependencies.build("scheduled-apn-sender", environment, configuration); + + final FaultTolerantRedisCluster pushSchedulerCluster = new FaultTolerantRedisCluster("push_scheduler", + configuration.getPushSchedulerCluster(), deps.redisClusterClientResources()); + + final ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")) + .maxThreads(1).minThreads(1).build(); + + final DynamicConfigurationManager dynamicConfigurationManager = new DynamicConfigurationManager<>( + configuration.getAppConfig().getApplication(), + configuration.getAppConfig().getEnvironment(), + configuration.getAppConfig().getConfigurationName(), + DynamicConfiguration.class); + + final APNSender apnSender = new APNSender(apnSenderExecutor, configuration.getApnConfiguration()); + final ApnPushNotificationScheduler apnPushNotificationScheduler = new ApnPushNotificationScheduler( + pushSchedulerCluster, apnSender, deps.accountsManager(), Optional.of(namespace.getInt(WORKER_COUNT)), + dynamicConfigurationManager); + + environment.lifecycle().manage(apnSender); + environment.lifecycle().manage(apnPushNotificationScheduler); + } + +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationSchedulerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationSchedulerTest.java index 906bbed3d..31511a4d6 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationSchedulerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationSchedulerTest.java @@ -9,13 +9,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import io.lettuce.core.cluster.SlotHash; -import java.time.Clock; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; @@ -26,11 +27,17 @@ import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicScheduledApnNotificationSendingConfiguration; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.TestClock; @@ -74,7 +81,17 @@ class ApnPushNotificationSchedulerTest { apnSender = mock(APNSender.class); clock = TestClock.now(); - apnPushNotificationScheduler = new ApnPushNotificationScheduler(REDIS_CLUSTER_EXTENSION.getRedisCluster(), apnSender, accountsManager, clock); + final DynamicConfigurationManager dynamicConfigurationManager = mock( + DynamicConfigurationManager.class); + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + final DynamicScheduledApnNotificationSendingConfiguration scheduledApnNotificationSendingConfiguration = new DynamicScheduledApnNotificationSendingConfiguration( + true, true); + when(dynamicConfiguration.getScheduledApnNotificationSendingConfiguration()).thenReturn( + scheduledApnNotificationSendingConfiguration); + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + + apnPushNotificationScheduler = new ApnPushNotificationScheduler(REDIS_CLUSTER_EXTENSION.getRedisCluster(), + apnSender, accountsManager, clock, Optional.empty(), dynamicConfigurationManager); } @Test @@ -216,4 +233,43 @@ class ApnPushNotificationSchedulerTest { verify(apnSender, never()).sendNotification(any()); } + + @ParameterizedTest + @CsvSource({ + "true, false, true, true", + "true, true, false, false", + "false, true, false, true", + "false, false, true, false" + }) + void testDedicatedProcessDynamicConfiguration(final boolean dedicatedProcess, final boolean enabledForServer, + final boolean enabledForDedicatedProcess, final boolean expectActivity) throws Exception { + + final FaultTolerantRedisCluster redisCluster = mock(FaultTolerantRedisCluster.class); + when(redisCluster.withCluster(any())).thenReturn(0L); + + final AccountsManager accountsManager = mock(AccountsManager.class); + + final DynamicConfigurationManager dynamicConfigurationManager = mock( + DynamicConfigurationManager.class); + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + final DynamicScheduledApnNotificationSendingConfiguration scheduledApnNotificationSendingConfiguration = new DynamicScheduledApnNotificationSendingConfiguration( + enabledForServer, enabledForDedicatedProcess); + when(dynamicConfiguration.getScheduledApnNotificationSendingConfiguration()).thenReturn( + scheduledApnNotificationSendingConfiguration); + + apnPushNotificationScheduler = new ApnPushNotificationScheduler(redisCluster, apnSender, + accountsManager, dedicatedProcess ? Optional.of(4) : Optional.empty(), dynamicConfigurationManager); + + apnPushNotificationScheduler.start(); + apnPushNotificationScheduler.stop(); + + if (expectActivity) { + verify(redisCluster, atLeastOnce()).withCluster(any()); + } else { + verifyNoInteractions(redisCluster); + verifyNoInteractions(accountsManager); + verifyNoInteractions(apnSender); + } + } }