FaultTolerantHttpClient: used managed ScheduledExecutorService for retries

This commit is contained in:
Chris Eager 2023-06-23 10:48:38 -05:00 committed by Jon Chambers
parent 8e48ac4ede
commit b852d6681d
13 changed files with 346 additions and 252 deletions

View File

@ -311,6 +311,7 @@ public final class Operations {
return FaultTolerantHttpClient.newBuilder() return FaultTolerantHttpClient.newBuilder()
.withName("integration-test") .withName("integration-test")
.withExecutor(Executors.newFixedThreadPool(16)) .withExecutor(Executors.newFixedThreadPool(16))
.withRetryExecutor(Executors.newSingleThreadScheduledExecutor())
.withCircuitBreaker(new CircuitBreakerConfiguration()) .withCircuitBreaker(new CircuitBreakerConfiguration())
.withTrustedServerCertificates(CONFIG.rootCert()) .withTrustedServerCertificates(CONFIG.rootCert())
.build(); .build();

View File

@ -379,6 +379,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build(); .executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService storageServiceExecutor = environment.lifecycle() ExecutorService storageServiceExecutor = environment.lifecycle()
.executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build(); .executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "secureValueRecoveryServiceRetry-%d")).threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "storageServiceRetry-%d")).threads(1).build();
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService( Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
ExecutorServiceMetrics.monitor(Metrics.globalRegistry, ExecutorServiceMetrics.monitor(Metrics.globalRegistry,
@ -411,6 +415,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.maxThreads(2) .maxThreads(2)
.minThreads(2) .minThreads(2)
.build(); .build();
ScheduledExecutorService subscriptionProcessorRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "subscriptionProcessorRetry-%d")).threads(1).build();
final AdminEventLogger adminEventLogger = new GoogleCloudAdminEventLogger( final AdminEventLogger adminEventLogger = new GoogleCloudAdminEventLogger(
LoggingOptions.newBuilder().setProjectId(config.getAdminEventLoggingConfiguration().projectId()) LoggingOptions.newBuilder().setProjectId(config.getAdminEventLoggingConfiguration().projectId())
@ -426,9 +432,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getStripe().idempotencyKeyGenerator().value(), config.getStripe().boostDescription(), config.getStripe() config.getStripe().idempotencyKeyGenerator().value(), config.getStripe().boostDescription(), config.getStripe()
.supportedCurrencies()); .supportedCurrencies());
BraintreeManager braintreeManager = new BraintreeManager(config.getBraintree().merchantId(), BraintreeManager braintreeManager = new BraintreeManager(config.getBraintree().merchantId(),
config.getBraintree().publicKey(), config.getBraintree().privateKey().value(), config.getBraintree().environment(), config.getBraintree().publicKey(), config.getBraintree().privateKey().value(),
config.getBraintree().environment(),
config.getBraintree().supportedCurrencies(), config.getBraintree().merchantAccounts(), config.getBraintree().supportedCurrencies(), config.getBraintree().merchantAccounts(),
config.getBraintree().graphqlUrl(), config.getBraintree().circuitBreaker(), subscriptionProcessorExecutor); config.getBraintree().graphqlUrl(), config.getBraintree().circuitBreaker(), subscriptionProcessorExecutor,
subscriptionProcessorRetryExecutor);
ExternalServiceCredentialsGenerator directoryV2CredentialsGenerator = DirectoryV2Controller.credentialsGenerator( ExternalServiceCredentialsGenerator directoryV2CredentialsGenerator = DirectoryV2Controller.credentialsGenerator(
config.getDirectoryV2Configuration().getDirectoryV2ClientConfiguration()); config.getDirectoryV2Configuration().getDirectoryV2ClientConfiguration());
@ -461,11 +469,12 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getRegistrationServiceConfiguration().registrationCaCertificate(), config.getRegistrationServiceConfiguration().registrationCaCertificate(),
registrationCallbackExecutor); registrationCallbackExecutor);
SecureBackupClient secureBackupClient = new SecureBackupClient(backupCredentialsGenerator, SecureBackupClient secureBackupClient = new SecureBackupClient(backupCredentialsGenerator,
secureValueRecoveryServiceExecutor, config.getSecureBackupServiceConfiguration()); secureValueRecoveryServiceExecutor, secureValueRecoveryServiceRetryExecutor,
config.getSecureBackupServiceConfiguration());
SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client(svr2CredentialsGenerator, SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client(svr2CredentialsGenerator,
secureValueRecoveryServiceExecutor, config.getSvr2Configuration()); secureValueRecoveryServiceExecutor, secureValueRecoveryServiceRetryExecutor, config.getSvr2Configuration());
SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator, SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator,
storageServiceExecutor, config.getSecureStorageServiceConfiguration()); storageServiceExecutor, storageServiceRetryExecutor, config.getSecureStorageServiceConfiguration());
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, recurringJobExecutor, ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, recurringJobExecutor,
keyspaceNotificationDispatchExecutor); keyspaceNotificationDispatchExecutor);
StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts); StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts);

View File

@ -17,7 +17,6 @@ import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.glassfish.jersey.SslConfigurator; import org.glassfish.jersey.SslConfigurator;
@ -28,10 +27,10 @@ import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
public class FaultTolerantHttpClient { public class FaultTolerantHttpClient {
private final HttpClient httpClient; private final HttpClient httpClient;
private final ScheduledExecutorService retryExecutor; private final ScheduledExecutorService retryExecutor;
private final Retry retry; private final Retry retry;
private final CircuitBreaker breaker; private final CircuitBreaker breaker;
public static final String SECURITY_PROTOCOL_TLS_1_2 = "TLSv1.2"; public static final String SECURITY_PROTOCOL_TLS_1_2 = "TLSv1.2";
public static final String SECURITY_PROTOCOL_TLS_1_3 = "TLSv1.3"; public static final String SECURITY_PROTOCOL_TLS_1_3 = "TLSv1.3";
@ -40,15 +39,20 @@ public class FaultTolerantHttpClient {
return new Builder(); return new Builder();
} }
private FaultTolerantHttpClient(String name, HttpClient httpClient, RetryConfiguration retryConfiguration, CircuitBreakerConfiguration circuitBreakerConfiguration) { private FaultTolerantHttpClient(String name, HttpClient httpClient, ScheduledExecutorService retryExecutor,
this.httpClient = httpClient; RetryConfiguration retryConfiguration, CircuitBreakerConfiguration circuitBreakerConfiguration) {
this.retryExecutor = Executors.newSingleThreadScheduledExecutor(); this.httpClient = httpClient;
this.breaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); this.retryExecutor = retryExecutor;
this.breaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig());
CircuitBreakerUtil.registerMetrics(breaker, FaultTolerantHttpClient.class); CircuitBreakerUtil.registerMetrics(breaker, FaultTolerantHttpClient.class);
if (retryConfiguration != null) { if (retryConfiguration != null) {
RetryConfig retryConfig = retryConfiguration.<HttpResponse>toRetryConfigBuilder().retryOnResult(o -> o.statusCode() >= 500).build(); if (this.retryExecutor == null) {
throw new IllegalArgumentException("retryExecutor must be specified with retryConfiguration");
}
RetryConfig retryConfig = retryConfiguration.<HttpResponse>toRetryConfigBuilder()
.retryOnResult(o -> o.statusCode() >= 500).build();
this.retry = Retry.of(name + "-retry", retryConfig); this.retry = Retry.of(name + "-retry", retryConfig);
CircuitBreakerUtil.registerMetrics(retry, FaultTolerantHttpClient.class); CircuitBreakerUtil.registerMetrics(retry, FaultTolerantHttpClient.class);
} else { } else {
@ -76,19 +80,20 @@ public class FaultTolerantHttpClient {
public static class Builder { public static class Builder {
private HttpClient.Version version = HttpClient.Version.HTTP_2;
private HttpClient.Redirect redirect = HttpClient.Redirect.NEVER;
private Duration connectTimeout = Duration.ofSeconds(10);
private HttpClient.Version version = HttpClient.Version.HTTP_2; private String name;
private HttpClient.Redirect redirect = HttpClient.Redirect.NEVER; private Executor executor;
private Duration connectTimeout = Duration.ofSeconds(10); private ScheduledExecutorService retryExecutor;
private KeyStore trustStore;
private String name; private String securityProtocol = SECURITY_PROTOCOL_TLS_1_2;
private Executor executor; private RetryConfiguration retryConfiguration;
private KeyStore trustStore;
private String securityProtocol = SECURITY_PROTOCOL_TLS_1_2;
private RetryConfiguration retryConfiguration;
private CircuitBreakerConfiguration circuitBreakerConfiguration; private CircuitBreakerConfiguration circuitBreakerConfiguration;
private Builder() {} private Builder() {
}
public Builder withName(String name) { public Builder withName(String name) {
this.name = name; this.name = name;
@ -120,6 +125,11 @@ public class FaultTolerantHttpClient {
return this; return this;
} }
public Builder withRetryExecutor(ScheduledExecutorService retryExecutor) {
this.retryExecutor = retryExecutor;
return this;
}
public Builder withCircuitBreaker(CircuitBreakerConfiguration circuitBreakerConfiguration) { public Builder withCircuitBreaker(CircuitBreakerConfiguration circuitBreakerConfiguration) {
this.circuitBreakerConfiguration = circuitBreakerConfiguration; this.circuitBreakerConfiguration = circuitBreakerConfiguration;
return this; return this;
@ -141,10 +151,10 @@ public class FaultTolerantHttpClient {
} }
final HttpClient.Builder builder = HttpClient.newBuilder() final HttpClient.Builder builder = HttpClient.newBuilder()
.connectTimeout(connectTimeout) .connectTimeout(connectTimeout)
.followRedirects(redirect) .followRedirects(redirect)
.version(version) .version(version)
.executor(executor); .executor(executor);
final SslConfigurator sslConfigurator = SslConfigurator.newInstance().securityProtocol(securityProtocol); final SslConfigurator sslConfigurator = SslConfigurator.newInstance().securityProtocol(securityProtocol);
@ -154,9 +164,9 @@ public class FaultTolerantHttpClient {
builder.sslContext(sslConfigurator.createSSLContext()); builder.sslContext(sslConfigurator.createSSLContext());
return new FaultTolerantHttpClient(name, builder.build(), retryConfiguration, circuitBreakerConfiguration); return new FaultTolerantHttpClient(name, builder.build(), retryExecutor, retryConfiguration,
circuitBreakerConfiguration);
} }
} }
} }

View File

@ -18,6 +18,7 @@ import java.time.Duration;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator;
import org.whispersystems.textsecuregcm.configuration.SecureBackupServiceConfiguration; import org.whispersystems.textsecuregcm.configuration.SecureBackupServiceConfiguration;
@ -29,37 +30,41 @@ import org.whispersystems.textsecuregcm.util.HttpUtils;
*/ */
public class SecureBackupClient { public class SecureBackupClient {
private final ExternalServiceCredentialsGenerator secureBackupCredentialsGenerator; private final ExternalServiceCredentialsGenerator secureBackupCredentialsGenerator;
private final URI deleteUri; private final URI deleteUri;
private final FaultTolerantHttpClient httpClient; private final FaultTolerantHttpClient httpClient;
@VisibleForTesting @VisibleForTesting
static final String DELETE_PATH = "/v1/backup"; static final String DELETE_PATH = "/v1/backup";
public SecureBackupClient(final ExternalServiceCredentialsGenerator secureBackupCredentialsGenerator, final Executor executor, final SecureBackupServiceConfiguration configuration) throws CertificateException { public SecureBackupClient(final ExternalServiceCredentialsGenerator secureBackupCredentialsGenerator,
this.secureBackupCredentialsGenerator = secureBackupCredentialsGenerator; final Executor executor, final
this.deleteUri = URI.create(configuration.getUri()).resolve(DELETE_PATH); ScheduledExecutorService retryExecutor, final SecureBackupServiceConfiguration configuration)
this.httpClient = FaultTolerantHttpClient.newBuilder() throws CertificateException {
.withCircuitBreaker(configuration.getCircuitBreakerConfiguration()) this.secureBackupCredentialsGenerator = secureBackupCredentialsGenerator;
.withRetry(configuration.getRetryConfiguration()) this.deleteUri = URI.create(configuration.getUri()).resolve(DELETE_PATH);
.withVersion(HttpClient.Version.HTTP_1_1) this.httpClient = FaultTolerantHttpClient.newBuilder()
.withConnectTimeout(Duration.ofSeconds(10)) .withCircuitBreaker(configuration.getCircuitBreakerConfiguration())
.withRedirect(HttpClient.Redirect.NEVER) .withRetry(configuration.getRetryConfiguration())
.withExecutor(executor) .withRetryExecutor(retryExecutor)
.withName("secure-backup") .withVersion(HttpClient.Version.HTTP_1_1)
.withSecurityProtocol(FaultTolerantHttpClient.SECURITY_PROTOCOL_TLS_1_2) .withConnectTimeout(Duration.ofSeconds(10))
.withTrustedServerCertificates(configuration.getBackupCaCertificates().toArray(new String[0])) .withRedirect(HttpClient.Redirect.NEVER)
.build(); .withExecutor(executor)
} .withName("secure-backup")
.withSecurityProtocol(FaultTolerantHttpClient.SECURITY_PROTOCOL_TLS_1_2)
.withTrustedServerCertificates(configuration.getBackupCaCertificates().toArray(new String[0]))
.build();
}
public CompletableFuture<Void> deleteBackups(final UUID accountUuid) { public CompletableFuture<Void> deleteBackups(final UUID accountUuid) {
final ExternalServiceCredentials credentials = secureBackupCredentialsGenerator.generateForUuid(accountUuid); final ExternalServiceCredentials credentials = secureBackupCredentialsGenerator.generateForUuid(accountUuid);
final HttpRequest request = HttpRequest.newBuilder() final HttpRequest request = HttpRequest.newBuilder()
.uri(deleteUri) .uri(deleteUri)
.DELETE() .DELETE()
.header(HttpHeaders.AUTHORIZATION, basicAuthHeader(credentials)) .header(HttpHeaders.AUTHORIZATION, basicAuthHeader(credentials))
.build(); .build();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).thenApply(response -> { return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).thenApply(response -> {
if (HttpUtils.isSuccessfulResponse(response.statusCode())) { if (HttpUtils.isSuccessfulResponse(response.statusCode())) {

View File

@ -18,6 +18,7 @@ import java.time.Duration;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator;
import org.whispersystems.textsecuregcm.configuration.SecureStorageServiceConfiguration; import org.whispersystems.textsecuregcm.configuration.SecureStorageServiceConfiguration;
@ -29,37 +30,41 @@ import org.whispersystems.textsecuregcm.util.HttpUtils;
*/ */
public class SecureStorageClient { public class SecureStorageClient {
private final ExternalServiceCredentialsGenerator storageServiceCredentialsGenerator; private final ExternalServiceCredentialsGenerator storageServiceCredentialsGenerator;
private final URI deleteUri; private final URI deleteUri;
private final FaultTolerantHttpClient httpClient; private final FaultTolerantHttpClient httpClient;
@VisibleForTesting @VisibleForTesting
static final String DELETE_PATH = "/v1/storage"; static final String DELETE_PATH = "/v1/storage";
public SecureStorageClient(final ExternalServiceCredentialsGenerator storageServiceCredentialsGenerator, final Executor executor, final SecureStorageServiceConfiguration configuration) throws CertificateException { public SecureStorageClient(final ExternalServiceCredentialsGenerator storageServiceCredentialsGenerator,
this.storageServiceCredentialsGenerator = storageServiceCredentialsGenerator; final Executor executor, final
this.deleteUri = URI.create(configuration.uri()).resolve(DELETE_PATH); ScheduledExecutorService retryExecutor, final SecureStorageServiceConfiguration configuration)
this.httpClient = FaultTolerantHttpClient.newBuilder() throws CertificateException {
.withCircuitBreaker(configuration.circuitBreaker()) this.storageServiceCredentialsGenerator = storageServiceCredentialsGenerator;
.withRetry(configuration.retry()) this.deleteUri = URI.create(configuration.uri()).resolve(DELETE_PATH);
.withVersion(HttpClient.Version.HTTP_1_1) this.httpClient = FaultTolerantHttpClient.newBuilder()
.withConnectTimeout(Duration.ofSeconds(10)) .withCircuitBreaker(configuration.circuitBreaker())
.withRedirect(HttpClient.Redirect.NEVER) .withRetry(configuration.retry())
.withExecutor(executor) .withRetryExecutor(retryExecutor)
.withName("secure-storage") .withVersion(HttpClient.Version.HTTP_1_1)
.withSecurityProtocol(FaultTolerantHttpClient.SECURITY_PROTOCOL_TLS_1_3) .withConnectTimeout(Duration.ofSeconds(10))
.withTrustedServerCertificates(configuration.storageCaCertificates().toArray(new String[0])) .withRedirect(HttpClient.Redirect.NEVER)
.build(); .withExecutor(executor)
} .withName("secure-storage")
.withSecurityProtocol(FaultTolerantHttpClient.SECURITY_PROTOCOL_TLS_1_3)
.withTrustedServerCertificates(configuration.storageCaCertificates().toArray(new String[0]))
.build();
}
public CompletableFuture<Void> deleteStoredData(final UUID accountUuid) { public CompletableFuture<Void> deleteStoredData(final UUID accountUuid) {
final ExternalServiceCredentials credentials = storageServiceCredentialsGenerator.generateForUuid(accountUuid); final ExternalServiceCredentials credentials = storageServiceCredentialsGenerator.generateForUuid(accountUuid);
final HttpRequest request = HttpRequest.newBuilder() final HttpRequest request = HttpRequest.newBuilder()
.uri(deleteUri) .uri(deleteUri)
.DELETE() .DELETE()
.header(HttpHeaders.AUTHORIZATION, basicAuthHeader(credentials)) .header(HttpHeaders.AUTHORIZATION, basicAuthHeader(credentials))
.build(); .build();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).thenApply(response -> { return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).thenApply(response -> {
if (HttpUtils.isSuccessfulResponse(response.statusCode())) { if (HttpUtils.isSuccessfulResponse(response.statusCode())) {

View File

@ -18,6 +18,7 @@ import java.time.Duration;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator;
import org.whispersystems.textsecuregcm.configuration.SecureValueRecovery2Configuration; import org.whispersystems.textsecuregcm.configuration.SecureValueRecovery2Configuration;
@ -37,13 +38,15 @@ public class SecureValueRecovery2Client {
static final String DELETE_PATH = "/v1/delete"; static final String DELETE_PATH = "/v1/delete";
public SecureValueRecovery2Client(final ExternalServiceCredentialsGenerator secureValueRecoveryCredentialsGenerator, public SecureValueRecovery2Client(final ExternalServiceCredentialsGenerator secureValueRecoveryCredentialsGenerator,
final Executor executor, final SecureValueRecovery2Configuration configuration) final Executor executor, final ScheduledExecutorService retryExecutor,
final SecureValueRecovery2Configuration configuration)
throws CertificateException { throws CertificateException {
this.secureValueRecoveryCredentialsGenerator = secureValueRecoveryCredentialsGenerator; this.secureValueRecoveryCredentialsGenerator = secureValueRecoveryCredentialsGenerator;
this.deleteUri = URI.create(configuration.uri()).resolve(DELETE_PATH); this.deleteUri = URI.create(configuration.uri()).resolve(DELETE_PATH);
this.httpClient = FaultTolerantHttpClient.newBuilder() this.httpClient = FaultTolerantHttpClient.newBuilder()
.withCircuitBreaker(configuration.circuitBreaker()) .withCircuitBreaker(configuration.circuitBreaker())
.withRetry(configuration.retry()) .withRetry(configuration.retry())
.withRetryExecutor(retryExecutor)
.withVersion(HttpClient.Version.HTTP_1_1) .withVersion(HttpClient.Version.HTTP_1_1)
.withConnectTimeout(Duration.ofSeconds(10)) .withConnectTimeout(Duration.ofSeconds(10))
.withRedirect(HttpClient.Redirect.NEVER) .withRedirect(HttpClient.Redirect.NEVER)

View File

@ -32,6 +32,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.ws.rs.ClientErrorException; import javax.ws.rs.ClientErrorException;
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
@ -61,7 +62,8 @@ public class BraintreeManager implements SubscriptionProcessorManager {
final Map<String, String> currenciesToMerchantAccounts, final Map<String, String> currenciesToMerchantAccounts,
final String graphqlUri, final String graphqlUri,
final CircuitBreakerConfiguration circuitBreakerConfiguration, final CircuitBreakerConfiguration circuitBreakerConfiguration,
final Executor executor) { final Executor executor,
final ScheduledExecutorService retryExecutor) {
this(new BraintreeGateway(braintreeEnvironment, braintreeMerchantId, braintreePublicKey, this(new BraintreeGateway(braintreeEnvironment, braintreeMerchantId, braintreePublicKey,
braintreePrivateKey), braintreePrivateKey),
@ -71,6 +73,7 @@ public class BraintreeManager implements SubscriptionProcessorManager {
.withName("braintree-graphql") .withName("braintree-graphql")
.withCircuitBreaker(circuitBreakerConfiguration) .withCircuitBreaker(circuitBreakerConfiguration)
.withExecutor(executor) .withExecutor(executor)
.withRetryExecutor(retryExecutor)
.build(), graphqlUri, braintreePublicKey, braintreePrivateKey), .build(), graphqlUri, braintreePublicKey, braintreePrivateKey),
executor); executor);
} }

View File

@ -18,6 +18,7 @@ import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser; import net.sourceforge.argparse4j.inf.Subparser;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
@ -110,6 +111,10 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
.executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build(); .executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService storageServiceExecutor = environment.lifecycle() ExecutorService storageServiceExecutor = environment.lifecycle()
.executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build(); .executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "secureValueRecoveryServiceRetry-%d")).threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "storageServiceRetry-%d")).threads(1).build();
ExternalServiceCredentialsGenerator backupCredentialsGenerator = SecureBackupController.credentialsGenerator( ExternalServiceCredentialsGenerator backupCredentialsGenerator = SecureBackupController.credentialsGenerator(
configuration.getSecureBackupServiceConfiguration()); configuration.getSecureBackupServiceConfiguration());
@ -174,12 +179,14 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
configuration.getClientPresenceClusterConfiguration(), redisClusterClientResources); configuration.getClientPresenceClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster rateLimitersCluster = new FaultTolerantRedisCluster("rate_limiters", FaultTolerantRedisCluster rateLimitersCluster = new FaultTolerantRedisCluster("rate_limiters",
configuration.getRateLimitersCluster(), redisClusterClientResources); configuration.getRateLimitersCluster(), redisClusterClientResources);
SecureBackupClient secureBackupClient = new SecureBackupClient(backupCredentialsGenerator, secureValueRecoveryExecutor, SecureBackupClient secureBackupClient = new SecureBackupClient(backupCredentialsGenerator,
configuration.getSecureBackupServiceConfiguration()); secureValueRecoveryExecutor,
secureValueRecoveryServiceRetryExecutor, configuration.getSecureBackupServiceConfiguration());
SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client( SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client(
secureValueRecoveryCredentialsGenerator, secureValueRecoveryExecutor, configuration.getSvr2Configuration()); secureValueRecoveryCredentialsGenerator, secureValueRecoveryExecutor, secureValueRecoveryServiceRetryExecutor,
configuration.getSvr2Configuration());
SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator, SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator,
storageServiceExecutor, configuration.getSecureStorageServiceConfiguration()); storageServiceExecutor, storageServiceRetryExecutor, configuration.getSecureStorageServiceConfiguration());
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster,
Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor); Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor);
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster, MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster,

View File

@ -94,6 +94,11 @@ record CommandDependencies(
ExecutorService storageServiceExecutor = environment.lifecycle() ExecutorService storageServiceExecutor = environment.lifecycle()
.executorService(name(name, "storageService-%d")).maxThreads(8).minThreads(8).build(); .executorService(name(name, "storageService-%d")).maxThreads(8).minThreads(8).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "secureValueRecoveryServiceRetry-%d")).threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "storageServiceRetry-%d")).threads(1).build();
ExternalServiceCredentialsGenerator backupCredentialsGenerator = SecureBackupController.credentialsGenerator( ExternalServiceCredentialsGenerator backupCredentialsGenerator = SecureBackupController.credentialsGenerator(
configuration.getSecureBackupServiceConfiguration()); configuration.getSecureBackupServiceConfiguration());
ExternalServiceCredentialsGenerator storageCredentialsGenerator = SecureStorageController.credentialsGenerator( ExternalServiceCredentialsGenerator storageCredentialsGenerator = SecureStorageController.credentialsGenerator(
@ -159,13 +164,14 @@ record CommandDependencies(
FaultTolerantRedisCluster rateLimitersCluster = new FaultTolerantRedisCluster("rate_limiters", FaultTolerantRedisCluster rateLimitersCluster = new FaultTolerantRedisCluster("rate_limiters",
configuration.getRateLimitersCluster(), redisClusterClientResources); configuration.getRateLimitersCluster(), redisClusterClientResources);
SecureBackupClient secureBackupClient = new SecureBackupClient(backupCredentialsGenerator, SecureBackupClient secureBackupClient = new SecureBackupClient(backupCredentialsGenerator,
secureValueRecoveryServiceExecutor, secureValueRecoveryServiceExecutor, secureValueRecoveryServiceRetryExecutor,
configuration.getSecureBackupServiceConfiguration()); configuration.getSecureBackupServiceConfiguration());
SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client( SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client(
secureValueRecoveryCredentialsGenerator, secureValueRecoveryServiceExecutor, secureValueRecoveryCredentialsGenerator, secureValueRecoveryServiceExecutor,
secureValueRecoveryServiceRetryExecutor,
configuration.getSvr2Configuration()); configuration.getSvr2Configuration());
SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator, SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator,
storageServiceExecutor, configuration.getSecureStorageServiceConfiguration()); storageServiceExecutor, storageServiceRetryExecutor, configuration.getSecureStorageServiceConfiguration());
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster,
recurringJobExecutor, keyspaceNotificationDispatchExecutor); recurringJobExecutor, keyspaceNotificationDispatchExecutor);
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster, MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster,

View File

@ -21,6 +21,7 @@ import java.util.UUID;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -36,6 +37,7 @@ class SecureBackupClientTest {
private UUID accountUuid; private UUID accountUuid;
private ExternalServiceCredentialsGenerator credentialsGenerator; private ExternalServiceCredentialsGenerator credentialsGenerator;
private ExecutorService httpExecutor; private ExecutorService httpExecutor;
private ScheduledExecutorService retryExecutor;
private SecureBackupClient secureStorageClient; private SecureBackupClient secureStorageClient;
@ -46,68 +48,71 @@ class SecureBackupClientTest {
@BeforeEach @BeforeEach
void setUp() throws CertificateException { void setUp() throws CertificateException {
accountUuid = UUID.randomUUID(); accountUuid = UUID.randomUUID();
credentialsGenerator = mock(ExternalServiceCredentialsGenerator.class); credentialsGenerator = mock(ExternalServiceCredentialsGenerator.class);
httpExecutor = Executors.newSingleThreadExecutor(); httpExecutor = Executors.newSingleThreadExecutor();
retryExecutor = Executors.newSingleThreadScheduledExecutor();
final SecureBackupServiceConfiguration config = new SecureBackupServiceConfiguration(); final SecureBackupServiceConfiguration config = new SecureBackupServiceConfiguration();
config.setUri("http://localhost:" + wireMock.getPort()); config.setUri("http://localhost:" + wireMock.getPort());
// This is a randomly-generated, throwaway certificate that's not actually connected to anything // This is a randomly-generated, throwaway certificate that's not actually connected to anything
config.setBackupCaCertificates(List.of(""" config.setBackupCaCertificates(
-----BEGIN CERTIFICATE----- List.of("""
MIICZDCCAc2gAwIBAgIBADANBgkqhkiG9w0BAQ0FADBPMQswCQYDVQQGEwJ1czEL -----BEGIN CERTIFICATE-----
MAkGA1UECAwCVVMxHjAcBgNVBAoMFVNpZ25hbCBNZXNzZW5nZXIsIExMQzETMBEG MIICZDCCAc2gAwIBAgIBADANBgkqhkiG9w0BAQ0FADBPMQswCQYDVQQGEwJ1czEL
A1UEAwwKc2lnbmFsLm9yZzAeFw0yMDEyMjMyMjQ3NTlaFw0zMDEyMjEyMjQ3NTla MAkGA1UECAwCVVMxHjAcBgNVBAoMFVNpZ25hbCBNZXNzZW5nZXIsIExMQzETMBEG
ME8xCzAJBgNVBAYTAnVzMQswCQYDVQQIDAJVUzEeMBwGA1UECgwVU2lnbmFsIE1l A1UEAwwKc2lnbmFsLm9yZzAeFw0yMDEyMjMyMjQ3NTlaFw0zMDEyMjEyMjQ3NTla
c3NlbmdlciwgTExDMRMwEQYDVQQDDApzaWduYWwub3JnMIGfMA0GCSqGSIb3DQEB ME8xCzAJBgNVBAYTAnVzMQswCQYDVQQIDAJVUzEeMBwGA1UECgwVU2lnbmFsIE1l
AQUAA4GNADCBiQKBgQCfSLcZNHYqbxSsgWp4JvbPRHjQTrlsrKrgD2q7f/OY6O3Y c3NlbmdlciwgTExDMRMwEQYDVQQDDApzaWduYWwub3JnMIGfMA0GCSqGSIb3DQEB
/X0QNcNSOJpliN8rmzwslfsrXHO3q1diGRw4xHogUJZ/7NQrHiP/zhN0VTDh49pD AQUAA4GNADCBiQKBgQCfSLcZNHYqbxSsgWp4JvbPRHjQTrlsrKrgD2q7f/OY6O3Y
ZpjXVyUbayLS/6qM5arKxBspzEFBb5v8cF6bPr76SO/rpGXiI0j6yJKX6fRiKwID /X0QNcNSOJpliN8rmzwslfsrXHO3q1diGRw4xHogUJZ/7NQrHiP/zhN0VTDh49pD
AQABo1AwTjAdBgNVHQ4EFgQU6Jrs/Fmj0z4dA3wvdq/WqA4P49IwHwYDVR0jBBgw ZpjXVyUbayLS/6qM5arKxBspzEFBb5v8cF6bPr76SO/rpGXiI0j6yJKX6fRiKwID
FoAU6Jrs/Fmj0z4dA3wvdq/WqA4P49IwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0B AQABo1AwTjAdBgNVHQ4EFgQU6Jrs/Fmj0z4dA3wvdq/WqA4P49IwHwYDVR0jBBgw
AQ0FAAOBgQB+5d5+NtzLILfrc9QmJdIO1YeDP64JmFwTER0kEUouRsb9UwknVWZa FoAU6Jrs/Fmj0z4dA3wvdq/WqA4P49IwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0B
y7MTM4NoBV1k0zb5LAk89SIDPr/maW5AsLtEomzjnEiomjoMBUdNe3YCgQReoLnr AQ0FAAOBgQB+5d5+NtzLILfrc9QmJdIO1YeDP64JmFwTER0kEUouRsb9UwknVWZa
R/QaUNbrCjTGYfBsjGbIzmkWPUyTec2ZdRyJ8JiVl386+6CZkxnndQ== y7MTM4NoBV1k0zb5LAk89SIDPr/maW5AsLtEomzjnEiomjoMBUdNe3YCgQReoLnr
-----END CERTIFICATE----- R/QaUNbrCjTGYfBsjGbIzmkWPUyTec2ZdRyJ8JiVl386+6CZkxnndQ==
""", -----END CERTIFICATE-----
""" """, """
-----BEGIN CERTIFICATE----- -----BEGIN CERTIFICATE-----
MIIEpDCCAowCCQC43PUTWSADVjANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls MIIEpDCCAowCCQC43PUTWSADVjANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls
b2NhbGhvc3QwHhcNMjIxMDE3MjA0NTM0WhcNMjMxMDE3MjA0NTM0WjAUMRIwEAYD b2NhbGhvc3QwHhcNMjIxMDE3MjA0NTM0WhcNMjMxMDE3MjA0NTM0WjAUMRIwEAYD
VQQDDAlsb2NhbGhvc3QwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQDV VQQDDAlsb2NhbGhvc3QwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQDV
x1cdEd2ffQTlTXWRiCHGcrlYf4RJnctt9sw/BuHWTLXBu5LhyJSGn5LRszO/NCXK x1cdEd2ffQTlTXWRiCHGcrlYf4RJnctt9sw/BuHWTLXBu5LhyJSGn5LRszO/NCXK
Z/cmGR7pLj366RtiwL+Qo3nhvDCK7T9xZeNIusM6XMcMK9D/DGCYPqtjQz8NXd9V Z/cmGR7pLj366RtiwL+Qo3nhvDCK7T9xZeNIusM6XMcMK9D/DGCYPqtjQz8NXd9V
ajBBe6nwTDTa+oqX8Mt89foWNkg5Il/lY62u9Dr18LRZ2W9zzYi3Q9/K0CbIX6pM ajBBe6nwTDTa+oqX8Mt89foWNkg5Il/lY62u9Dr18LRZ2W9zzYi3Q9/K0CbIX6pM
yVlPIO5rITOR2IsbeyqsO9jufgX5lP4ZKLLBAP1b7usjC4YdvWacjQg/rK5aay1x yVlPIO5rITOR2IsbeyqsO9jufgX5lP4ZKLLBAP1b7usjC4YdvWacjQg/rK5aay1x
jC2HCDgo/4N30QVXzSA9nFfSe6AE/xkStK4819JqOkY5JsJCbef1P3hOOdSLEjbp jC2HCDgo/4N30QVXzSA9nFfSe6AE/xkStK4819JqOkY5JsJCbef1P3hOOdSLEjbp
xq3MjOs6G6dOgteaAGs10vx7dHxDWETTIiD7BIZ9zRYgOF5bkCaIUO+JfySE1MHD xq3MjOs6G6dOgteaAGs10vx7dHxDWETTIiD7BIZ9zRYgOF5bkCaIUO+JfySE1MHD
KBAFLoRuvmRev5Ln5R0MCHpUMSmMNgJqz+RWZV3g/gpYbuWiHgJOwL1393eK50Bg KBAFLoRuvmRev5Ln5R0MCHpUMSmMNgJqz+RWZV3g/gpYbuWiHgJOwL1393eK50Bg
W7SXQ8EjJj2yXZSH+1gPzN0DRoJZiaBoTPnCL2qUgvwFpW1PJsM5FDyUJFUoK5kK W7SXQ8EjJj2yXZSH+1gPzN0DRoJZiaBoTPnCL2qUgvwFpW1PJsM5FDyUJFUoK5kK
HLBBSKAPt6ZlSrUe2nBgJv7EF1GK+fTU08LXgW33OpLceGPa0zTShkukQUMtUtZ8 HLBBSKAPt6ZlSrUe2nBgJv7EF1GK+fTU08LXgW33OpLceGPa0zTShkukQUMtUtZ8
GqhO12ohMzEupIu5Xurthq4VVUrzHUdj1ZZRMhAbfLU36sd03MMyL/xBqTN6dzCa GqhO12ohMzEupIu5Xurthq4VVUrzHUdj1ZZRMhAbfLU36sd03MMyL/xBqTN6dzCa
GDGIPGpYjAllZ5xMRt2kZdv+Kr6oo3u2nLUIsqI7KQIDAQABMA0GCSqGSIb3DQEB GDGIPGpYjAllZ5xMRt2kZdv+Kr6oo3u2nLUIsqI7KQIDAQABMA0GCSqGSIb3DQEB
CwUAA4ICAQCB5s43YF35ssf5YONW5iAaifGpi1o0866xfeOybtohFGvQ7V2W34i9 CwUAA4ICAQCB5s43YF35ssf5YONW5iAaifGpi1o0866xfeOybtohFGvQ7V2W34i9
TYBCt8+0hgatMcvZ08f0vqig1i7nrvYcE1hnhL7JNkU8qm0s9ytHZt6j62nB0kd/ TYBCt8+0hgatMcvZ08f0vqig1i7nrvYcE1hnhL7JNkU8qm0s9ytHZt6j62nB0kd/
uqE2hOEQalTf/2TGPV0CCgiqLyd8lEUQvQeA38wktwUeZpVnErlzHeMR2CvV3K8R uqE2hOEQalTf/2TGPV0CCgiqLyd8lEUQvQeA38wktwUeZpVnErlzHeMR2CvV3K8R
u4vV6SnBcf+TAt56RKYZkPyvZj5llQPo14Glyoo8qZES7Ky1SHmM0GL+baPRBjRW u4vV6SnBcf+TAt56RKYZkPyvZj5llQPo14Glyoo8qZES7Ky1SHmM0GL+baPRBjRW
3KgSt98Wyu4yr9qu21JpnbAnLhBfzfSKjSeCRgFElUE1GIaFGRZ7ypA74dUKeLnb 3KgSt98Wyu4yr9qu21JpnbAnLhBfzfSKjSeCRgFElUE1GIaFGRZ7ypA74dUKeLnb
/VUWrszmUhGaEjV9dpI6x6B/kSpQMtIQqBaKRY2ALUeEujS/rURi4iMDwSU+GkSH /VUWrszmUhGaEjV9dpI6x6B/kSpQMtIQqBaKRY2ALUeEujS/rURi4iMDwSU+GkSH
cyEvZKS97OA/dWeXfLXdo4beDBRG93bI4rQnDg5+VdlBOkQSLueb8x6/VThMoC5d cyEvZKS97OA/dWeXfLXdo4beDBRG93bI4rQnDg5+VdlBOkQSLueb8x6/VThMoC5d
vZiotFQHseljQAdTkNa6tBu6c4XDYPCKB3CfkMYOlCfTS7Acn5G6dxTPKBtLGBnL vZiotFQHseljQAdTkNa6tBu6c4XDYPCKB3CfkMYOlCfTS7Acn5G6dxTPKBtLGBnL
nQfYyzuwYkN09+2PVzt6auBHr3To7uoclkxX+hxyvPIwIZ0N6b4tQR1FCAkvg29Q nQfYyzuwYkN09+2PVzt6auBHr3To7uoclkxX+hxyvPIwIZ0N6b4tQR1FCAkvg29Q
WIOjZOKGW690ESKCKOnFjUHVO0HpuWnT81URTuY62FXsYdVc2wE4v0E04mEbqQ0P WIOjZOKGW690ESKCKOnFjUHVO0HpuWnT81URTuY62FXsYdVc2wE4v0E04mEbqQ0P
lY6ZKNA81Lm3YADYtObmK1IUrOPo9BeIaPy0UM08SmN880Vunqa91Q== lY6ZKNA81Lm3YADYtObmK1IUrOPo9BeIaPy0UM08SmN880Vunqa91Q==
-----END CERTIFICATE----- -----END CERTIFICATE-----
""")); """));
secureStorageClient = new SecureBackupClient(credentialsGenerator, httpExecutor, config); secureStorageClient = new SecureBackupClient(credentialsGenerator, httpExecutor, retryExecutor, config);
} }
@AfterEach @AfterEach
void tearDown() throws InterruptedException { void tearDown() throws InterruptedException {
httpExecutor.shutdown(); httpExecutor.shutdown();
httpExecutor.awaitTermination(1, TimeUnit.SECONDS); httpExecutor.awaitTermination(1, TimeUnit.SECONDS);
retryExecutor.shutdown();
retryExecutor.awaitTermination(1, TimeUnit.SECONDS);
} }
@Test @Test

View File

@ -22,6 +22,7 @@ import java.util.UUID;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -36,111 +37,120 @@ import org.whispersystems.textsecuregcm.configuration.SecureStorageServiceConfig
class SecureStorageClientTest { class SecureStorageClientTest {
private UUID accountUuid; private UUID accountUuid;
private ExternalServiceCredentialsGenerator credentialsGenerator; private ExternalServiceCredentialsGenerator credentialsGenerator;
private ExecutorService httpExecutor; private ExecutorService httpExecutor;
private ScheduledExecutorService retryExecutor;
private SecureStorageClient secureStorageClient; private SecureStorageClient secureStorageClient;
@RegisterExtension @RegisterExtension
private final WireMockExtension wireMock = WireMockExtension.newInstance() private final WireMockExtension wireMock = WireMockExtension.newInstance()
.options(wireMockConfig().dynamicPort().dynamicHttpsPort()) .options(wireMockConfig().dynamicPort().dynamicHttpsPort())
.build(); .build();
@BeforeEach @BeforeEach
void setUp() throws CertificateException { void setUp() throws CertificateException {
accountUuid = UUID.randomUUID(); accountUuid = UUID.randomUUID();
credentialsGenerator = mock(ExternalServiceCredentialsGenerator.class); credentialsGenerator = mock(ExternalServiceCredentialsGenerator.class);
httpExecutor = Executors.newSingleThreadExecutor(); httpExecutor = Executors.newSingleThreadExecutor();
retryExecutor = Executors.newSingleThreadScheduledExecutor();
final SecureStorageServiceConfiguration config = new SecureStorageServiceConfiguration( final SecureStorageServiceConfiguration config = new SecureStorageServiceConfiguration(
randomSecretBytes(32), randomSecretBytes(32),
"http://localhost:" + wireMock.getPort(), "http://localhost:" + wireMock.getPort(),
List.of(""" List.of("""
-----BEGIN CERTIFICATE----- -----BEGIN CERTIFICATE-----
MIICZDCCAc2gAwIBAgIBADANBgkqhkiG9w0BAQ0FADBPMQswCQYDVQQGEwJ1czEL MIICZDCCAc2gAwIBAgIBADANBgkqhkiG9w0BAQ0FADBPMQswCQYDVQQGEwJ1czEL
MAkGA1UECAwCVVMxHjAcBgNVBAoMFVNpZ25hbCBNZXNzZW5nZXIsIExMQzETMBEG MAkGA1UECAwCVVMxHjAcBgNVBAoMFVNpZ25hbCBNZXNzZW5nZXIsIExMQzETMBEG
A1UEAwwKc2lnbmFsLm9yZzAeFw0yMDEyMjMyMjQ3NTlaFw0zMDEyMjEyMjQ3NTla A1UEAwwKc2lnbmFsLm9yZzAeFw0yMDEyMjMyMjQ3NTlaFw0zMDEyMjEyMjQ3NTla
ME8xCzAJBgNVBAYTAnVzMQswCQYDVQQIDAJVUzEeMBwGA1UECgwVU2lnbmFsIE1l ME8xCzAJBgNVBAYTAnVzMQswCQYDVQQIDAJVUzEeMBwGA1UECgwVU2lnbmFsIE1l
c3NlbmdlciwgTExDMRMwEQYDVQQDDApzaWduYWwub3JnMIGfMA0GCSqGSIb3DQEB c3NlbmdlciwgTExDMRMwEQYDVQQDDApzaWduYWwub3JnMIGfMA0GCSqGSIb3DQEB
AQUAA4GNADCBiQKBgQCfSLcZNHYqbxSsgWp4JvbPRHjQTrlsrKrgD2q7f/OY6O3Y AQUAA4GNADCBiQKBgQCfSLcZNHYqbxSsgWp4JvbPRHjQTrlsrKrgD2q7f/OY6O3Y
/X0QNcNSOJpliN8rmzwslfsrXHO3q1diGRw4xHogUJZ/7NQrHiP/zhN0VTDh49pD /X0QNcNSOJpliN8rmzwslfsrXHO3q1diGRw4xHogUJZ/7NQrHiP/zhN0VTDh49pD
ZpjXVyUbayLS/6qM5arKxBspzEFBb5v8cF6bPr76SO/rpGXiI0j6yJKX6fRiKwID ZpjXVyUbayLS/6qM5arKxBspzEFBb5v8cF6bPr76SO/rpGXiI0j6yJKX6fRiKwID
AQABo1AwTjAdBgNVHQ4EFgQU6Jrs/Fmj0z4dA3wvdq/WqA4P49IwHwYDVR0jBBgw AQABo1AwTjAdBgNVHQ4EFgQU6Jrs/Fmj0z4dA3wvdq/WqA4P49IwHwYDVR0jBBgw
FoAU6Jrs/Fmj0z4dA3wvdq/WqA4P49IwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0B FoAU6Jrs/Fmj0z4dA3wvdq/WqA4P49IwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0B
AQ0FAAOBgQB+5d5+NtzLILfrc9QmJdIO1YeDP64JmFwTER0kEUouRsb9UwknVWZa AQ0FAAOBgQB+5d5+NtzLILfrc9QmJdIO1YeDP64JmFwTER0kEUouRsb9UwknVWZa
y7MTM4NoBV1k0zb5LAk89SIDPr/maW5AsLtEomzjnEiomjoMBUdNe3YCgQReoLnr y7MTM4NoBV1k0zb5LAk89SIDPr/maW5AsLtEomzjnEiomjoMBUdNe3YCgQReoLnr
R/QaUNbrCjTGYfBsjGbIzmkWPUyTec2ZdRyJ8JiVl386+6CZkxnndQ== R/QaUNbrCjTGYfBsjGbIzmkWPUyTec2ZdRyJ8JiVl386+6CZkxnndQ==
-----END CERTIFICATE----- -----END CERTIFICATE-----
""", """, """
""" -----BEGIN CERTIFICATE-----
-----BEGIN CERTIFICATE----- MIIEpDCCAowCCQC43PUTWSADVjANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls
MIIEpDCCAowCCQC43PUTWSADVjANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls b2NhbGhvc3QwHhcNMjIxMDE3MjA0NTM0WhcNMjMxMDE3MjA0NTM0WjAUMRIwEAYD
b2NhbGhvc3QwHhcNMjIxMDE3MjA0NTM0WhcNMjMxMDE3MjA0NTM0WjAUMRIwEAYD VQQDDAlsb2NhbGhvc3QwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQDV
VQQDDAlsb2NhbGhvc3QwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQDV x1cdEd2ffQTlTXWRiCHGcrlYf4RJnctt9sw/BuHWTLXBu5LhyJSGn5LRszO/NCXK
x1cdEd2ffQTlTXWRiCHGcrlYf4RJnctt9sw/BuHWTLXBu5LhyJSGn5LRszO/NCXK Z/cmGR7pLj366RtiwL+Qo3nhvDCK7T9xZeNIusM6XMcMK9D/DGCYPqtjQz8NXd9V
Z/cmGR7pLj366RtiwL+Qo3nhvDCK7T9xZeNIusM6XMcMK9D/DGCYPqtjQz8NXd9V ajBBe6nwTDTa+oqX8Mt89foWNkg5Il/lY62u9Dr18LRZ2W9zzYi3Q9/K0CbIX6pM
ajBBe6nwTDTa+oqX8Mt89foWNkg5Il/lY62u9Dr18LRZ2W9zzYi3Q9/K0CbIX6pM yVlPIO5rITOR2IsbeyqsO9jufgX5lP4ZKLLBAP1b7usjC4YdvWacjQg/rK5aay1x
yVlPIO5rITOR2IsbeyqsO9jufgX5lP4ZKLLBAP1b7usjC4YdvWacjQg/rK5aay1x jC2HCDgo/4N30QVXzSA9nFfSe6AE/xkStK4819JqOkY5JsJCbef1P3hOOdSLEjbp
jC2HCDgo/4N30QVXzSA9nFfSe6AE/xkStK4819JqOkY5JsJCbef1P3hOOdSLEjbp xq3MjOs6G6dOgteaAGs10vx7dHxDWETTIiD7BIZ9zRYgOF5bkCaIUO+JfySE1MHD
xq3MjOs6G6dOgteaAGs10vx7dHxDWETTIiD7BIZ9zRYgOF5bkCaIUO+JfySE1MHD KBAFLoRuvmRev5Ln5R0MCHpUMSmMNgJqz+RWZV3g/gpYbuWiHgJOwL1393eK50Bg
KBAFLoRuvmRev5Ln5R0MCHpUMSmMNgJqz+RWZV3g/gpYbuWiHgJOwL1393eK50Bg W7SXQ8EjJj2yXZSH+1gPzN0DRoJZiaBoTPnCL2qUgvwFpW1PJsM5FDyUJFUoK5kK
W7SXQ8EjJj2yXZSH+1gPzN0DRoJZiaBoTPnCL2qUgvwFpW1PJsM5FDyUJFUoK5kK HLBBSKAPt6ZlSrUe2nBgJv7EF1GK+fTU08LXgW33OpLceGPa0zTShkukQUMtUtZ8
HLBBSKAPt6ZlSrUe2nBgJv7EF1GK+fTU08LXgW33OpLceGPa0zTShkukQUMtUtZ8 GqhO12ohMzEupIu5Xurthq4VVUrzHUdj1ZZRMhAbfLU36sd03MMyL/xBqTN6dzCa
GqhO12ohMzEupIu5Xurthq4VVUrzHUdj1ZZRMhAbfLU36sd03MMyL/xBqTN6dzCa GDGIPGpYjAllZ5xMRt2kZdv+Kr6oo3u2nLUIsqI7KQIDAQABMA0GCSqGSIb3DQEB
GDGIPGpYjAllZ5xMRt2kZdv+Kr6oo3u2nLUIsqI7KQIDAQABMA0GCSqGSIb3DQEB CwUAA4ICAQCB5s43YF35ssf5YONW5iAaifGpi1o0866xfeOybtohFGvQ7V2W34i9
CwUAA4ICAQCB5s43YF35ssf5YONW5iAaifGpi1o0866xfeOybtohFGvQ7V2W34i9 TYBCt8+0hgatMcvZ08f0vqig1i7nrvYcE1hnhL7JNkU8qm0s9ytHZt6j62nB0kd/
TYBCt8+0hgatMcvZ08f0vqig1i7nrvYcE1hnhL7JNkU8qm0s9ytHZt6j62nB0kd/ uqE2hOEQalTf/2TGPV0CCgiqLyd8lEUQvQeA38wktwUeZpVnErlzHeMR2CvV3K8R
uqE2hOEQalTf/2TGPV0CCgiqLyd8lEUQvQeA38wktwUeZpVnErlzHeMR2CvV3K8R u4vV6SnBcf+TAt56RKYZkPyvZj5llQPo14Glyoo8qZES7Ky1SHmM0GL+baPRBjRW
u4vV6SnBcf+TAt56RKYZkPyvZj5llQPo14Glyoo8qZES7Ky1SHmM0GL+baPRBjRW 3KgSt98Wyu4yr9qu21JpnbAnLhBfzfSKjSeCRgFElUE1GIaFGRZ7ypA74dUKeLnb
3KgSt98Wyu4yr9qu21JpnbAnLhBfzfSKjSeCRgFElUE1GIaFGRZ7ypA74dUKeLnb /VUWrszmUhGaEjV9dpI6x6B/kSpQMtIQqBaKRY2ALUeEujS/rURi4iMDwSU+GkSH
/VUWrszmUhGaEjV9dpI6x6B/kSpQMtIQqBaKRY2ALUeEujS/rURi4iMDwSU+GkSH cyEvZKS97OA/dWeXfLXdo4beDBRG93bI4rQnDg5+VdlBOkQSLueb8x6/VThMoC5d
cyEvZKS97OA/dWeXfLXdo4beDBRG93bI4rQnDg5+VdlBOkQSLueb8x6/VThMoC5d vZiotFQHseljQAdTkNa6tBu6c4XDYPCKB3CfkMYOlCfTS7Acn5G6dxTPKBtLGBnL
vZiotFQHseljQAdTkNa6tBu6c4XDYPCKB3CfkMYOlCfTS7Acn5G6dxTPKBtLGBnL nQfYyzuwYkN09+2PVzt6auBHr3To7uoclkxX+hxyvPIwIZ0N6b4tQR1FCAkvg29Q
nQfYyzuwYkN09+2PVzt6auBHr3To7uoclkxX+hxyvPIwIZ0N6b4tQR1FCAkvg29Q WIOjZOKGW690ESKCKOnFjUHVO0HpuWnT81URTuY62FXsYdVc2wE4v0E04mEbqQ0P
WIOjZOKGW690ESKCKOnFjUHVO0HpuWnT81URTuY62FXsYdVc2wE4v0E04mEbqQ0P lY6ZKNA81Lm3YADYtObmK1IUrOPo9BeIaPy0UM08SmN880Vunqa91Q==
lY6ZKNA81Lm3YADYtObmK1IUrOPo9BeIaPy0UM08SmN880Vunqa91Q== -----END CERTIFICATE-----
-----END CERTIFICATE----- """),
"""), new CircuitBreakerConfiguration(),
new CircuitBreakerConfiguration(), new RetryConfiguration());
new RetryConfiguration());
secureStorageClient = new SecureStorageClient(credentialsGenerator, httpExecutor, config); secureStorageClient = new SecureStorageClient(credentialsGenerator, httpExecutor, retryExecutor, config);
} }
@AfterEach @AfterEach
void tearDown() throws InterruptedException { void tearDown() throws InterruptedException {
httpExecutor.shutdown();
httpExecutor.awaitTermination(1, TimeUnit.SECONDS);
}
@Test httpExecutor.shutdown();
void deleteStoredData() { httpExecutor.awaitTermination(1, TimeUnit.SECONDS);
final String username = RandomStringUtils.randomAlphabetic(16); retryExecutor.shutdown();
final String password = RandomStringUtils.randomAlphanumeric(32); retryExecutor.awaitTermination(1, TimeUnit.SECONDS);
}
when(credentialsGenerator.generateForUuid(accountUuid)).thenReturn(new ExternalServiceCredentials(username, password)); @Test
void deleteStoredData() {
wireMock.stubFor(delete(urlEqualTo(SecureStorageClient.DELETE_PATH)) final String username = RandomStringUtils.randomAlphabetic(16);
.withBasicAuth(username, password) final String password = RandomStringUtils.randomAlphanumeric(32);
.willReturn(aResponse().withStatus(202)));
// We're happy as long as this doesn't throw an exception when(credentialsGenerator.generateForUuid(accountUuid)).thenReturn(
secureStorageClient.deleteStoredData(accountUuid).join(); new ExternalServiceCredentials(username, password));
}
@Test wireMock.stubFor(delete(urlEqualTo(SecureStorageClient.DELETE_PATH))
void deleteStoredDataFailure() { .withBasicAuth(username, password)
final String username = RandomStringUtils.randomAlphabetic(16); .willReturn(aResponse().withStatus(202)));
final String password = RandomStringUtils.randomAlphanumeric(32);
when(credentialsGenerator.generateForUuid(accountUuid)).thenReturn(new ExternalServiceCredentials(username, password)); // We're happy as long as this doesn't throw an exception
secureStorageClient.deleteStoredData(accountUuid).join();
}
wireMock.stubFor(delete(urlEqualTo(SecureStorageClient.DELETE_PATH)) @Test
.withBasicAuth(username, password) void deleteStoredDataFailure() {
.willReturn(aResponse().withStatus(400)));
final CompletionException completionException = assertThrows(CompletionException.class, () -> secureStorageClient.deleteStoredData(accountUuid).join()); final String username = RandomStringUtils.randomAlphabetic(16);
assertTrue(completionException.getCause() instanceof SecureStorageException); final String password = RandomStringUtils.randomAlphanumeric(32);
}
when(credentialsGenerator.generateForUuid(accountUuid)).thenReturn(
new ExternalServiceCredentials(username, password));
wireMock.stubFor(delete(urlEqualTo(SecureStorageClient.DELETE_PATH))
.withBasicAuth(username, password)
.willReturn(aResponse().withStatus(400)));
final CompletionException completionException = assertThrows(CompletionException.class,
() -> secureStorageClient.deleteStoredData(accountUuid).join());
assertTrue(completionException.getCause() instanceof SecureStorageException);
}
} }

View File

@ -23,6 +23,7 @@ import java.util.UUID;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -38,6 +39,7 @@ class SecureValueRecovery2ClientTest {
private UUID accountUuid; private UUID accountUuid;
private ExternalServiceCredentialsGenerator credentialsGenerator; private ExternalServiceCredentialsGenerator credentialsGenerator;
private ExecutorService httpExecutor; private ExecutorService httpExecutor;
private ScheduledExecutorService retryExecutor;
private SecureValueRecovery2Client secureValueRecovery2Client; private SecureValueRecovery2Client secureValueRecovery2Client;
@ -51,6 +53,7 @@ class SecureValueRecovery2ClientTest {
accountUuid = UUID.randomUUID(); accountUuid = UUID.randomUUID();
credentialsGenerator = mock(ExternalServiceCredentialsGenerator.class); credentialsGenerator = mock(ExternalServiceCredentialsGenerator.class);
httpExecutor = Executors.newSingleThreadExecutor(); httpExecutor = Executors.newSingleThreadExecutor();
retryExecutor = Executors.newSingleThreadScheduledExecutor();
final SecureValueRecovery2Configuration config = new SecureValueRecovery2Configuration( final SecureValueRecovery2Configuration config = new SecureValueRecovery2Configuration(
"http://localhost:" + wireMock.getPort(), "http://localhost:" + wireMock.getPort(),
@ -104,13 +107,16 @@ class SecureValueRecovery2ClientTest {
"""), """),
null, null); null, null);
secureValueRecovery2Client = new SecureValueRecovery2Client(credentialsGenerator, httpExecutor, config); secureValueRecovery2Client = new SecureValueRecovery2Client(credentialsGenerator, httpExecutor, retryExecutor,
config);
} }
@AfterEach @AfterEach
void tearDown() throws InterruptedException { void tearDown() throws InterruptedException {
httpExecutor.shutdown(); httpExecutor.shutdown();
httpExecutor.awaitTermination(1, TimeUnit.SECONDS); httpExecutor.awaitTermination(1, TimeUnit.SECONDS);
retryExecutor.shutdown();
retryExecutor.awaitTermination(1, TimeUnit.SECONDS);
} }
@Test @Test

View File

@ -20,7 +20,12 @@ import java.net.http.HttpClient;
import java.net.http.HttpRequest; import java.net.http.HttpRequest;
import java.net.http.HttpResponse; import java.net.http.HttpResponse;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
@ -34,21 +39,38 @@ class FaultTolerantHttpClientTest {
.options(wireMockConfig().dynamicPort().dynamicHttpsPort()) .options(wireMockConfig().dynamicPort().dynamicHttpsPort())
.build(); .build();
private ExecutorService httpExecutor;
private ScheduledExecutorService retryExecutor;
@BeforeEach
void setUp() {
httpExecutor = Executors.newSingleThreadExecutor();
retryExecutor = Executors.newSingleThreadScheduledExecutor();
}
@AfterEach
void tearDown() throws InterruptedException {
httpExecutor.shutdown();
httpExecutor.awaitTermination(1, TimeUnit.SECONDS);
retryExecutor.shutdown();
retryExecutor.awaitTermination(1, TimeUnit.SECONDS);
}
@Test @Test
void testSimpleGet() { void testSimpleGet() {
wireMock.stubFor(get(urlEqualTo("/ping")) wireMock.stubFor(get(urlEqualTo("/ping"))
.willReturn(aResponse() .willReturn(aResponse()
.withHeader("Content-Type", "text/plain") .withHeader("Content-Type", "text/plain")
.withBody("Pong!"))); .withBody("Pong!")));
FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder() FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder()
.withCircuitBreaker(new CircuitBreakerConfiguration()) .withCircuitBreaker(new CircuitBreakerConfiguration())
.withRetry(new RetryConfiguration()) .withRetry(new RetryConfiguration())
.withExecutor(Executors.newSingleThreadExecutor()) .withExecutor(httpExecutor)
.withName("test") .withRetryExecutor(retryExecutor)
.withVersion(HttpClient.Version.HTTP_2) .withName("test")
.build(); .withVersion(HttpClient.Version.HTTP_2)
.build();
HttpRequest request = HttpRequest.newBuilder() HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:" + wireMock.getPort() + "/ping")) .uri(URI.create("http://localhost:" + wireMock.getPort() + "/ping"))
@ -72,12 +94,13 @@ class FaultTolerantHttpClientTest {
.withBody("Pong!"))); .withBody("Pong!")));
FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder() FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder()
.withCircuitBreaker(new CircuitBreakerConfiguration()) .withCircuitBreaker(new CircuitBreakerConfiguration())
.withRetry(new RetryConfiguration()) .withRetry(new RetryConfiguration())
.withExecutor(Executors.newSingleThreadExecutor()) .withExecutor(httpExecutor)
.withName("test") .withRetryExecutor(retryExecutor)
.withVersion(HttpClient.Version.HTTP_2) .withName("test")
.build(); .withVersion(HttpClient.Version.HTTP_2)
.build();
HttpRequest request = HttpRequest.newBuilder() HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:" + wireMock.getPort() + "/failure")) .uri(URI.create("http://localhost:" + wireMock.getPort() + "/failure"))
@ -104,7 +127,8 @@ class FaultTolerantHttpClientTest {
FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder() FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder()
.withCircuitBreaker(circuitBreakerConfiguration) .withCircuitBreaker(circuitBreakerConfiguration)
.withRetry(new RetryConfiguration()) .withRetry(new RetryConfiguration())
.withExecutor(Executors.newSingleThreadExecutor()) .withRetryExecutor(retryExecutor)
.withExecutor(httpExecutor)
.withName("test") .withName("test")
.withVersion(HttpClient.Version.HTTP_2) .withVersion(HttpClient.Version.HTTP_2)
.build(); .build();