Use container-managed executors for APN/GCM senders.
This commit is contained in:
parent
2f7bb3499d
commit
96d3a69479
|
@ -299,6 +299,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
|
|
||||||
ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(2).build();
|
ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(2).build();
|
||||||
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build();
|
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build();
|
||||||
|
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build();
|
||||||
|
ExecutorService gcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "gcmSender-%d")).maxThreads(1).minThreads(1).build();
|
||||||
|
|
||||||
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, recurringJobExecutor, keyspaceNotificationDispatchExecutor);
|
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, recurringJobExecutor, keyspaceNotificationDispatchExecutor);
|
||||||
DirectoryManager directory = new DirectoryManager(directoryClient);
|
DirectoryManager directory = new DirectoryManager(directoryClient);
|
||||||
|
@ -316,8 +318,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
||||||
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
|
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
|
||||||
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
|
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
|
||||||
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
|
APNSender apnSender = new APNSender(apnSenderExecutor, accountsManager, config.getApnConfiguration());
|
||||||
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
|
GCMSender gcmSender = new GCMSender(gcmSenderExecutor, accountsManager, config.getGcmConfiguration().getApiKey());
|
||||||
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
|
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
|
||||||
ProvisioningManager provisioningManager = new ProvisioningManager(pubSubManager);
|
ProvisioningManager provisioningManager = new ProvisioningManager(pubSubManager);
|
||||||
|
|
||||||
|
|
|
@ -54,17 +54,18 @@ public class APNSender implements Managed {
|
||||||
private static final Meter unregisteredEventStale = metricRegistry.meter(name(APNSender.class, "unregistered_event_stale"));
|
private static final Meter unregisteredEventStale = metricRegistry.meter(name(APNSender.class, "unregistered_event_stale"));
|
||||||
private static final Meter unregisteredEventFresh = metricRegistry.meter(name(APNSender.class, "unregistered_event_fresh"));
|
private static final Meter unregisteredEventFresh = metricRegistry.meter(name(APNSender.class, "unregistered_event_fresh"));
|
||||||
|
|
||||||
private ExecutorService executor;
|
|
||||||
private ApnFallbackManager fallbackManager;
|
private ApnFallbackManager fallbackManager;
|
||||||
|
|
||||||
|
private final ExecutorService executor;
|
||||||
private final AccountsManager accountsManager;
|
private final AccountsManager accountsManager;
|
||||||
private final String bundleId;
|
private final String bundleId;
|
||||||
private final boolean sandbox;
|
private final boolean sandbox;
|
||||||
private final RetryingApnsClient apnsClient;
|
private final RetryingApnsClient apnsClient;
|
||||||
|
|
||||||
public APNSender(AccountsManager accountsManager, ApnConfiguration configuration)
|
public APNSender(ExecutorService executor, AccountsManager accountsManager, ApnConfiguration configuration)
|
||||||
throws IOException, NoSuchAlgorithmException, InvalidKeyException
|
throws IOException, NoSuchAlgorithmException, InvalidKeyException
|
||||||
{
|
{
|
||||||
|
this.executor = executor;
|
||||||
this.accountsManager = accountsManager;
|
this.accountsManager = accountsManager;
|
||||||
this.bundleId = configuration.getBundleId();
|
this.bundleId = configuration.getBundleId();
|
||||||
this.sandbox = configuration.isSandboxEnabled();
|
this.sandbox = configuration.isSandboxEnabled();
|
||||||
|
@ -120,12 +121,10 @@ public class APNSender implements Managed {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
this.executor = Executors.newSingleThreadExecutor();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void stop() {
|
||||||
this.executor.shutdown();
|
|
||||||
this.apnsClient.disconnect();
|
this.apnsClient.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,13 +22,11 @@ import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
import io.dropwizard.lifecycle.Managed;
|
|
||||||
|
|
||||||
public class GCMSender implements Managed {
|
public class GCMSender {
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(GCMSender.class);
|
private final Logger logger = LoggerFactory.getLogger(GCMSender.class);
|
||||||
|
|
||||||
|
@ -46,17 +44,16 @@ public class GCMSender implements Managed {
|
||||||
|
|
||||||
private final AccountsManager accountsManager;
|
private final AccountsManager accountsManager;
|
||||||
private final Sender signalSender;
|
private final Sender signalSender;
|
||||||
private ExecutorService executor;
|
private final ExecutorService executor;
|
||||||
|
|
||||||
public GCMSender(AccountsManager accountsManager, String signalKey) {
|
public GCMSender(ExecutorService executor, AccountsManager accountsManager, String signalKey) {
|
||||||
this.accountsManager = accountsManager;
|
this(executor, accountsManager, new Sender(signalKey, SystemMapper.getMapper(), 6));
|
||||||
this.signalSender = new Sender(signalKey, SystemMapper.getMapper(), 6);
|
|
||||||
|
|
||||||
CircuitBreakerUtil.registerMetrics(metricRegistry, signalSender.getRetry(), Sender.class);
|
CircuitBreakerUtil.registerMetrics(metricRegistry, signalSender.getRetry(), Sender.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public GCMSender(AccountsManager accountsManager, Sender sender, ExecutorService executor) {
|
public GCMSender(ExecutorService executor, AccountsManager accountsManager, Sender sender) {
|
||||||
this.accountsManager = accountsManager;
|
this.accountsManager = accountsManager;
|
||||||
this.signalSender = sender;
|
this.signalSender = sender;
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
|
@ -99,16 +96,6 @@ public class GCMSender implements Managed {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() {
|
|
||||||
executor = Executors.newSingleThreadExecutor();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop() {
|
|
||||||
this.executor.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleBadRegistration(GcmMessage message) {
|
private void handleBadRegistration(GcmMessage message) {
|
||||||
Optional<Account> account = getAccountForEvent(message);
|
Optional<Account> account = getAccountForEvent(message);
|
||||||
|
|
||||||
|
|
|
@ -191,7 +191,6 @@ public class MessageSender implements Managed {
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
apnSender.start();
|
apnSender.start();
|
||||||
gcmSender.start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -200,6 +199,5 @@ public class MessageSender implements Managed {
|
||||||
executor.awaitTermination(5, TimeUnit.MINUTES);
|
executor.awaitTermination(5, TimeUnit.MINUTES);
|
||||||
|
|
||||||
apnSender.stop();
|
apnSender.stop();
|
||||||
gcmSender.stop();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,7 @@ public class GCMSenderTest {
|
||||||
when(successResult.isSuccess()).thenReturn(true);
|
when(successResult.isSuccess()).thenReturn(true);
|
||||||
|
|
||||||
GcmMessage message = new GcmMessage("foo", "+12223334444", 1, GcmMessage.Type.NOTIFICATION, Optional.empty());
|
GcmMessage message = new GcmMessage("foo", "+12223334444", 1, GcmMessage.Type.NOTIFICATION, Optional.empty());
|
||||||
GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService);
|
GCMSender gcmSender = new GCMSender(executorService, accountsManager, sender);
|
||||||
|
|
||||||
CompletableFuture<Result> successFuture = CompletableFuture.completedFuture(successResult);
|
CompletableFuture<Result> successFuture = CompletableFuture.completedFuture(successResult);
|
||||||
|
|
||||||
|
@ -67,7 +67,7 @@ public class GCMSenderTest {
|
||||||
when(invalidResult.isSuccess()).thenReturn(true);
|
when(invalidResult.isSuccess()).thenReturn(true);
|
||||||
|
|
||||||
GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, GcmMessage.Type.NOTIFICATION, Optional.empty());
|
GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, GcmMessage.Type.NOTIFICATION, Optional.empty());
|
||||||
GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService);
|
GCMSender gcmSender = new GCMSender(executorService, accountsManager, sender);
|
||||||
|
|
||||||
CompletableFuture<Result> invalidFuture = CompletableFuture.completedFuture(invalidResult);
|
CompletableFuture<Result> invalidFuture = CompletableFuture.completedFuture(invalidResult);
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@ public class GCMSenderTest {
|
||||||
when(canonicalResult.getCanonicalRegistrationId()).thenReturn(canonicalId);
|
when(canonicalResult.getCanonicalRegistrationId()).thenReturn(canonicalId);
|
||||||
|
|
||||||
GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, GcmMessage.Type.NOTIFICATION, Optional.empty());
|
GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, GcmMessage.Type.NOTIFICATION, Optional.empty());
|
||||||
GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService);
|
GCMSender gcmSender = new GCMSender(executorService, accountsManager, sender);
|
||||||
|
|
||||||
CompletableFuture<Result> invalidFuture = CompletableFuture.completedFuture(canonicalResult);
|
CompletableFuture<Result> invalidFuture = CompletableFuture.completedFuture(canonicalResult);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue