Generalize scope of and expand size of websocket scheduled executor service

This commit is contained in:
Jon Chambers 2022-06-27 17:04:20 -04:00 committed by Jon Chambers
parent a45d95905e
commit 4e131858ca
3 changed files with 12 additions and 12 deletions

View File

@ -372,7 +372,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ScheduledExecutorService recurringJobExecutor = environment.lifecycle() ScheduledExecutorService recurringJobExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build(); .scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build();
ScheduledExecutorService retrySchedulingExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "retry-%d")).threads(2).build(); ScheduledExecutorService websocketScheduledExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "websocket-%d")).threads(8).build();
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build(); ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build();
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build(); ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build();
ExecutorService gcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "gcmSender-%d")).maxThreads(1).minThreads(1).build(); ExecutorService gcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "gcmSender-%d")).maxThreads(1).minThreads(1).build();
@ -604,7 +604,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator)); webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator));
webSocketEnvironment.setConnectListener( webSocketEnvironment.setConnectListener(
new AuthenticatedConnectListener(receiptSender, messagesManager, messageSender, apnFallbackManager, new AuthenticatedConnectListener(receiptSender, messagesManager, messageSender, apnFallbackManager,
clientPresenceManager, retrySchedulingExecutor)); clientPresenceManager, websocketScheduledExecutor));
webSocketEnvironment.jersey().register(new WebsocketRefreshApplicationEventListener(accountsManager, clientPresenceManager)); webSocketEnvironment.jersey().register(new WebsocketRefreshApplicationEventListener(accountsManager, clientPresenceManager));
webSocketEnvironment.jersey().register(new ContentLengthFilter(TrafficSource.WEBSOCKET)); webSocketEnvironment.jersey().register(new ContentLengthFilter(TrafficSource.WEBSOCKET));
webSocketEnvironment.jersey().register(MultiDeviceMessageListProvider.class); webSocketEnvironment.jersey().register(MultiDeviceMessageListProvider.class);

View File

@ -40,20 +40,20 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
private final MessageSender messageSender; private final MessageSender messageSender;
private final ApnFallbackManager apnFallbackManager; private final ApnFallbackManager apnFallbackManager;
private final ClientPresenceManager clientPresenceManager; private final ClientPresenceManager clientPresenceManager;
private final ScheduledExecutorService retrySchedulingExecutor; private final ScheduledExecutorService scheduledExecutorService;
public AuthenticatedConnectListener(ReceiptSender receiptSender, public AuthenticatedConnectListener(ReceiptSender receiptSender,
MessagesManager messagesManager, MessagesManager messagesManager,
final MessageSender messageSender, ApnFallbackManager apnFallbackManager, final MessageSender messageSender, ApnFallbackManager apnFallbackManager,
ClientPresenceManager clientPresenceManager, ClientPresenceManager clientPresenceManager,
ScheduledExecutorService retrySchedulingExecutor) ScheduledExecutorService scheduledExecutorService)
{ {
this.receiptSender = receiptSender; this.receiptSender = receiptSender;
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
this.messageSender = messageSender; this.messageSender = messageSender;
this.apnFallbackManager = apnFallbackManager; this.apnFallbackManager = apnFallbackManager;
this.clientPresenceManager = clientPresenceManager; this.clientPresenceManager = clientPresenceManager;
this.retrySchedulingExecutor = retrySchedulingExecutor; this.scheduledExecutorService = scheduledExecutorService;
} }
@Override @Override
@ -65,7 +65,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
final WebSocketConnection connection = new WebSocketConnection(receiptSender, final WebSocketConnection connection = new WebSocketConnection(receiptSender,
messagesManager, auth, device, messagesManager, auth, device,
context.getClient(), context.getClient(),
retrySchedulingExecutor); scheduledExecutorService);
openWebsocketCounter.inc(); openWebsocketCounter.inc();
RedisOperation.unchecked(() -> apnFallbackManager.cancel(auth.getAccount(), device)); RedisOperation.unchecked(() -> apnFallbackManager.cancel(auth.getAccount(), device));

View File

@ -101,7 +101,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private final int sendFuturesTimeoutMillis; private final int sendFuturesTimeoutMillis;
private final ScheduledExecutorService retrySchedulingExecutor; private final ScheduledExecutorService scheduledExecutorService;
private final boolean isDesktopClient; private final boolean isDesktopClient;
@ -127,7 +127,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
AuthenticatedAccount auth, AuthenticatedAccount auth,
Device device, Device device,
WebSocketClient client, WebSocketClient client,
ScheduledExecutorService retrySchedulingExecutor) { ScheduledExecutorService scheduledExecutorService) {
this(receiptSender, this(receiptSender,
messagesManager, messagesManager,
@ -135,7 +135,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
device, device,
client, client,
DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS, DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS,
retrySchedulingExecutor); scheduledExecutorService);
} }
@VisibleForTesting @VisibleForTesting
@ -145,7 +145,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
Device device, Device device,
WebSocketClient client, WebSocketClient client,
int sendFuturesTimeoutMillis, int sendFuturesTimeoutMillis,
ScheduledExecutorService retrySchedulingExecutor) { ScheduledExecutorService scheduledExecutorService) {
this.receiptSender = receiptSender; this.receiptSender = receiptSender;
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
@ -153,7 +153,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
this.device = device; this.device = device;
this.client = client; this.client = client;
this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis; this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis;
this.retrySchedulingExecutor = retrySchedulingExecutor; this.scheduledExecutorService = scheduledExecutorService;
Optional<ClientPlatform> maybePlatform; Optional<ClientPlatform> maybePlatform;
@ -294,7 +294,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
final long delay = RETRY_DELAY_MILLIS + random.nextInt(RETRY_DELAY_JITTER_MILLIS); final long delay = RETRY_DELAY_MILLIS + random.nextInt(RETRY_DELAY_JITTER_MILLIS);
retryFuture retryFuture
.set(retrySchedulingExecutor.schedule(this::processStoredMessages, delay, TimeUnit.MILLISECONDS)); .set(scheduledExecutorService.schedule(this::processStoredMessages, delay, TimeUnit.MILLISECONDS));
} }
} else { } else {
logger.debug("Client disconnected before queue cleared"); logger.debug("Client disconnected before queue cleared");