diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index c28affeaf..cc52d1a0d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -161,6 +161,7 @@ import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper import org.whispersystems.textsecuregcm.mappers.RegistrationServiceSenderExceptionMapper; import org.whispersystems.textsecuregcm.mappers.ServerRejectedExceptionMapper; import org.whispersystems.textsecuregcm.mappers.SubscriptionProcessorExceptionMapper; +import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener; import org.whispersystems.textsecuregcm.metrics.MetricsHttpChannelListener; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; @@ -858,6 +859,7 @@ public class WhisperServerService extends Application commonControllers = Lists.newArrayList( new AccountController(accountsManager, rateLimiters, turnTokenGenerator, registrationRecoveryPasswordsManager, usernameHashZkProofVerifier), @@ -968,7 +971,7 @@ public class WhisperServerService extends Application dynamicConfigurationManager; private final ServerSecretParams serverSecretParams; private final SpamChecker spamChecker; + private final MessageMetrics messageMetrics; private final Clock clock; private static final int MAX_FETCH_ACCOUNT_CONCURRENCY = 8; @@ -216,6 +217,7 @@ public class MessageController { final DynamicConfigurationManager dynamicConfigurationManager, final ServerSecretParams serverSecretParams, final SpamChecker spamChecker, + final MessageMetrics messageMetrics, final Clock clock) { this.rateLimiters = rateLimiters; this.messageByteLimitEstimator = messageByteLimitEstimator; @@ -232,6 +234,7 @@ public class MessageController { this.dynamicConfigurationManager = dynamicConfigurationManager; this.serverSecretParams = serverSecretParams; this.spamChecker = spamChecker; + this.messageMetrics = messageMetrics; this.clock = clock; } @@ -732,8 +735,8 @@ public class MessageController { final OutgoingMessageEntityList messages = new OutgoingMessageEntityList(envelopes .map(OutgoingMessageEntity::fromEnvelope) .peek(outgoingMessageEntity -> { - MessageMetrics.measureAccountOutgoingMessageUuidMismatches(auth.getAccount(), outgoingMessageEntity); - MessageMetrics.measureOutgoingMessageLatency(outgoingMessageEntity.serverTimestamp(), "rest", userAgent, clientReleaseManager); + messageMetrics.measureAccountOutgoingMessageUuidMismatches(auth.getAccount(), outgoingMessageEntity); + messageMetrics.measureOutgoingMessageLatency(outgoingMessageEntity.serverTimestamp(), "rest", userAgent, clientReleaseManager); }) .collect(Collectors.toList()), messagesAndHasMore.second()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MessageMetrics.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MessageMetrics.java index dfe64921e..2c74bb5fe 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MessageMetrics.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MessageMetrics.java @@ -7,6 +7,8 @@ package org.whispersystems.textsecuregcm.metrics; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; +import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; @@ -30,13 +32,23 @@ public final class MessageMetrics { "mismatchedAccountEnvelopeUuid"); public static final String DELIVERY_LATENCY_TIMER_NAME = name(MessageMetrics.class, "deliveryLatency"); + private final MeterRegistry metricRegistry; - public static void measureAccountOutgoingMessageUuidMismatches(final Account account, + @VisibleForTesting + MessageMetrics(final MeterRegistry metricRegistry) { + this.metricRegistry = metricRegistry; + } + + public MessageMetrics() { + this(Metrics.globalRegistry); + } + + public void measureAccountOutgoingMessageUuidMismatches(final Account account, final OutgoingMessageEntity outgoingMessage) { measureAccountDestinationUuidMismatches(account, outgoingMessage.destinationUuid()); } - public static void measureAccountEnvelopeUuidMismatches(final Account account, + public void measureAccountEnvelopeUuidMismatches(final Account account, final MessageProtos.Envelope envelope) { if (envelope.hasDestinationUuid()) { try { @@ -47,16 +59,16 @@ public final class MessageMetrics { } } - private static void measureAccountDestinationUuidMismatches(final Account account, final ServiceIdentifier destinationIdentifier) { + private void measureAccountDestinationUuidMismatches(final Account account, final ServiceIdentifier destinationIdentifier) { if (!account.isIdentifiedBy(destinationIdentifier)) { // In all cases, this represents a mismatch between the account’s current PNI and its PNI when the message was // sent. This is an expected case, but if this metric changes significantly, it could indicate an issue to // investigate. - Metrics.counter(MISMATCHED_ACCOUNT_ENVELOPE_UUID_COUNTER_NAME).increment(); + metricRegistry.counter(MISMATCHED_ACCOUNT_ENVELOPE_UUID_COUNTER_NAME).increment(); } } - public static void measureOutgoingMessageLatency(final long serverTimestamp, + public void measureOutgoingMessageLatency(final long serverTimestamp, final String channel, final String userAgent, final ClientReleaseManager clientReleaseManager) { @@ -70,7 +82,7 @@ public final class MessageMetrics { Timer.builder(DELIVERY_LATENCY_TIMER_NAME) .publishPercentileHistogram(true) .tags(tags) - .register(Metrics.globalRegistry) + .register(metricRegistry) .record(Duration.between(Instant.ofEpochMilli(serverTimestamp), Instant.now())); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java index bfaae44a0..10f943af2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; +import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; @@ -51,6 +52,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { private final ReceiptSender receiptSender; private final MessagesManager messagesManager; + private final MessageMetrics messageMetrics; private final PushNotificationManager pushNotificationManager; private final ClientPresenceManager clientPresenceManager; private final ScheduledExecutorService scheduledExecutorService; @@ -69,6 +71,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { public AuthenticatedConnectListener(ReceiptSender receiptSender, MessagesManager messagesManager, + MessageMetrics messageMetrics, PushNotificationManager pushNotificationManager, ClientPresenceManager clientPresenceManager, ScheduledExecutorService scheduledExecutorService, @@ -76,6 +79,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { ClientReleaseManager clientReleaseManager) { this.receiptSender = receiptSender; this.messagesManager = messagesManager; + this.messageMetrics = messageMetrics; this.pushNotificationManager = pushNotificationManager; this.clientPresenceManager = clientPresenceManager; this.scheduledExecutorService = scheduledExecutorService; @@ -138,7 +142,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { final Device device = auth.getAuthenticatedDevice(); final Timer.Sample sample = Timer.start(); final WebSocketConnection connection = new WebSocketConnection(receiptSender, - messagesManager, auth, device, + messagesManager, messageMetrics, auth, device, context.getClient(), scheduledExecutorService, messageDeliveryScheduler, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index d341a0bff..44923458f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -109,6 +109,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private final ReceiptSender receiptSender; private final MessagesManager messagesManager; + private final MessageMetrics messageMetrics; private final AuthenticatedAccount auth; private final Device device; @@ -141,6 +142,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac public WebSocketConnection(ReceiptSender receiptSender, MessagesManager messagesManager, + MessageMetrics messageMetrics, AuthenticatedAccount auth, Device device, WebSocketClient client, @@ -150,6 +152,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac this(receiptSender, messagesManager, + messageMetrics, auth, device, client, @@ -162,6 +165,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac @VisibleForTesting WebSocketConnection(ReceiptSender receiptSender, MessagesManager messagesManager, + MessageMetrics messageMetrics, AuthenticatedAccount auth, Device device, WebSocketClient client, @@ -172,6 +176,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac this.receiptSender = receiptSender; this.messagesManager = messagesManager; + this.messageMetrics = messageMetrics; this.auth = auth; this.device = device; this.client = client; @@ -208,7 +213,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac sendMessageCounter.increment(); sentMessageCounter.increment(); bytesSentCounter.increment(body.map(bytes -> bytes.length).orElse(0)); - MessageMetrics.measureAccountEnvelopeUuidMismatches(auth.getAccount(), message); + messageMetrics.measureAccountEnvelopeUuidMismatches(auth.getAccount(), message); // X-Signal-Key: false must be sent until Android stops assuming it missing means true return client.sendRequest("PUT", "/api/v1/message", @@ -217,7 +222,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac if (throwable != null) { sendFailuresCounter.increment(); } else { - MessageMetrics.measureOutgoingMessageLatency(message.getServerTimestamp(), "websocket", client.getUserAgent(), clientReleaseManager); + messageMetrics.measureOutgoingMessageLatency(message.getServerTimestamp(), "websocket", client.getUserAgent(), clientReleaseManager); } }).thenCompose(response -> { final CompletableFuture result; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java index 0b8b716bf..e5f3e8969 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java @@ -119,6 +119,7 @@ import org.whispersystems.textsecuregcm.limits.CardinalityEstimator; import org.whispersystems.textsecuregcm.limits.RateLimiter; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper; +import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider; import org.whispersystems.textsecuregcm.push.MessageSender; import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; @@ -212,7 +213,7 @@ class MessageControllerTest { new MessageController(rateLimiters, cardinalityEstimator, messageSender, receiptSender, accountsManager, messagesManager, pushNotificationManager, reportMessageManager, multiRecipientMessageExecutor, messageDeliveryScheduler, ReportSpamTokenProvider.noop(), mock(ClientReleaseManager.class), dynamicConfigurationManager, - serverSecretParams, SpamChecker.noop(), clock)) + serverSecretParams, SpamChecker.noop(), new MessageMetrics(), clock)) .build(); @BeforeEach diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/metrics/MessageMetricsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/metrics/MessageMetricsTest.java index 002ad7d25..308d0f74d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/metrics/MessageMetricsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/metrics/MessageMetricsTest.java @@ -13,11 +13,9 @@ import static org.mockito.Mockito.when; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Meter; -import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.util.Optional; import java.util.UUID; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.whispersystems.textsecuregcm.entities.MessageProtos; @@ -33,6 +31,7 @@ class MessageMetricsTest { private final UUID aci = UUID.fromString("11111111-1111-1111-1111-111111111111"); private final UUID pni = UUID.fromString("22222222-2222-2222-2222-222222222222"); private final UUID otherUuid = UUID.fromString("99999999-9999-9999-9999-999999999999"); + private MessageMetrics messageMetrics; private SimpleMeterRegistry simpleMeterRegistry; @BeforeEach @@ -42,35 +41,28 @@ class MessageMetricsTest { when(account.isIdentifiedBy(any())).thenReturn(false); when(account.isIdentifiedBy(new AciServiceIdentifier(aci))).thenReturn(true); when(account.isIdentifiedBy(new PniServiceIdentifier(pni))).thenReturn(true); - Metrics.globalRegistry.clear(); simpleMeterRegistry = new SimpleMeterRegistry(); - Metrics.globalRegistry.add(simpleMeterRegistry); - } - - @AfterEach - void teardown() { - Metrics.globalRegistry.remove(simpleMeterRegistry); - Metrics.globalRegistry.clear(); + messageMetrics = new MessageMetrics(simpleMeterRegistry); } @Test void measureAccountOutgoingMessageUuidMismatches() { final OutgoingMessageEntity outgoingMessageToAci = createOutgoingMessageEntity(new AciServiceIdentifier(aci)); - MessageMetrics.measureAccountOutgoingMessageUuidMismatches(account, outgoingMessageToAci); + messageMetrics.measureAccountOutgoingMessageUuidMismatches(account, outgoingMessageToAci); Optional counter = findCounter(simpleMeterRegistry); assertTrue(counter.isEmpty()); final OutgoingMessageEntity outgoingMessageToPni = createOutgoingMessageEntity(new PniServiceIdentifier(pni)); - MessageMetrics.measureAccountOutgoingMessageUuidMismatches(account, outgoingMessageToPni); + messageMetrics.measureAccountOutgoingMessageUuidMismatches(account, outgoingMessageToPni); counter = findCounter(simpleMeterRegistry); assertTrue(counter.isEmpty()); final OutgoingMessageEntity outgoingMessageToOtherUuid = createOutgoingMessageEntity(new AciServiceIdentifier(otherUuid)); - MessageMetrics.measureAccountOutgoingMessageUuidMismatches(account, outgoingMessageToOtherUuid); + messageMetrics.measureAccountOutgoingMessageUuidMismatches(account, outgoingMessageToOtherUuid); counter = findCounter(simpleMeterRegistry); assertEquals(1.0, counter.map(Counter::count).orElse(0.0)); @@ -83,26 +75,26 @@ class MessageMetricsTest { @Test void measureAccountEnvelopeUuidMismatches() { final MessageProtos.Envelope envelopeToAci = createEnvelope(new AciServiceIdentifier(aci)); - MessageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToAci); + messageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToAci); Optional counter = findCounter(simpleMeterRegistry); assertTrue(counter.isEmpty()); final MessageProtos.Envelope envelopeToPni = createEnvelope(new PniServiceIdentifier(pni)); - MessageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToPni); + messageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToPni); counter = findCounter(simpleMeterRegistry); assertTrue(counter.isEmpty()); final MessageProtos.Envelope envelopeToOtherUuid = createEnvelope(new AciServiceIdentifier(otherUuid)); - MessageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToOtherUuid); + messageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToOtherUuid); counter = findCounter(simpleMeterRegistry); assertEquals(1.0, counter.map(Counter::count).orElse(0.0)); final MessageProtos.Envelope envelopeToNull = createEnvelope(null); - MessageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToNull); + messageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToNull); counter = findCounter(simpleMeterRegistry); assertEquals(1.0, counter.map(Counter::count).orElse(0.0)); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 8f2be9cdf..532e03960 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -46,6 +46,7 @@ import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; +import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.Account; @@ -124,6 +125,7 @@ class WebSocketConnectionIntegrationTest { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), + new MessageMetrics(), new AuthenticatedAccount(account, device), device, webSocketClient, @@ -209,6 +211,7 @@ class WebSocketConnectionIntegrationTest { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), + new MessageMetrics(), new AuthenticatedAccount(account, device), device, webSocketClient, @@ -275,6 +278,7 @@ class WebSocketConnectionIntegrationTest { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), + new MessageMetrics(), new AuthenticatedAccount(account, device), device, webSocketClient, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 479dde23e..ce7468507 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import java.util.stream.Stream; import org.eclipse.jetty.websocket.api.UpgradeRequest; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -56,6 +57,7 @@ import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; +import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.ReceiptSender; @@ -121,7 +123,7 @@ class WebSocketConnectionTest { WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator, mock(PrincipalSupplier.class)); AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, messagesManager, - mock(PushNotificationManager.class), mock(ClientPresenceManager.class), + new MessageMetrics(), mock(PushNotificationManager.class), mock(ClientPresenceManager.class), retrySchedulingExecutor, messageDeliveryScheduler, clientReleaseManager); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); @@ -201,8 +203,7 @@ class WebSocketConnectionTest { return future; }); - WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, - auth, device, client, retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + WebSocketConnection connection = webSocketConnection(client); connection.start(); verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), @@ -229,8 +230,7 @@ class WebSocketConnectionTest { @Test public void testOnlineSend() { final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + final WebSocketConnection connection = webSocketConnection(client); final UUID accountUuid = UUID.randomUUID(); @@ -349,8 +349,7 @@ class WebSocketConnectionTest { return future; }); - WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, - auth, device, client, retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + WebSocketConnection connection = webSocketConnection(client); connection.start(); @@ -373,8 +372,7 @@ class WebSocketConnectionTest { @Test void testProcessStoredMessageConcurrency() { final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + final WebSocketConnection connection = webSocketConnection(client); when(account.getNumber()).thenReturn("+18005551234"); when(account.getUuid()).thenReturn(UUID.randomUUID()); @@ -438,8 +436,7 @@ class WebSocketConnectionTest { @Test void testProcessStoredMessagesMultiplePages() { final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + final WebSocketConnection connection = webSocketConnection(client); when(account.getNumber()).thenReturn("+18005551234"); final UUID accountUuid = UUID.randomUUID(); @@ -487,8 +484,7 @@ class WebSocketConnectionTest { @Test void testProcessStoredMessagesMultiplePagesBackpressure() { final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + final WebSocketConnection connection = webSocketConnection(client); when(account.getNumber()).thenReturn("+18005551234"); final UUID accountUuid = UUID.randomUUID(); @@ -571,8 +567,7 @@ class WebSocketConnectionTest { @Test void testProcessStoredMessagesContainsSenderUuid() { final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + final WebSocketConnection connection = webSocketConnection(client); when(account.getNumber()).thenReturn("+18005551234"); final UUID accountUuid = UUID.randomUUID(); @@ -630,11 +625,15 @@ class WebSocketConnectionTest { verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty())); } + private @NotNull WebSocketConnection webSocketConnection(final WebSocketClient client) { + return new WebSocketConnection(receiptSender, messagesManager, new MessageMetrics(), auth, device, client, + retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + } + @Test void testProcessStoredMessagesSingleEmptyCall() { final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + final WebSocketConnection connection = webSocketConnection(client); final UUID accountUuid = UUID.randomUUID(); @@ -662,8 +661,7 @@ class WebSocketConnectionTest { @Test public void testRequeryOnStateMismatch() { final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + final WebSocketConnection connection = webSocketConnection(client); final UUID accountUuid = UUID.randomUUID(); when(account.getNumber()).thenReturn("+18005551234"); @@ -718,8 +716,7 @@ class WebSocketConnectionTest { @Test void testProcessCachedMessagesOnly() { final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + final WebSocketConnection connection = webSocketConnection(client); final UUID accountUuid = UUID.randomUUID(); @@ -750,8 +747,7 @@ class WebSocketConnectionTest { @Test void testProcessDatabaseMessagesAfterPersist() { final WebSocketClient client = mock(WebSocketClient.class); - final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + final WebSocketConnection connection = webSocketConnection(client); final UUID accountUuid = UUID.randomUUID(); @@ -797,8 +793,7 @@ class WebSocketConnectionTest { final WebSocketClient client = mock(WebSocketClient.class); when(client.isOpen()).thenReturn(true); - WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + WebSocketConnection connection = webSocketConnection(client); connection.start(); verify(retrySchedulingExecutor, times(WebSocketConnection.MAX_CONSECUTIVE_RETRIES)).schedule(any(Runnable.class), @@ -821,8 +816,7 @@ class WebSocketConnectionTest { final WebSocketClient client = mock(WebSocketClient.class); when(client.isOpen()).thenReturn(false); - WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + WebSocketConnection connection = webSocketConnection(client); connection.start(); verify(retrySchedulingExecutor, never()).schedule(any(Runnable.class), anyLong(), any()); @@ -855,8 +849,7 @@ class WebSocketConnectionTest { when(messagesManager.delete(any(), anyByte(), any(), any())).thenReturn( CompletableFuture.completedFuture(Optional.empty())); - WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, messageDeliveryScheduler, clientReleaseManager); + WebSocketConnection connection = webSocketConnection(client); connection.start(); @@ -912,8 +905,7 @@ class WebSocketConnectionTest { when(messagesManager.delete(any(), anyByte(), any(), any())).thenReturn( CompletableFuture.completedFuture(Optional.empty())); - WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); + WebSocketConnection connection = webSocketConnection(client); connection.start();