From 6db97f5541c3f7f4f46a959c79affd318be16004 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Mon, 10 Jul 2023 12:51:16 -0400 Subject: [PATCH] Standardize client tag version handling; add client version tags to delivery latency metrics --- .../textsecuregcm/WhisperServerService.java | 4 +- .../dynamic/DynamicConfiguration.java | 8 ++++ .../DynamicDeliveryLatencyConfiguration.java | 14 ++++++ .../DynamicPushLatencyConfiguration.java | 13 +----- .../controllers/MessageController.java | 10 ++++- .../textsecuregcm/metrics/MessageMetrics.java | 23 ++++++++-- .../metrics/UserAgentTagUtil.java | 7 +-- .../push/PushLatencyManager.java | 23 ++-------- .../AuthenticatedConnectListener.java | 10 ++++- .../websocket/WebSocketConnection.java | 16 +++++-- .../controllers/MessageControllerTest.java | 15 ++++++- .../push/PushLatencyManagerTest.java | 2 +- .../WebSocketConnectionIntegrationTest.java | 21 +++++++-- .../websocket/WebSocketConnectionTest.java | 44 ++++++++++++------- 14 files changed, 142 insertions(+), 68 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicDeliveryLatencyConfiguration.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index bf6e61238..20954d98d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -642,7 +642,7 @@ public class WhisperServerService extends Application getExperimentEnrollmentConfiguration( final String experimentName) { return Optional.ofNullable(experiments.get(experimentName)); @@ -104,4 +108,8 @@ public class DynamicConfiguration { public DynamicECPreKeyMigrationConfiguration getEcPreKeyMigrationConfiguration() { return ecPreKeyMigration; } + + public DynamicDeliveryLatencyConfiguration getDeliveryLatencyConfiguration() { + return deliveryLatency; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicDeliveryLatencyConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicDeliveryLatencyConfiguration.java new file mode 100644 index 000000000..9ab29fb9f --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicDeliveryLatencyConfiguration.java @@ -0,0 +1,14 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.configuration.dynamic; + +import com.vdurmont.semver4j.Semver; +import org.whispersystems.textsecuregcm.util.ua.ClientPlatform; +import java.util.Map; +import java.util.Set; + +public record DynamicDeliveryLatencyConfiguration(Map> instrumentedVersions) { +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicPushLatencyConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicPushLatencyConfiguration.java index beed2829a..157264c0c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicPushLatencyConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicPushLatencyConfiguration.java @@ -12,16 +12,5 @@ import org.whispersystems.textsecuregcm.util.ua.ClientPlatform; import java.util.Map; import java.util.Set; -public class DynamicPushLatencyConfiguration { - - private final Map> instrumentedVersions; - - @JsonCreator - public DynamicPushLatencyConfiguration(@JsonProperty("instrumentedVersions") final Map> instrumentedVersions) { - this.instrumentedVersions = instrumentedVersions; - } - - public Map> getInstrumentedVersions() { - return instrumentedVersions; - } +public record DynamicPushLatencyConfiguration(Map> instrumentedVersions) { } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java index f73c1a446..6d23f1b20 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java @@ -66,6 +66,7 @@ import org.whispersystems.textsecuregcm.auth.Anonymous; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; import org.whispersystems.textsecuregcm.auth.CombinedUnidentifiedSenderAccessKeys; import org.whispersystems.textsecuregcm.auth.OptionalAccess; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.AccountMismatchedDevices; import org.whispersystems.textsecuregcm.entities.AccountStaleDevices; import org.whispersystems.textsecuregcm.entities.IncomingMessage; @@ -95,6 +96,7 @@ import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.DeletedAccounts; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.ReportMessageManager; import org.whispersystems.textsecuregcm.util.DestinationDeviceValidator; @@ -122,6 +124,7 @@ public class MessageController { private final ExecutorService multiRecipientMessageExecutor; private final Scheduler messageDeliveryScheduler; private final ReportSpamTokenProvider reportSpamTokenProvider; + private final DynamicConfigurationManager dynamicConfigurationManager; private static final String REJECT_OVERSIZE_MESSAGE_COUNTER = name(MessageController.class, "rejectOversizeMessage"); private static final String SENT_MESSAGE_COUNTER_NAME = name(MessageController.class, "sentMessages"); @@ -154,7 +157,8 @@ public class MessageController { ReportMessageManager reportMessageManager, @Nonnull ExecutorService multiRecipientMessageExecutor, Scheduler messageDeliveryScheduler, - @Nonnull ReportSpamTokenProvider reportSpamTokenProvider) { + @Nonnull ReportSpamTokenProvider reportSpamTokenProvider, + final DynamicConfigurationManager dynamicConfigurationManager) { this.rateLimiters = rateLimiters; this.messageSender = messageSender; this.receiptSender = receiptSender; @@ -166,6 +170,7 @@ public class MessageController { this.multiRecipientMessageExecutor = Objects.requireNonNull(multiRecipientMessageExecutor); this.messageDeliveryScheduler = messageDeliveryScheduler; this.reportSpamTokenProvider = reportSpamTokenProvider; + this.dynamicConfigurationManager = dynamicConfigurationManager; } @Timed @@ -536,7 +541,8 @@ public class MessageController { .map(OutgoingMessageEntity::fromEnvelope) .peek(outgoingMessageEntity -> { MessageMetrics.measureAccountOutgoingMessageUuidMismatches(auth.getAccount(), outgoingMessageEntity); - MessageMetrics.measureOutgoingMessageLatency(outgoingMessageEntity.serverTimestamp(), "rest", userAgent); + MessageMetrics.measureOutgoingMessageLatency(outgoingMessageEntity.serverTimestamp(), "rest", userAgent, + dynamicConfigurationManager.getConfiguration().getDeliveryLatencyConfiguration().instrumentedVersions()); }) .collect(Collectors.toList()), messagesAndHasMore.second()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MessageMetrics.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MessageMetrics.java index 38695b92d..2b7027b82 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MessageMetrics.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MessageMetrics.java @@ -7,9 +7,14 @@ package org.whispersystems.textsecuregcm.metrics; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; +import com.vdurmont.semver4j.Semver; import io.micrometer.core.instrument.Metrics; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.UUID; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; @@ -18,6 +23,8 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.util.ua.ClientPlatform; +import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil; public final class MessageMetrics { @@ -54,10 +61,18 @@ public final class MessageMetrics { } } - public static void measureOutgoingMessageLatency(final long serverTimestamp, final String channel, final String userAgent) { - Metrics.timer(DELIVERY_LATENCY_TIMER_NAME, Tags.of( - UserAgentTagUtil.getPlatformTag(userAgent), - Tag.of("channel", channel))) + public static void measureOutgoingMessageLatency(final long serverTimestamp, + final String channel, + final String userAgent, + final Map> taggedVersions) { + + final List tags = new ArrayList<>(3); + tags.add(UserAgentTagUtil.getPlatformTag(userAgent)); + tags.add(Tag.of("channel", channel)); + + UserAgentTagUtil.getClientVersionTag(userAgent, taggedVersions).ifPresent(tags::add); + + Metrics.timer(DELIVERY_LATENCY_TIMER_NAME, tags) .record(Duration.between(Instant.ofEpochMilli(serverTimestamp), Instant.now())); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/UserAgentTagUtil.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/UserAgentTagUtil.java index 1ba6b0046..77253e97b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/UserAgentTagUtil.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/UserAgentTagUtil.java @@ -7,14 +7,15 @@ package org.whispersystems.textsecuregcm.metrics; import com.vdurmont.semver4j.Semver; import io.micrometer.core.instrument.Tag; -import org.whispersystems.textsecuregcm.util.Pair; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import org.whispersystems.textsecuregcm.util.ua.ClientPlatform; import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException; import org.whispersystems.textsecuregcm.util.ua.UserAgent; import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil; -import java.util.*; - /** * Utility class for extracting platform/version metrics tags from User-Agent strings. */ diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushLatencyManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushLatencyManager.java index 6b59e6602..742aa7379 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushLatencyManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushLatencyManager.java @@ -8,7 +8,6 @@ package org.whispersystems.textsecuregcm.push; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; -import com.vdurmont.semver4j.Semver; import io.lettuce.core.SetArgs; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; @@ -16,10 +15,8 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; @@ -30,9 +27,6 @@ import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.util.SystemMapper; -import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException; -import org.whispersystems.textsecuregcm.util.ua.UserAgent; -import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil; /** * Measures and records the latency between sending a push notification to a device and that device draining its queue @@ -105,21 +99,12 @@ public class PushLatencyManager { tags.add(UserAgentTagUtil.getPlatformTag(userAgentString)); tags.add(Tag.of("pushType", pushRecord.pushType().name().toLowerCase())); + UserAgentTagUtil.getClientVersionTag(userAgentString, + dynamicConfigurationManager.getConfiguration().getPushLatencyConfiguration().instrumentedVersions()) + .ifPresent(tags::add); + pushRecord.urgent().ifPresent(urgent -> tags.add(Tag.of("urgent", String.valueOf(urgent)))); - try { - final UserAgent userAgent = UserAgentUtil.parseUserAgentString(userAgentString); - - final Set instrumentedVersions = - dynamicConfigurationManager.getConfiguration().getPushLatencyConfiguration().getInstrumentedVersions() - .getOrDefault(userAgent.getPlatform(), Collections.emptySet()); - - if (instrumentedVersions.contains(userAgent.getVersion())) { - tags.add(Tag.of("clientVersion", userAgent.getVersion().toString())); - } - } catch (UnrecognizedUserAgentException ignored) { - } - Metrics.timer(TIMER_NAME, tags).record(latency); } }); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java index 797ffc153..b158f1db6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; @@ -31,6 +32,7 @@ import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.ua.ClientPlatform; @@ -67,6 +69,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { private final ClientPresenceManager clientPresenceManager; private final ScheduledExecutorService scheduledExecutorService; private final Scheduler messageDeliveryScheduler; + private final DynamicConfigurationManager dynamicConfigurationManager; private final Map openAuthenticatedWebsocketsByClientPlatform; private final Map openUnauthenticatedWebsocketsByClientPlatform; @@ -83,13 +86,15 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { PushNotificationManager pushNotificationManager, ClientPresenceManager clientPresenceManager, ScheduledExecutorService scheduledExecutorService, - Scheduler messageDeliveryScheduler) { + Scheduler messageDeliveryScheduler, + final DynamicConfigurationManager dynamicConfigurationManager) { this.receiptSender = receiptSender; this.messagesManager = messagesManager; this.pushNotificationManager = pushNotificationManager; this.clientPresenceManager = clientPresenceManager; this.scheduledExecutorService = scheduledExecutorService; this.messageDeliveryScheduler = messageDeliveryScheduler; + this.dynamicConfigurationManager = dynamicConfigurationManager; openAuthenticatedWebsocketsByClientPlatform = new EnumMap<>(ClientPlatform.class); openUnauthenticatedWebsocketsByClientPlatform = new EnumMap<>(ClientPlatform.class); @@ -151,7 +156,8 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { messagesManager, auth, device, context.getClient(), scheduledExecutorService, - messageDeliveryScheduler); + messageDeliveryScheduler, + dynamicConfigurationManager); openWebsocketAtomicInteger.incrementAndGet(); openWebsocketCounter.inc(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index e5a677709..6b6ba4155 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -37,6 +37,7 @@ import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.controllers.MessageController; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; @@ -45,6 +46,7 @@ import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.push.DisplacedPresenceListener; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.MessageAvailabilityListener; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.util.Constants; @@ -128,6 +130,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private final Random random = new Random(); private final Scheduler messageDeliveryScheduler; + private final DynamicConfigurationManager dynamicConfigurationManager; + private enum StoredMessageState { EMPTY, CACHED_NEW_MESSAGES_AVAILABLE, @@ -140,7 +144,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac Device device, WebSocketClient client, ScheduledExecutorService scheduledExecutorService, - Scheduler messageDeliveryScheduler) { + Scheduler messageDeliveryScheduler, + DynamicConfigurationManager dynamicConfigurationManager) { this(receiptSender, messagesManager, @@ -149,7 +154,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac client, DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS, scheduledExecutorService, - messageDeliveryScheduler); + messageDeliveryScheduler, dynamicConfigurationManager); } @VisibleForTesting @@ -160,7 +165,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac WebSocketClient client, int sendFuturesTimeoutMillis, ScheduledExecutorService scheduledExecutorService, - Scheduler messageDeliveryScheduler) { + Scheduler messageDeliveryScheduler, + DynamicConfigurationManager dynamicConfigurationManager) { this.receiptSender = receiptSender; this.messagesManager = messagesManager; @@ -170,6 +176,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis; this.scheduledExecutorService = scheduledExecutorService; this.messageDeliveryScheduler = messageDeliveryScheduler; + this.dynamicConfigurationManager = dynamicConfigurationManager; } public void start() { @@ -208,7 +215,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac if (throwable != null) { sendFailuresMeter.mark(); } else { - MessageMetrics.measureOutgoingMessageLatency(message.getServerTimestamp(), "websocket", client.getUserAgent()); + MessageMetrics.measureOutgoingMessageLatency(message.getServerTimestamp(), "websocket", client.getUserAgent(), + dynamicConfigurationManager.getConfiguration().getDeliveryLatencyConfiguration().instrumentedVersions()); } }).thenCompose(response -> { final CompletableFuture result; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java index 9e7c14961..617e8dd21 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java @@ -41,6 +41,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; import java.util.Base64; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -74,6 +75,8 @@ import org.signal.libsignal.protocol.ecc.ECKeyPair; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; import org.whispersystems.textsecuregcm.auth.DisabledPermittedAuthenticatedAccount; import org.whispersystems.textsecuregcm.auth.OptionalAccess; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicDeliveryLatencyConfiguration; import org.whispersystems.textsecuregcm.entities.ECSignedPreKey; import org.whispersystems.textsecuregcm.entities.IncomingMessage; import org.whispersystems.textsecuregcm.entities.IncomingMessageList; @@ -99,6 +102,7 @@ import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.DeletedAccounts; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.ReportMessageManager; import org.whispersystems.textsecuregcm.tests.util.AccountsHelper; @@ -149,6 +153,7 @@ class MessageControllerTest { private static final ReportMessageManager reportMessageManager = mock(ReportMessageManager.class); private static final ExecutorService multiRecipientMessageExecutor = mock(ExecutorService.class); private static final Scheduler messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); + private static final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class); private static final ResourceExtension resources = ResourceExtension.builder() .addProperty(ServerProperties.UNWRAP_COMPLETION_STAGE_IN_WRITER_ENABLE, Boolean.TRUE) @@ -161,7 +166,7 @@ class MessageControllerTest { .addResource( new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, deletedAccounts, messagesManager, pushNotificationManager, reportMessageManager, multiRecipientMessageExecutor, - messageDeliveryScheduler, ReportSpamTokenProvider.noop())) + messageDeliveryScheduler, ReportSpamTokenProvider.noop(), dynamicConfigurationManager)) .build(); @BeforeEach @@ -191,6 +196,14 @@ class MessageControllerTest { when(accountsManager.getByPhoneNumberIdentifier(MULTI_DEVICE_PNI)).thenReturn(Optional.of(multiDeviceAccount)); when(accountsManager.getByAccountIdentifier(INTERNATIONAL_UUID)).thenReturn(Optional.of(internationalAccount)); + final DynamicDeliveryLatencyConfiguration deliveryLatencyConfiguration = mock(DynamicDeliveryLatencyConfiguration.class); + when(deliveryLatencyConfiguration.instrumentedVersions()).thenReturn(Collections.emptyMap()); + + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + when(dynamicConfiguration.getDeliveryLatencyConfiguration()).thenReturn(deliveryLatencyConfiguration); + + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + when(rateLimiters.getMessagesLimiter()).thenReturn(rateLimiter); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushLatencyManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushLatencyManagerTest.java index bb335b1bd..b6d6c8d7b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushLatencyManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushLatencyManagerTest.java @@ -47,7 +47,7 @@ class PushLatencyManagerTest { when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); when(dynamicConfiguration.getPushLatencyConfiguration()).thenReturn(dynamicPushLatencyConfiguration); - when(dynamicPushLatencyConfiguration.getInstrumentedVersions()).thenReturn(Collections.emptyMap()); + when(dynamicPushLatencyConfiguration.instrumentedVersions()).thenReturn(Collections.emptyMap()); } @ParameterizedTest diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 2a7e98e8e..2165b251e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.time.Clock; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -44,12 +45,15 @@ import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicDeliveryLatencyConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.DynamoDbExtension; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.storage.MessagesCache; @@ -79,6 +83,7 @@ class WebSocketConnectionIntegrationTest { private Device device; private WebSocketClient webSocketClient; private Scheduler messageDeliveryScheduler; + private DynamicConfigurationManager dynamicConfigurationManager; private long serialTimestamp = System.currentTimeMillis(); @@ -98,6 +103,15 @@ class WebSocketConnectionIntegrationTest { device = mock(Device.class); webSocketClient = mock(WebSocketClient.class); + final DynamicDeliveryLatencyConfiguration deliveryLatencyConfiguration = mock(DynamicDeliveryLatencyConfiguration.class); + when(deliveryLatencyConfiguration.instrumentedVersions()).thenReturn(Collections.emptyMap()); + + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + when(dynamicConfiguration.getDeliveryLatencyConfiguration()).thenReturn(deliveryLatencyConfiguration); + + dynamicConfigurationManager = mock(DynamicConfigurationManager.class); + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + when(account.getNumber()).thenReturn("+18005551234"); when(account.getUuid()).thenReturn(UUID.randomUUID()); when(device.getId()).thenReturn(1L); @@ -126,7 +140,7 @@ class WebSocketConnectionIntegrationTest { device, webSocketClient, scheduledExecutorService, - messageDeliveryScheduler); + messageDeliveryScheduler, dynamicConfigurationManager); final List expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount); @@ -210,7 +224,7 @@ class WebSocketConnectionIntegrationTest { device, webSocketClient, scheduledExecutorService, - messageDeliveryScheduler); + messageDeliveryScheduler, dynamicConfigurationManager); final int persistedMessageCount = 207; final int cachedMessageCount = 173; @@ -276,7 +290,8 @@ class WebSocketConnectionIntegrationTest { webSocketClient, 100, // use a very short timeout, so that this test completes quickly scheduledExecutorService, - messageDeliveryScheduler); + messageDeliveryScheduler, + dynamicConfigurationManager); final int persistedMessageCount = 207; final int cachedMessageCount = 173; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 1e55e3aaa..00f1e5ee4 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -33,6 +33,7 @@ import io.lettuce.core.RedisException; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -52,12 +53,15 @@ import org.junit.jupiter.api.Test; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicDeliveryLatencyConfiguration; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.websocket.WebSocketClient; @@ -88,6 +92,7 @@ class WebSocketConnectionTest { private ReceiptSender receiptSender; private ScheduledExecutorService retrySchedulingExecutor; private Scheduler messageDeliveryScheduler; + private DynamicConfigurationManager dynamicConfigurationManager; @BeforeEach void setup() { @@ -101,6 +106,15 @@ class WebSocketConnectionTest { receiptSender = mock(ReceiptSender.class); retrySchedulingExecutor = mock(ScheduledExecutorService.class); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); + + final DynamicDeliveryLatencyConfiguration deliveryLatencyConfiguration = mock(DynamicDeliveryLatencyConfiguration.class); + when(deliveryLatencyConfiguration.instrumentedVersions()).thenReturn(Collections.emptyMap()); + + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + when(dynamicConfiguration.getDeliveryLatencyConfiguration()).thenReturn(deliveryLatencyConfiguration); + + dynamicConfigurationManager = mock(DynamicConfigurationManager.class); + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); } @AfterEach @@ -114,7 +128,7 @@ class WebSocketConnectionTest { WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator); AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, messagesManager, mock(PushNotificationManager.class), mock(ClientPresenceManager.class), - retrySchedulingExecutor, messageDeliveryScheduler); + retrySchedulingExecutor, messageDeliveryScheduler, dynamicConfigurationManager); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD)))) @@ -194,7 +208,7 @@ class WebSocketConnectionTest { }); WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, - auth, device, client, retrySchedulingExecutor, Schedulers.immediate()); + auth, device, client, retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); connection.start(); verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), @@ -222,7 +236,7 @@ class WebSocketConnectionTest { public void testOnlineSend() { final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); final UUID accountUuid = UUID.randomUUID(); @@ -342,7 +356,7 @@ class WebSocketConnectionTest { }); WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, - auth, device, client, retrySchedulingExecutor, Schedulers.immediate()); + auth, device, client, retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); connection.start(); @@ -366,7 +380,7 @@ class WebSocketConnectionTest { void testProcessStoredMessageConcurrency() { final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); when(account.getNumber()).thenReturn("+18005551234"); when(account.getUuid()).thenReturn(UUID.randomUUID()); @@ -431,7 +445,7 @@ class WebSocketConnectionTest { void testProcessStoredMessagesMultiplePages() { final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); when(account.getNumber()).thenReturn("+18005551234"); final UUID accountUuid = UUID.randomUUID(); @@ -483,7 +497,7 @@ class WebSocketConnectionTest { void testProcessStoredMessagesContainsSenderUuid() { final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); when(account.getNumber()).thenReturn("+18005551234"); final UUID accountUuid = UUID.randomUUID(); @@ -545,7 +559,7 @@ class WebSocketConnectionTest { void testProcessStoredMessagesSingleEmptyCall() { final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); final UUID accountUuid = UUID.randomUUID(); @@ -574,7 +588,7 @@ class WebSocketConnectionTest { public void testRequeryOnStateMismatch() { final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); final UUID accountUuid = UUID.randomUUID(); when(account.getNumber()).thenReturn("+18005551234"); @@ -630,7 +644,7 @@ class WebSocketConnectionTest { void testProcessCachedMessagesOnly() { final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); final UUID accountUuid = UUID.randomUUID(); @@ -662,7 +676,7 @@ class WebSocketConnectionTest { void testProcessDatabaseMessagesAfterPersist() { final WebSocketClient client = mock(WebSocketClient.class); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); final UUID accountUuid = UUID.randomUUID(); @@ -709,7 +723,7 @@ class WebSocketConnectionTest { when(client.isOpen()).thenReturn(true); WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); connection.start(); verify(retrySchedulingExecutor, times(WebSocketConnection.MAX_CONSECUTIVE_RETRIES)).schedule(any(Runnable.class), @@ -733,7 +747,7 @@ class WebSocketConnectionTest { when(client.isOpen()).thenReturn(false); WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); connection.start(); verify(retrySchedulingExecutor, never()).schedule(any(Runnable.class), anyLong(), any()); @@ -767,7 +781,7 @@ class WebSocketConnectionTest { CompletableFuture.completedFuture(Optional.empty())); WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, messageDeliveryScheduler); + retrySchedulingExecutor, messageDeliveryScheduler, dynamicConfigurationManager); connection.start(); @@ -824,7 +838,7 @@ class WebSocketConnectionTest { CompletableFuture.completedFuture(Optional.empty())); WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client, - retrySchedulingExecutor, Schedulers.immediate()); + retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager); connection.start();