Manage some unmanaged executors

This commit is contained in:
Ravi Khadiwala 2024-05-03 14:09:55 -05:00 committed by ravi-signal
parent fc097db2a0
commit 7aff81547a
8 changed files with 54 additions and 13 deletions

View File

@ -43,6 +43,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Stream; import java.util.stream.Stream;
import javax.servlet.DispatcherType; import javax.servlet.DispatcherType;
@ -463,7 +464,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.scheduledExecutorService(name(getClass(), "storageServiceRetry-%d")).threads(1).build(); .scheduledExecutorService(name(getClass(), "storageServiceRetry-%d")).threads(1).build();
ScheduledExecutorService hcaptchaRetryExecutor = environment.lifecycle() ScheduledExecutorService hcaptchaRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "hCaptchaRetry-%d")).threads(1).build(); .scheduledExecutorService(name(getClass(), "hCaptchaRetry-%d")).threads(1).build();
ScheduledExecutorService remoteStorageExecutor = environment.lifecycle() ScheduledExecutorService remoteStorageRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "remoteStorageRetry-%d")).threads(1).build(); .scheduledExecutorService(name(getClass(), "remoteStorageRetry-%d")).threads(1).build();
ScheduledExecutorService registrationIdentityTokenRefreshExecutor = environment.lifecycle() ScheduledExecutorService registrationIdentityTokenRefreshExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "registrationIdentityTokenRefresh-%d")).threads(1).build(); .scheduledExecutorService(name(getClass(), "registrationIdentityTokenRefresh-%d")).threads(1).build();
@ -512,6 +513,23 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.minThreads(8) .minThreads(8)
.maxThreads(8) .maxThreads(8)
.build(); .build();
// unbounded executor (same as cachedThreadPool)
ExecutorService hcaptchaHttpExecutor = environment.lifecycle()
.executorService(name(getClass(), "hcaptcha-%d"))
.minThreads(0)
.maxThreads(Integer.MAX_VALUE)
.workQueue(new SynchronousQueue<>())
.keepAliveTime(io.dropwizard.util.Duration.seconds(60L))
.build();
// unbounded executor (same as cachedThreadPool)
ExecutorService remoteStorageHttpExecutor = environment.lifecycle()
.executorService(name(getClass(), "remoteStorage-%d"))
.minThreads(0)
.maxThreads(Integer.MAX_VALUE)
.workQueue(new SynchronousQueue<>())
.keepAliveTime(io.dropwizard.util.Duration.seconds(60L))
.build();
ScheduledExecutorService subscriptionProcessorRetryExecutor = environment.lifecycle() ScheduledExecutorService subscriptionProcessorRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "subscriptionProcessorRetry-%d")).threads(1).build(); .scheduledExecutorService(name(getClass(), "subscriptionProcessorRetry-%d")).threads(1).build();
@ -613,7 +631,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getMessageByteLimitCardinalityEstimator().period()); config.getMessageByteLimitCardinalityEstimator().period());
HCaptchaClient hCaptchaClient = config.getHCaptchaConfiguration() HCaptchaClient hCaptchaClient = config.getHCaptchaConfiguration()
.build(hcaptchaRetryExecutor, dynamicConfigurationManager); .build(hcaptchaRetryExecutor, hcaptchaHttpExecutor, dynamicConfigurationManager);
HttpClient shortCodeRetrieverHttpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2) HttpClient shortCodeRetrieverHttpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2)
.connectTimeout(Duration.ofSeconds(10)).build(); .connectTimeout(Duration.ofSeconds(10)).build();
ShortCodeExpander shortCodeRetriever = new ShortCodeExpander(shortCodeRetrieverHttpClient, config.getShortCodeRetrieverConfiguration().baseUrl()); ShortCodeExpander shortCodeRetriever = new ShortCodeExpander(shortCodeRetrieverHttpClient, config.getShortCodeRetrieverConfiguration().baseUrl());
@ -697,13 +715,17 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
dynamoDbAsyncClient, dynamoDbAsyncClient,
config.getDynamoDbTables().getBackups().getTableName(), config.getDynamoDbTables().getBackups().getTableName(),
clock); clock);
final Cdn3RemoteStorageManager cdn3RemoteStorageManager = new Cdn3RemoteStorageManager(
remoteStorageHttpExecutor,
remoteStorageRetryExecutor,
config.getCdn3StorageManagerConfiguration());
BackupManager backupManager = new BackupManager( BackupManager backupManager = new BackupManager(
backupsDb, backupsDb,
backupsGenericZkSecretParams, backupsGenericZkSecretParams,
rateLimiters, rateLimiters,
tusAttachmentGenerator, tusAttachmentGenerator,
cdn3BackupCredentialGenerator, cdn3BackupCredentialGenerator,
new Cdn3RemoteStorageManager(remoteStorageExecutor, config.getCdn3StorageManagerConfiguration()), cdn3RemoteStorageManager,
clock); clock);
final DynamicConfigTurnRouter configTurnRouter = new DynamicConfigTurnRouter(dynamicConfigurationManager); final DynamicConfigTurnRouter configTurnRouter = new DynamicConfigTurnRouter(dynamicConfigurationManager);

View File

@ -18,6 +18,7 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -55,6 +56,7 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
private static final String STATUS_TAG_NAME = "status"; private static final String STATUS_TAG_NAME = "status";
public Cdn3RemoteStorageManager( public Cdn3RemoteStorageManager(
final ExecutorService httpExecutor,
final ScheduledExecutorService retryExecutor, final ScheduledExecutorService retryExecutor,
final Cdn3StorageManagerConfiguration configuration) { final Cdn3StorageManagerConfiguration configuration) {
@ -67,7 +69,7 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
this.storageManagerHttpClient = FaultTolerantHttpClient.newBuilder() this.storageManagerHttpClient = FaultTolerantHttpClient.newBuilder()
.withName("cdn3-storage-manager") .withName("cdn3-storage-manager")
.withCircuitBreaker(configuration.circuitBreaker()) .withCircuitBreaker(configuration.circuitBreaker())
.withExecutor(Executors.newCachedThreadPool()) .withExecutor(httpExecutor)
.withRetryExecutor(retryExecutor) .withRetryExecutor(retryExecutor)
.withRetry(configuration.retry()) .withRetry(configuration.retry())
.withConnectTimeout(Duration.ofSeconds(10)) .withConnectTimeout(Duration.ofSeconds(10))

View File

@ -60,6 +60,7 @@ public class HCaptchaClient implements CaptchaClient {
public HCaptchaClient( public HCaptchaClient(
final String apiKey, final String apiKey,
final ScheduledExecutorService retryExecutor, final ScheduledExecutorService retryExecutor,
final ExecutorService httpExecutor,
final CircuitBreakerConfiguration circuitBreakerConfiguration, final CircuitBreakerConfiguration circuitBreakerConfiguration,
final RetryConfiguration retryConfiguration, final RetryConfiguration retryConfiguration,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) { final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
@ -67,7 +68,7 @@ public class HCaptchaClient implements CaptchaClient {
FaultTolerantHttpClient.newBuilder() FaultTolerantHttpClient.newBuilder()
.withName("hcaptcha") .withName("hcaptcha")
.withCircuitBreaker(circuitBreakerConfiguration) .withCircuitBreaker(circuitBreakerConfiguration)
.withExecutor(Executors.newCachedThreadPool()) .withExecutor(httpExecutor)
.withRetryExecutor(retryExecutor) .withRetryExecutor(retryExecutor)
.withRetry(retryConfiguration) .withRetry(retryConfiguration)
.withRetryOnException(ex -> ex instanceof IOException) .withRetryOnException(ex -> ex instanceof IOException)

View File

@ -10,11 +10,12 @@ import io.dropwizard.jackson.Discoverable;
import org.whispersystems.textsecuregcm.captcha.HCaptchaClient; import org.whispersystems.textsecuregcm.captcha.HCaptchaClient;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HCaptchaConfiguration.class) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HCaptchaConfiguration.class)
public interface HCaptchaClientFactory extends Discoverable { public interface HCaptchaClientFactory extends Discoverable {
HCaptchaClient build(ScheduledExecutorService retryExecutor, HCaptchaClient build(ScheduledExecutorService retryExecutor, ExecutorService httpExecutor,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager); DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager);
} }

View File

@ -12,6 +12,7 @@ import org.whispersystems.textsecuregcm.captcha.HCaptchaClient;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.secrets.SecretString; import org.whispersystems.textsecuregcm.configuration.secrets.SecretString;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@JsonTypeName("default") @JsonTypeName("default")
@ -43,11 +44,14 @@ public class HCaptchaConfiguration implements HCaptchaClientFactory {
} }
@Override @Override
public HCaptchaClient build(final ScheduledExecutorService retryExecutor, public HCaptchaClient build(
final ScheduledExecutorService retryExecutor,
final ExecutorService httpExecutor,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) { final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
return new HCaptchaClient( return new HCaptchaClient(
apiKey.value(), apiKey.value(),
retryExecutor, retryExecutor,
httpExecutor,
circuitBreaker, circuitBreaker,
retry, retry,
dynamicConfigurationManager); dynamicConfigurationManager);

View File

@ -15,6 +15,7 @@ import java.security.cert.CertificateException;
import java.time.Clock; import java.time.Clock;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import org.signal.libsignal.zkgroup.GenericServerSecretParams; import org.signal.libsignal.zkgroup.GenericServerSecretParams;
import org.signal.libsignal.zkgroup.InvalidInputException; import org.signal.libsignal.zkgroup.InvalidInputException;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
@ -111,10 +112,14 @@ record CommandDependencies(
.executorService(name(name, "accountLock-%d")).minThreads(8).maxThreads(8).build(); .executorService(name(name, "accountLock-%d")).minThreads(8).maxThreads(8).build();
ExecutorService clientPresenceExecutor = environment.lifecycle() ExecutorService clientPresenceExecutor = environment.lifecycle()
.executorService(name(name, "clientPresence-%d")).minThreads(8).maxThreads(8).build(); .executorService(name(name, "clientPresence-%d")).minThreads(8).maxThreads(8).build();
ExecutorService remoteStorageHttpExecutor = environment.lifecycle()
.executorService(name(name, "remoteStorage-%d"))
.minThreads(0).maxThreads(Integer.MAX_VALUE).workQueue(new SynchronousQueue<>())
.keepAliveTime(io.dropwizard.util.Duration.seconds(60L)).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle() ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "secureValueRecoveryServiceRetry-%d")).threads(1).build(); .scheduledExecutorService(name(name, "secureValueRecoveryServiceRetry-%d")).threads(1).build();
ScheduledExecutorService remoteStorageExecutor = environment.lifecycle() ScheduledExecutorService remoteStorageRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "remoteStorageRetry-%d")).threads(1).build(); .scheduledExecutorService(name(name, "remoteStorageRetry-%d")).threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle() ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "storageServiceRetry-%d")).threads(1).build(); .scheduledExecutorService(name(name, "storageServiceRetry-%d")).threads(1).build();
@ -211,7 +216,10 @@ record CommandDependencies(
rateLimiters, rateLimiters,
new TusAttachmentGenerator(configuration.getTus()), new TusAttachmentGenerator(configuration.getTus()),
new Cdn3BackupCredentialGenerator(configuration.getTus()), new Cdn3BackupCredentialGenerator(configuration.getTus()),
new Cdn3RemoteStorageManager(remoteStorageExecutor, configuration.getCdn3StorageManagerConfiguration()), new Cdn3RemoteStorageManager(
remoteStorageHttpExecutor,
remoteStorageRetryExecutor,
configuration.getCdn3StorageManagerConfiguration()),
clock); clock);
environment.lifecycle().manage(messagesCache); environment.lifecycle().manage(messagesCache);

View File

@ -51,6 +51,7 @@ public class Cdn3RemoteStorageManagerTest {
@BeforeEach @BeforeEach
public void init() { public void init() {
remoteStorageManager = new Cdn3RemoteStorageManager( remoteStorageManager = new Cdn3RemoteStorageManager(
Executors.newCachedThreadPool(),
Executors.newSingleThreadScheduledExecutor(), Executors.newSingleThreadScheduledExecutor(),
new Cdn3StorageManagerConfiguration( new Cdn3StorageManagerConfiguration(
wireMock.url("storage-manager/"), wireMock.url("storage-manager/"),

View File

@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import java.io.IOException; import java.io.IOException;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.whispersystems.textsecuregcm.captcha.Action; import org.whispersystems.textsecuregcm.captcha.Action;
import org.whispersystems.textsecuregcm.captcha.AssessmentResult; import org.whispersystems.textsecuregcm.captcha.AssessmentResult;
@ -19,10 +20,11 @@ import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
public class StubHCaptchaClientFactory implements HCaptchaClientFactory { public class StubHCaptchaClientFactory implements HCaptchaClientFactory {
@Override @Override
public HCaptchaClient build(final ScheduledExecutorService retryExecutor, public HCaptchaClient build(final ScheduledExecutorService retryExecutor, ExecutorService httpExecutor,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) { final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
return new StubHCaptchaClient(retryExecutor, new CircuitBreakerConfiguration(), dynamicConfigurationManager); return new StubHCaptchaClient(retryExecutor, httpExecutor, new CircuitBreakerConfiguration(),
dynamicConfigurationManager);
} }
/** /**
@ -30,10 +32,10 @@ public class StubHCaptchaClientFactory implements HCaptchaClientFactory {
*/ */
private static class StubHCaptchaClient extends HCaptchaClient { private static class StubHCaptchaClient extends HCaptchaClient {
public StubHCaptchaClient(final ScheduledExecutorService retryExecutor, public StubHCaptchaClient(final ScheduledExecutorService retryExecutor, ExecutorService httpExecutor,
final CircuitBreakerConfiguration circuitBreakerConfiguration, final CircuitBreakerConfiguration circuitBreakerConfiguration,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) { final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
super(null, retryExecutor, circuitBreakerConfiguration, null, dynamicConfigurationManager); super(null, retryExecutor, httpExecutor, circuitBreakerConfiguration, null, dynamicConfigurationManager);
} }
@Override @Override