Fix flaky MessageMetricsTest

Make the MeterRegistry in MessageMetrics configurable
This commit is contained in:
Ravi Khadiwala 2024-05-16 12:53:22 -05:00 committed by ravi-signal
parent a80c020146
commit 40639f70f4
9 changed files with 78 additions and 62 deletions

View File

@ -161,6 +161,7 @@ import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper
import org.whispersystems.textsecuregcm.mappers.RegistrationServiceSenderExceptionMapper;
import org.whispersystems.textsecuregcm.mappers.ServerRejectedExceptionMapper;
import org.whispersystems.textsecuregcm.mappers.SubscriptionProcessorExceptionMapper;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener;
import org.whispersystems.textsecuregcm.metrics.MetricsHttpChannelListener;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
@ -858,6 +859,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final MetricsHttpChannelListener metricsHttpChannelListener = new MetricsHttpChannelListener(clientReleaseManager,
Set.of(websocketServletPath, provisioningWebsocketServletPath, "/health-check"));
metricsHttpChannelListener.configure(environment);
final MessageMetrics messageMetrics = new MessageMetrics();
environment.jersey().register(new BufferingInterceptor());
environment.jersey().register(new VirtualExecutorServiceProvider("managed-async-virtual-thread-"));
@ -874,7 +876,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
webSocketEnvironment.jersey().register(new VirtualExecutorServiceProvider("managed-async-websocket-virtual-thread-"));
webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator, new AccountPrincipalSupplier(accountsManager)));
webSocketEnvironment.setConnectListener(
new AuthenticatedConnectListener(receiptSender, messagesManager, pushNotificationManager,
new AuthenticatedConnectListener(receiptSender, messagesManager, messageMetrics, pushNotificationManager,
clientPresenceManager, websocketScheduledExecutor, messageDeliveryScheduler, clientReleaseManager));
webSocketEnvironment.jersey()
.register(new WebsocketRefreshApplicationEventListener(accountsManager, clientPresenceManager));
@ -939,6 +941,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
log.info("Registered spam filter: {}", filter.getClass().getName());
});
final List<Object> commonControllers = Lists.newArrayList(
new AccountController(accountsManager, rateLimiters, turnTokenGenerator, registrationRecoveryPasswordsManager,
usernameHashZkProofVerifier),
@ -968,7 +971,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
new MessageController(rateLimiters, messageByteLimitCardinalityEstimator, messageSender, receiptSender,
accountsManager, messagesManager, pushNotificationManager, reportMessageManager,
multiRecipientMessageExecutor, messageDeliveryScheduler, reportSpamTokenProvider, clientReleaseManager,
dynamicConfigurationManager, zkSecretParams, spamChecker, Clock.systemUTC()),
dynamicConfigurationManager, zkSecretParams, spamChecker, messageMetrics, Clock.systemUTC()),
new PaymentsController(currencyManager, paymentsCredentialsGenerator),
new ProfileController(clock, rateLimiters, accountsManager, profilesManager, dynamicConfigurationManager,
profileBadgeConverter, config.getBadges(), cdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner,

View File

@ -167,6 +167,7 @@ public class MessageController {
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private final ServerSecretParams serverSecretParams;
private final SpamChecker spamChecker;
private final MessageMetrics messageMetrics;
private final Clock clock;
private static final int MAX_FETCH_ACCOUNT_CONCURRENCY = 8;
@ -216,6 +217,7 @@ public class MessageController {
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final ServerSecretParams serverSecretParams,
final SpamChecker spamChecker,
final MessageMetrics messageMetrics,
final Clock clock) {
this.rateLimiters = rateLimiters;
this.messageByteLimitEstimator = messageByteLimitEstimator;
@ -232,6 +234,7 @@ public class MessageController {
this.dynamicConfigurationManager = dynamicConfigurationManager;
this.serverSecretParams = serverSecretParams;
this.spamChecker = spamChecker;
this.messageMetrics = messageMetrics;
this.clock = clock;
}
@ -732,8 +735,8 @@ public class MessageController {
final OutgoingMessageEntityList messages = new OutgoingMessageEntityList(envelopes
.map(OutgoingMessageEntity::fromEnvelope)
.peek(outgoingMessageEntity -> {
MessageMetrics.measureAccountOutgoingMessageUuidMismatches(auth.getAccount(), outgoingMessageEntity);
MessageMetrics.measureOutgoingMessageLatency(outgoingMessageEntity.serverTimestamp(), "rest", userAgent, clientReleaseManager);
messageMetrics.measureAccountOutgoingMessageUuidMismatches(auth.getAccount(), outgoingMessageEntity);
messageMetrics.measureOutgoingMessageLatency(outgoingMessageEntity.serverTimestamp(), "rest", userAgent, clientReleaseManager);
})
.collect(Collectors.toList()),
messagesAndHasMore.second());

View File

@ -7,6 +7,8 @@ package org.whispersystems.textsecuregcm.metrics;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
@ -30,13 +32,23 @@ public final class MessageMetrics {
"mismatchedAccountEnvelopeUuid");
public static final String DELIVERY_LATENCY_TIMER_NAME = name(MessageMetrics.class, "deliveryLatency");
private final MeterRegistry metricRegistry;
public static void measureAccountOutgoingMessageUuidMismatches(final Account account,
@VisibleForTesting
MessageMetrics(final MeterRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
}
public MessageMetrics() {
this(Metrics.globalRegistry);
}
public void measureAccountOutgoingMessageUuidMismatches(final Account account,
final OutgoingMessageEntity outgoingMessage) {
measureAccountDestinationUuidMismatches(account, outgoingMessage.destinationUuid());
}
public static void measureAccountEnvelopeUuidMismatches(final Account account,
public void measureAccountEnvelopeUuidMismatches(final Account account,
final MessageProtos.Envelope envelope) {
if (envelope.hasDestinationUuid()) {
try {
@ -47,16 +59,16 @@ public final class MessageMetrics {
}
}
private static void measureAccountDestinationUuidMismatches(final Account account, final ServiceIdentifier destinationIdentifier) {
private void measureAccountDestinationUuidMismatches(final Account account, final ServiceIdentifier destinationIdentifier) {
if (!account.isIdentifiedBy(destinationIdentifier)) {
// In all cases, this represents a mismatch between the accounts current PNI and its PNI when the message was
// sent. This is an expected case, but if this metric changes significantly, it could indicate an issue to
// investigate.
Metrics.counter(MISMATCHED_ACCOUNT_ENVELOPE_UUID_COUNTER_NAME).increment();
metricRegistry.counter(MISMATCHED_ACCOUNT_ENVELOPE_UUID_COUNTER_NAME).increment();
}
}
public static void measureOutgoingMessageLatency(final long serverTimestamp,
public void measureOutgoingMessageLatency(final long serverTimestamp,
final String channel,
final String userAgent,
final ClientReleaseManager clientReleaseManager) {
@ -70,7 +82,7 @@ public final class MessageMetrics {
Timer.builder(DELIVERY_LATENCY_TIMER_NAME)
.publishPercentileHistogram(true)
.tags(tags)
.register(Metrics.globalRegistry)
.register(metricRegistry)
.record(Duration.between(Instant.ofEpochMilli(serverTimestamp), Instant.now()));
}
}

View File

@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
@ -51,6 +52,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
private final ReceiptSender receiptSender;
private final MessagesManager messagesManager;
private final MessageMetrics messageMetrics;
private final PushNotificationManager pushNotificationManager;
private final ClientPresenceManager clientPresenceManager;
private final ScheduledExecutorService scheduledExecutorService;
@ -69,6 +71,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
public AuthenticatedConnectListener(ReceiptSender receiptSender,
MessagesManager messagesManager,
MessageMetrics messageMetrics,
PushNotificationManager pushNotificationManager,
ClientPresenceManager clientPresenceManager,
ScheduledExecutorService scheduledExecutorService,
@ -76,6 +79,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
ClientReleaseManager clientReleaseManager) {
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
this.messageMetrics = messageMetrics;
this.pushNotificationManager = pushNotificationManager;
this.clientPresenceManager = clientPresenceManager;
this.scheduledExecutorService = scheduledExecutorService;
@ -138,7 +142,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
final Device device = auth.getAuthenticatedDevice();
final Timer.Sample sample = Timer.start();
final WebSocketConnection connection = new WebSocketConnection(receiptSender,
messagesManager, auth, device,
messagesManager, messageMetrics, auth, device,
context.getClient(),
scheduledExecutorService,
messageDeliveryScheduler,

View File

@ -109,6 +109,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private final ReceiptSender receiptSender;
private final MessagesManager messagesManager;
private final MessageMetrics messageMetrics;
private final AuthenticatedAccount auth;
private final Device device;
@ -141,6 +142,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
public WebSocketConnection(ReceiptSender receiptSender,
MessagesManager messagesManager,
MessageMetrics messageMetrics,
AuthenticatedAccount auth,
Device device,
WebSocketClient client,
@ -150,6 +152,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
this(receiptSender,
messagesManager,
messageMetrics,
auth,
device,
client,
@ -162,6 +165,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
@VisibleForTesting
WebSocketConnection(ReceiptSender receiptSender,
MessagesManager messagesManager,
MessageMetrics messageMetrics,
AuthenticatedAccount auth,
Device device,
WebSocketClient client,
@ -172,6 +176,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
this.messageMetrics = messageMetrics;
this.auth = auth;
this.device = device;
this.client = client;
@ -208,7 +213,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
sendMessageCounter.increment();
sentMessageCounter.increment();
bytesSentCounter.increment(body.map(bytes -> bytes.length).orElse(0));
MessageMetrics.measureAccountEnvelopeUuidMismatches(auth.getAccount(), message);
messageMetrics.measureAccountEnvelopeUuidMismatches(auth.getAccount(), message);
// X-Signal-Key: false must be sent until Android stops assuming it missing means true
return client.sendRequest("PUT", "/api/v1/message",
@ -217,7 +222,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
if (throwable != null) {
sendFailuresCounter.increment();
} else {
MessageMetrics.measureOutgoingMessageLatency(message.getServerTimestamp(), "websocket", client.getUserAgent(), clientReleaseManager);
messageMetrics.measureOutgoingMessageLatency(message.getServerTimestamp(), "websocket", client.getUserAgent(), clientReleaseManager);
}
}).thenCompose(response -> {
final CompletableFuture<Void> result;

View File

@ -119,6 +119,7 @@ import org.whispersystems.textsecuregcm.limits.CardinalityEstimator;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider;
import org.whispersystems.textsecuregcm.push.MessageSender;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
@ -212,7 +213,7 @@ class MessageControllerTest {
new MessageController(rateLimiters, cardinalityEstimator, messageSender, receiptSender, accountsManager,
messagesManager, pushNotificationManager, reportMessageManager, multiRecipientMessageExecutor,
messageDeliveryScheduler, ReportSpamTokenProvider.noop(), mock(ClientReleaseManager.class), dynamicConfigurationManager,
serverSecretParams, SpamChecker.noop(), clock))
serverSecretParams, SpamChecker.noop(), new MessageMetrics(), clock))
.build();
@BeforeEach

View File

@ -13,11 +13,9 @@ import static org.mockito.Mockito.when;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
@ -33,6 +31,7 @@ class MessageMetricsTest {
private final UUID aci = UUID.fromString("11111111-1111-1111-1111-111111111111");
private final UUID pni = UUID.fromString("22222222-2222-2222-2222-222222222222");
private final UUID otherUuid = UUID.fromString("99999999-9999-9999-9999-999999999999");
private MessageMetrics messageMetrics;
private SimpleMeterRegistry simpleMeterRegistry;
@BeforeEach
@ -42,35 +41,28 @@ class MessageMetricsTest {
when(account.isIdentifiedBy(any())).thenReturn(false);
when(account.isIdentifiedBy(new AciServiceIdentifier(aci))).thenReturn(true);
when(account.isIdentifiedBy(new PniServiceIdentifier(pni))).thenReturn(true);
Metrics.globalRegistry.clear();
simpleMeterRegistry = new SimpleMeterRegistry();
Metrics.globalRegistry.add(simpleMeterRegistry);
}
@AfterEach
void teardown() {
Metrics.globalRegistry.remove(simpleMeterRegistry);
Metrics.globalRegistry.clear();
messageMetrics = new MessageMetrics(simpleMeterRegistry);
}
@Test
void measureAccountOutgoingMessageUuidMismatches() {
final OutgoingMessageEntity outgoingMessageToAci = createOutgoingMessageEntity(new AciServiceIdentifier(aci));
MessageMetrics.measureAccountOutgoingMessageUuidMismatches(account, outgoingMessageToAci);
messageMetrics.measureAccountOutgoingMessageUuidMismatches(account, outgoingMessageToAci);
Optional<Counter> counter = findCounter(simpleMeterRegistry);
assertTrue(counter.isEmpty());
final OutgoingMessageEntity outgoingMessageToPni = createOutgoingMessageEntity(new PniServiceIdentifier(pni));
MessageMetrics.measureAccountOutgoingMessageUuidMismatches(account, outgoingMessageToPni);
messageMetrics.measureAccountOutgoingMessageUuidMismatches(account, outgoingMessageToPni);
counter = findCounter(simpleMeterRegistry);
assertTrue(counter.isEmpty());
final OutgoingMessageEntity outgoingMessageToOtherUuid = createOutgoingMessageEntity(new AciServiceIdentifier(otherUuid));
MessageMetrics.measureAccountOutgoingMessageUuidMismatches(account, outgoingMessageToOtherUuid);
messageMetrics.measureAccountOutgoingMessageUuidMismatches(account, outgoingMessageToOtherUuid);
counter = findCounter(simpleMeterRegistry);
assertEquals(1.0, counter.map(Counter::count).orElse(0.0));
@ -83,26 +75,26 @@ class MessageMetricsTest {
@Test
void measureAccountEnvelopeUuidMismatches() {
final MessageProtos.Envelope envelopeToAci = createEnvelope(new AciServiceIdentifier(aci));
MessageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToAci);
messageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToAci);
Optional<Counter> counter = findCounter(simpleMeterRegistry);
assertTrue(counter.isEmpty());
final MessageProtos.Envelope envelopeToPni = createEnvelope(new PniServiceIdentifier(pni));
MessageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToPni);
messageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToPni);
counter = findCounter(simpleMeterRegistry);
assertTrue(counter.isEmpty());
final MessageProtos.Envelope envelopeToOtherUuid = createEnvelope(new AciServiceIdentifier(otherUuid));
MessageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToOtherUuid);
messageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToOtherUuid);
counter = findCounter(simpleMeterRegistry);
assertEquals(1.0, counter.map(Counter::count).orElse(0.0));
final MessageProtos.Envelope envelopeToNull = createEnvelope(null);
MessageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToNull);
messageMetrics.measureAccountEnvelopeUuidMismatches(account, envelopeToNull);
counter = findCounter(simpleMeterRegistry);
assertEquals(1.0, counter.map(Counter::count).orElse(0.0));

View File

@ -46,6 +46,7 @@ import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.storage.Account;
@ -124,6 +125,7 @@ class WebSocketConnectionIntegrationTest {
final WebSocketConnection webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService),
new MessageMetrics(),
new AuthenticatedAccount(account, device),
device,
webSocketClient,
@ -209,6 +211,7 @@ class WebSocketConnectionIntegrationTest {
final WebSocketConnection webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService),
new MessageMetrics(),
new AuthenticatedAccount(account, device),
device,
webSocketClient,
@ -275,6 +278,7 @@ class WebSocketConnectionIntegrationTest {
final WebSocketConnection webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService),
new MessageMetrics(),
new AuthenticatedAccount(account, device),
device,
webSocketClient,

View File

@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -56,6 +57,7 @@ import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
@ -121,7 +123,7 @@ class WebSocketConnectionTest {
WebSocketAccountAuthenticator webSocketAuthenticator =
new WebSocketAccountAuthenticator(accountAuthenticator, mock(PrincipalSupplier.class));
AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, messagesManager,
mock(PushNotificationManager.class), mock(ClientPresenceManager.class),
new MessageMetrics(), mock(PushNotificationManager.class), mock(ClientPresenceManager.class),
retrySchedulingExecutor, messageDeliveryScheduler, clientReleaseManager);
WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class);
@ -201,8 +203,7 @@ class WebSocketConnectionTest {
return future;
});
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager,
auth, device, client, retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
WebSocketConnection connection = webSocketConnection(client);
connection.start();
verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class),
@ -229,8 +230,7 @@ class WebSocketConnectionTest {
@Test
public void testOnlineSend() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
final WebSocketConnection connection = webSocketConnection(client);
final UUID accountUuid = UUID.randomUUID();
@ -349,8 +349,7 @@ class WebSocketConnectionTest {
return future;
});
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager,
auth, device, client, retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
WebSocketConnection connection = webSocketConnection(client);
connection.start();
@ -373,8 +372,7 @@ class WebSocketConnectionTest {
@Test
void testProcessStoredMessageConcurrency() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
final WebSocketConnection connection = webSocketConnection(client);
when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID());
@ -438,8 +436,7 @@ class WebSocketConnectionTest {
@Test
void testProcessStoredMessagesMultiplePages() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
final WebSocketConnection connection = webSocketConnection(client);
when(account.getNumber()).thenReturn("+18005551234");
final UUID accountUuid = UUID.randomUUID();
@ -487,8 +484,7 @@ class WebSocketConnectionTest {
@Test
void testProcessStoredMessagesMultiplePagesBackpressure() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
final WebSocketConnection connection = webSocketConnection(client);
when(account.getNumber()).thenReturn("+18005551234");
final UUID accountUuid = UUID.randomUUID();
@ -571,8 +567,7 @@ class WebSocketConnectionTest {
@Test
void testProcessStoredMessagesContainsSenderUuid() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
final WebSocketConnection connection = webSocketConnection(client);
when(account.getNumber()).thenReturn("+18005551234");
final UUID accountUuid = UUID.randomUUID();
@ -630,11 +625,15 @@ class WebSocketConnectionTest {
verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty()));
}
private @NotNull WebSocketConnection webSocketConnection(final WebSocketClient client) {
return new WebSocketConnection(receiptSender, messagesManager, new MessageMetrics(), auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
}
@Test
void testProcessStoredMessagesSingleEmptyCall() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
final WebSocketConnection connection = webSocketConnection(client);
final UUID accountUuid = UUID.randomUUID();
@ -662,8 +661,7 @@ class WebSocketConnectionTest {
@Test
public void testRequeryOnStateMismatch() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
final WebSocketConnection connection = webSocketConnection(client);
final UUID accountUuid = UUID.randomUUID();
when(account.getNumber()).thenReturn("+18005551234");
@ -718,8 +716,7 @@ class WebSocketConnectionTest {
@Test
void testProcessCachedMessagesOnly() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
final WebSocketConnection connection = webSocketConnection(client);
final UUID accountUuid = UUID.randomUUID();
@ -750,8 +747,7 @@ class WebSocketConnectionTest {
@Test
void testProcessDatabaseMessagesAfterPersist() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
final WebSocketConnection connection = webSocketConnection(client);
final UUID accountUuid = UUID.randomUUID();
@ -797,8 +793,7 @@ class WebSocketConnectionTest {
final WebSocketClient client = mock(WebSocketClient.class);
when(client.isOpen()).thenReturn(true);
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
WebSocketConnection connection = webSocketConnection(client);
connection.start();
verify(retrySchedulingExecutor, times(WebSocketConnection.MAX_CONSECUTIVE_RETRIES)).schedule(any(Runnable.class),
@ -821,8 +816,7 @@ class WebSocketConnectionTest {
final WebSocketClient client = mock(WebSocketClient.class);
when(client.isOpen()).thenReturn(false);
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
WebSocketConnection connection = webSocketConnection(client);
connection.start();
verify(retrySchedulingExecutor, never()).schedule(any(Runnable.class), anyLong(), any());
@ -855,8 +849,7 @@ class WebSocketConnectionTest {
when(messagesManager.delete(any(), anyByte(), any(), any())).thenReturn(
CompletableFuture.completedFuture(Optional.empty()));
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, messageDeliveryScheduler, clientReleaseManager);
WebSocketConnection connection = webSocketConnection(client);
connection.start();
@ -912,8 +905,7 @@ class WebSocketConnectionTest {
when(messagesManager.delete(any(), anyByte(), any(), any())).thenReturn(
CompletableFuture.completedFuture(Optional.empty()));
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
WebSocketConnection connection = webSocketConnection(client);
connection.start();