Use a dedicated executor service for dispatching keyspace notifications.

This commit is contained in:
Jon Chambers 2020-09-01 17:06:13 -04:00 committed by Jon Chambers
parent ad01610d1e
commit 5c04f2634a
3 changed files with 15 additions and 13 deletions

View File

@ -281,11 +281,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration());
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration());
ScheduledExecutorService clientPresenceExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build();
ScheduledExecutorService refreshFeatureFlagsExecutor = environment.lifecycle().scheduledExecutorService("featureFlags").threads(1).build();
ExecutorService messageNotificationExecutor = environment.lifecycle().executorService("messageCacheNotifications").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build();
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService("messageCacheNotifications").maxThreads(8).workQueue(new ArrayBlockingQueue<>(10_000)).build();
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, clientPresenceExecutor);
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, recurringJobExecutor, keyspaceNotificationDispatchExecutor);
DirectoryManager directory = new DirectoryManager(directoryClient);
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheCluster);
@ -293,11 +292,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster);
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster, messageNotificationExecutor);
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
MessagesManager messagesManager = new MessagesManager(messages, clusterMessagesCache, pushLatencyManager);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, refreshFeatureFlagsExecutor);
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, recurringJobExecutor);
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);

View File

@ -27,6 +27,7 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -48,6 +49,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
private final ClusterLuaScript clearPresenceScript;
private final ExecutorService keyspaceNotificationExecutorService;
private final ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> pruneMissingPeersFuture;
@ -67,11 +69,12 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
private static final Logger log = LoggerFactory.getLogger(ClientPresenceManager.class);
public ClientPresenceManager(final FaultTolerantRedisCluster presenceCluster, final ScheduledExecutorService scheduledExecutorService) throws IOException {
this.presenceCluster = presenceCluster;
this.pubSubConnection = this.presenceCluster.createPubSubConnection();
this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua", ScriptOutputType.INTEGER);
this.scheduledExecutorService = scheduledExecutorService;
public ClientPresenceManager(final FaultTolerantRedisCluster presenceCluster, final ScheduledExecutorService scheduledExecutorService, final ExecutorService keyspaceNotificationExecutorService) throws IOException {
this.presenceCluster = presenceCluster;
this.pubSubConnection = this.presenceCluster.createPubSubConnection();
this.clearPresenceScript = ClusterLuaScript.fromResource(presenceCluster, "lua/clear_presence.lua", ScriptOutputType.INTEGER);
this.scheduledExecutorService = scheduledExecutorService;
this.keyspaceNotificationExecutorService = keyspaceNotificationExecutorService;
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
metricRegistry.gauge(name(getClass(), "localClientCount"), () -> displacementListenersByPresenceKey::size);
@ -240,7 +243,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
// Another process has overwritten this presence key, which means the client has connected to another host.
// At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making
// synchronous Lettuce calls to avoid deadlocking.
scheduledExecutorService.execute(() -> {
keyspaceNotificationExecutorService.execute(() -> {
displacePresence(channel.substring("__keyspace@0__:".length()));
remoteDisplacementMeter.mark();
});

View File

@ -36,7 +36,7 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
});
presenceRenewalExecutorService = Executors.newSingleThreadScheduledExecutor();
clientPresenceManager = new ClientPresenceManager(getRedisCluster(), presenceRenewalExecutorService);
clientPresenceManager = new ClientPresenceManager(getRedisCluster(), presenceRenewalExecutorService, presenceRenewalExecutorService);
}
@Override