Always have 0 `ApnPushNotificationScheduler` worker threads in front-end service
This commit is contained in:
parent
f8fefe2e5e
commit
b81a0e99d4
|
@ -490,7 +490,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
APNSender apnSender = new APNSender(apnSenderExecutor, config.getApnConfiguration());
|
APNSender apnSender = new APNSender(apnSenderExecutor, config.getApnConfiguration());
|
||||||
FcmSender fcmSender = new FcmSender(fcmSenderExecutor, config.getFcmConfiguration().credentials().value());
|
FcmSender fcmSender = new FcmSender(fcmSenderExecutor, config.getFcmConfiguration().credentials().value());
|
||||||
ApnPushNotificationScheduler apnPushNotificationScheduler = new ApnPushNotificationScheduler(pushSchedulerCluster,
|
ApnPushNotificationScheduler apnPushNotificationScheduler = new ApnPushNotificationScheduler(pushSchedulerCluster,
|
||||||
apnSender, accountsManager, Optional.empty(), dynamicConfigurationManager);
|
apnSender, accountsManager, 0);
|
||||||
PushNotificationManager pushNotificationManager = new PushNotificationManager(accountsManager, apnSender, fcmSender,
|
PushNotificationManager pushNotificationManager = new PushNotificationManager(accountsManager, apnSender, fcmSender,
|
||||||
apnPushNotificationScheduler, pushLatencyManager);
|
apnPushNotificationScheduler, pushLatencyManager);
|
||||||
RateLimiters rateLimiters = RateLimiters.createAndValidate(config.getLimitsConfiguration(),
|
RateLimiters rateLimiters = RateLimiters.createAndValidate(config.getLimitsConfiguration(),
|
||||||
|
|
|
@ -47,11 +47,6 @@ public class DynamicConfiguration {
|
||||||
@Valid
|
@Valid
|
||||||
private DynamicTurnConfiguration turn = new DynamicTurnConfiguration();
|
private DynamicTurnConfiguration turn = new DynamicTurnConfiguration();
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
@Valid
|
|
||||||
DynamicScheduledApnNotificationSendingConfiguration scheduledApnNotificationSending = new DynamicScheduledApnNotificationSendingConfiguration(
|
|
||||||
true, false);
|
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
@Valid
|
@Valid
|
||||||
DynamicMessagePersisterConfiguration messagePersister = new DynamicMessagePersisterConfiguration();
|
DynamicMessagePersisterConfiguration messagePersister = new DynamicMessagePersisterConfiguration();
|
||||||
|
@ -94,10 +89,6 @@ public class DynamicConfiguration {
|
||||||
return turn;
|
return turn;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DynamicScheduledApnNotificationSendingConfiguration getScheduledApnNotificationSendingConfiguration() {
|
|
||||||
return scheduledApnNotificationSending;
|
|
||||||
}
|
|
||||||
|
|
||||||
public DynamicMessagePersisterConfiguration getMessagePersisterConfiguration() {
|
public DynamicMessagePersisterConfiguration getMessagePersisterConfiguration() {
|
||||||
return messagePersister;
|
return messagePersister;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2023 Signal Messenger, LLC
|
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.configuration.dynamic;
|
|
||||||
|
|
||||||
public record DynamicScheduledApnNotificationSendingConfiguration(boolean enabledForServer,
|
|
||||||
boolean enabledForDedicatedProcess) {
|
|
||||||
|
|
||||||
}
|
|
|
@ -33,13 +33,11 @@ import java.util.function.Consumer;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
|
||||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.Device;
|
import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
|
||||||
import org.whispersystems.textsecuregcm.util.Pair;
|
import org.whispersystems.textsecuregcm.util.Pair;
|
||||||
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
|
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
|
||||||
import org.whispersystems.textsecuregcm.util.Util;
|
import org.whispersystems.textsecuregcm.util.Util;
|
||||||
|
@ -75,10 +73,6 @@ public class ApnPushNotificationScheduler implements Managed {
|
||||||
private final ClusterLuaScript scheduleBackgroundNotificationScript;
|
private final ClusterLuaScript scheduleBackgroundNotificationScript;
|
||||||
|
|
||||||
private final Thread[] workerThreads;
|
private final Thread[] workerThreads;
|
||||||
private final boolean dedicatedProcess;
|
|
||||||
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
|
|
||||||
|
|
||||||
private static final int DEFAULT_WORKER_THREAD_COUNT = 4;
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static final Duration BACKGROUND_NOTIFICATION_PERIOD = Duration.ofMinutes(20);
|
static final Duration BACKGROUND_NOTIFICATION_PERIOD = Duration.ofMinutes(20);
|
||||||
|
@ -105,18 +99,6 @@ public class ApnPushNotificationScheduler implements Managed {
|
||||||
}
|
}
|
||||||
|
|
||||||
private long processNextSlot() {
|
private long processNextSlot() {
|
||||||
if (dedicatedProcess) {
|
|
||||||
if (!dynamicConfigurationManager.getConfiguration().getScheduledApnNotificationSendingConfiguration()
|
|
||||||
.enabledForDedicatedProcess()) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (!dynamicConfigurationManager.getConfiguration().getScheduledApnNotificationSendingConfiguration()
|
|
||||||
.enabledForServer()) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final int slot = (int) (pushSchedulingCluster.withCluster(connection ->
|
final int slot = (int) (pushSchedulingCluster.withCluster(connection ->
|
||||||
connection.sync().incr(NEXT_SLOT_TO_PROCESS_KEY)) % SlotHash.SLOT_COUNT);
|
connection.sync().incr(NEXT_SLOT_TO_PROCESS_KEY)) % SlotHash.SLOT_COUNT);
|
||||||
|
|
||||||
|
@ -181,11 +163,10 @@ public class ApnPushNotificationScheduler implements Managed {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ApnPushNotificationScheduler(FaultTolerantRedisCluster pushSchedulingCluster,
|
public ApnPushNotificationScheduler(FaultTolerantRedisCluster pushSchedulingCluster,
|
||||||
APNSender apnSender, AccountsManager accountsManager, final Optional<Integer> dedicatedProcessWorkerThreadCount,
|
APNSender apnSender, AccountsManager accountsManager, final int dedicatedProcessWorkerThreadCount)
|
||||||
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
this(pushSchedulingCluster, apnSender, accountsManager, Clock.systemUTC(), dedicatedProcessWorkerThreadCount,
|
this(pushSchedulingCluster, apnSender, accountsManager, Clock.systemUTC(), dedicatedProcessWorkerThreadCount);
|
||||||
dynamicConfigurationManager);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -193,8 +174,7 @@ public class ApnPushNotificationScheduler implements Managed {
|
||||||
APNSender apnSender,
|
APNSender apnSender,
|
||||||
AccountsManager accountsManager,
|
AccountsManager accountsManager,
|
||||||
Clock clock,
|
Clock clock,
|
||||||
Optional<Integer> dedicatedProcessThreadCount,
|
int dedicatedProcessThreadCount) throws IOException {
|
||||||
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) throws IOException {
|
|
||||||
|
|
||||||
this.apnSender = apnSender;
|
this.apnSender = apnSender;
|
||||||
this.accountsManager = accountsManager;
|
this.accountsManager = accountsManager;
|
||||||
|
@ -211,14 +191,11 @@ public class ApnPushNotificationScheduler implements Managed {
|
||||||
this.scheduleBackgroundNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster,
|
this.scheduleBackgroundNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster,
|
||||||
"lua/apn/schedule_background_notification.lua", ScriptOutputType.VALUE);
|
"lua/apn/schedule_background_notification.lua", ScriptOutputType.VALUE);
|
||||||
|
|
||||||
this.workerThreads = dedicatedProcessThreadCount.map(Thread[]::new)
|
this.workerThreads = new Thread[dedicatedProcessThreadCount];
|
||||||
.orElseGet(() -> new Thread[DEFAULT_WORKER_THREAD_COUNT]);
|
|
||||||
|
|
||||||
for (int i = 0; i < this.workerThreads.length; i++) {
|
for (int i = 0; i < this.workerThreads.length; i++) {
|
||||||
this.workerThreads[i] = new Thread(new NotificationWorker(), "ApnFallbackManagerWorker-" + i);
|
this.workerThreads[i] = new Thread(new NotificationWorker(), "ApnFallbackManagerWorker-" + i);
|
||||||
}
|
}
|
||||||
this.dedicatedProcess = dedicatedProcessThreadCount.isPresent();
|
|
||||||
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -10,17 +10,14 @@ import static com.codahale.metrics.MetricRegistry.name;
|
||||||
import io.dropwizard.Application;
|
import io.dropwizard.Application;
|
||||||
import io.dropwizard.cli.ServerCommand;
|
import io.dropwizard.cli.ServerCommand;
|
||||||
import io.dropwizard.setup.Environment;
|
import io.dropwizard.setup.Environment;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import net.sourceforge.argparse4j.inf.Namespace;
|
import net.sourceforge.argparse4j.inf.Namespace;
|
||||||
import net.sourceforge.argparse4j.inf.Subparser;
|
import net.sourceforge.argparse4j.inf.Subparser;
|
||||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
|
||||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||||
import org.whispersystems.textsecuregcm.push.APNSender;
|
import org.whispersystems.textsecuregcm.push.APNSender;
|
||||||
import org.whispersystems.textsecuregcm.push.ApnPushNotificationScheduler;
|
import org.whispersystems.textsecuregcm.push.ApnPushNotificationScheduler;
|
||||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
|
||||||
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
|
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
|
||||||
|
|
||||||
public class ScheduledApnPushNotificationSenderServiceCommand extends ServerCommand<WhisperServerConfiguration> {
|
public class ScheduledApnPushNotificationSenderServiceCommand extends ServerCommand<WhisperServerConfiguration> {
|
||||||
|
@ -63,18 +60,9 @@ public class ScheduledApnPushNotificationSenderServiceCommand extends ServerComm
|
||||||
final ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d"))
|
final ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d"))
|
||||||
.maxThreads(1).minThreads(1).build();
|
.maxThreads(1).minThreads(1).build();
|
||||||
|
|
||||||
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = new DynamicConfigurationManager<>(
|
|
||||||
configuration.getAppConfig().getApplication(),
|
|
||||||
configuration.getAppConfig().getEnvironment(),
|
|
||||||
configuration.getAppConfig().getConfigurationName(),
|
|
||||||
DynamicConfiguration.class);
|
|
||||||
|
|
||||||
dynamicConfigurationManager.start();
|
|
||||||
|
|
||||||
final APNSender apnSender = new APNSender(apnSenderExecutor, configuration.getApnConfiguration());
|
final APNSender apnSender = new APNSender(apnSenderExecutor, configuration.getApnConfiguration());
|
||||||
final ApnPushNotificationScheduler apnPushNotificationScheduler = new ApnPushNotificationScheduler(
|
final ApnPushNotificationScheduler apnPushNotificationScheduler = new ApnPushNotificationScheduler(
|
||||||
pushSchedulerCluster, apnSender, deps.accountsManager(), Optional.of(namespace.getInt(WORKER_COUNT)),
|
pushSchedulerCluster, apnSender, deps.accountsManager(), namespace.getInt(WORKER_COUNT));
|
||||||
dynamicConfigurationManager);
|
|
||||||
|
|
||||||
environment.lifecycle().manage(apnSender);
|
environment.lifecycle().manage(apnSender);
|
||||||
environment.lifecycle().manage(apnPushNotificationScheduler);
|
environment.lifecycle().manage(apnPushNotificationScheduler);
|
||||||
|
|
|
@ -31,13 +31,11 @@ import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.CsvSource;
|
import org.junit.jupiter.params.provider.CsvSource;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicScheduledApnNotificationSendingConfiguration;
|
|
||||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||||
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
|
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.Device;
|
import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
|
||||||
import org.whispersystems.textsecuregcm.util.Pair;
|
import org.whispersystems.textsecuregcm.util.Pair;
|
||||||
import org.whispersystems.textsecuregcm.util.TestClock;
|
import org.whispersystems.textsecuregcm.util.TestClock;
|
||||||
|
|
||||||
|
@ -81,17 +79,8 @@ class ApnPushNotificationSchedulerTest {
|
||||||
apnSender = mock(APNSender.class);
|
apnSender = mock(APNSender.class);
|
||||||
clock = TestClock.now();
|
clock = TestClock.now();
|
||||||
|
|
||||||
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = mock(
|
|
||||||
DynamicConfigurationManager.class);
|
|
||||||
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
|
|
||||||
final DynamicScheduledApnNotificationSendingConfiguration scheduledApnNotificationSendingConfiguration = new DynamicScheduledApnNotificationSendingConfiguration(
|
|
||||||
true, true);
|
|
||||||
when(dynamicConfiguration.getScheduledApnNotificationSendingConfiguration()).thenReturn(
|
|
||||||
scheduledApnNotificationSendingConfiguration);
|
|
||||||
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
|
|
||||||
|
|
||||||
apnPushNotificationScheduler = new ApnPushNotificationScheduler(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
|
apnPushNotificationScheduler = new ApnPushNotificationScheduler(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
|
||||||
apnSender, accountsManager, clock, Optional.empty(), dynamicConfigurationManager);
|
apnSender, accountsManager, clock, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -236,30 +225,21 @@ class ApnPushNotificationSchedulerTest {
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@CsvSource({
|
@CsvSource({
|
||||||
"true, false, true, true",
|
"1, true",
|
||||||
"true, true, false, false",
|
"0, false",
|
||||||
"false, true, false, true",
|
|
||||||
"false, false, true, false"
|
|
||||||
})
|
})
|
||||||
void testDedicatedProcessDynamicConfiguration(final boolean dedicatedProcess, final boolean enabledForServer,
|
void testDedicatedProcessDynamicConfiguration(final int dedicatedThreadCount, final boolean expectActivity)
|
||||||
final boolean enabledForDedicatedProcess, final boolean expectActivity) throws Exception {
|
throws Exception {
|
||||||
|
|
||||||
final FaultTolerantRedisCluster redisCluster = mock(FaultTolerantRedisCluster.class);
|
final FaultTolerantRedisCluster redisCluster = mock(FaultTolerantRedisCluster.class);
|
||||||
when(redisCluster.withCluster(any())).thenReturn(0L);
|
when(redisCluster.withCluster(any())).thenReturn(0L);
|
||||||
|
|
||||||
final AccountsManager accountsManager = mock(AccountsManager.class);
|
final AccountsManager accountsManager = mock(AccountsManager.class);
|
||||||
|
|
||||||
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = mock(
|
|
||||||
DynamicConfigurationManager.class);
|
|
||||||
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
|
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
|
||||||
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
|
|
||||||
final DynamicScheduledApnNotificationSendingConfiguration scheduledApnNotificationSendingConfiguration = new DynamicScheduledApnNotificationSendingConfiguration(
|
|
||||||
enabledForServer, enabledForDedicatedProcess);
|
|
||||||
when(dynamicConfiguration.getScheduledApnNotificationSendingConfiguration()).thenReturn(
|
|
||||||
scheduledApnNotificationSendingConfiguration);
|
|
||||||
|
|
||||||
apnPushNotificationScheduler = new ApnPushNotificationScheduler(redisCluster, apnSender,
|
apnPushNotificationScheduler = new ApnPushNotificationScheduler(redisCluster, apnSender,
|
||||||
accountsManager, dedicatedProcess ? Optional.of(4) : Optional.empty(), dynamicConfigurationManager);
|
accountsManager, dedicatedThreadCount);
|
||||||
|
|
||||||
apnPushNotificationScheduler.start();
|
apnPushNotificationScheduler.start();
|
||||||
apnPushNotificationScheduler.stop();
|
apnPushNotificationScheduler.stop();
|
||||||
|
|
Loading…
Reference in New Issue