Migrate from bounded elastic to dedicated executor for message delivery

This commit is contained in:
Chris Eager 2023-03-17 18:17:35 -05:00 committed by Chris Eager
parent 6075d5137b
commit f5c62a3d85
14 changed files with 124 additions and 55 deletions

View File

@ -28,6 +28,7 @@ import io.lettuce.core.resource.ClientResources;
import io.micrometer.core.instrument.Meter.Id; import io.micrometer.core.instrument.Meter.Id;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.micrometer.core.instrument.config.MeterFilter; import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.datadog.DatadogMeterRegistry; import io.micrometer.datadog.DatadogMeterRegistry;
@ -137,6 +138,7 @@ import org.whispersystems.textsecuregcm.metrics.GarbageCollectionGauges;
import org.whispersystems.textsecuregcm.metrics.MaxFileDescriptorGauge; import org.whispersystems.textsecuregcm.metrics.MaxFileDescriptorGauge;
import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener; import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener;
import org.whispersystems.textsecuregcm.metrics.MetricsRequestEventListener; import org.whispersystems.textsecuregcm.metrics.MetricsRequestEventListener;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.MicrometerRegistryManager; import org.whispersystems.textsecuregcm.metrics.MicrometerRegistryManager;
import org.whispersystems.textsecuregcm.metrics.NetworkReceivedGauge; import org.whispersystems.textsecuregcm.metrics.NetworkReceivedGauge;
import org.whispersystems.textsecuregcm.metrics.NetworkSentGauge; import org.whispersystems.textsecuregcm.metrics.NetworkSentGauge;
@ -234,6 +236,8 @@ import org.whispersystems.textsecuregcm.workers.UnlinkDeviceCommand;
import org.whispersystems.textsecuregcm.workers.ZkParamsCommand; import org.whispersystems.textsecuregcm.workers.ZkParamsCommand;
import org.whispersystems.websocket.WebSocketResourceProviderFactory; import org.whispersystems.websocket.WebSocketResourceProviderFactory;
import org.whispersystems.websocket.setup.WebSocketEnvironment; import org.whispersystems.websocket.setup.WebSocketEnvironment;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.Region;
@ -399,12 +403,15 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FaultTolerantRedisCluster rateLimitersCluster = new FaultTolerantRedisCluster("rate_limiters", config.getRateLimitersCluster(), redisClientResources); FaultTolerantRedisCluster rateLimitersCluster = new FaultTolerantRedisCluster("rate_limiters", config.getRateLimitersCluster(), redisClientResources);
final BlockingQueue<Runnable> keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(100_000); final BlockingQueue<Runnable> keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(100_000);
Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), keyspaceNotificationDispatchQueue); Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(),
keyspaceNotificationDispatchQueue);
final BlockingQueue<Runnable> receiptSenderQueue = new LinkedBlockingQueue<>(); final BlockingQueue<Runnable> receiptSenderQueue = new LinkedBlockingQueue<>();
Metrics.gaugeCollectionSize(name(getClass(), "receiptSenderQueue"), Collections.emptyList(), receiptSenderQueue); Metrics.gaugeCollectionSize(name(getClass(), "receiptSenderQueue"), Collections.emptyList(), receiptSenderQueue);
final BlockingQueue<Runnable> fcmSenderQueue = new LinkedBlockingQueue<>(); final BlockingQueue<Runnable> fcmSenderQueue = new LinkedBlockingQueue<>();
Metrics.gaugeCollectionSize(name(getClass(), "fcmSenderQueue"), Collections.emptyList(), fcmSenderQueue); Metrics.gaugeCollectionSize(name(getClass(), "fcmSenderQueue"), Collections.emptyList(), fcmSenderQueue);
final BlockingQueue<Runnable> messageDeliveryQueue = new LinkedBlockingQueue<>();
Metrics.gaugeCollectionSize(MetricsUtil.name(getClass(), "messageDeliveryQueue"), Collections.emptyList(),
messageDeliveryQueue);
ScheduledExecutorService recurringJobExecutor = environment.lifecycle() ScheduledExecutorService recurringJobExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build(); .scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build();
@ -416,6 +423,16 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ExecutorService storageServiceExecutor = environment.lifecycle().executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build(); ExecutorService storageServiceExecutor = environment.lifecycle().executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService accountDeletionExecutor = environment.lifecycle().executorService(name(getClass(), "accountCleaner-%d")).maxThreads(16).minThreads(16).build(); ExecutorService accountDeletionExecutor = environment.lifecycle().executorService(name(getClass(), "accountCleaner-%d")).maxThreads(16).minThreads(16).build();
// using 80 threads to match Schedulers.boundedElastic() behavior
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
ExecutorServiceMetrics.monitor(Metrics.globalRegistry,
environment.lifecycle().executorService(name(getClass(), "messageDelivery-%d"))
.minThreads(80)
.maxThreads(80)
.workQueue(messageDeliveryQueue)
.build(),
MetricsUtil.name(getClass(), "messageDeliveryExecutor")),
"messageDelivery");
// TODO: generally speaking this is a DynamoDB I/O executor for the accounts table; we should eventually have a general executor for speaking to the accounts table, but most of the server is still synchronous so this isn't widely useful yet // TODO: generally speaking this is a DynamoDB I/O executor for the accounts table; we should eventually have a general executor for speaking to the accounts table, but most of the server is still synchronous so this isn't widely useful yet
ExecutorService batchIdentityCheckExecutor = environment.lifecycle().executorService(name(getClass(), "batchIdentityCheck-%d")).minThreads(32).maxThreads(32).build(); ExecutorService batchIdentityCheckExecutor = environment.lifecycle().executorService(name(getClass(), "batchIdentityCheck-%d")).minThreads(32).maxThreads(32).build();
ExecutorService multiRecipientMessageExecutor = environment.lifecycle() ExecutorService multiRecipientMessageExecutor = environment.lifecycle()
@ -485,7 +502,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
StoredVerificationCodeManager pendingDevicesManager = new StoredVerificationCodeManager(pendingDevices); StoredVerificationCodeManager pendingDevicesManager = new StoredVerificationCodeManager(pendingDevices);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, Clock.systemUTC(), MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, Clock.systemUTC(),
keyspaceNotificationDispatchExecutor, messageDeletionAsyncExecutor); keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionAsyncExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager); PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster, ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
config.getReportMessageConfiguration().getCounterTtl()); config.getReportMessageConfiguration().getCounterTtl());
@ -674,7 +691,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator)); webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator));
webSocketEnvironment.setConnectListener( webSocketEnvironment.setConnectListener(
new AuthenticatedConnectListener(receiptSender, messagesManager, pushNotificationManager, new AuthenticatedConnectListener(receiptSender, messagesManager, pushNotificationManager,
clientPresenceManager, websocketScheduledExecutor)); clientPresenceManager, websocketScheduledExecutor, messageDeliveryScheduler));
webSocketEnvironment.jersey() webSocketEnvironment.jersey()
.register(new WebsocketRefreshApplicationEventListener(accountsManager, clientPresenceManager)); .register(new WebsocketRefreshApplicationEventListener(accountsManager, clientPresenceManager));
webSocketEnvironment.jersey().register(new RequestStatisticsFilter(TrafficSource.WEBSOCKET)); webSocketEnvironment.jersey().register(new RequestStatisticsFilter(TrafficSource.WEBSOCKET));
@ -750,8 +767,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
new DirectoryV2Controller(directoryV2CredentialsGenerator), new DirectoryV2Controller(directoryV2CredentialsGenerator),
new DonationController(clock, zkReceiptOperations, redeemedReceiptsManager, accountsManager, config.getBadges(), new DonationController(clock, zkReceiptOperations, redeemedReceiptsManager, accountsManager, config.getBadges(),
ReceiptCredentialPresentation::new), ReceiptCredentialPresentation::new),
new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, deletedAccountsManager, messagesManager, pushNotificationManager, reportMessageManager, multiRecipientMessageExecutor, new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, deletedAccountsManager,
reportSpamTokenProvider), messagesManager, pushNotificationManager, reportMessageManager, multiRecipientMessageExecutor,
messageDeliveryScheduler, reportSpamTokenProvider),
new PaymentsController(currencyManager, paymentsCredentialsGenerator), new PaymentsController(currencyManager, paymentsCredentialsGenerator),
new ProfileController(clock, rateLimiters, accountsManager, profilesManager, dynamicConfigurationManager, new ProfileController(clock, rateLimiters, accountsManager, profilesManager, dynamicConfigurationManager,
profileBadgeConverter, config.getBadges(), cdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, profileBadgeConverter, config.getBadges(), cdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner,

View File

@ -104,7 +104,7 @@ import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil; import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
import org.whispersystems.textsecuregcm.websocket.WebSocketConnection; import org.whispersystems.textsecuregcm.websocket.WebSocketConnection;
import org.whispersystems.websocket.Stories; import org.whispersystems.websocket.Stories;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Scheduler;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@Path("/v1/messages") @Path("/v1/messages")
@ -122,6 +122,7 @@ public class MessageController {
private final PushNotificationManager pushNotificationManager; private final PushNotificationManager pushNotificationManager;
private final ReportMessageManager reportMessageManager; private final ReportMessageManager reportMessageManager;
private final ExecutorService multiRecipientMessageExecutor; private final ExecutorService multiRecipientMessageExecutor;
private final Scheduler messageDeliveryScheduler;
private final ReportSpamTokenProvider reportSpamTokenProvider; private final ReportSpamTokenProvider reportSpamTokenProvider;
private static final String REJECT_OVERSIZE_MESSAGE_COUNTER = name(MessageController.class, "rejectOversizeMessage"); private static final String REJECT_OVERSIZE_MESSAGE_COUNTER = name(MessageController.class, "rejectOversizeMessage");
@ -154,6 +155,7 @@ public class MessageController {
PushNotificationManager pushNotificationManager, PushNotificationManager pushNotificationManager,
ReportMessageManager reportMessageManager, ReportMessageManager reportMessageManager,
@Nonnull ExecutorService multiRecipientMessageExecutor, @Nonnull ExecutorService multiRecipientMessageExecutor,
Scheduler messageDeliveryScheduler,
@Nonnull ReportSpamTokenProvider reportSpamTokenProvider) { @Nonnull ReportSpamTokenProvider reportSpamTokenProvider) {
this.rateLimiters = rateLimiters; this.rateLimiters = rateLimiters;
this.messageSender = messageSender; this.messageSender = messageSender;
@ -164,6 +166,7 @@ public class MessageController {
this.pushNotificationManager = pushNotificationManager; this.pushNotificationManager = pushNotificationManager;
this.reportMessageManager = reportMessageManager; this.reportMessageManager = reportMessageManager;
this.multiRecipientMessageExecutor = Objects.requireNonNull(multiRecipientMessageExecutor); this.multiRecipientMessageExecutor = Objects.requireNonNull(multiRecipientMessageExecutor);
this.messageDeliveryScheduler = messageDeliveryScheduler;
this.reportSpamTokenProvider = reportSpamTokenProvider; this.reportSpamTokenProvider = reportSpamTokenProvider;
} }
@ -553,7 +556,7 @@ public class MessageController {
return messages; return messages;
}) })
.timeout(Duration.ofSeconds(5)) .timeout(Duration.ofSeconds(5))
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(messageDeliveryScheduler)
.toFuture(); .toFuture();
} }

View File

@ -7,6 +7,8 @@ package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.InstrumentedExecutorService; import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.SharedMetricRegistries;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -29,9 +31,13 @@ public class ReceiptSender {
final ExecutorService executor) { final ExecutorService executor) {
this.accountManager = accountManager; this.accountManager = accountManager;
this.messageSender = messageSender; this.messageSender = messageSender;
this.executor = new InstrumentedExecutorService(executor, this.executor = ExecutorServiceMetrics.monitor(
SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), Metrics.globalRegistry,
MetricsUtil.name(ReceiptSender.class, "executor")); new InstrumentedExecutorService(executor,
SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME),
MetricsUtil.name(ReceiptSender.class, "executor")),
MetricsUtil.name(ReceiptSender.class, "executor"))
;
} }
public void sendReceipt(UUID sourceUuid, long sourceDeviceId, UUID destinationUuid, long messageId) { public void sendReceipt(UUID sourceUuid, long sourceDeviceId, UUID destinationUuid, long messageId) {

View File

@ -63,6 +63,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private final Clock clock; private final Clock clock;
private final ExecutorService notificationExecutorService; private final ExecutorService notificationExecutorService;
private final Scheduler messageDeliveryScheduler;
private final ExecutorService messageDeletionExecutorService; private final ExecutorService messageDeletionExecutorService;
// messageDeletionExecutorService wrapped into a reactor Scheduler // messageDeletionExecutorService wrapped into a reactor Scheduler
private final Scheduler messageDeletionScheduler; private final Scheduler messageDeletionScheduler;
@ -103,7 +104,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class); private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class);
public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster, public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster,
final Clock clock, final ExecutorService notificationExecutorService, final Clock clock, final ExecutorService notificationExecutorService, final Scheduler messageDeliveryScheduler,
final ExecutorService messageDeletionExecutorService) throws IOException { final ExecutorService messageDeletionExecutorService) throws IOException {
this.readDeleteCluster = readDeleteCluster; this.readDeleteCluster = readDeleteCluster;
@ -111,6 +112,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
this.clock = clock; this.clock = clock;
this.notificationExecutorService = notificationExecutorService; this.notificationExecutorService = notificationExecutorService;
this.messageDeliveryScheduler = messageDeliveryScheduler;
this.messageDeletionExecutorService = messageDeletionExecutorService; this.messageDeletionExecutorService = messageDeletionExecutorService;
this.messageDeletionScheduler = Schedulers.fromExecutorService(messageDeletionExecutorService, "messageDeletion"); this.messageDeletionScheduler = Schedulers.fromExecutorService(messageDeletionExecutorService, "messageDeletion");
@ -263,7 +265,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
}) })
.limitRate(1) .limitRate(1)
// we want to ensure we dont accidentally block the Lettuce/netty i/o executors // we want to ensure we dont accidentally block the Lettuce/netty i/o executors
.publishOn(Schedulers.boundedElastic()) .publishOn(messageDeliveryScheduler)
.map(Pair::first) .map(Pair::first)
.flatMapIterable(queueItems -> { .flatMapIterable(queueItems -> {
final List<MessageProtos.Envelope> envelopes = new ArrayList<>(queueItems.size() / 2); final List<MessageProtos.Envelope> envelopes = new ArrayList<>(queueItems.size() / 2);

View File

@ -11,6 +11,8 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import java.util.EnumMap; import java.util.EnumMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -18,8 +20,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
@ -38,6 +38,7 @@ import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil; import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
import org.whispersystems.websocket.session.WebSocketSessionContext; import org.whispersystems.websocket.session.WebSocketSessionContext;
import org.whispersystems.websocket.setup.WebSocketConnectListener; import org.whispersystems.websocket.setup.WebSocketConnectListener;
import reactor.core.scheduler.Scheduler;
public class AuthenticatedConnectListener implements WebSocketConnectListener { public class AuthenticatedConnectListener implements WebSocketConnectListener {
@ -61,6 +62,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
private final PushNotificationManager pushNotificationManager; private final PushNotificationManager pushNotificationManager;
private final ClientPresenceManager clientPresenceManager; private final ClientPresenceManager clientPresenceManager;
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
private final Scheduler messageDeliveryScheduler;
private final Map<ClientPlatform, AtomicInteger> openWebsocketsByClientPlatform; private final Map<ClientPlatform, AtomicInteger> openWebsocketsByClientPlatform;
private final AtomicInteger openWebsocketsFromUnknownPlatforms; private final AtomicInteger openWebsocketsFromUnknownPlatforms;
@ -69,12 +71,14 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
MessagesManager messagesManager, MessagesManager messagesManager,
PushNotificationManager pushNotificationManager, PushNotificationManager pushNotificationManager,
ClientPresenceManager clientPresenceManager, ClientPresenceManager clientPresenceManager,
ScheduledExecutorService scheduledExecutorService) { ScheduledExecutorService scheduledExecutorService,
Scheduler messageDeliveryScheduler) {
this.receiptSender = receiptSender; this.receiptSender = receiptSender;
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
this.pushNotificationManager = pushNotificationManager; this.pushNotificationManager = pushNotificationManager;
this.clientPresenceManager = clientPresenceManager; this.clientPresenceManager = clientPresenceManager;
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
this.messageDeliveryScheduler = messageDeliveryScheduler;
openWebsocketsByClientPlatform = new EnumMap<>(ClientPlatform.class); openWebsocketsByClientPlatform = new EnumMap<>(ClientPlatform.class);
@ -100,7 +104,8 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
final WebSocketConnection connection = new WebSocketConnection(receiptSender, final WebSocketConnection connection = new WebSocketConnection(receiptSender,
messagesManager, auth, device, messagesManager, auth, device,
context.getClient(), context.getClient(),
scheduledExecutorService); scheduledExecutorService,
messageDeliveryScheduler);
final AtomicInteger openWebsocketAtomicInteger = getOpenWebsocketCounter(context.getClient().getUserAgent()); final AtomicInteger openWebsocketAtomicInteger = getOpenWebsocketCounter(context.getClient().getUserAgent());

View File

@ -56,7 +56,6 @@ import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class WebSocketConnection implements MessageAvailabilityListener, DisplacedPresenceListener { public class WebSocketConnection implements MessageAvailabilityListener, DisplacedPresenceListener {
@ -127,7 +126,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private final AtomicReference<Disposable> messageSubscription = new AtomicReference<>(); private final AtomicReference<Disposable> messageSubscription = new AtomicReference<>();
private final Random random = new Random(); private final Random random = new Random();
private final Scheduler reactiveScheduler; private final Scheduler messageDeliveryScheduler;
private enum StoredMessageState { private enum StoredMessageState {
EMPTY, EMPTY,
@ -136,29 +135,12 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
} }
public WebSocketConnection(ReceiptSender receiptSender, public WebSocketConnection(ReceiptSender receiptSender,
MessagesManager messagesManager,
AuthenticatedAccount auth,
Device device,
WebSocketClient client,
ScheduledExecutorService scheduledExecutorService) {
this(receiptSender,
messagesManager,
auth,
device,
client,
scheduledExecutorService,
Schedulers.boundedElastic());
}
@VisibleForTesting
WebSocketConnection(ReceiptSender receiptSender,
MessagesManager messagesManager, MessagesManager messagesManager,
AuthenticatedAccount auth, AuthenticatedAccount auth,
Device device, Device device,
WebSocketClient client, WebSocketClient client,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
Scheduler reactiveScheduler) { Scheduler messageDeliveryScheduler) {
this(receiptSender, this(receiptSender,
messagesManager, messagesManager,
@ -167,7 +149,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
client, client,
DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS, DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS,
scheduledExecutorService, scheduledExecutorService,
reactiveScheduler); messageDeliveryScheduler);
} }
@VisibleForTesting @VisibleForTesting
@ -178,7 +160,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
WebSocketClient client, WebSocketClient client,
int sendFuturesTimeoutMillis, int sendFuturesTimeoutMillis,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
Scheduler reactiveScheduler) { Scheduler messageDeliveryScheduler) {
this.receiptSender = receiptSender; this.receiptSender = receiptSender;
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
@ -187,7 +169,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
this.client = client; this.client = client;
this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis; this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis;
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
this.reactiveScheduler = reactiveScheduler; this.messageDeliveryScheduler = messageDeliveryScheduler;
} }
public void start() { public void start() {
@ -366,7 +348,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
.flatMapSequential(envelope -> .flatMapSequential(envelope ->
Mono.fromFuture(sendMessage(envelope) Mono.fromFuture(sendMessage(envelope)
.orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS))) .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)))
.subscribeOn(reactiveScheduler) .subscribeOn(messageDeliveryScheduler)
.subscribe( .subscribe(
// no additional consumer of values - it is Flux<Void> by now // no additional consumer of values - it is Flux<Void> by now
null, null,

View File

@ -58,6 +58,8 @@ import org.whispersystems.textsecuregcm.storage.UsernameHashNotAvailableExceptio
import org.whispersystems.textsecuregcm.storage.UsernameReservationNotFoundException; import org.whispersystems.textsecuregcm.storage.UsernameReservationNotFoundException;
import org.whispersystems.textsecuregcm.storage.VerificationCodeStore; import org.whispersystems.textsecuregcm.storage.VerificationCodeStore;
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig; import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
@ -100,6 +102,9 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster",
configuration.getCacheClusterConfiguration(), redisClusterClientResources); configuration.getCacheClusterConfiguration(), redisClusterClientResources);
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
environment.lifecycle().executorService("messageDelivery-%d").maxThreads(4)
.build());
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle() ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle()
.executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(4).build(); .executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(4).build();
ExecutorService messageDeletionExecutor = environment.lifecycle() ExecutorService messageDeletionExecutor = environment.lifecycle()
@ -113,7 +118,7 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
configuration.getSecureBackupServiceConfiguration()); configuration.getSecureBackupServiceConfiguration());
ExternalServiceCredentialsGenerator storageCredentialsGenerator = SecureStorageController.credentialsGenerator( ExternalServiceCredentialsGenerator storageCredentialsGenerator = SecureStorageController.credentialsGenerator(
configuration.getSecureStorageServiceConfiguration()); configuration.getSecureStorageServiceConfiguration());
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = new DynamicConfigurationManager<>( DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = new DynamicConfigurationManager<>(
configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(), configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(),
configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class); configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class);
@ -190,7 +195,7 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster,
Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor); Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor);
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster, MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster,
Clock.systemUTC(), keyspaceNotificationDispatchExecutor, messageDeletionExecutor); Clock.systemUTC(), keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionExecutor);
DirectoryQueue directoryQueue = new DirectoryQueue( DirectoryQueue directoryQueue = new DirectoryQueue(
configuration.getDirectoryConfiguration().getSqsConfiguration()); configuration.getDirectoryConfiguration().getSqsConfiguration());
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);

View File

@ -49,6 +49,8 @@ import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
import org.whispersystems.textsecuregcm.storage.StoredVerificationCodeManager; import org.whispersystems.textsecuregcm.storage.StoredVerificationCodeManager;
import org.whispersystems.textsecuregcm.storage.VerificationCodeStore; import org.whispersystems.textsecuregcm.storage.VerificationCodeStore;
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig; import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
@ -78,6 +80,9 @@ record CommandDependencies(
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster",
configuration.getCacheClusterConfiguration(), redisClusterClientResources); configuration.getCacheClusterConfiguration(), redisClusterClientResources);
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
environment.lifecycle().executorService("messageDelivery").maxThreads(4)
.build());
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle() ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle()
.executorService(name(name, "keyspaceNotification-%d")).maxThreads(4).build(); .executorService(name(name, "keyspaceNotification-%d")).maxThreads(4).build();
ExecutorService messageDeletionExecutor = environment.lifecycle() ExecutorService messageDeletionExecutor = environment.lifecycle()
@ -167,7 +172,7 @@ record CommandDependencies(
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster,
Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor); Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor);
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster, MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster,
Clock.systemUTC(), keyspaceNotificationDispatchExecutor, messageDeletionExecutor); Clock.systemUTC(), keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionExecutor);
DirectoryQueue directoryQueue = new DirectoryQueue( DirectoryQueue directoryQueue = new DirectoryQueue(
configuration.getDirectoryConfiguration().getSqsConfiguration()); configuration.getDirectoryConfiguration().getSqsConfiguration());
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);

View File

@ -60,6 +60,7 @@ import javax.ws.rs.core.Response;
import org.glassfish.jersey.server.ServerProperties; import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory; import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -104,6 +105,8 @@ import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.websocket.Stories; import org.whispersystems.websocket.Stories;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ExtendWith(DropwizardExtensionsSupport.class) @ExtendWith(DropwizardExtensionsSupport.class)
class MessageControllerTest { class MessageControllerTest {
@ -142,6 +145,7 @@ class MessageControllerTest {
private static final PushNotificationManager pushNotificationManager = mock(PushNotificationManager.class); private static final PushNotificationManager pushNotificationManager = mock(PushNotificationManager.class);
private static final ReportMessageManager reportMessageManager = mock(ReportMessageManager.class); private static final ReportMessageManager reportMessageManager = mock(ReportMessageManager.class);
private static final ExecutorService multiRecipientMessageExecutor = mock(ExecutorService.class); private static final ExecutorService multiRecipientMessageExecutor = mock(ExecutorService.class);
private static final Scheduler messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
private static final ResourceExtension resources = ResourceExtension.builder() private static final ResourceExtension resources = ResourceExtension.builder()
.addProperty(ServerProperties.UNWRAP_COMPLETION_STAGE_IN_WRITER_ENABLE, Boolean.TRUE) .addProperty(ServerProperties.UNWRAP_COMPLETION_STAGE_IN_WRITER_ENABLE, Boolean.TRUE)
@ -154,7 +158,7 @@ class MessageControllerTest {
.addResource( .addResource(
new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, deletedAccountsManager, new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, deletedAccountsManager,
messagesManager, pushNotificationManager, reportMessageManager, multiRecipientMessageExecutor, messagesManager, pushNotificationManager, reportMessageManager, multiRecipientMessageExecutor,
ReportSpamTokenProvider.noop())) messageDeliveryScheduler, ReportSpamTokenProvider.noop()))
.build(); .build();
@BeforeEach @BeforeEach
@ -213,6 +217,11 @@ class MessageControllerTest {
); );
} }
@AfterAll
static void teardownAll() {
messageDeliveryScheduler.dispose();
}
@Test @Test
void testSendFromDisabledAccount() throws Exception { void testSendFromDisabledAccount() throws Exception {
Response response = Response response =

View File

@ -35,6 +35,8 @@ import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfigurati
import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbExtension; import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbExtension;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
@ -47,6 +49,7 @@ class MessagePersisterIntegrationTest {
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
private ExecutorService notificationExecutorService; private ExecutorService notificationExecutorService;
private Scheduler messageDeliveryScheduler;
private ExecutorService messageDeletionExecutorService; private ExecutorService messageDeletionExecutorService;
private MessagesCache messagesCache; private MessagesCache messagesCache;
private MessagesManager messagesManager; private MessagesManager messagesManager;
@ -67,6 +70,7 @@ class MessagePersisterIntegrationTest {
when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration());
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messageDeletionExecutorService = Executors.newSingleThreadExecutor(); messageDeletionExecutorService = Executors.newSingleThreadExecutor();
final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(),
dynamoDbExtension.getDynamoDbAsyncClient(), MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(14), dynamoDbExtension.getDynamoDbAsyncClient(), MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(14),
@ -76,7 +80,7 @@ class MessagePersisterIntegrationTest {
notificationExecutorService = Executors.newSingleThreadExecutor(); notificationExecutorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), notificationExecutorService, REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), notificationExecutorService,
messageDeletionExecutorService); messageDeliveryScheduler, messageDeletionExecutorService);
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class), messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class),
messageDeletionExecutorService); messageDeletionExecutorService);
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
@ -102,6 +106,8 @@ class MessagePersisterIntegrationTest {
messageDeletionExecutorService.shutdown(); messageDeletionExecutorService.shutdown();
messageDeletionExecutorService.awaitTermination(15, TimeUnit.SECONDS); messageDeletionExecutorService.awaitTermination(15, TimeUnit.SECONDS);
messageDeliveryScheduler.dispose();
} }
@Test @Test

View File

@ -41,6 +41,8 @@ import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
class MessagePersisterTest { class MessagePersisterTest {
@ -48,6 +50,7 @@ class MessagePersisterTest {
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
private ExecutorService sharedExecutorService; private ExecutorService sharedExecutorService;
private Scheduler messageDeliveryScheduler;
private MessagesCache messagesCache; private MessagesCache messagesCache;
private MessagesDynamoDb messagesDynamoDb; private MessagesDynamoDb messagesDynamoDb;
private MessagePersister messagePersister; private MessagePersister messagePersister;
@ -76,8 +79,10 @@ class MessagePersisterTest {
when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration());
sharedExecutorService = Executors.newSingleThreadExecutor(); sharedExecutorService = Executors.newSingleThreadExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, sharedExecutorService); REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, messageDeliveryScheduler,
sharedExecutorService);
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, PERSIST_DELAY); dynamicConfigurationManager, PERSIST_DELAY);
@ -100,6 +105,8 @@ class MessagePersisterTest {
void tearDown() throws Exception { void tearDown() throws Exception {
sharedExecutorService.shutdown(); sharedExecutorService.shutdown();
sharedExecutorService.awaitTermination(1, TimeUnit.SECONDS); sharedExecutorService.awaitTermination(1, TimeUnit.SECONDS);
messageDeliveryScheduler.dispose();
} }
@Test @Test

View File

@ -64,6 +64,8 @@ import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
class MessagesCacheTest { class MessagesCacheTest {
@ -78,6 +80,7 @@ class MessagesCacheTest {
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
private ExecutorService sharedExecutorService; private ExecutorService sharedExecutorService;
private Scheduler messageDeliveryScheduler;
private MessagesCache messagesCache; private MessagesCache messagesCache;
private static final UUID DESTINATION_UUID = UUID.randomUUID(); private static final UUID DESTINATION_UUID = UUID.randomUUID();
@ -92,9 +95,10 @@ class MessagesCacheTest {
}); });
sharedExecutorService = Executors.newSingleThreadExecutor(); sharedExecutorService = Executors.newSingleThreadExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService,
sharedExecutorService); messageDeliveryScheduler, sharedExecutorService);
messagesCache.start(); messagesCache.start();
} }
@ -105,6 +109,8 @@ class MessagesCacheTest {
sharedExecutorService.shutdown(); sharedExecutorService.shutdown();
sharedExecutorService.awaitTermination(1, TimeUnit.SECONDS); sharedExecutorService.awaitTermination(1, TimeUnit.SECONDS);
messageDeliveryScheduler.dispose();
} }
@ParameterizedTest @ParameterizedTest
@ -266,6 +272,7 @@ class MessagesCacheTest {
REDIS_CLUSTER_EXTENSION.getRedisCluster(), REDIS_CLUSTER_EXTENSION.getRedisCluster(),
cacheClock, cacheClock,
sharedExecutorService, sharedExecutorService,
messageDeliveryScheduler,
sharedExecutorService); sharedExecutorService);
final List<MessageProtos.Envelope> actualMessages = Flux.from( final List<MessageProtos.Envelope> actualMessages = Flux.from(
@ -547,6 +554,7 @@ class MessagesCacheTest {
private MessagesCache messagesCache; private MessagesCache messagesCache;
private RedisAdvancedClusterReactiveCommands<byte[], byte[]> reactiveCommands; private RedisAdvancedClusterReactiveCommands<byte[], byte[]> reactiveCommands;
private RedisAdvancedClusterAsyncCommands<byte[], byte[]> asyncCommands; private RedisAdvancedClusterAsyncCommands<byte[], byte[]> asyncCommands;
private Scheduler messageDeliveryScheduler;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@BeforeEach @BeforeEach
@ -559,13 +567,16 @@ class MessagesCacheTest {
.binaryAsyncCommands(asyncCommands) .binaryAsyncCommands(asyncCommands)
.build(); .build();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(mockCluster, mockCluster, Clock.systemUTC(), mock(ExecutorService.class), messagesCache = new MessagesCache(mockCluster, mockCluster, Clock.systemUTC(), mock(ExecutorService.class),
Executors.newSingleThreadExecutor()); messageDeliveryScheduler, Executors.newSingleThreadExecutor());
} }
@AfterEach @AfterEach
void teardown() { void teardown() {
StepVerifier.resetDefaultTimeout(); StepVerifier.resetDefaultTimeout();
messageDeliveryScheduler.dispose();
} }
@Test @Test

View File

@ -59,6 +59,7 @@ import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbExtension;
import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage; import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
class WebSocketConnectionIntegrationTest { class WebSocketConnectionIntegrationTest {
@ -77,6 +78,7 @@ class WebSocketConnectionIntegrationTest {
private Device device; private Device device;
private WebSocketClient webSocketClient; private WebSocketClient webSocketClient;
private ScheduledExecutorService retrySchedulingExecutor; private ScheduledExecutorService retrySchedulingExecutor;
private Scheduler messageDeliveryScheduler;
private long serialTimestamp = System.currentTimeMillis(); private long serialTimestamp = System.currentTimeMillis();
@ -84,8 +86,10 @@ class WebSocketConnectionIntegrationTest {
void setUp() throws Exception { void setUp() throws Exception {
sharedExecutorService = Executors.newSingleThreadExecutor(); sharedExecutorService = Executors.newSingleThreadExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, sharedExecutorService); REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), sharedExecutorService, messageDeliveryScheduler,
sharedExecutorService);
messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(), messagesDynamoDb = new MessagesDynamoDb(dynamoDbExtension.getDynamoDbClient(),
dynamoDbExtension.getDynamoDbAsyncClient(), MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(7), dynamoDbExtension.getDynamoDbAsyncClient(), MessagesDynamoDbExtension.TABLE_NAME, Duration.ofDays(7),
sharedExecutorService); sharedExecutorService);
@ -122,7 +126,8 @@ class WebSocketConnectionIntegrationTest {
new AuthenticatedAccount(() -> new Pair<>(account, device)), new AuthenticatedAccount(() -> new Pair<>(account, device)),
device, device,
webSocketClient, webSocketClient,
retrySchedulingExecutor); retrySchedulingExecutor,
messageDeliveryScheduler);
final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount); final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount);
@ -205,7 +210,8 @@ class WebSocketConnectionIntegrationTest {
new AuthenticatedAccount(() -> new Pair<>(account, device)), new AuthenticatedAccount(() -> new Pair<>(account, device)),
device, device,
webSocketClient, webSocketClient,
retrySchedulingExecutor); retrySchedulingExecutor,
messageDeliveryScheduler);
final int persistedMessageCount = 207; final int persistedMessageCount = 207;
final int cachedMessageCount = 173; final int cachedMessageCount = 173;
@ -271,7 +277,7 @@ class WebSocketConnectionIntegrationTest {
webSocketClient, webSocketClient,
100, // use a very short timeout, so that this test completes quickly 100, // use a very short timeout, so that this test completes quickly
retrySchedulingExecutor, retrySchedulingExecutor,
Schedulers.boundedElastic()); messageDeliveryScheduler);
final int persistedMessageCount = 207; final int persistedMessageCount = 207;
final int cachedMessageCount = 173; final int cachedMessageCount = 173;

View File

@ -68,6 +68,7 @@ import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import org.whispersystems.websocket.session.WebSocketSessionContext; import org.whispersystems.websocket.session.WebSocketSessionContext;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
@ -90,6 +91,7 @@ class WebSocketConnectionTest {
private MessagesManager messagesManager; private MessagesManager messagesManager;
private ReceiptSender receiptSender; private ReceiptSender receiptSender;
private ScheduledExecutorService retrySchedulingExecutor; private ScheduledExecutorService retrySchedulingExecutor;
private Scheduler messageDeliveryScheduler;
@BeforeEach @BeforeEach
void setup() { void setup() {
@ -102,11 +104,13 @@ class WebSocketConnectionTest {
messagesManager = mock(MessagesManager.class); messagesManager = mock(MessagesManager.class);
receiptSender = mock(ReceiptSender.class); receiptSender = mock(ReceiptSender.class);
retrySchedulingExecutor = mock(ScheduledExecutorService.class); retrySchedulingExecutor = mock(ScheduledExecutorService.class);
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
} }
@AfterEach @AfterEach
void teardown() { void teardown() {
StepVerifier.resetDefaultTimeout(); StepVerifier.resetDefaultTimeout();
messageDeliveryScheduler.dispose();
} }
@Test @Test
@ -114,7 +118,7 @@ class WebSocketConnectionTest {
WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator); WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator);
AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, messagesManager, AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, messagesManager,
mock(PushNotificationManager.class), mock(ClientPresenceManager.class), mock(PushNotificationManager.class), mock(ClientPresenceManager.class),
retrySchedulingExecutor); retrySchedulingExecutor, messageDeliveryScheduler);
WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class);
when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD)))) when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD))))
@ -773,7 +777,7 @@ class WebSocketConnectionTest {
CompletableFuture.completedFuture(Optional.empty())); CompletableFuture.completedFuture(Optional.empty()));
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor); retrySchedulingExecutor, messageDeliveryScheduler);
connection.start(); connection.start();