diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 1efc81861..2ba46edab 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -28,6 +28,7 @@ import io.lettuce.core.resource.ClientResources; import io.micrometer.core.instrument.Meter.Id; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; import io.micrometer.core.instrument.config.MeterFilter; import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import io.micrometer.datadog.DatadogMeterRegistry; @@ -137,6 +138,7 @@ import org.whispersystems.textsecuregcm.metrics.GarbageCollectionGauges; import org.whispersystems.textsecuregcm.metrics.MaxFileDescriptorGauge; import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener; import org.whispersystems.textsecuregcm.metrics.MetricsRequestEventListener; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.MicrometerRegistryManager; import org.whispersystems.textsecuregcm.metrics.NetworkReceivedGauge; import org.whispersystems.textsecuregcm.metrics.NetworkSentGauge; @@ -234,6 +236,8 @@ import org.whispersystems.textsecuregcm.workers.UnlinkDeviceCommand; import org.whispersystems.textsecuregcm.workers.ZkParamsCommand; import org.whispersystems.websocket.WebSocketResourceProviderFactory; import org.whispersystems.websocket.setup.WebSocketEnvironment; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; @@ -399,12 +403,15 @@ public class WhisperServerService extends Application keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(100_000); - Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), keyspaceNotificationDispatchQueue); + Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), + keyspaceNotificationDispatchQueue); final BlockingQueue receiptSenderQueue = new LinkedBlockingQueue<>(); Metrics.gaugeCollectionSize(name(getClass(), "receiptSenderQueue"), Collections.emptyList(), receiptSenderQueue); - final BlockingQueue fcmSenderQueue = new LinkedBlockingQueue<>(); Metrics.gaugeCollectionSize(name(getClass(), "fcmSenderQueue"), Collections.emptyList(), fcmSenderQueue); + final BlockingQueue messageDeliveryQueue = new LinkedBlockingQueue<>(); + Metrics.gaugeCollectionSize(MetricsUtil.name(getClass(), "messageDeliveryQueue"), Collections.emptyList(), + messageDeliveryQueue); ScheduledExecutorService recurringJobExecutor = environment.lifecycle() .scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build(); @@ -416,6 +423,16 @@ public class WhisperServerService extends Application imp private final Clock clock; private final ExecutorService notificationExecutorService; + private final Scheduler messageDeliveryScheduler; private final ExecutorService messageDeletionExecutorService; // messageDeletionExecutorService wrapped into a reactor Scheduler private final Scheduler messageDeletionScheduler; @@ -103,7 +104,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class); public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster, - final Clock clock, final ExecutorService notificationExecutorService, + final Clock clock, final ExecutorService notificationExecutorService, final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService) throws IOException { this.readDeleteCluster = readDeleteCluster; @@ -111,6 +112,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp this.clock = clock; this.notificationExecutorService = notificationExecutorService; + this.messageDeliveryScheduler = messageDeliveryScheduler; this.messageDeletionExecutorService = messageDeletionExecutorService; this.messageDeletionScheduler = Schedulers.fromExecutorService(messageDeletionExecutorService, "messageDeletion"); @@ -263,7 +265,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp }) .limitRate(1) // we want to ensure we don’t accidentally block the Lettuce/netty i/o executors - .publishOn(Schedulers.boundedElastic()) + .publishOn(messageDeliveryScheduler) .map(Pair::first) .flatMapIterable(queueItems -> { final List envelopes = new ArrayList<>(queueItems.size() / 2); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java index 034bf75ec..4569f6b14 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java @@ -11,6 +11,8 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tags; import java.util.EnumMap; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; @@ -18,8 +20,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tags; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; @@ -38,6 +38,7 @@ import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException; import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil; import org.whispersystems.websocket.session.WebSocketSessionContext; import org.whispersystems.websocket.setup.WebSocketConnectListener; +import reactor.core.scheduler.Scheduler; public class AuthenticatedConnectListener implements WebSocketConnectListener { @@ -61,6 +62,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { private final PushNotificationManager pushNotificationManager; private final ClientPresenceManager clientPresenceManager; private final ScheduledExecutorService scheduledExecutorService; + private final Scheduler messageDeliveryScheduler; private final Map openWebsocketsByClientPlatform; private final AtomicInteger openWebsocketsFromUnknownPlatforms; @@ -69,12 +71,14 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { MessagesManager messagesManager, PushNotificationManager pushNotificationManager, ClientPresenceManager clientPresenceManager, - ScheduledExecutorService scheduledExecutorService) { + ScheduledExecutorService scheduledExecutorService, + Scheduler messageDeliveryScheduler) { this.receiptSender = receiptSender; this.messagesManager = messagesManager; this.pushNotificationManager = pushNotificationManager; this.clientPresenceManager = clientPresenceManager; this.scheduledExecutorService = scheduledExecutorService; + this.messageDeliveryScheduler = messageDeliveryScheduler; openWebsocketsByClientPlatform = new EnumMap<>(ClientPlatform.class); @@ -100,7 +104,8 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, context.getClient(), - scheduledExecutorService); + scheduledExecutorService, + messageDeliveryScheduler); final AtomicInteger openWebsocketAtomicInteger = getOpenWebsocketCounter(context.getClient().getUserAgent()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 2b7d60a59..575b3e420 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -56,7 +56,6 @@ import reactor.core.observability.micrometer.Micrometer; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; public class WebSocketConnection implements MessageAvailabilityListener, DisplacedPresenceListener { @@ -127,7 +126,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private final AtomicReference messageSubscription = new AtomicReference<>(); private final Random random = new Random(); - private final Scheduler reactiveScheduler; + private final Scheduler messageDeliveryScheduler; private enum StoredMessageState { EMPTY, @@ -136,29 +135,12 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac } public WebSocketConnection(ReceiptSender receiptSender, - MessagesManager messagesManager, - AuthenticatedAccount auth, - Device device, - WebSocketClient client, - ScheduledExecutorService scheduledExecutorService) { - - this(receiptSender, - messagesManager, - auth, - device, - client, - scheduledExecutorService, - Schedulers.boundedElastic()); - } - - @VisibleForTesting - WebSocketConnection(ReceiptSender receiptSender, MessagesManager messagesManager, AuthenticatedAccount auth, Device device, WebSocketClient client, ScheduledExecutorService scheduledExecutorService, - Scheduler reactiveScheduler) { + Scheduler messageDeliveryScheduler) { this(receiptSender, messagesManager, @@ -167,7 +149,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac client, DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS, scheduledExecutorService, - reactiveScheduler); + messageDeliveryScheduler); } @VisibleForTesting @@ -178,7 +160,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac WebSocketClient client, int sendFuturesTimeoutMillis, ScheduledExecutorService scheduledExecutorService, - Scheduler reactiveScheduler) { + Scheduler messageDeliveryScheduler) { this.receiptSender = receiptSender; this.messagesManager = messagesManager; @@ -187,7 +169,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac this.client = client; this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis; this.scheduledExecutorService = scheduledExecutorService; - this.reactiveScheduler = reactiveScheduler; + this.messageDeliveryScheduler = messageDeliveryScheduler; } public void start() { @@ -366,7 +348,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac .flatMapSequential(envelope -> Mono.fromFuture(sendMessage(envelope) .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS))) - .subscribeOn(reactiveScheduler) + .subscribeOn(messageDeliveryScheduler) .subscribe( // no additional consumer of values - it is Flux by now null, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java index 3dabc6d5a..96b949a31 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java @@ -58,6 +58,8 @@ import org.whispersystems.textsecuregcm.storage.UsernameHashNotAvailableExceptio import org.whispersystems.textsecuregcm.storage.UsernameReservationNotFoundException; import org.whispersystems.textsecuregcm.storage.VerificationCodeStore; import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; @@ -100,6 +102,9 @@ public class AssignUsernameCommand extends EnvironmentCommand dynamicConfigurationManager = new DynamicConfigurationManager<>( configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(), configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class); @@ -190,7 +195,7 @@ public class AssignUsernameCommand extends EnvironmentCommand actualMessages = Flux.from( @@ -547,6 +554,7 @@ class MessagesCacheTest { private MessagesCache messagesCache; private RedisAdvancedClusterReactiveCommands reactiveCommands; private RedisAdvancedClusterAsyncCommands asyncCommands; + private Scheduler messageDeliveryScheduler; @SuppressWarnings("unchecked") @BeforeEach @@ -559,13 +567,16 @@ class MessagesCacheTest { .binaryAsyncCommands(asyncCommands) .build(); + messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); + messagesCache = new MessagesCache(mockCluster, mockCluster, Clock.systemUTC(), mock(ExecutorService.class), - Executors.newSingleThreadExecutor()); + messageDeliveryScheduler, Executors.newSingleThreadExecutor()); } @AfterEach void teardown() { StepVerifier.resetDefaultTimeout(); + messageDeliveryScheduler.dispose(); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index d47213ba0..5cf17757d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -59,6 +59,7 @@ import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbExtension; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.messages.WebSocketResponseMessage; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; class WebSocketConnectionIntegrationTest { @@ -77,6 +78,7 @@ class WebSocketConnectionIntegrationTest { private Device device; private WebSocketClient webSocketClient; private ScheduledExecutorService retrySchedulingExecutor; + private Scheduler messageDeliveryScheduler; private long serialTimestamp = System.currentTimeMillis(); @@ -84,8 +86,10 @@ class WebSocketConnectionIntegrationTest { void setUp() throws Exception { sharedExecutorService = Executors.newSingleThreadExecutor(); + messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, sharedExecutorService); + REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, messageDeliveryScheduler, + sharedExecutorService); messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), dynamoDbExtension.getDynamoDbAsyncClient(), MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(7), sharedExecutorService); @@ -122,7 +126,8 @@ class WebSocketConnectionIntegrationTest { new AuthenticatedAccount(() -> new Pair<>(account, device)), device, webSocketClient, - retrySchedulingExecutor); + retrySchedulingExecutor, + messageDeliveryScheduler); final List expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount); @@ -205,7 +210,8 @@ class WebSocketConnectionIntegrationTest { new AuthenticatedAccount(() -> new Pair<>(account, device)), device, webSocketClient, - retrySchedulingExecutor); + retrySchedulingExecutor, + messageDeliveryScheduler); final int persistedMessageCount = 207; final int cachedMessageCount = 173; @@ -271,7 +277,7 @@ class WebSocketConnectionIntegrationTest { webSocketClient, 100, // use a very short timeout, so that this test completes quickly retrySchedulingExecutor, - Schedulers.boundedElastic()); + messageDeliveryScheduler); final int persistedMessageCount = 207; final int cachedMessageCount = 173; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 982d98aa8..dbdc2a279 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -68,6 +68,7 @@ import org.whispersystems.websocket.messages.WebSocketResponseMessage; import org.whispersystems.websocket.session.WebSocketSessionContext; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; @@ -90,6 +91,7 @@ class WebSocketConnectionTest { private MessagesManager messagesManager; private ReceiptSender receiptSender; private ScheduledExecutorService retrySchedulingExecutor; + private Scheduler messageDeliveryScheduler; @BeforeEach void setup() { @@ -102,11 +104,13 @@ class WebSocketConnectionTest { messagesManager = mock(MessagesManager.class); receiptSender = mock(ReceiptSender.class); retrySchedulingExecutor = mock(ScheduledExecutorService.class); + messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); } @AfterEach void teardown() { StepVerifier.resetDefaultTimeout(); + messageDeliveryScheduler.dispose(); } @Test @@ -114,7 +118,7 @@ class WebSocketConnectionTest { WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator); AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, messagesManager, mock(PushNotificationManager.class), mock(ClientPresenceManager.class), - retrySchedulingExecutor); + retrySchedulingExecutor, messageDeliveryScheduler); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD)))) @@ -773,7 +777,7 @@ class WebSocketConnectionTest { CompletableFuture.completedFuture(Optional.empty())); WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor); + retrySchedulingExecutor, messageDeliveryScheduler); connection.start();