Move batch identity checks off the common fork join pool
This commit is contained in:
parent
cc8dda28cc
commit
da49db5b9e
|
@ -11,7 +11,6 @@ import com.amazonaws.auth.InstanceProfileCredentialsProvider;
|
|||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.jdbi3.strategies.DefaultNameStrategy;
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
|
@ -24,8 +23,6 @@ import io.dropwizard.auth.PolymorphicAuthDynamicFeature;
|
|||
import io.dropwizard.auth.PolymorphicAuthValueFactoryProvider;
|
||||
import io.dropwizard.auth.basic.BasicCredentialAuthFilter;
|
||||
import io.dropwizard.auth.basic.BasicCredentials;
|
||||
import io.dropwizard.db.PooledDataSourceFactory;
|
||||
import io.dropwizard.jdbi3.JdbiFactory;
|
||||
import io.dropwizard.setup.Bootstrap;
|
||||
import io.dropwizard.setup.Environment;
|
||||
import io.lettuce.core.resource.ClientResources;
|
||||
|
@ -55,7 +52,6 @@ import javax.servlet.FilterRegistration;
|
|||
import javax.servlet.ServletRegistration;
|
||||
import org.eclipse.jetty.servlets.CrossOriginFilter;
|
||||
import org.glassfish.jersey.server.ServerProperties;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.signal.i18n.HeaderControlledResourceBundleLookup;
|
||||
import org.signal.libsignal.zkgroup.ServerSecretParams;
|
||||
import org.signal.libsignal.zkgroup.auth.ServerZkAuthOperations;
|
||||
|
@ -178,7 +174,6 @@ import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawler;
|
|||
import org.whispersystems.textsecuregcm.storage.DirectoryReconciler;
|
||||
import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
import org.whispersystems.textsecuregcm.storage.IssuedReceiptsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Keys;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagePersister;
|
||||
|
@ -383,6 +378,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
ExecutorService gcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "gcmSender-%d")).maxThreads(1).minThreads(1).build();
|
||||
ExecutorService backupServiceExecutor = environment.lifecycle().executorService(name(getClass(), "backupService-%d")).maxThreads(1).minThreads(1).build();
|
||||
ExecutorService storageServiceExecutor = environment.lifecycle().executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
|
||||
|
||||
// TODO: generally speaking this is a DynamoDB I/O executor for the accounts table; we should eventually have a general executor for speaking to the accounts table, but most of the server is still synchronous so this isn't widely useful yet
|
||||
ExecutorService batchIdentityCheckExecutor = environment.lifecycle().executorService(name(getClass(), "batchIdentityCheck-%d")).minThreads(32).maxThreads(32).build();
|
||||
ExecutorService multiRecipientMessageExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "multiRecipientMessage-%d")).minThreads(64).maxThreads(64).build();
|
||||
ExecutorService stripeExecutor = environment.lifecycle().executorService(name(getClass(), "stripe-%d")).
|
||||
|
@ -636,7 +634,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, deletedAccountsManager,
|
||||
messagesManager, apnFallbackManager, reportMessageManager, multiRecipientMessageExecutor),
|
||||
new PaymentsController(currencyManager, paymentsCredentialsGenerator),
|
||||
new ProfileController(clock, rateLimiters, accountsManager, profilesManager, dynamicConfigurationManager, profileBadgeConverter, config.getBadges(), cdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, config.getCdnConfiguration().getBucket(), zkProfileOperations),
|
||||
new ProfileController(clock, rateLimiters, accountsManager, profilesManager, dynamicConfigurationManager, profileBadgeConverter, config.getBadges(), cdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, config.getCdnConfiguration().getBucket(), zkProfileOperations, batchIdentityCheckExecutor),
|
||||
new ProvisioningController(rateLimiters, provisioningManager),
|
||||
new RemoteConfigController(remoteConfigsManager, config.getRemoteConfigConfiguration().getAuthorizedTokens(), config.getRemoteConfigConfiguration().getGlobalConfig()),
|
||||
new SecureBackupController(backupCredentialsGenerator),
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.controllers;
|
|||
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||
|
||||
import com.codahale.metrics.annotation.Timed;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.dropwizard.auth.Auth;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
|
@ -33,6 +34,7 @@ import java.util.Map.Entry;
|
|||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.validation.Valid;
|
||||
|
@ -125,6 +127,8 @@ public class ProfileController {
|
|||
private final S3Client s3client;
|
||||
private final String bucket;
|
||||
|
||||
private final Executor batchIdentityCheckExecutor;
|
||||
|
||||
private static final String PROFILE_KEY_CREDENTIAL_TYPE = "profileKey";
|
||||
private static final String PNI_CREDENTIAL_TYPE = "pni";
|
||||
|
||||
|
@ -143,7 +147,8 @@ public class ProfileController {
|
|||
PostPolicyGenerator policyGenerator,
|
||||
PolicySigner policySigner,
|
||||
String bucket,
|
||||
ServerZkProfileOperations zkProfileOperations) {
|
||||
ServerZkProfileOperations zkProfileOperations,
|
||||
Executor batchIdentityCheckExecutor) {
|
||||
this.clock = clock;
|
||||
this.rateLimiters = rateLimiters;
|
||||
this.accountsManager = accountsManager;
|
||||
|
@ -157,6 +162,7 @@ public class ProfileController {
|
|||
this.s3client = s3client;
|
||||
this.policyGenerator = policyGenerator;
|
||||
this.policySigner = policySigner;
|
||||
this.batchIdentityCheckExecutor = Preconditions.checkNotNull(batchIdentityCheckExecutor);
|
||||
}
|
||||
|
||||
@Timed
|
||||
|
@ -354,11 +360,11 @@ public class ProfileController {
|
|||
for (final BatchIdentityCheckRequest.Element element : request.elements()) {
|
||||
checkFingerprintAndAdd(element, responseElements, sha256);
|
||||
}
|
||||
});
|
||||
}, batchIdentityCheckExecutor);
|
||||
}
|
||||
|
||||
return Tuple.of(futures, responseElements);
|
||||
}).thenComposeAsync(tuple2 -> CompletableFuture.allOf(tuple2._1).thenApply((ignored) -> new BatchIdentityCheckResponse(tuple2._2)));
|
||||
}).thenCompose(tuple2 -> CompletableFuture.allOf(tuple2._1).thenApply((ignored) -> new BatchIdentityCheckResponse(tuple2._2)));
|
||||
}
|
||||
|
||||
private void checkFingerprintAndAdd(BatchIdentityCheckRequest.Element element,
|
||||
|
|
|
@ -35,6 +35,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.stream.Stream;
|
||||
import javax.ws.rs.client.Entity;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
@ -148,7 +149,8 @@ class ProfileControllerTest {
|
|||
postPolicyGenerator,
|
||||
policySigner,
|
||||
"profilesBucket",
|
||||
zkProfileOperations))
|
||||
zkProfileOperations,
|
||||
Executors.newSingleThreadExecutor()))
|
||||
.build();
|
||||
|
||||
@BeforeEach
|
||||
|
|
Loading…
Reference in New Issue