Always use reactived message processing in `WebSocketConnection`

This commit is contained in:
Chris Eager 2022-11-10 18:31:21 -06:00 committed by Chris Eager
parent 12300761ab
commit 77d691df59
5 changed files with 105 additions and 327 deletions

View File

@ -643,7 +643,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator)); webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator));
webSocketEnvironment.setConnectListener( webSocketEnvironment.setConnectListener(
new AuthenticatedConnectListener(receiptSender, messagesManager, pushNotificationManager, new AuthenticatedConnectListener(receiptSender, messagesManager, pushNotificationManager,
clientPresenceManager, websocketScheduledExecutor, experimentEnrollmentManager)); clientPresenceManager, websocketScheduledExecutor));
webSocketEnvironment.jersey() webSocketEnvironment.jersey()
.register(new WebsocketRefreshApplicationEventListener(accountsManager, clientPresenceManager)); .register(new WebsocketRefreshApplicationEventListener(accountsManager, clientPresenceManager));
webSocketEnvironment.jersey().register(new ContentLengthFilter(TrafficSource.WEBSOCKET)); webSocketEnvironment.jersey().register(new ContentLengthFilter(TrafficSource.WEBSOCKET));

View File

@ -11,18 +11,13 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
@ -50,8 +45,6 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
private static final long RENEW_PRESENCE_INTERVAL_MINUTES = 5; private static final long RENEW_PRESENCE_INTERVAL_MINUTES = 5;
private static final String REACTIVE_MESSAGE_QUEUE_EXPERIMENT_NAME = "reactive_message_queue_v1";
private static final Logger log = LoggerFactory.getLogger(AuthenticatedConnectListener.class); private static final Logger log = LoggerFactory.getLogger(AuthenticatedConnectListener.class);
private final ReceiptSender receiptSender; private final ReceiptSender receiptSender;
@ -59,26 +52,17 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
private final PushNotificationManager pushNotificationManager; private final PushNotificationManager pushNotificationManager;
private final ClientPresenceManager clientPresenceManager; private final ClientPresenceManager clientPresenceManager;
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
private final AtomicInteger openReactiveWebSockets = new AtomicInteger(0);
private final AtomicInteger openStandardWebSockets = new AtomicInteger(0);
public AuthenticatedConnectListener(ReceiptSender receiptSender, public AuthenticatedConnectListener(ReceiptSender receiptSender,
MessagesManager messagesManager, MessagesManager messagesManager,
PushNotificationManager pushNotificationManager, PushNotificationManager pushNotificationManager,
ClientPresenceManager clientPresenceManager, ClientPresenceManager clientPresenceManager,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService) {
ExperimentEnrollmentManager experimentEnrollmentManager) {
this.receiptSender = receiptSender; this.receiptSender = receiptSender;
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
this.pushNotificationManager = pushNotificationManager; this.pushNotificationManager = pushNotificationManager;
this.clientPresenceManager = clientPresenceManager; this.clientPresenceManager = clientPresenceManager;
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
this.experimentEnrollmentManager = experimentEnrollmentManager;
Metrics.gauge(OPEN_WEBSOCKET_COUNTER_NAME, Tags.of("reactive", String.valueOf(true)), openReactiveWebSockets);
Metrics.gauge(OPEN_WEBSOCKET_COUNTER_NAME, Tags.of("reactive", String.valueOf(false)), openStandardWebSockets);
} }
@Override @Override
@ -87,21 +71,12 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
final AuthenticatedAccount auth = context.getAuthenticated(AuthenticatedAccount.class); final AuthenticatedAccount auth = context.getAuthenticated(AuthenticatedAccount.class);
final Device device = auth.getAuthenticatedDevice(); final Device device = auth.getAuthenticatedDevice();
final Timer.Context timer = durationTimer.time(); final Timer.Context timer = durationTimer.time();
final boolean enrolledInReactiveMessageQueue = experimentEnrollmentManager.isEnrolled(
auth.getAccount().getUuid(),
REACTIVE_MESSAGE_QUEUE_EXPERIMENT_NAME);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, final WebSocketConnection connection = new WebSocketConnection(receiptSender,
messagesManager, auth, device, messagesManager, auth, device,
context.getClient(), context.getClient(),
scheduledExecutorService, scheduledExecutorService);
enrolledInReactiveMessageQueue);
openWebsocketCounter.inc(); openWebsocketCounter.inc();
if (enrolledInReactiveMessageQueue) {
openReactiveWebSockets.incrementAndGet();
} else {
openStandardWebSockets.incrementAndGet();
}
pushNotificationManager.handleMessagesRetrieved(auth.getAccount(), device, context.getClient().getUserAgent()); pushNotificationManager.handleMessagesRetrieved(auth.getAccount(), device, context.getClient().getUserAgent());
@ -109,11 +84,6 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
context.addListener((closingContext, statusCode, reason) -> { context.addListener((closingContext, statusCode, reason) -> {
openWebsocketCounter.dec(); openWebsocketCounter.dec();
if (enrolledInReactiveMessageQueue) {
openReactiveWebSockets.decrementAndGet();
} else {
openStandardWebSockets.decrementAndGet();
}
timer.stop(); timer.stop();

View File

@ -48,7 +48,6 @@ import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessageAvailabilityListener; import org.whispersystems.textsecuregcm.storage.MessageAvailabilityListener;
import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.TimestampHeaderUtil; import org.whispersystems.textsecuregcm.util.TimestampHeaderUtil;
import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage; import org.whispersystems.websocket.messages.WebSocketResponseMessage;
@ -90,7 +89,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private static final String STATUS_CODE_TAG = "status"; private static final String STATUS_CODE_TAG = "status";
private static final String STATUS_MESSAGE_TAG = "message"; private static final String STATUS_MESSAGE_TAG = "message";
private static final String ERROR_TYPE_TAG = "errorType"; private static final String ERROR_TYPE_TAG = "errorType";
private static final String REACTIVE_TAG = "reactive";
private static final long SLOW_DRAIN_THRESHOLD = 10_000; private static final long SLOW_DRAIN_THRESHOLD = 10_000;
@ -128,7 +126,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private final AtomicReference<Disposable> messageSubscription = new AtomicReference<>(); private final AtomicReference<Disposable> messageSubscription = new AtomicReference<>();
private final Random random = new Random(); private final Random random = new Random();
private final boolean useReactive;
private final Scheduler reactiveScheduler; private final Scheduler reactiveScheduler;
private enum StoredMessageState { private enum StoredMessageState {
@ -142,8 +139,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
AuthenticatedAccount auth, AuthenticatedAccount auth,
Device device, Device device,
WebSocketClient client, WebSocketClient client,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService) {
boolean useReactive) {
this(receiptSender, this(receiptSender,
messagesManager, messagesManager,
@ -151,7 +147,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
device, device,
client, client,
scheduledExecutorService, scheduledExecutorService,
useReactive,
Schedulers.boundedElastic()); Schedulers.boundedElastic());
} }
@ -162,7 +157,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
Device device, Device device,
WebSocketClient client, WebSocketClient client,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
boolean useReactive,
Scheduler reactiveScheduler) { Scheduler reactiveScheduler) {
this(receiptSender, this(receiptSender,
@ -172,7 +166,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
client, client,
DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS, DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS,
scheduledExecutorService, scheduledExecutorService,
useReactive,
reactiveScheduler); reactiveScheduler);
} }
@ -184,7 +177,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
WebSocketClient client, WebSocketClient client,
int sendFuturesTimeoutMillis, int sendFuturesTimeoutMillis,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
boolean useReactive,
Scheduler reactiveScheduler) { Scheduler reactiveScheduler) {
this.receiptSender = receiptSender; this.receiptSender = receiptSender;
@ -194,7 +186,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
this.client = client; this.client = client;
this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis; this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis;
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
this.useReactive = useReactive;
this.reactiveScheduler = reactiveScheduler; this.reactiveScheduler = reactiveScheduler;
} }
@ -249,8 +240,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
final List<Tag> tags = new ArrayList<>( final List<Tag> tags = new ArrayList<>(
List.of( List.of(
Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())), Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())),
UserAgentTagUtil.getPlatformTag(client.getUserAgent()), UserAgentTagUtil.getPlatformTag(client.getUserAgent())
Tag.of(REACTIVE_TAG, String.valueOf(useReactive))
)); ));
// TODO Remove this once we've identified the cause of message rejections from desktop clients // TODO Remove this once we've identified the cause of message rejections from desktop clients
@ -297,21 +287,11 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
@VisibleForTesting @VisibleForTesting
void processStoredMessages() { void processStoredMessages() {
if (useReactive) {
processStoredMessages_reactive();
} else {
processStoredMessage_paged();
}
}
private void processStoredMessage_paged() {
assert !useReactive;
if (processStoredMessagesSemaphore.tryAcquire()) { if (processStoredMessagesSemaphore.tryAcquire()) {
final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY); final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY);
final CompletableFuture<Void> queueCleared = new CompletableFuture<>(); final CompletableFuture<Void> queueCleared = new CompletableFuture<>();
sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueCleared); sendMessages(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueCleared);
setQueueClearedHandler(state, queueCleared); setQueueClearedHandler(state, queueCleared);
} }
@ -325,8 +305,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) { if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) {
final List<Tag> tags = List.of( final List<Tag> tags = List.of(
UserAgentTagUtil.getPlatformTag(client.getUserAgent()), UserAgentTagUtil.getPlatformTag(client.getUserAgent())
Tag.of(REACTIVE_TAG, String.valueOf(useReactive))
); );
final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get(); final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get();
@ -373,54 +352,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
}); });
} }
private void processStoredMessages_reactive() { private void sendMessages(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueCleared) {
assert useReactive;
if (processStoredMessagesSemaphore.tryAcquire()) {
final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY);
final CompletableFuture<Void> queueCleared = new CompletableFuture<>();
sendMessagesReactive(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueCleared);
setQueueClearedHandler(state, queueCleared);
}
}
private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueCleared) {
try {
final Pair<List<Envelope>, Boolean> messagesAndHasMore = messagesManager.getMessagesForDevice(
auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly);
final List<Envelope> messages = messagesAndHasMore.first();
final boolean hasMore = messagesAndHasMore.second();
final CompletableFuture<?>[] sendFutures = new CompletableFuture[messages.size()];
for (int i = 0; i < messages.size(); i++) {
final Envelope envelope = messages.get(i);
sendFutures[i] = sendMessage(envelope);
}
// Set a large, non-zero timeout, to prevent any failure to acknowledge receipt from blocking indefinitely
CompletableFuture.allOf(sendFutures)
.orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)
.whenComplete((v, cause) -> {
if (cause == null) {
if (hasMore) {
sendNextMessagePage(cachedMessagesOnly, queueCleared);
} else {
queueCleared.complete(null);
}
} else {
queueCleared.completeExceptionally(cause);
}
});
} catch (final Exception e) {
queueCleared.completeExceptionally(e);
}
}
private void sendMessagesReactive(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueCleared) {
final Publisher<Envelope> messages = final Publisher<Envelope> messages =
messagesManager.getMessagesForDeviceReactive(auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly); messagesManager.getMessagesForDeviceReactive(auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly);
@ -503,8 +435,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
public void handleDisplacement(final boolean connectedElsewhere) { public void handleDisplacement(final boolean connectedElsewhere) {
final Tags tags = Tags.of( final Tags tags = Tags.of(
UserAgentTagUtil.getPlatformTag(client.getUserAgent()), UserAgentTagUtil.getPlatformTag(client.getUserAgent()),
Tag.of("connectedElsewhere", String.valueOf(connectedElsewhere)), Tag.of("connectedElsewhere", String.valueOf(connectedElsewhere))
Tag.of(REACTIVE_TAG, String.valueOf(useReactive))); );
Metrics.counter(DISPLACEMENT_COUNTER_NAME, tags).increment(); Metrics.counter(DISPLACEMENT_COUNTER_NAME, tags).increment();

View File

@ -37,10 +37,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
@ -111,23 +111,18 @@ class WebSocketConnectionIntegrationTest {
@ParameterizedTest @ParameterizedTest
@CsvSource({ @CsvSource({
"207, 173, true", "207, 173",
"207, 173, false", "323, 0",
"323, 0, true", "0, 221",
"323, 0, false",
"0, 221, true",
"0, 221, false",
}) })
void testProcessStoredMessages(final int persistedMessageCount, final int cachedMessageCount, void testProcessStoredMessages(final int persistedMessageCount, final int cachedMessageCount) {
final boolean useReactive) {
final WebSocketConnection webSocketConnection = new WebSocketConnection( final WebSocketConnection webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class), mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService),
new AuthenticatedAccount(() -> new Pair<>(account, device)), new AuthenticatedAccount(() -> new Pair<>(account, device)),
device, device,
webSocketClient, webSocketClient,
retrySchedulingExecutor, retrySchedulingExecutor);
useReactive);
final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount); final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount);
@ -202,17 +197,15 @@ class WebSocketConnectionIntegrationTest {
}); });
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) void testProcessStoredMessagesClientClosed() {
void testProcessStoredMessagesClientClosed(final boolean useReactive) {
final WebSocketConnection webSocketConnection = new WebSocketConnection( final WebSocketConnection webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class), mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService),
new AuthenticatedAccount(() -> new Pair<>(account, device)), new AuthenticatedAccount(() -> new Pair<>(account, device)),
device, device,
webSocketClient, webSocketClient,
retrySchedulingExecutor, retrySchedulingExecutor);
useReactive);
final int persistedMessageCount = 207; final int persistedMessageCount = 207;
final int cachedMessageCount = 173; final int cachedMessageCount = 173;
@ -268,9 +261,8 @@ class WebSocketConnectionIntegrationTest {
}); });
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) void testProcessStoredMessagesSendFutureTimeout() {
void testProcessStoredMessagesSendFutureTimeout(final boolean useReactive) {
final WebSocketConnection webSocketConnection = new WebSocketConnection( final WebSocketConnection webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class), mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService),
@ -279,7 +271,6 @@ class WebSocketConnectionIntegrationTest {
webSocketClient, webSocketClient,
100, // use a very short timeout, so that this test completes quickly 100, // use a very short timeout, so that this test completes quickly
retrySchedulingExecutor, retrySchedulingExecutor,
useReactive,
Schedulers.boundedElastic()); Schedulers.boundedElastic());
final int persistedMessageCount = 207; final int persistedMessageCount = 207;

View File

@ -31,7 +31,6 @@ import io.lettuce.core.RedisException;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -51,12 +50,9 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.ReceiptSender;
@ -116,7 +112,7 @@ class WebSocketConnectionTest {
WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator); WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator);
AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, storedMessages, AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, storedMessages,
mock(PushNotificationManager.class), mock(ClientPresenceManager.class), mock(PushNotificationManager.class), mock(ClientPresenceManager.class),
retrySchedulingExecutor, mock(ExperimentEnrollmentManager.class)); retrySchedulingExecutor);
WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class);
when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD)))) when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD))))
@ -146,9 +142,8 @@ class WebSocketConnectionTest {
assertTrue(account.isRequired()); assertTrue(account.isRequired());
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) void testOpen() {
void testOpen(final boolean useReactive) throws Exception {
MessagesManager storedMessages = mock(MessagesManager.class); MessagesManager storedMessages = mock(MessagesManager.class);
UUID accountUuid = UUID.randomUUID(); UUID accountUuid = UUID.randomUUID();
@ -177,13 +172,8 @@ class WebSocketConnectionTest {
String userAgent = "user-agent"; String userAgent = "user-agent";
if (useReactive) { when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false))
when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) .thenReturn(Flux.fromIterable(outgoingMessages));
.thenReturn(Flux.fromIterable(outgoingMessages));
} else {
when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false))
.thenReturn(new Pair<>(outgoingMessages, false));
}
final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>(); final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
@ -197,7 +187,7 @@ class WebSocketConnectionTest {
}); });
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages,
auth, device, client, retrySchedulingExecutor, useReactive, Schedulers.immediate()); auth, device, client, retrySchedulingExecutor, Schedulers.immediate());
connection.start(); connection.start();
verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class),
@ -221,13 +211,12 @@ class WebSocketConnectionTest {
verify(client).close(anyInt(), anyString()); verify(client).close(anyInt(), anyString());
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) public void testOnlineSend() {
public void testOnlineSend(final boolean useReactive) {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, useReactive, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
@ -236,21 +225,11 @@ class WebSocketConnectionTest {
when(device.getId()).thenReturn(1L); when(device.getId()).thenReturn(1L);
when(client.isOpen()).thenReturn(true); when(client.isOpen()).thenReturn(true);
if (useReactive) { when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean()))
when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) .thenReturn(Flux.empty())
.thenReturn(Flux.empty()) .thenReturn(Flux.just(createMessage(UUID.randomUUID(), UUID.randomUUID(), 1111, "first")))
.thenReturn(Flux.just(createMessage(UUID.randomUUID(), UUID.randomUUID(), 1111, "first"))) .thenReturn(Flux.just(createMessage(UUID.randomUUID(), UUID.randomUUID(), 2222, "second")))
.thenReturn(Flux.just(createMessage(UUID.randomUUID(), UUID.randomUUID(), 2222, "second"))) .thenReturn(Flux.empty());
.thenReturn(Flux.empty());
} else {
when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean()))
.thenReturn(new Pair<>(Collections.emptyList(), false))
.thenReturn(new Pair<>(List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 1111, "first")),
false))
.thenReturn(new Pair<>(List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 2222, "second")),
false))
.thenReturn(new Pair<>(Collections.emptyList(), false));
}
final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
when(successResponse.getStatus()).thenReturn(200); when(successResponse.getStatus()).thenReturn(200);
@ -295,9 +274,8 @@ class WebSocketConnectionTest {
verify(client, times(2)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), any(Optional.class)); verify(client, times(2)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), any(Optional.class));
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) void testPendingSend() {
void testPendingSend(final boolean useReactive) throws Exception {
MessagesManager storedMessages = mock(MessagesManager.class); MessagesManager storedMessages = mock(MessagesManager.class);
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
@ -342,13 +320,8 @@ class WebSocketConnectionTest {
String userAgent = "user-agent"; String userAgent = "user-agent";
if (useReactive) { when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false))
when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) .thenReturn(Flux.fromIterable(pendingMessages));
.thenReturn(Flux.fromIterable(pendingMessages));
} else {
when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false))
.thenReturn(new Pair<>(pendingMessages, false));
}
final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>(); final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
@ -362,7 +335,7 @@ class WebSocketConnectionTest {
}); });
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages,
auth, device, client, retrySchedulingExecutor, useReactive, Schedulers.immediate()); auth, device, client, retrySchedulingExecutor, Schedulers.immediate());
connection.start(); connection.start();
@ -382,13 +355,12 @@ class WebSocketConnectionTest {
verify(client).close(anyInt(), anyString()); verify(client).close(anyInt(), anyString());
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) void testProcessStoredMessageConcurrency() {
void testProcessStoredMessageConcurrency(final boolean useReactive) {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, useReactive, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
when(account.getNumber()).thenReturn("+18005551234"); when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID()); when(account.getUuid()).thenReturn(UUID.randomUUID());
@ -398,41 +370,22 @@ class WebSocketConnectionTest {
final AtomicBoolean threadWaiting = new AtomicBoolean(false); final AtomicBoolean threadWaiting = new AtomicBoolean(false);
final AtomicBoolean returnMessageList = new AtomicBoolean(false); final AtomicBoolean returnMessageList = new AtomicBoolean(false);
if (useReactive) { when(
when( messagesManager.getMessagesForDeviceReactive(account.getUuid(), 1L, false))
messagesManager.getMessagesForDeviceReactive(account.getUuid(), 1L, false)) .thenAnswer(invocation -> {
.thenAnswer(invocation -> { synchronized (threadWaiting) {
synchronized (threadWaiting) { threadWaiting.set(true);
threadWaiting.set(true); threadWaiting.notifyAll();
threadWaiting.notifyAll(); }
}
synchronized (returnMessageList) { synchronized (returnMessageList) {
while (!returnMessageList.get()) { while (!returnMessageList.get()) {
returnMessageList.wait(); returnMessageList.wait();
}
} }
}
return Flux.empty(); return Flux.empty();
}); });
} else {
when(
messagesManager.getMessagesForDevice(account.getUuid(), 1L, false))
.thenAnswer(invocation -> {
synchronized (threadWaiting) {
threadWaiting.set(true);
threadWaiting.notifyAll();
}
synchronized (returnMessageList) {
while (!returnMessageList.get()) {
returnMessageList.wait();
}
}
return new Pair<>(Collections.emptyList(), false);
});
}
final Thread[] threads = new Thread[10]; final Thread[] threads = new Thread[10];
final CountDownLatch unblockedThreadsLatch = new CountDownLatch(threads.length - 1); final CountDownLatch unblockedThreadsLatch = new CountDownLatch(threads.length - 1);
@ -465,20 +418,15 @@ class WebSocketConnectionTest {
} }
}); });
if (useReactive) { verify(messagesManager).getMessagesForDeviceReactive(any(UUID.class), anyLong(), eq(false));
verify(messagesManager).getMessagesForDeviceReactive(any(UUID.class), anyLong(), eq(false));
} else {
verify(messagesManager).getMessagesForDevice(any(UUID.class), anyLong(), eq(false));
}
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) void testProcessStoredMessagesMultiplePages() {
void testProcessStoredMessagesMultiplePages(final boolean useReactive) {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, useReactive, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
when(account.getNumber()).thenReturn("+18005551234"); when(account.getNumber()).thenReturn("+18005551234");
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
@ -493,14 +441,8 @@ class WebSocketConnectionTest {
final List<Envelope> secondPageMessages = final List<Envelope> secondPageMessages =
List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 3333, "third")); List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 3333, "third"));
if (useReactive) { when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), eq(false)))
when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), eq(false))) .thenReturn(Flux.fromStream(Stream.concat(firstPageMessages.stream(), secondPageMessages.stream())));
.thenReturn(Flux.fromStream(Stream.concat(firstPageMessages.stream(), secondPageMessages.stream())));
} else {
when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), eq(false)))
.thenReturn(new Pair<>(firstPageMessages, true))
.thenReturn(new Pair<>(secondPageMessages, false));
}
when(messagesManager.delete(eq(accountUuid), eq(1L), any(), any())) when(messagesManager.delete(eq(accountUuid), eq(1L), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null)); .thenReturn(CompletableFuture.completedFuture(null));
@ -532,13 +474,12 @@ class WebSocketConnectionTest {
verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty()));
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) void testProcessStoredMessagesContainsSenderUuid() {
void testProcessStoredMessagesContainsSenderUuid(final boolean useReactive) {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, useReactive, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
when(account.getNumber()).thenReturn("+18005551234"); when(account.getNumber()).thenReturn("+18005551234");
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
@ -550,15 +491,9 @@ class WebSocketConnectionTest {
final List<Envelope> messages = List.of( final List<Envelope> messages = List.of(
createMessage(senderUuid, UUID.randomUUID(), 1111L, "message the first")); createMessage(senderUuid, UUID.randomUUID(), 1111L, "message the first"));
if (useReactive) { when(messagesManager.getMessagesForDeviceReactive(account.getUuid(), 1L, false))
when(messagesManager.getMessagesForDeviceReactive(account.getUuid(), 1L, false)) .thenReturn(Flux.fromIterable(messages))
.thenReturn(Flux.fromIterable(messages)) .thenReturn(Flux.empty());
.thenReturn(Flux.empty());
} else {
when(messagesManager.getMessagesForDevice(account.getUuid(), 1L, false))
.thenReturn(new Pair<>(messages, false))
.thenReturn(new Pair<>(Collections.emptyList(), false));
}
when(messagesManager.delete(eq(accountUuid), eq(1L), any(UUID.class), any())) when(messagesManager.delete(eq(accountUuid), eq(1L), any(UUID.class), any()))
.thenReturn(CompletableFuture.completedFuture(null)); .thenReturn(CompletableFuture.completedFuture(null));
@ -602,13 +537,12 @@ class WebSocketConnectionTest {
verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty()));
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) void testProcessStoredMessagesSingleEmptyCall() {
void testProcessStoredMessagesSingleEmptyCall(final boolean useReactive) {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, useReactive, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
@ -617,13 +551,8 @@ class WebSocketConnectionTest {
when(device.getId()).thenReturn(1L); when(device.getId()).thenReturn(1L);
when(client.isOpen()).thenReturn(true); when(client.isOpen()).thenReturn(true);
if (useReactive) { when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean()))
when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) .thenReturn(Flux.empty());
.thenReturn(Flux.empty());
} else {
when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean()))
.thenReturn(new Pair<>(Collections.emptyList(), false));
}
final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
when(successResponse.getStatus()).thenReturn(200); when(successResponse.getStatus()).thenReturn(200);
@ -638,13 +567,12 @@ class WebSocketConnectionTest {
verify(client, times(1)).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); verify(client, times(1)).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty()));
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) public void testRequeryOnStateMismatch() {
public void testRequeryOnStateMismatch(final boolean useReactive) {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, useReactive, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
when(account.getNumber()).thenReturn("+18005551234"); when(account.getNumber()).thenReturn("+18005551234");
@ -659,17 +587,10 @@ class WebSocketConnectionTest {
final List<Envelope> secondPageMessages = final List<Envelope> secondPageMessages =
List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 3333, "third")); List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 3333, "third"));
if (useReactive) { when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean()))
when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) .thenReturn(Flux.fromIterable(firstPageMessages))
.thenReturn(Flux.fromIterable(firstPageMessages)) .thenReturn(Flux.fromIterable(secondPageMessages))
.thenReturn(Flux.fromIterable(secondPageMessages)) .thenReturn(Flux.empty());
.thenReturn(Flux.empty());
} else {
when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean()))
.thenReturn(new Pair<>(firstPageMessages, false))
.thenReturn(new Pair<>(secondPageMessages, false))
.thenReturn(new Pair<>(Collections.emptyList(), false));
}
when(messagesManager.delete(eq(accountUuid), eq(1L), any(), any())) when(messagesManager.delete(eq(accountUuid), eq(1L), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null)); .thenReturn(CompletableFuture.completedFuture(null));
@ -703,13 +624,12 @@ class WebSocketConnectionTest {
verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty()));
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) void testProcessCachedMessagesOnly() {
void testProcessCachedMessagesOnly(final boolean useReactive) {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, useReactive, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
@ -718,13 +638,8 @@ class WebSocketConnectionTest {
when(device.getId()).thenReturn(1L); when(device.getId()).thenReturn(1L);
when(client.isOpen()).thenReturn(true); when(client.isOpen()).thenReturn(true);
if (useReactive) { when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean()))
when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) .thenReturn(Flux.empty());
.thenReturn(Flux.empty());
} else {
when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean()))
.thenReturn(new Pair<>(Collections.emptyList(), false));
}
final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
when(successResponse.getStatus()).thenReturn(200); when(successResponse.getStatus()).thenReturn(200);
@ -735,28 +650,19 @@ class WebSocketConnectionTest {
// anything. // anything.
connection.processStoredMessages(); connection.processStoredMessages();
if (useReactive) { verify(messagesManager).getMessagesForDeviceReactive(account.getUuid(), device.getId(), false);
verify(messagesManager).getMessagesForDeviceReactive(account.getUuid(), device.getId(), false);
} else {
verify(messagesManager).getMessagesForDevice(account.getUuid(), device.getId(), false);
}
connection.handleNewMessagesAvailable(); connection.handleNewMessagesAvailable();
if (useReactive) { verify(messagesManager).getMessagesForDeviceReactive(account.getUuid(), device.getId(), true);
verify(messagesManager).getMessagesForDeviceReactive(account.getUuid(), device.getId(), true);
} else {
verify(messagesManager).getMessagesForDevice(account.getUuid(), device.getId(), true);
}
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) void testProcessDatabaseMessagesAfterPersist() {
void testProcessDatabaseMessagesAfterPersist(final boolean useReactive) {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, useReactive, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
@ -765,13 +671,8 @@ class WebSocketConnectionTest {
when(device.getId()).thenReturn(1L); when(device.getId()).thenReturn(1L);
when(client.isOpen()).thenReturn(true); when(client.isOpen()).thenReturn(true);
if (useReactive) { when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean()))
when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) .thenReturn(Flux.empty());
.thenReturn(Flux.empty());
} else {
when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean()))
.thenReturn(new Pair<>(Collections.emptyList(), false));
}
final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
when(successResponse.getStatus()).thenReturn(200); when(successResponse.getStatus()).thenReturn(200);
@ -783,16 +684,11 @@ class WebSocketConnectionTest {
connection.processStoredMessages(); connection.processStoredMessages();
connection.handleMessagesPersisted(); connection.handleMessagesPersisted();
if (useReactive) { verify(messagesManager, times(2)).getMessagesForDeviceReactive(account.getUuid(), device.getId(), false);
verify(messagesManager, times(2)).getMessagesForDeviceReactive(account.getUuid(), device.getId(), false);
} else {
verify(messagesManager, times(2)).getMessagesForDevice(account.getUuid(), device.getId(), false);
}
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) void testRetrieveMessageException() {
void testRetrieveMessageException(final boolean useReactive) {
MessagesManager storedMessages = mock(MessagesManager.class); MessagesManager storedMessages = mock(MessagesManager.class);
UUID accountUuid = UUID.randomUUID(); UUID accountUuid = UUID.randomUUID();
@ -802,13 +698,8 @@ class WebSocketConnectionTest {
when(account.getNumber()).thenReturn("+14152222222"); when(account.getNumber()).thenReturn("+14152222222");
when(account.getUuid()).thenReturn(accountUuid); when(account.getUuid()).thenReturn(accountUuid);
if (useReactive) { when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false))
when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) .thenReturn(Flux.error(new RedisException("OH NO")));
.thenReturn(Flux.error(new RedisException("OH NO")));
} else {
when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false))
.thenThrow(new RedisException("OH NO"));
}
when(retrySchedulingExecutor.schedule(any(Runnable.class), anyLong(), any())).thenAnswer( when(retrySchedulingExecutor.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(
(Answer<ScheduledFuture<?>>) invocation -> { (Answer<ScheduledFuture<?>>) invocation -> {
@ -820,7 +711,7 @@ class WebSocketConnectionTest {
when(client.isOpen()).thenReturn(true); when(client.isOpen()).thenReturn(true);
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client,
retrySchedulingExecutor, useReactive, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
connection.start(); connection.start();
verify(retrySchedulingExecutor, times(WebSocketConnection.MAX_CONSECUTIVE_RETRIES)).schedule(any(Runnable.class), verify(retrySchedulingExecutor, times(WebSocketConnection.MAX_CONSECUTIVE_RETRIES)).schedule(any(Runnable.class),
@ -828,9 +719,8 @@ class WebSocketConnectionTest {
verify(client).close(eq(1011), anyString()); verify(client).close(eq(1011), anyString());
} }
@ParameterizedTest @Test
@ValueSource(booleans = {true, false}) void testRetrieveMessageExceptionClientDisconnected() {
void testRetrieveMessageExceptionClientDisconnected(final boolean useReactive) {
MessagesManager storedMessages = mock(MessagesManager.class); MessagesManager storedMessages = mock(MessagesManager.class);
UUID accountUuid = UUID.randomUUID(); UUID accountUuid = UUID.randomUUID();
@ -840,19 +730,14 @@ class WebSocketConnectionTest {
when(account.getNumber()).thenReturn("+14152222222"); when(account.getNumber()).thenReturn("+14152222222");
when(account.getUuid()).thenReturn(accountUuid); when(account.getUuid()).thenReturn(accountUuid);
if (useReactive) { when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false))
when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) .thenReturn(Flux.error(new RedisException("OH NO")));
.thenReturn(Flux.error(new RedisException("OH NO")));
} else {
when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false))
.thenThrow(new RedisException("OH NO"));
}
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
when(client.isOpen()).thenReturn(false); when(client.isOpen()).thenReturn(false);
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client,
retrySchedulingExecutor, useReactive, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
connection.start(); connection.start();
verify(retrySchedulingExecutor, never()).schedule(any(Runnable.class), anyLong(), any()); verify(retrySchedulingExecutor, never()).schedule(any(Runnable.class), anyLong(), any());
@ -897,7 +782,7 @@ class WebSocketConnectionTest {
CompletableFuture.completedFuture(Optional.empty())); CompletableFuture.completedFuture(Optional.empty()));
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client,
retrySchedulingExecutor, true); retrySchedulingExecutor);
connection.start(); connection.start();
@ -956,7 +841,7 @@ class WebSocketConnectionTest {
CompletableFuture.completedFuture(Optional.empty())); CompletableFuture.completedFuture(Optional.empty()));
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client,
retrySchedulingExecutor, true, Schedulers.immediate()); retrySchedulingExecutor, Schedulers.immediate());
connection.start(); connection.start();