From 7e14a0bc30ef9733ac5efe29244ff35b51142828 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 9 Sep 2020 19:21:55 -0400 Subject: [PATCH] Drop pub/sub operations from WebsocketConnection. --- .../textsecuregcm/WhisperServerService.java | 6 +- .../controllers/KeepAliveController.java | 19 +-- .../push/ClientPresenceManager.java | 4 + .../AuthenticatedConnectListener.java | 23 +-- .../websocket/WebSocketConnection.java | 73 ++------- .../push/ClientPresenceManagerTest.java | 13 ++ .../WebSocketConnectionIntegrationTest.java | 3 +- .../websocket/WebSocketConnectionTest.java | 148 ++---------------- 8 files changed, 66 insertions(+), 223 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 212969683..b32148f7a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -404,9 +404,9 @@ public class WhisperServerService extends Application webSocketEnvironment = new WebSocketEnvironment<>(environment, config.getWebSocketConfiguration(), 90000); webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator)); - webSocketEnvironment.setConnectListener(new AuthenticatedConnectListener(pushSender, receiptSender, messagesManager, pubSubManager, apnFallbackManager, clientPresenceManager)); + webSocketEnvironment.setConnectListener(new AuthenticatedConnectListener(receiptSender, messagesManager, apnFallbackManager, clientPresenceManager)); webSocketEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET)); - webSocketEnvironment.jersey().register(new KeepAliveController(pubSubManager)); + webSocketEnvironment.jersey().register(new KeepAliveController(clientPresenceManager)); webSocketEnvironment.jersey().register(messageController); webSocketEnvironment.jersey().register(profileController); webSocketEnvironment.jersey().register(attachmentControllerV1); @@ -417,7 +417,7 @@ public class WhisperServerService extends Application provisioningEnvironment = new WebSocketEnvironment<>(environment, webSocketEnvironment.getRequestLog(), 60000); provisioningEnvironment.setConnectListener(new ProvisioningConnectListener(pubSubManager)); provisioningEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET)); - provisioningEnvironment.jersey().register(new KeepAliveController(pubSubManager)); + provisioningEnvironment.jersey().register(new KeepAliveController(clientPresenceManager)); registerCorsFilter(environment); registerExceptionMappers(environment, webSocketEnvironment, provisioningEnvironment); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeepAliveController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeepAliveController.java index bbc557eed..c3707020f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeepAliveController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeepAliveController.java @@ -1,11 +1,11 @@ package org.whispersystems.textsecuregcm.controllers; import com.codahale.metrics.annotation.Timed; +import io.dropwizard.auth.Auth; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.PubSubManager; -import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; import org.whispersystems.websocket.session.WebSocketSession; import org.whispersystems.websocket.session.WebSocketSessionContext; @@ -13,18 +13,16 @@ import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.core.Response; -import io.dropwizard.auth.Auth; - @Path("/v1/keepalive") public class KeepAliveController { private final Logger logger = LoggerFactory.getLogger(KeepAliveController.class); - private final PubSubManager pubSubManager; + private final ClientPresenceManager clientPresenceManager; - public KeepAliveController(PubSubManager pubSubManager) { - this.pubSubManager = pubSubManager; + public KeepAliveController(final ClientPresenceManager clientPresenceManager) { + this.clientPresenceManager = clientPresenceManager; } @Timed @@ -33,11 +31,8 @@ public class KeepAliveController { @WebSocketSession WebSocketSessionContext context) { if (account != null) { - WebsocketAddress address = new WebsocketAddress(account.getNumber(), - account.getAuthenticatedDevice().get().getId()); - - if (!pubSubManager.hasLocalSubscription(address)) { - logger.warn("***** No local subscription found for: " + address); + if (!clientPresenceManager.isLocallyPresent(account.getUuid(), account.getAuthenticatedDevice().get().getId())) { + logger.warn("***** No local subscription found for {}::{}", account.getUuid(), account.getAuthenticatedDevice().get().getId()); context.getClient().close(1000, "OK"); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java index 602f53208..0e28c7062 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java @@ -170,6 +170,10 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter sendMessage(final Envelope message, final Optional storedMessageInfo) { try { String header; @@ -143,20 +114,16 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability return client.sendRequest("PUT", "/api/v1/message", List.of(header, TimestampHeaderUtil.getTimestampHeader()), body).whenComplete((response, throwable) -> { if (throwable == null) { - boolean isReceipt = message.getType() == Envelope.Type.RECEIPT; - - if (isSuccessResponse(response) && !isReceipt) { - messageTime.update(System.currentTimeMillis() - message.getTimestamp()); - } - if (isSuccessResponse(response)) { - if (storedMessageInfo.isPresent()) messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached); - if (!isReceipt) sendDeliveryReceiptFor(message); - } else if (!isSuccessResponse(response) && !storedMessageInfo.isPresent()) { - requeueMessage(message); + if (storedMessageInfo.isPresent()) { + messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached); + } + + if (message.getType() != Envelope.Type.RECEIPT) { + messageTime.update(System.currentTimeMillis() - message.getTimestamp()); + sendDeliveryReceiptFor(message); + } } - } else { - if (!storedMessageInfo.isPresent()) requeueMessage(message); } }); } catch (CryptoEncodingException e) { @@ -165,16 +132,6 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability } } - private void requeueMessage(Envelope message) { - pushSender.getWebSocketSender().queueMessage(account, device, message); - - try { - pushSender.sendQueuedNotification(account, device); - } catch (NotPushRegisteredException e) { - logger.warn("requeueMessage", e); - } - } - private void sendDeliveryReceiptFor(Envelope message) { if (!message.hasSource()) return; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java index 3bf7c2dcf..8e2e8dc1c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java @@ -59,6 +59,19 @@ public class ClientPresenceManagerTest extends AbstractRedisClusterTest { assertTrue(clientPresenceManager.isPresent(accountUuid, deviceId)); } + @Test + public void testIsLocallyPresent() { + final UUID accountUuid = UUID.randomUUID(); + final long deviceId = 1; + + assertFalse(clientPresenceManager.isLocallyPresent(accountUuid, deviceId)); + + clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); + getRedisCluster().useCluster(connection -> connection.sync().flushall()); + + assertTrue(clientPresenceManager.isLocallyPresent(accountUuid, deviceId)); + } + @Test public void testLocalDisplacement() { final UUID accountUuid = UUID.randomUUID(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 5dcb567e4..31e38ee6a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -14,7 +14,6 @@ import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; -import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; import org.whispersystems.textsecuregcm.storage.Account; @@ -78,7 +77,7 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest when(account.getUuid()).thenReturn(UUID.randomUUID()); when(device.getId()).thenReturn(1L); - webSocketConnection = new WebSocketConnection(mock(PushSender.class), + webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), new MessagesManager(messages, messagesCache, mock(PushLatencyManager.class)), account, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 15dcddaaf..733b343d8 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -12,15 +12,12 @@ import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList; import org.whispersystems.textsecuregcm.push.ApnFallbackManager; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; -import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.WebsocketSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessagesManager; -import org.whispersystems.textsecuregcm.storage.PubSubManager; -import org.whispersystems.textsecuregcm.storage.PubSubProtos; import org.whispersystems.textsecuregcm.util.Base64; import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.auth.WebSocketAuthenticator.AuthenticationResult; @@ -68,11 +65,9 @@ public class WebSocketConnectionTest { private static final AccountAuthenticator accountAuthenticator = mock(AccountAuthenticator.class); private static final AccountsManager accountsManager = mock(AccountsManager.class); - private static final PubSubManager pubSubManager = mock(PubSubManager.class ); private static final Account account = mock(Account.class ); private static final Device device = mock(Device.class ); private static final UpgradeRequest upgradeRequest = mock(UpgradeRequest.class ); - private static final PushSender pushSender = mock(PushSender.class); private static final ReceiptSender receiptSender = mock(ReceiptSender.class); private static final ApnFallbackManager apnFallbackManager = mock(ApnFallbackManager.class); @@ -80,7 +75,7 @@ public class WebSocketConnectionTest { public void testCredentials() throws Exception { MessagesManager storedMessages = mock(MessagesManager.class); WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator); - AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(pushSender, receiptSender, storedMessages, pubSubManager, apnFallbackManager, mock(ClientPresenceManager.class)); + AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, storedMessages, apnFallbackManager, mock(ClientPresenceManager.class)); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD)))) @@ -167,11 +162,10 @@ public class WebSocketConnectionTest { } }); - WebsocketAddress websocketAddress = new WebsocketAddress(account.getNumber(), device.getId()); - WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, storedMessages, + WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, account, device, client, "someid"); - connection.onDispatchSubscribed(websocketAddress.serialize()); + connection.start(); verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.>any()); assertTrue(futures.size() == 3); @@ -186,111 +180,15 @@ public class WebSocketConnectionTest { verify(storedMessages, times(1)).delete(eq(account.getNumber()), eq(accountUuid), eq(2L), eq(2L), eq(false)); verify(receiptSender, times(1)).sendReceipt(eq(account), eq("sender1"), eq(2222L)); - connection.onDispatchUnsubscribed(websocketAddress.serialize()); - verify(client).close(anyInt(), anyString()); - } - - @Test - public void testOnlineSend() throws Exception { - MessagesManager storedMessages = mock(MessagesManager.class); - WebsocketSender websocketSender = mock(WebsocketSender.class); - - when(pushSender.getWebSocketSender()).thenReturn(websocketSender); - - Envelope firstMessage = Envelope.newBuilder() - .setLegacyMessage(ByteString.copyFrom("first".getBytes())) - .setSource("sender1") - .setTimestamp(System.currentTimeMillis()) - .setSourceDevice(1) - .setType(Envelope.Type.CIPHERTEXT) - .build(); - - Envelope secondMessage = Envelope.newBuilder() - .setLegacyMessage(ByteString.copyFrom("second".getBytes())) - .setSource("sender2") - .setTimestamp(System.currentTimeMillis()) - .setSourceDevice(2) - .setType(Envelope.Type.CIPHERTEXT) - .build(); - - List pendingMessages = new LinkedList<>(); - OutgoingMessageEntityList pendingMessagesList = new OutgoingMessageEntityList(pendingMessages, false); - - when(device.getId()).thenReturn(2L); - when(device.getSignalingKey()).thenReturn(Base64.encodeBytes(new byte[52])); - - when(account.getAuthenticatedDevice()).thenReturn(Optional.of(device)); - when(account.getNumber()).thenReturn("+14152222222"); - when(account.getUuid()).thenReturn(UUID.randomUUID()); - - final Device sender1device = mock(Device.class); - - Set sender1devices = new HashSet() {{ - add(sender1device); - }}; - - Account sender1 = mock(Account.class); - when(sender1.getDevices()).thenReturn(sender1devices); - - when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1)); - when(accountsManager.get("sender2")).thenReturn(Optional.empty()); - - String userAgent = "user-agent"; - - when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), userAgent, false)) - .thenReturn(pendingMessagesList); - - final List> futures = new LinkedList<>(); - final WebSocketClient client = mock(WebSocketClient.class); - - when(client.getUserAgent()).thenReturn(userAgent); - when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.>any())) - .thenAnswer(new Answer>() { - @Override - public CompletableFuture answer(InvocationOnMock invocationOnMock) throws Throwable { - CompletableFuture future = new CompletableFuture<>(); - futures.add(future); - return future; - } - }); - - WebsocketAddress websocketAddress = new WebsocketAddress(account.getNumber(), device.getId()); - WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, storedMessages, - account, device, client, "anotherid"); - - connection.onDispatchSubscribed(websocketAddress.serialize()); - connection.onDispatchMessage(websocketAddress.serialize(), PubSubProtos.PubSubMessage.newBuilder() - .setType(PubSubProtos.PubSubMessage.Type.DELIVER) - .setContent(ByteString.copyFrom(firstMessage.toByteArray())) - .build().toByteArray()); - - connection.onDispatchMessage(websocketAddress.serialize(), PubSubProtos.PubSubMessage.newBuilder() - .setType(PubSubProtos.PubSubMessage.Type.DELIVER) - .setContent(ByteString.copyFrom(secondMessage.toByteArray())) - .build().toByteArray()); - - verify(client, times(2)).sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.>any()); - - assertEquals(futures.size(), 2); - - WebSocketResponseMessage response = mock(WebSocketResponseMessage.class); - when(response.getStatus()).thenReturn(200); - futures.get(1).complete(response); - futures.get(0).completeExceptionally(new IOException()); - - verify(receiptSender, times(1)).sendReceipt(eq(account), eq("sender2"), eq(secondMessage.getTimestamp())); - verify(websocketSender, times(1)).queueMessage(eq(account), eq(device), any(Envelope.class)); - verify(pushSender, times(1)).sendQueuedNotification(eq(account), eq(device)); - - connection.onDispatchUnsubscribed(websocketAddress.serialize()); + connection.stop(); verify(client).close(anyInt(), anyString()); } @Test(timeout = 5_000L) - public void testOnlineSendViaKeyspaceNotification() throws Exception { + public void testOnlineSend() throws Exception { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency"); + final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, "concurrency"); final UUID accountUuid = UUID.randomUUID(); @@ -322,7 +220,7 @@ public class WebSocketConnectionTest { // messages, the call to CompletableFuture.allOf(...) in processStoredMessages will produce an instantly-succeeded // future, and the whenComplete method will get called immediately on THIS thread, so we don't need to synchronize // or wait for anything. - connection.onDispatchSubscribed("channel"); + connection.start(); connection.handleNewMessagesAvailable(); @@ -349,11 +247,6 @@ public class WebSocketConnectionTest { MessagesManager storedMessages = mock(MessagesManager.class); WebsocketSender websocketSender = mock(WebsocketSender.class); - reset(websocketSender); - reset(pushSender); - - when(pushSender.getWebSocketSender()).thenReturn(websocketSender); - final Envelope firstMessage = Envelope.newBuilder() .setLegacyMessage(ByteString.copyFrom("first".getBytes())) .setSource("sender1") @@ -423,11 +316,10 @@ public class WebSocketConnectionTest { } }); - WebsocketAddress websocketAddress = new WebsocketAddress(account.getNumber(), device.getId()); - WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, storedMessages, + WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, account, device, client, "onemoreid"); - connection.onDispatchSubscribed(websocketAddress.serialize()); + connection.start(); verify(client, times(2)).sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.>any()); @@ -440,9 +332,8 @@ public class WebSocketConnectionTest { verify(receiptSender, times(1)).sendReceipt(eq(account), eq("sender2"), eq(secondMessage.getTimestamp())); verifyNoMoreInteractions(websocketSender); - verifyNoMoreInteractions(pushSender); - connection.onDispatchUnsubscribed(websocketAddress.serialize()); + connection.stop(); verify(client).close(anyInt(), anyString()); } @@ -450,7 +341,7 @@ public class WebSocketConnectionTest { public void testProcessStoredMessageConcurrency() throws InterruptedException { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency"); + final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, "concurrency"); when(account.getNumber()).thenReturn("+18005551234"); when(account.getUuid()).thenReturn(UUID.randomUUID()); @@ -511,7 +402,7 @@ public class WebSocketConnectionTest { public void testProcessStoredMessagesMultiplePages() throws InterruptedException { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency"); + final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, "concurrency"); when(account.getNumber()).thenReturn("+18005551234"); when(account.getUuid()).thenReturn(UUID.randomUUID()); @@ -554,7 +445,7 @@ public class WebSocketConnectionTest { public void testProcessStoredMessagesSingleEmptyCall() { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency"); + final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, "concurrency"); final UUID accountUuid = UUID.randomUUID(); @@ -583,7 +474,7 @@ public class WebSocketConnectionTest { public void testRequeryOnStateMismatch() throws InterruptedException { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency"); + final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, "concurrency"); final UUID accountUuid = UUID.randomUUID(); when(account.getNumber()).thenReturn("+18005551234"); @@ -609,15 +500,10 @@ public class WebSocketConnectionTest { final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); when(successResponse.getStatus()).thenReturn(200); - final byte[] queryDbMessageBytes = PubSubProtos.PubSubMessage.newBuilder() - .setType(PubSubProtos.PubSubMessage.Type.QUERY_DB) - .build() - .toByteArray(); - final CountDownLatch sendLatch = new CountDownLatch(firstPageMessages.size() + secondPageMessages.size()); when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), any(Optional.class))).thenAnswer((Answer>)invocation -> { - connection.onDispatchMessage("channel", queryDbMessageBytes); + connection.handleNewMessagesAvailable(); sendLatch.countDown(); return CompletableFuture.completedFuture(successResponse); @@ -635,7 +521,7 @@ public class WebSocketConnectionTest { public void testProcessCachedMessagesOnly() { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency"); + final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, "concurrency"); final UUID accountUuid = UUID.randomUUID(); @@ -667,7 +553,7 @@ public class WebSocketConnectionTest { public void testProcessDatabaseMessagesAfterPersist() { final MessagesManager messagesManager = mock(MessagesManager.class); final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency"); + final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, "concurrency"); final UUID accountUuid = UUID.randomUUID();