From 77d691df59f00b1c6646a469798ab8130fd9a1a4 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Thu, 10 Nov 2022 18:31:21 -0600 Subject: [PATCH] Always use reactived message processing in `WebSocketConnection` --- .../textsecuregcm/WhisperServerService.java | 2 +- .../AuthenticatedConnectListener.java | 34 +-- .../websocket/WebSocketConnection.java | 82 +---- .../WebSocketConnectionIntegrationTest.java | 31 +- .../websocket/WebSocketConnectionTest.java | 283 ++++++------------ 5 files changed, 105 insertions(+), 327 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 080ec68a0..b6a5902d2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -643,7 +643,7 @@ public class WhisperServerService extends Application { openWebsocketCounter.dec(); - if (enrolledInReactiveMessageQueue) { - openReactiveWebSockets.decrementAndGet(); - } else { - openStandardWebSockets.decrementAndGet(); - } timer.stop(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 5309619ea..528491abb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -48,7 +48,6 @@ import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessageAvailabilityListener; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.TimestampHeaderUtil; import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.messages.WebSocketResponseMessage; @@ -90,7 +89,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private static final String STATUS_CODE_TAG = "status"; private static final String STATUS_MESSAGE_TAG = "message"; private static final String ERROR_TYPE_TAG = "errorType"; - private static final String REACTIVE_TAG = "reactive"; private static final long SLOW_DRAIN_THRESHOLD = 10_000; @@ -128,7 +126,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private final AtomicReference messageSubscription = new AtomicReference<>(); private final Random random = new Random(); - private final boolean useReactive; private final Scheduler reactiveScheduler; private enum StoredMessageState { @@ -142,8 +139,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac AuthenticatedAccount auth, Device device, WebSocketClient client, - ScheduledExecutorService scheduledExecutorService, - boolean useReactive) { + ScheduledExecutorService scheduledExecutorService) { this(receiptSender, messagesManager, @@ -151,7 +147,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac device, client, scheduledExecutorService, - useReactive, Schedulers.boundedElastic()); } @@ -162,7 +157,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac Device device, WebSocketClient client, ScheduledExecutorService scheduledExecutorService, - boolean useReactive, Scheduler reactiveScheduler) { this(receiptSender, @@ -172,7 +166,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac client, DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS, scheduledExecutorService, - useReactive, reactiveScheduler); } @@ -184,7 +177,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac WebSocketClient client, int sendFuturesTimeoutMillis, ScheduledExecutorService scheduledExecutorService, - boolean useReactive, Scheduler reactiveScheduler) { this.receiptSender = receiptSender; @@ -194,7 +186,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac this.client = client; this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis; this.scheduledExecutorService = scheduledExecutorService; - this.useReactive = useReactive; this.reactiveScheduler = reactiveScheduler; } @@ -249,8 +240,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac final List tags = new ArrayList<>( List.of( Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())), - UserAgentTagUtil.getPlatformTag(client.getUserAgent()), - Tag.of(REACTIVE_TAG, String.valueOf(useReactive)) + UserAgentTagUtil.getPlatformTag(client.getUserAgent()) )); // TODO Remove this once we've identified the cause of message rejections from desktop clients @@ -297,21 +287,11 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac @VisibleForTesting void processStoredMessages() { - if (useReactive) { - processStoredMessages_reactive(); - } else { - processStoredMessage_paged(); - } - } - - private void processStoredMessage_paged() { - assert !useReactive; - if (processStoredMessagesSemaphore.tryAcquire()) { final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY); final CompletableFuture queueCleared = new CompletableFuture<>(); - sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueCleared); + sendMessages(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueCleared); setQueueClearedHandler(state, queueCleared); } @@ -325,8 +305,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) { final List tags = List.of( - UserAgentTagUtil.getPlatformTag(client.getUserAgent()), - Tag.of(REACTIVE_TAG, String.valueOf(useReactive)) + UserAgentTagUtil.getPlatformTag(client.getUserAgent()) ); final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get(); @@ -373,54 +352,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac }); } - private void processStoredMessages_reactive() { - assert useReactive; - - if (processStoredMessagesSemaphore.tryAcquire()) { - final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY); - final CompletableFuture queueCleared = new CompletableFuture<>(); - - sendMessagesReactive(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueCleared); - - setQueueClearedHandler(state, queueCleared); - } - } - - private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture queueCleared) { - try { - final Pair, Boolean> messagesAndHasMore = messagesManager.getMessagesForDevice( - auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly); - - final List messages = messagesAndHasMore.first(); - final boolean hasMore = messagesAndHasMore.second(); - - final CompletableFuture[] sendFutures = new CompletableFuture[messages.size()]; - - for (int i = 0; i < messages.size(); i++) { - final Envelope envelope = messages.get(i); - sendFutures[i] = sendMessage(envelope); - } - - // Set a large, non-zero timeout, to prevent any failure to acknowledge receipt from blocking indefinitely - CompletableFuture.allOf(sendFutures) - .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS) - .whenComplete((v, cause) -> { - if (cause == null) { - if (hasMore) { - sendNextMessagePage(cachedMessagesOnly, queueCleared); - } else { - queueCleared.complete(null); - } - } else { - queueCleared.completeExceptionally(cause); - } - }); - } catch (final Exception e) { - queueCleared.completeExceptionally(e); - } - } - - private void sendMessagesReactive(final boolean cachedMessagesOnly, final CompletableFuture queueCleared) { + private void sendMessages(final boolean cachedMessagesOnly, final CompletableFuture queueCleared) { final Publisher messages = messagesManager.getMessagesForDeviceReactive(auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly); @@ -503,8 +435,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac public void handleDisplacement(final boolean connectedElsewhere) { final Tags tags = Tags.of( UserAgentTagUtil.getPlatformTag(client.getUserAgent()), - Tag.of("connectedElsewhere", String.valueOf(connectedElsewhere)), - Tag.of(REACTIVE_TAG, String.valueOf(useReactive))); + Tag.of("connectedElsewhere", String.valueOf(connectedElsewhere)) + ); Metrics.counter(DISPLACEMENT_COUNTER_NAME, tags).increment(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 668dcb2b1..d47213ba0 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -37,10 +37,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; @@ -111,23 +111,18 @@ class WebSocketConnectionIntegrationTest { @ParameterizedTest @CsvSource({ - "207, 173, true", - "207, 173, false", - "323, 0, true", - "323, 0, false", - "0, 221, true", - "0, 221, false", + "207, 173", + "323, 0", + "0, 221", }) - void testProcessStoredMessages(final int persistedMessageCount, final int cachedMessageCount, - final boolean useReactive) { + void testProcessStoredMessages(final int persistedMessageCount, final int cachedMessageCount) { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), new AuthenticatedAccount(() -> new Pair<>(account, device)), device, webSocketClient, - retrySchedulingExecutor, - useReactive); + retrySchedulingExecutor); final List expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount); @@ -202,17 +197,15 @@ class WebSocketConnectionIntegrationTest { }); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testProcessStoredMessagesClientClosed(final boolean useReactive) { + @Test + void testProcessStoredMessagesClientClosed() { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), new AuthenticatedAccount(() -> new Pair<>(account, device)), device, webSocketClient, - retrySchedulingExecutor, - useReactive); + retrySchedulingExecutor); final int persistedMessageCount = 207; final int cachedMessageCount = 173; @@ -268,9 +261,8 @@ class WebSocketConnectionIntegrationTest { }); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testProcessStoredMessagesSendFutureTimeout(final boolean useReactive) { + @Test + void testProcessStoredMessagesSendFutureTimeout() { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), @@ -279,7 +271,6 @@ class WebSocketConnectionIntegrationTest { webSocketClient, 100, // use a very short timeout, so that this test completes quickly retrySchedulingExecutor, - useReactive, Schedulers.boundedElastic()); final int persistedMessageCount = 207; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 393526e78..0d04a728f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -31,7 +31,6 @@ import io.lettuce.core.RedisException; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -51,12 +50,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; -import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.ReceiptSender; @@ -116,7 +112,7 @@ class WebSocketConnectionTest { WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator); AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, storedMessages, mock(PushNotificationManager.class), mock(ClientPresenceManager.class), - retrySchedulingExecutor, mock(ExperimentEnrollmentManager.class)); + retrySchedulingExecutor); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD)))) @@ -146,9 +142,8 @@ class WebSocketConnectionTest { assertTrue(account.isRequired()); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testOpen(final boolean useReactive) throws Exception { + @Test + void testOpen() { MessagesManager storedMessages = mock(MessagesManager.class); UUID accountUuid = UUID.randomUUID(); @@ -177,13 +172,8 @@ class WebSocketConnectionTest { String userAgent = "user-agent"; - if (useReactive) { - when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) - .thenReturn(Flux.fromIterable(outgoingMessages)); - } else { - when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false)) - .thenReturn(new Pair<>(outgoingMessages, false)); - } + when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) + .thenReturn(Flux.fromIterable(outgoingMessages)); final List> futures = new LinkedList<>(); final WebSocketClient client = mock(WebSocketClient.class); @@ -197,7 +187,7 @@ class WebSocketConnectionTest { }); WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, - auth, device, client, retrySchedulingExecutor, useReactive, Schedulers.immediate()); + auth, device, client, retrySchedulingExecutor, Schedulers.immediate()); connection.start(); verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), @@ -221,13 +211,12 @@ class WebSocketConnectionTest { verify(client).close(anyInt(), anyString()); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testOnlineSend(final boolean useReactive) { + @Test + public void testOnlineSend() { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, useReactive, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate()); final UUID accountUuid = UUID.randomUUID(); @@ -236,21 +225,11 @@ class WebSocketConnectionTest { when(device.getId()).thenReturn(1L); when(client.isOpen()).thenReturn(true); - if (useReactive) { - when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) - .thenReturn(Flux.empty()) - .thenReturn(Flux.just(createMessage(UUID.randomUUID(), UUID.randomUUID(), 1111, "first"))) - .thenReturn(Flux.just(createMessage(UUID.randomUUID(), UUID.randomUUID(), 2222, "second"))) - .thenReturn(Flux.empty()); - } else { - when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean())) - .thenReturn(new Pair<>(Collections.emptyList(), false)) - .thenReturn(new Pair<>(List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 1111, "first")), - false)) - .thenReturn(new Pair<>(List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 2222, "second")), - false)) - .thenReturn(new Pair<>(Collections.emptyList(), false)); - } + when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) + .thenReturn(Flux.empty()) + .thenReturn(Flux.just(createMessage(UUID.randomUUID(), UUID.randomUUID(), 1111, "first"))) + .thenReturn(Flux.just(createMessage(UUID.randomUUID(), UUID.randomUUID(), 2222, "second"))) + .thenReturn(Flux.empty()); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); when(successResponse.getStatus()).thenReturn(200); @@ -295,9 +274,8 @@ class WebSocketConnectionTest { verify(client, times(2)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), any(Optional.class)); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testPendingSend(final boolean useReactive) throws Exception { + @Test + void testPendingSend() { MessagesManager storedMessages = mock(MessagesManager.class); final UUID accountUuid = UUID.randomUUID(); @@ -342,13 +320,8 @@ class WebSocketConnectionTest { String userAgent = "user-agent"; - if (useReactive) { - when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) - .thenReturn(Flux.fromIterable(pendingMessages)); - } else { - when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false)) - .thenReturn(new Pair<>(pendingMessages, false)); - } + when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) + .thenReturn(Flux.fromIterable(pendingMessages)); final List> futures = new LinkedList<>(); final WebSocketClient client = mock(WebSocketClient.class); @@ -362,7 +335,7 @@ class WebSocketConnectionTest { }); WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, - auth, device, client, retrySchedulingExecutor, useReactive, Schedulers.immediate()); + auth, device, client, retrySchedulingExecutor, Schedulers.immediate()); connection.start(); @@ -382,13 +355,12 @@ class WebSocketConnectionTest { verify(client).close(anyInt(), anyString()); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testProcessStoredMessageConcurrency(final boolean useReactive) { + @Test + void testProcessStoredMessageConcurrency() { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, useReactive, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate()); when(account.getNumber()).thenReturn("+18005551234"); when(account.getUuid()).thenReturn(UUID.randomUUID()); @@ -398,41 +370,22 @@ class WebSocketConnectionTest { final AtomicBoolean threadWaiting = new AtomicBoolean(false); final AtomicBoolean returnMessageList = new AtomicBoolean(false); - if (useReactive) { - when( - messagesManager.getMessagesForDeviceReactive(account.getUuid(), 1L, false)) - .thenAnswer(invocation -> { - synchronized (threadWaiting) { - threadWaiting.set(true); - threadWaiting.notifyAll(); - } + when( + messagesManager.getMessagesForDeviceReactive(account.getUuid(), 1L, false)) + .thenAnswer(invocation -> { + synchronized (threadWaiting) { + threadWaiting.set(true); + threadWaiting.notifyAll(); + } - synchronized (returnMessageList) { - while (!returnMessageList.get()) { - returnMessageList.wait(); - } + synchronized (returnMessageList) { + while (!returnMessageList.get()) { + returnMessageList.wait(); } + } - return Flux.empty(); - }); - } else { - when( - messagesManager.getMessagesForDevice(account.getUuid(), 1L, false)) - .thenAnswer(invocation -> { - synchronized (threadWaiting) { - threadWaiting.set(true); - threadWaiting.notifyAll(); - } - - synchronized (returnMessageList) { - while (!returnMessageList.get()) { - returnMessageList.wait(); - } - } - - return new Pair<>(Collections.emptyList(), false); - }); - } + return Flux.empty(); + }); final Thread[] threads = new Thread[10]; final CountDownLatch unblockedThreadsLatch = new CountDownLatch(threads.length - 1); @@ -465,20 +418,15 @@ class WebSocketConnectionTest { } }); - if (useReactive) { - verify(messagesManager).getMessagesForDeviceReactive(any(UUID.class), anyLong(), eq(false)); - } else { - verify(messagesManager).getMessagesForDevice(any(UUID.class), anyLong(), eq(false)); - } + verify(messagesManager).getMessagesForDeviceReactive(any(UUID.class), anyLong(), eq(false)); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testProcessStoredMessagesMultiplePages(final boolean useReactive) { + @Test + void testProcessStoredMessagesMultiplePages() { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, useReactive, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate()); when(account.getNumber()).thenReturn("+18005551234"); final UUID accountUuid = UUID.randomUUID(); @@ -493,14 +441,8 @@ class WebSocketConnectionTest { final List secondPageMessages = List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 3333, "third")); - if (useReactive) { - when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), eq(false))) - .thenReturn(Flux.fromStream(Stream.concat(firstPageMessages.stream(), secondPageMessages.stream()))); - } else { - when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), eq(false))) - .thenReturn(new Pair<>(firstPageMessages, true)) - .thenReturn(new Pair<>(secondPageMessages, false)); - } + when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), eq(false))) + .thenReturn(Flux.fromStream(Stream.concat(firstPageMessages.stream(), secondPageMessages.stream()))); when(messagesManager.delete(eq(accountUuid), eq(1L), any(), any())) .thenReturn(CompletableFuture.completedFuture(null)); @@ -532,13 +474,12 @@ class WebSocketConnectionTest { verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testProcessStoredMessagesContainsSenderUuid(final boolean useReactive) { + @Test + void testProcessStoredMessagesContainsSenderUuid() { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, useReactive, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate()); when(account.getNumber()).thenReturn("+18005551234"); final UUID accountUuid = UUID.randomUUID(); @@ -550,15 +491,9 @@ class WebSocketConnectionTest { final List messages = List.of( createMessage(senderUuid, UUID.randomUUID(), 1111L, "message the first")); - if (useReactive) { - when(messagesManager.getMessagesForDeviceReactive(account.getUuid(), 1L, false)) - .thenReturn(Flux.fromIterable(messages)) - .thenReturn(Flux.empty()); - } else { - when(messagesManager.getMessagesForDevice(account.getUuid(), 1L, false)) - .thenReturn(new Pair<>(messages, false)) - .thenReturn(new Pair<>(Collections.emptyList(), false)); - } + when(messagesManager.getMessagesForDeviceReactive(account.getUuid(), 1L, false)) + .thenReturn(Flux.fromIterable(messages)) + .thenReturn(Flux.empty()); when(messagesManager.delete(eq(accountUuid), eq(1L), any(UUID.class), any())) .thenReturn(CompletableFuture.completedFuture(null)); @@ -602,13 +537,12 @@ class WebSocketConnectionTest { verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testProcessStoredMessagesSingleEmptyCall(final boolean useReactive) { + @Test + void testProcessStoredMessagesSingleEmptyCall() { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, useReactive, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate()); final UUID accountUuid = UUID.randomUUID(); @@ -617,13 +551,8 @@ class WebSocketConnectionTest { when(device.getId()).thenReturn(1L); when(client.isOpen()).thenReturn(true); - if (useReactive) { - when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) - .thenReturn(Flux.empty()); - } else { - when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean())) - .thenReturn(new Pair<>(Collections.emptyList(), false)); - } + when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) + .thenReturn(Flux.empty()); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); when(successResponse.getStatus()).thenReturn(200); @@ -638,13 +567,12 @@ class WebSocketConnectionTest { verify(client, times(1)).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testRequeryOnStateMismatch(final boolean useReactive) { + @Test + public void testRequeryOnStateMismatch() { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, useReactive, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate()); final UUID accountUuid = UUID.randomUUID(); when(account.getNumber()).thenReturn("+18005551234"); @@ -659,17 +587,10 @@ class WebSocketConnectionTest { final List secondPageMessages = List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 3333, "third")); - if (useReactive) { - when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) - .thenReturn(Flux.fromIterable(firstPageMessages)) - .thenReturn(Flux.fromIterable(secondPageMessages)) - .thenReturn(Flux.empty()); - } else { - when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean())) - .thenReturn(new Pair<>(firstPageMessages, false)) - .thenReturn(new Pair<>(secondPageMessages, false)) - .thenReturn(new Pair<>(Collections.emptyList(), false)); - } + when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) + .thenReturn(Flux.fromIterable(firstPageMessages)) + .thenReturn(Flux.fromIterable(secondPageMessages)) + .thenReturn(Flux.empty()); when(messagesManager.delete(eq(accountUuid), eq(1L), any(), any())) .thenReturn(CompletableFuture.completedFuture(null)); @@ -703,13 +624,12 @@ class WebSocketConnectionTest { verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testProcessCachedMessagesOnly(final boolean useReactive) { + @Test + void testProcessCachedMessagesOnly() { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, useReactive, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate()); final UUID accountUuid = UUID.randomUUID(); @@ -718,13 +638,8 @@ class WebSocketConnectionTest { when(device.getId()).thenReturn(1L); when(client.isOpen()).thenReturn(true); - if (useReactive) { - when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) - .thenReturn(Flux.empty()); - } else { - when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean())) - .thenReturn(new Pair<>(Collections.emptyList(), false)); - } + when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) + .thenReturn(Flux.empty()); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); when(successResponse.getStatus()).thenReturn(200); @@ -735,28 +650,19 @@ class WebSocketConnectionTest { // anything. connection.processStoredMessages(); - if (useReactive) { - verify(messagesManager).getMessagesForDeviceReactive(account.getUuid(), device.getId(), false); - } else { - verify(messagesManager).getMessagesForDevice(account.getUuid(), device.getId(), false); - } + verify(messagesManager).getMessagesForDeviceReactive(account.getUuid(), device.getId(), false); connection.handleNewMessagesAvailable(); - if (useReactive) { - verify(messagesManager).getMessagesForDeviceReactive(account.getUuid(), device.getId(), true); - } else { - verify(messagesManager).getMessagesForDevice(account.getUuid(), device.getId(), true); - } + verify(messagesManager).getMessagesForDeviceReactive(account.getUuid(), device.getId(), true); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testProcessDatabaseMessagesAfterPersist(final boolean useReactive) { + @Test + void testProcessDatabaseMessagesAfterPersist() { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, useReactive, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate()); final UUID accountUuid = UUID.randomUUID(); @@ -765,13 +671,8 @@ class WebSocketConnectionTest { when(device.getId()).thenReturn(1L); when(client.isOpen()).thenReturn(true); - if (useReactive) { - when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) - .thenReturn(Flux.empty()); - } else { - when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean())) - .thenReturn(new Pair<>(Collections.emptyList(), false)); - } + when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(1L), anyBoolean())) + .thenReturn(Flux.empty()); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); when(successResponse.getStatus()).thenReturn(200); @@ -783,16 +684,11 @@ class WebSocketConnectionTest { connection.processStoredMessages(); connection.handleMessagesPersisted(); - if (useReactive) { - verify(messagesManager, times(2)).getMessagesForDeviceReactive(account.getUuid(), device.getId(), false); - } else { - verify(messagesManager, times(2)).getMessagesForDevice(account.getUuid(), device.getId(), false); - } + verify(messagesManager, times(2)).getMessagesForDeviceReactive(account.getUuid(), device.getId(), false); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testRetrieveMessageException(final boolean useReactive) { + @Test + void testRetrieveMessageException() { MessagesManager storedMessages = mock(MessagesManager.class); UUID accountUuid = UUID.randomUUID(); @@ -802,13 +698,8 @@ class WebSocketConnectionTest { when(account.getNumber()).thenReturn("+14152222222"); when(account.getUuid()).thenReturn(accountUuid); - if (useReactive) { - when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) - .thenReturn(Flux.error(new RedisException("OH NO"))); - } else { - when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false)) - .thenThrow(new RedisException("OH NO")); - } + when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) + .thenReturn(Flux.error(new RedisException("OH NO"))); when(retrySchedulingExecutor.schedule(any(Runnable.class), anyLong(), any())).thenAnswer( (Answer>) invocation -> { @@ -820,7 +711,7 @@ class WebSocketConnectionTest { when(client.isOpen()).thenReturn(true); WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, - retrySchedulingExecutor, useReactive, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate()); connection.start(); verify(retrySchedulingExecutor, times(WebSocketConnection.MAX_CONSECUTIVE_RETRIES)).schedule(any(Runnable.class), @@ -828,9 +719,8 @@ class WebSocketConnectionTest { verify(client).close(eq(1011), anyString()); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testRetrieveMessageExceptionClientDisconnected(final boolean useReactive) { + @Test + void testRetrieveMessageExceptionClientDisconnected() { MessagesManager storedMessages = mock(MessagesManager.class); UUID accountUuid = UUID.randomUUID(); @@ -840,19 +730,14 @@ class WebSocketConnectionTest { when(account.getNumber()).thenReturn("+14152222222"); when(account.getUuid()).thenReturn(accountUuid); - if (useReactive) { - when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) - .thenReturn(Flux.error(new RedisException("OH NO"))); - } else { - when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false)) - .thenThrow(new RedisException("OH NO")); - } + when(storedMessages.getMessagesForDeviceReactive(account.getUuid(), device.getId(), false)) + .thenReturn(Flux.error(new RedisException("OH NO"))); final WebSocketClient client = mock(WebSocketClient.class); when(client.isOpen()).thenReturn(false); WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, - retrySchedulingExecutor, useReactive, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate()); connection.start(); verify(retrySchedulingExecutor, never()).schedule(any(Runnable.class), anyLong(), any()); @@ -897,7 +782,7 @@ class WebSocketConnectionTest { CompletableFuture.completedFuture(Optional.empty())); WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, - retrySchedulingExecutor, true); + retrySchedulingExecutor); connection.start(); @@ -956,7 +841,7 @@ class WebSocketConnectionTest { CompletableFuture.completedFuture(Optional.empty())); WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, auth, device, client, - retrySchedulingExecutor, true, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate()); connection.start();