Rely on the client presence manager to decide whether to send push notifications.

This commit is contained in:
Jon Chambers 2020-09-01 16:59:57 -04:00 committed by Jon Chambers
parent 697c380cd1
commit ad01610d1e
2 changed files with 7 additions and 14 deletions

View File

@ -281,10 +281,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration()); FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration());
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration()); FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration());
ScheduledExecutorService clientPresenceExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build(); ScheduledExecutorService clientPresenceExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build();
ScheduledExecutorService refreshFeatureFlagsExecutor = environment.lifecycle().scheduledExecutorService("featureFlags").threads(1).build(); ScheduledExecutorService refreshFeatureFlagsExecutor = environment.lifecycle().scheduledExecutorService("featureFlags").threads(1).build();
ExecutorService messageNotificationExecutor = environment.lifecycle().executorService("messageCacheNotifications").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build(); ExecutorService messageNotificationExecutor = environment.lifecycle().executorService("messageCacheNotifications").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
ExecutorService websocketExperimentExecutor = environment.lifecycle().executorService("websocketPresenceExperiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, clientPresenceExecutor); ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, clientPresenceExecutor);
DirectoryManager directory = new DirectoryManager(directoryClient); DirectoryManager directory = new DirectoryManager(directoryClient);
@ -304,7 +303,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager); PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration()); APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey()); GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager, websocketExperimentExecutor); WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager);
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster); RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager); AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);

View File

@ -67,14 +67,10 @@ public class WebsocketSender {
private final PubSubManager pubSubManager; private final PubSubManager pubSubManager;
private final ClientPresenceManager clientPresenceManager; private final ClientPresenceManager clientPresenceManager;
private final Experiment presenceExperiment = new Experiment("presence", "websocketSender"); public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager) {
private final Executor experimentExecutor;
public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager, Executor experimentExecutor) {
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager; this.pubSubManager = pubSubManager;
this.clientPresenceManager = clientPresenceManager; this.clientPresenceManager = clientPresenceManager;
this.experimentExecutor = experimentExecutor;
} }
public DeliveryStatus sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) { public DeliveryStatus sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) {
@ -84,11 +80,9 @@ public class WebsocketSender {
.setContent(message.toByteString()) .setContent(message.toByteString())
.build(); .build();
final boolean clientPresent = pubSubManager.publish(address, pubSubMessage); pubSubManager.publish(address, pubSubMessage);
presenceExperiment.compareSupplierResultAsync(clientPresent, () -> clientPresenceManager.isPresent(account.getUuid(), device.getId()), experimentExecutor); if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) {
if (clientPresent) {
if (channel == Type.APN) apnOnlineMeter.mark(); if (channel == Type.APN) apnOnlineMeter.mark();
else if (channel == Type.GCM) gcmOnlineMeter.mark(); else if (channel == Type.GCM) gcmOnlineMeter.mark();
else websocketOnlineMeter.mark(); else websocketOnlineMeter.mark();