Enqueue async operations from a dedicated thread

This commit is contained in:
Jon Chambers 2024-12-16 17:12:24 -05:00 committed by Jon Chambers
parent 33c0a27b85
commit a96c0ec7a3
5 changed files with 42 additions and 13 deletions

View File

@ -487,6 +487,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService virtualThreadEventLoggerExecutor = environment.lifecycle()
.executorService(name(getClass(), "virtualThreadEventLogger-%d")).minThreads(1).maxThreads(1).build();
ExecutorService asyncOperationQueueingExecutor = environment.lifecycle()
.executorService(name(getClass(), "asyncOperationQueueing-%d")).minThreads(1).maxThreads(1).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "secureValueRecoveryServiceRetry-%d")).threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
@ -639,7 +641,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
PushNotificationManager pushNotificationManager =
new PushNotificationManager(accountsManager, apnSender, fcmSender, pushNotificationScheduler);
WebSocketConnectionEventManager webSocketConnectionEventManager =
new WebSocketConnectionEventManager(accountsManager, pushNotificationManager, messagesCluster, clientEventExecutor);
new WebSocketConnectionEventManager(accountsManager, pushNotificationManager, messagesCluster, clientEventExecutor, asyncOperationQueueingExecutor);
RateLimiters rateLimiters = RateLimiters.createAndValidate(config.getLimitsConfiguration(),
dynamicConfigurationManager, rateLimitersCluster);
ProvisioningManager provisioningManager = new ProvisioningManager(pubsubClient);

View File

@ -23,10 +23,12 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -64,6 +66,10 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
private final FaultTolerantRedisClusterClient clusterClient;
private final Executor listenerEventExecutor;
// Note that this MUST be a single-threaded executor; its function is to process tasks that should usually be
// non-blocking, but can rarely block, and do so in the order in which those tasks were submitted.
private final Executor asyncOperationQueueingExecutor;
@Nullable
private FaultTolerantPubSubClusterConnection<byte[], byte[]> pubSubConnection;
@ -102,13 +108,15 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
public WebSocketConnectionEventManager(final AccountsManager accountsManager,
final PushNotificationManager pushNotificationManager,
final FaultTolerantRedisClusterClient clusterClient,
final Executor listenerEventExecutor) {
final Executor listenerEventExecutor,
final Executor asyncOperationQueueingExecutor) {
this.accountsManager = accountsManager;
this.pushNotificationManager = pushNotificationManager;
this.clusterClient = clusterClient;
this.listenerEventExecutor = listenerEventExecutor;
this.asyncOperationQueueingExecutor = asyncOperationQueueingExecutor;
this.listenersByAccountAndDeviceIdentifier =
Metrics.gaugeMapSize(LISTENER_GAUGE_NAME, Tags.empty(), new ConcurrentHashMap<>());
@ -169,8 +177,9 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
// operation is asynchronous; we're not blocking on it in the scope of the `compute` operation.
listenersByAccountAndDeviceIdentifier.compute(new AccountAndDeviceIdentifier(accountIdentifier, deviceId),
(key, existingListener) -> {
subscribeFuture.set(pubSubConnection.withPubSubConnection(connection ->
connection.async().ssubscribe(eventChannel)));
subscribeFuture.set(CompletableFuture.supplyAsync(() -> pubSubConnection.withPubSubConnection(connection ->
connection.async().ssubscribe(eventChannel)), asyncOperationQueueingExecutor)
.thenCompose(Function.identity()));
if (existingListener != null) {
displacedListener.set(existingListener);
@ -223,9 +232,10 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
// operation is asynchronous; we're not blocking on it in the scope of the `compute` operation.
listenersByAccountAndDeviceIdentifier.compute(new AccountAndDeviceIdentifier(accountIdentifier, deviceId),
(ignored, existingListener) -> {
unsubscribeFuture.set(pubSubConnection.withPubSubConnection(connection ->
connection.async().sunsubscribe(getClientEventChannel(accountIdentifier, deviceId)))
.thenRun(Util.NOOP));
unsubscribeFuture.set(CompletableFuture.supplyAsync(() -> pubSubConnection.withPubSubConnection(connection ->
connection.async().sunsubscribe(getClientEventChannel(accountIdentifier, deviceId)))
.thenRun(Util.NOOP), asyncOperationQueueingExecutor)
.thenCompose(Function.identity()));
return null;
});
@ -295,9 +305,9 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
listenersByAccountAndDeviceIdentifier.compute(accountAndDeviceIdentifier, (ignored, existingListener) -> {
if (existingListener == null && pubSubConnection != null) {
// Enqueue, but do not block on, an "unsubscribe" operation
pubSubConnection.usePubSubConnection(connection ->
asyncOperationQueueingExecutor.execute(() -> pubSubConnection.usePubSubConnection(connection ->
connection.async().sunsubscribe(getClientEventChannel(accountAndDeviceIdentifier.accountIdentifier(),
accountAndDeviceIdentifier.deviceId())));
accountAndDeviceIdentifier.deviceId()))));
}
// Make no change to the existing listener whether present or absent

View File

@ -143,6 +143,8 @@ record CommandDependencies(
.maxThreads(16).minThreads(16).build();
ExecutorService clientEventExecutor = environment.lifecycle()
.virtualExecutorService(name(name, "clientEvent-%d"));
ExecutorService asyncOperationQueueingExecutor = environment.lifecycle()
.executorService(name(name, "asyncOperationQueueing-%d")).minThreads(1).maxThreads(1).build();
ExecutorService disconnectionRequestListenerExecutor = environment.lifecycle()
.virtualExecutorService(name(name, "disconnectionRequest-%d"));
@ -283,7 +285,7 @@ record CommandDependencies(
Clock.systemUTC());
WebSocketConnectionEventManager webSocketConnectionEventManager =
new WebSocketConnectionEventManager(accountsManager, pushNotificationManager, messagesCluster, clientEventExecutor);
new WebSocketConnectionEventManager(accountsManager, pushNotificationManager, messagesCluster, clientEventExecutor, asyncOperationQueueingExecutor);
environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(disconnectionRequestManager);

View File

@ -51,6 +51,7 @@ class WebSocketConnectionEventManagerTest {
private WebSocketConnectionEventManager remoteEventManager;
private static ExecutorService webSocketConnectionEventExecutor;
private static ExecutorService asyncOperationQueueingExecutor;
@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
@ -73,6 +74,7 @@ class WebSocketConnectionEventManagerTest {
@BeforeAll
static void setUpBeforeAll() {
webSocketConnectionEventExecutor = Executors.newVirtualThreadPerTaskExecutor();
asyncOperationQueueingExecutor = Executors.newSingleThreadExecutor();
}
@BeforeEach
@ -80,12 +82,14 @@ class WebSocketConnectionEventManagerTest {
localEventManager = new WebSocketConnectionEventManager(mock(AccountsManager.class),
mock(PushNotificationManager.class),
REDIS_CLUSTER_EXTENSION.getRedisCluster(),
webSocketConnectionEventExecutor);
webSocketConnectionEventExecutor,
asyncOperationQueueingExecutor);
remoteEventManager = new WebSocketConnectionEventManager(mock(AccountsManager.class),
mock(PushNotificationManager.class),
REDIS_CLUSTER_EXTENSION.getRedisCluster(),
webSocketConnectionEventExecutor);
webSocketConnectionEventExecutor,
asyncOperationQueueingExecutor);
localEventManager.start();
remoteEventManager.start();
@ -100,6 +104,7 @@ class WebSocketConnectionEventManagerTest {
@AfterAll
static void tearDownAfterAll() {
webSocketConnectionEventExecutor.shutdown();
asyncOperationQueueingExecutor.shutdown();
}
@ParameterizedTest
@ -242,6 +247,7 @@ class WebSocketConnectionEventManagerTest {
mock(AccountsManager.class),
mock(PushNotificationManager.class),
clusterClient,
Runnable::run,
Runnable::run);
eventManager.start();
@ -309,6 +315,7 @@ class WebSocketConnectionEventManagerTest {
mock(AccountsManager.class),
mock(PushNotificationManager.class),
clusterClient,
Runnable::run,
Runnable::run);
eventManager.start();
@ -366,6 +373,7 @@ class WebSocketConnectionEventManagerTest {
accountsManager,
pushNotificationManager,
clusterClient,
Runnable::run,
Runnable::run);
eventManager.start();

View File

@ -54,6 +54,7 @@ class MessagePersisterIntegrationTest {
private Scheduler messageDeliveryScheduler;
private ExecutorService messageDeletionExecutorService;
private ExecutorService websocketConnectionEventExecutor;
private ExecutorService asyncOperationQueueingExecutor;
private MessagesCache messagesCache;
private MessagesManager messagesManager;
private WebSocketConnectionEventManager webSocketConnectionEventManager;
@ -86,10 +87,12 @@ class MessagePersisterIntegrationTest {
messageDeletionExecutorService);
websocketConnectionEventExecutor = Executors.newVirtualThreadPerTaskExecutor();
asyncOperationQueueingExecutor = Executors.newSingleThreadExecutor();
webSocketConnectionEventManager = new WebSocketConnectionEventManager(mock(AccountsManager.class),
mock(PushNotificationManager.class),
REDIS_CLUSTER_EXTENSION.getRedisCluster(),
websocketConnectionEventExecutor);
websocketConnectionEventExecutor,
asyncOperationQueueingExecutor);
webSocketConnectionEventManager.start();
@ -108,6 +111,7 @@ class MessagePersisterIntegrationTest {
when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration());
}
@SuppressWarnings("ResultOfMethodCallIgnored")
@AfterEach
void tearDown() throws Exception {
messageDeletionExecutorService.shutdown();
@ -116,6 +120,9 @@ class MessagePersisterIntegrationTest {
websocketConnectionEventExecutor.shutdown();
websocketConnectionEventExecutor.awaitTermination(15, TimeUnit.SECONDS);
asyncOperationQueueingExecutor.shutdown();
asyncOperationQueueingExecutor.awaitTermination(15, TimeUnit.SECONDS);
messageDeliveryScheduler.dispose();
webSocketConnectionEventManager.stop();