Standardize client tag version handling; add client version tags to delivery latency metrics

This commit is contained in:
Jon Chambers 2023-07-10 12:51:16 -04:00 committed by Jon Chambers
parent adf6c751ee
commit 6db97f5541
14 changed files with 142 additions and 68 deletions

View File

@ -642,7 +642,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator));
webSocketEnvironment.setConnectListener(
new AuthenticatedConnectListener(receiptSender, messagesManager, pushNotificationManager,
clientPresenceManager, websocketScheduledExecutor, messageDeliveryScheduler));
clientPresenceManager, websocketScheduledExecutor, messageDeliveryScheduler, dynamicConfigurationManager));
webSocketEnvironment.jersey()
.register(new WebsocketRefreshApplicationEventListener(accountsManager, clientPresenceManager));
webSocketEnvironment.jersey().register(new RequestStatisticsFilter(TrafficSource.WEBSOCKET));
@ -719,7 +719,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ReceiptCredentialPresentation::new),
new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, deletedAccounts,
messagesManager, pushNotificationManager, reportMessageManager, multiRecipientMessageExecutor,
messageDeliveryScheduler, reportSpamTokenProvider),
messageDeliveryScheduler, reportSpamTokenProvider, dynamicConfigurationManager),
new PaymentsController(currencyManager, paymentsCredentialsGenerator),
new ProfileController(clock, rateLimiters, accountsManager, profilesManager, dynamicConfigurationManager,
profileBadgeConverter, config.getBadges(), cdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner,

View File

@ -59,6 +59,10 @@ public class DynamicConfiguration {
@Valid
DynamicECPreKeyMigrationConfiguration ecPreKeyMigration = new DynamicECPreKeyMigrationConfiguration(true, false);
@JsonProperty
@Valid
DynamicDeliveryLatencyConfiguration deliveryLatency = new DynamicDeliveryLatencyConfiguration(Collections.emptyMap());
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
@ -104,4 +108,8 @@ public class DynamicConfiguration {
public DynamicECPreKeyMigrationConfiguration getEcPreKeyMigrationConfiguration() {
return ecPreKeyMigration;
}
public DynamicDeliveryLatencyConfiguration getDeliveryLatencyConfiguration() {
return deliveryLatency;
}
}

View File

@ -0,0 +1,14 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration.dynamic;
import com.vdurmont.semver4j.Semver;
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
import java.util.Map;
import java.util.Set;
public record DynamicDeliveryLatencyConfiguration(Map<ClientPlatform, Set<Semver>> instrumentedVersions) {
}

View File

@ -12,16 +12,5 @@ import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
import java.util.Map;
import java.util.Set;
public class DynamicPushLatencyConfiguration {
private final Map<ClientPlatform, Set<Semver>> instrumentedVersions;
@JsonCreator
public DynamicPushLatencyConfiguration(@JsonProperty("instrumentedVersions") final Map<ClientPlatform, Set<Semver>> instrumentedVersions) {
this.instrumentedVersions = instrumentedVersions;
}
public Map<ClientPlatform, Set<Semver>> getInstrumentedVersions() {
return instrumentedVersions;
}
public record DynamicPushLatencyConfiguration(Map<ClientPlatform, Set<Semver>> instrumentedVersions) {
}

View File

@ -66,6 +66,7 @@ import org.whispersystems.textsecuregcm.auth.Anonymous;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.auth.CombinedUnidentifiedSenderAccessKeys;
import org.whispersystems.textsecuregcm.auth.OptionalAccess;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.AccountMismatchedDevices;
import org.whispersystems.textsecuregcm.entities.AccountStaleDevices;
import org.whispersystems.textsecuregcm.entities.IncomingMessage;
@ -95,6 +96,7 @@ import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DeletedAccounts;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
import org.whispersystems.textsecuregcm.util.DestinationDeviceValidator;
@ -122,6 +124,7 @@ public class MessageController {
private final ExecutorService multiRecipientMessageExecutor;
private final Scheduler messageDeliveryScheduler;
private final ReportSpamTokenProvider reportSpamTokenProvider;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private static final String REJECT_OVERSIZE_MESSAGE_COUNTER = name(MessageController.class, "rejectOversizeMessage");
private static final String SENT_MESSAGE_COUNTER_NAME = name(MessageController.class, "sentMessages");
@ -154,7 +157,8 @@ public class MessageController {
ReportMessageManager reportMessageManager,
@Nonnull ExecutorService multiRecipientMessageExecutor,
Scheduler messageDeliveryScheduler,
@Nonnull ReportSpamTokenProvider reportSpamTokenProvider) {
@Nonnull ReportSpamTokenProvider reportSpamTokenProvider,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
this.rateLimiters = rateLimiters;
this.messageSender = messageSender;
this.receiptSender = receiptSender;
@ -166,6 +170,7 @@ public class MessageController {
this.multiRecipientMessageExecutor = Objects.requireNonNull(multiRecipientMessageExecutor);
this.messageDeliveryScheduler = messageDeliveryScheduler;
this.reportSpamTokenProvider = reportSpamTokenProvider;
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
@Timed
@ -536,7 +541,8 @@ public class MessageController {
.map(OutgoingMessageEntity::fromEnvelope)
.peek(outgoingMessageEntity -> {
MessageMetrics.measureAccountOutgoingMessageUuidMismatches(auth.getAccount(), outgoingMessageEntity);
MessageMetrics.measureOutgoingMessageLatency(outgoingMessageEntity.serverTimestamp(), "rest", userAgent);
MessageMetrics.measureOutgoingMessageLatency(outgoingMessageEntity.serverTimestamp(), "rest", userAgent,
dynamicConfigurationManager.getConfiguration().getDeliveryLatencyConfiguration().instrumentedVersions());
})
.collect(Collectors.toList()),
messagesAndHasMore.second());

View File

@ -7,9 +7,14 @@ package org.whispersystems.textsecuregcm.metrics;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.vdurmont.semver4j.Semver;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
@ -18,6 +23,8 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
public final class MessageMetrics {
@ -54,10 +61,18 @@ public final class MessageMetrics {
}
}
public static void measureOutgoingMessageLatency(final long serverTimestamp, final String channel, final String userAgent) {
Metrics.timer(DELIVERY_LATENCY_TIMER_NAME, Tags.of(
UserAgentTagUtil.getPlatformTag(userAgent),
Tag.of("channel", channel)))
public static void measureOutgoingMessageLatency(final long serverTimestamp,
final String channel,
final String userAgent,
final Map<ClientPlatform, Set<Semver>> taggedVersions) {
final List<Tag> tags = new ArrayList<>(3);
tags.add(UserAgentTagUtil.getPlatformTag(userAgent));
tags.add(Tag.of("channel", channel));
UserAgentTagUtil.getClientVersionTag(userAgent, taggedVersions).ifPresent(tags::add);
Metrics.timer(DELIVERY_LATENCY_TIMER_NAME, tags)
.record(Duration.between(Instant.ofEpochMilli(serverTimestamp), Instant.now()));
}
}

View File

@ -7,14 +7,15 @@ package org.whispersystems.textsecuregcm.metrics;
import com.vdurmont.semver4j.Semver;
import io.micrometer.core.instrument.Tag;
import org.whispersystems.textsecuregcm.util.Pair;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgent;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
import java.util.*;
/**
* Utility class for extracting platform/version metrics tags from User-Agent strings.
*/

View File

@ -8,7 +8,6 @@ package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.vdurmont.semver4j.Semver;
import io.lettuce.core.SetArgs;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
@ -16,10 +15,8 @@ import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
@ -30,9 +27,6 @@ import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgent;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
/**
* Measures and records the latency between sending a push notification to a device and that device draining its queue
@ -105,21 +99,12 @@ public class PushLatencyManager {
tags.add(UserAgentTagUtil.getPlatformTag(userAgentString));
tags.add(Tag.of("pushType", pushRecord.pushType().name().toLowerCase()));
UserAgentTagUtil.getClientVersionTag(userAgentString,
dynamicConfigurationManager.getConfiguration().getPushLatencyConfiguration().instrumentedVersions())
.ifPresent(tags::add);
pushRecord.urgent().ifPresent(urgent -> tags.add(Tag.of("urgent", String.valueOf(urgent))));
try {
final UserAgent userAgent = UserAgentUtil.parseUserAgentString(userAgentString);
final Set<Semver> instrumentedVersions =
dynamicConfigurationManager.getConfiguration().getPushLatencyConfiguration().getInstrumentedVersions()
.getOrDefault(userAgent.getPlatform(), Collections.emptySet());
if (instrumentedVersions.contains(userAgent.getVersion())) {
tags.add(Tag.of("clientVersion", userAgent.getVersion().toString()));
}
} catch (UnrecognizedUserAgentException ignored) {
}
Metrics.timer(TIMER_NAME, tags).record(latency);
}
});

View File

@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
@ -31,6 +32,7 @@ import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
@ -67,6 +69,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
private final ClientPresenceManager clientPresenceManager;
private final ScheduledExecutorService scheduledExecutorService;
private final Scheduler messageDeliveryScheduler;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private final Map<ClientPlatform, AtomicInteger> openAuthenticatedWebsocketsByClientPlatform;
private final Map<ClientPlatform, AtomicInteger> openUnauthenticatedWebsocketsByClientPlatform;
@ -83,13 +86,15 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
PushNotificationManager pushNotificationManager,
ClientPresenceManager clientPresenceManager,
ScheduledExecutorService scheduledExecutorService,
Scheduler messageDeliveryScheduler) {
Scheduler messageDeliveryScheduler,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
this.pushNotificationManager = pushNotificationManager;
this.clientPresenceManager = clientPresenceManager;
this.scheduledExecutorService = scheduledExecutorService;
this.messageDeliveryScheduler = messageDeliveryScheduler;
this.dynamicConfigurationManager = dynamicConfigurationManager;
openAuthenticatedWebsocketsByClientPlatform = new EnumMap<>(ClientPlatform.class);
openUnauthenticatedWebsocketsByClientPlatform = new EnumMap<>(ClientPlatform.class);
@ -151,7 +156,8 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
messagesManager, auth, device,
context.getClient(),
scheduledExecutorService,
messageDeliveryScheduler);
messageDeliveryScheduler,
dynamicConfigurationManager);
openWebsocketAtomicInteger.incrementAndGet();
openWebsocketCounter.inc();

View File

@ -37,6 +37,7 @@ import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.controllers.MessageController;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
@ -45,6 +46,7 @@ import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.push.DisplacedPresenceListener;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.MessageAvailabilityListener;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.Constants;
@ -128,6 +130,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private final Random random = new Random();
private final Scheduler messageDeliveryScheduler;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private enum StoredMessageState {
EMPTY,
CACHED_NEW_MESSAGES_AVAILABLE,
@ -140,7 +144,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
Device device,
WebSocketClient client,
ScheduledExecutorService scheduledExecutorService,
Scheduler messageDeliveryScheduler) {
Scheduler messageDeliveryScheduler,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
this(receiptSender,
messagesManager,
@ -149,7 +154,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
client,
DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS,
scheduledExecutorService,
messageDeliveryScheduler);
messageDeliveryScheduler, dynamicConfigurationManager);
}
@VisibleForTesting
@ -160,7 +165,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
WebSocketClient client,
int sendFuturesTimeoutMillis,
ScheduledExecutorService scheduledExecutorService,
Scheduler messageDeliveryScheduler) {
Scheduler messageDeliveryScheduler,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
@ -170,6 +176,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis;
this.scheduledExecutorService = scheduledExecutorService;
this.messageDeliveryScheduler = messageDeliveryScheduler;
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
public void start() {
@ -208,7 +215,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
if (throwable != null) {
sendFailuresMeter.mark();
} else {
MessageMetrics.measureOutgoingMessageLatency(message.getServerTimestamp(), "websocket", client.getUserAgent());
MessageMetrics.measureOutgoingMessageLatency(message.getServerTimestamp(), "websocket", client.getUserAgent(),
dynamicConfigurationManager.getConfiguration().getDeliveryLatencyConfiguration().instrumentedVersions());
}
}).thenCompose(response -> {
final CompletableFuture<Void> result;

View File

@ -41,6 +41,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -74,6 +75,8 @@ import org.signal.libsignal.protocol.ecc.ECKeyPair;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.auth.DisabledPermittedAuthenticatedAccount;
import org.whispersystems.textsecuregcm.auth.OptionalAccess;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicDeliveryLatencyConfiguration;
import org.whispersystems.textsecuregcm.entities.ECSignedPreKey;
import org.whispersystems.textsecuregcm.entities.IncomingMessage;
import org.whispersystems.textsecuregcm.entities.IncomingMessageList;
@ -99,6 +102,7 @@ import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DeletedAccounts;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
import org.whispersystems.textsecuregcm.tests.util.AccountsHelper;
@ -149,6 +153,7 @@ class MessageControllerTest {
private static final ReportMessageManager reportMessageManager = mock(ReportMessageManager.class);
private static final ExecutorService multiRecipientMessageExecutor = mock(ExecutorService.class);
private static final Scheduler messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
private static final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
private static final ResourceExtension resources = ResourceExtension.builder()
.addProperty(ServerProperties.UNWRAP_COMPLETION_STAGE_IN_WRITER_ENABLE, Boolean.TRUE)
@ -161,7 +166,7 @@ class MessageControllerTest {
.addResource(
new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, deletedAccounts,
messagesManager, pushNotificationManager, reportMessageManager, multiRecipientMessageExecutor,
messageDeliveryScheduler, ReportSpamTokenProvider.noop()))
messageDeliveryScheduler, ReportSpamTokenProvider.noop(), dynamicConfigurationManager))
.build();
@BeforeEach
@ -191,6 +196,14 @@ class MessageControllerTest {
when(accountsManager.getByPhoneNumberIdentifier(MULTI_DEVICE_PNI)).thenReturn(Optional.of(multiDeviceAccount));
when(accountsManager.getByAccountIdentifier(INTERNATIONAL_UUID)).thenReturn(Optional.of(internationalAccount));
final DynamicDeliveryLatencyConfiguration deliveryLatencyConfiguration = mock(DynamicDeliveryLatencyConfiguration.class);
when(deliveryLatencyConfiguration.instrumentedVersions()).thenReturn(Collections.emptyMap());
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
when(dynamicConfiguration.getDeliveryLatencyConfiguration()).thenReturn(deliveryLatencyConfiguration);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
when(rateLimiters.getMessagesLimiter()).thenReturn(rateLimiter);
}

View File

@ -47,7 +47,7 @@ class PushLatencyManagerTest {
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
when(dynamicConfiguration.getPushLatencyConfiguration()).thenReturn(dynamicPushLatencyConfiguration);
when(dynamicPushLatencyConfiguration.getInstrumentedVersions()).thenReturn(Collections.emptyMap());
when(dynamicPushLatencyConfiguration.instrumentedVersions()).thenReturn(Collections.emptyMap());
}
@ParameterizedTest

View File

@ -25,6 +25,7 @@ import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -44,12 +45,15 @@ import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicDeliveryLatencyConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtension;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
@ -79,6 +83,7 @@ class WebSocketConnectionIntegrationTest {
private Device device;
private WebSocketClient webSocketClient;
private Scheduler messageDeliveryScheduler;
private DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private long serialTimestamp = System.currentTimeMillis();
@ -98,6 +103,15 @@ class WebSocketConnectionIntegrationTest {
device = mock(Device.class);
webSocketClient = mock(WebSocketClient.class);
final DynamicDeliveryLatencyConfiguration deliveryLatencyConfiguration = mock(DynamicDeliveryLatencyConfiguration.class);
when(deliveryLatencyConfiguration.instrumentedVersions()).thenReturn(Collections.emptyMap());
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
when(dynamicConfiguration.getDeliveryLatencyConfiguration()).thenReturn(deliveryLatencyConfiguration);
dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID());
when(device.getId()).thenReturn(1L);
@ -126,7 +140,7 @@ class WebSocketConnectionIntegrationTest {
device,
webSocketClient,
scheduledExecutorService,
messageDeliveryScheduler);
messageDeliveryScheduler, dynamicConfigurationManager);
final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount);
@ -210,7 +224,7 @@ class WebSocketConnectionIntegrationTest {
device,
webSocketClient,
scheduledExecutorService,
messageDeliveryScheduler);
messageDeliveryScheduler, dynamicConfigurationManager);
final int persistedMessageCount = 207;
final int cachedMessageCount = 173;
@ -276,7 +290,8 @@ class WebSocketConnectionIntegrationTest {
webSocketClient,
100, // use a very short timeout, so that this test completes quickly
scheduledExecutorService,
messageDeliveryScheduler);
messageDeliveryScheduler,
dynamicConfigurationManager);
final int persistedMessageCount = 207;
final int cachedMessageCount = 173;

View File

@ -33,6 +33,7 @@ import io.lettuce.core.RedisException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -52,12 +53,15 @@ import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicDeliveryLatencyConfiguration;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.websocket.WebSocketClient;
@ -88,6 +92,7 @@ class WebSocketConnectionTest {
private ReceiptSender receiptSender;
private ScheduledExecutorService retrySchedulingExecutor;
private Scheduler messageDeliveryScheduler;
private DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
@BeforeEach
void setup() {
@ -101,6 +106,15 @@ class WebSocketConnectionTest {
receiptSender = mock(ReceiptSender.class);
retrySchedulingExecutor = mock(ScheduledExecutorService.class);
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
final DynamicDeliveryLatencyConfiguration deliveryLatencyConfiguration = mock(DynamicDeliveryLatencyConfiguration.class);
when(deliveryLatencyConfiguration.instrumentedVersions()).thenReturn(Collections.emptyMap());
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
when(dynamicConfiguration.getDeliveryLatencyConfiguration()).thenReturn(deliveryLatencyConfiguration);
dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
}
@AfterEach
@ -114,7 +128,7 @@ class WebSocketConnectionTest {
WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator);
AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, messagesManager,
mock(PushNotificationManager.class), mock(ClientPresenceManager.class),
retrySchedulingExecutor, messageDeliveryScheduler);
retrySchedulingExecutor, messageDeliveryScheduler, dynamicConfigurationManager);
WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class);
when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD))))
@ -194,7 +208,7 @@ class WebSocketConnectionTest {
});
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager,
auth, device, client, retrySchedulingExecutor, Schedulers.immediate());
auth, device, client, retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
connection.start();
verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class),
@ -222,7 +236,7 @@ class WebSocketConnectionTest {
public void testOnlineSend() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate());
retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
final UUID accountUuid = UUID.randomUUID();
@ -342,7 +356,7 @@ class WebSocketConnectionTest {
});
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager,
auth, device, client, retrySchedulingExecutor, Schedulers.immediate());
auth, device, client, retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
connection.start();
@ -366,7 +380,7 @@ class WebSocketConnectionTest {
void testProcessStoredMessageConcurrency() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate());
retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID());
@ -431,7 +445,7 @@ class WebSocketConnectionTest {
void testProcessStoredMessagesMultiplePages() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate());
retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
when(account.getNumber()).thenReturn("+18005551234");
final UUID accountUuid = UUID.randomUUID();
@ -483,7 +497,7 @@ class WebSocketConnectionTest {
void testProcessStoredMessagesContainsSenderUuid() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate());
retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
when(account.getNumber()).thenReturn("+18005551234");
final UUID accountUuid = UUID.randomUUID();
@ -545,7 +559,7 @@ class WebSocketConnectionTest {
void testProcessStoredMessagesSingleEmptyCall() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate());
retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
final UUID accountUuid = UUID.randomUUID();
@ -574,7 +588,7 @@ class WebSocketConnectionTest {
public void testRequeryOnStateMismatch() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate());
retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
final UUID accountUuid = UUID.randomUUID();
when(account.getNumber()).thenReturn("+18005551234");
@ -630,7 +644,7 @@ class WebSocketConnectionTest {
void testProcessCachedMessagesOnly() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate());
retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
final UUID accountUuid = UUID.randomUUID();
@ -662,7 +676,7 @@ class WebSocketConnectionTest {
void testProcessDatabaseMessagesAfterPersist() {
final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate());
retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
final UUID accountUuid = UUID.randomUUID();
@ -709,7 +723,7 @@ class WebSocketConnectionTest {
when(client.isOpen()).thenReturn(true);
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate());
retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
connection.start();
verify(retrySchedulingExecutor, times(WebSocketConnection.MAX_CONSECUTIVE_RETRIES)).schedule(any(Runnable.class),
@ -733,7 +747,7 @@ class WebSocketConnectionTest {
when(client.isOpen()).thenReturn(false);
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate());
retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
connection.start();
verify(retrySchedulingExecutor, never()).schedule(any(Runnable.class), anyLong(), any());
@ -767,7 +781,7 @@ class WebSocketConnectionTest {
CompletableFuture.completedFuture(Optional.empty()));
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, messageDeliveryScheduler);
retrySchedulingExecutor, messageDeliveryScheduler, dynamicConfigurationManager);
connection.start();
@ -824,7 +838,7 @@ class WebSocketConnectionTest {
CompletableFuture.completedFuture(Optional.empty()));
WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, auth, device, client,
retrySchedulingExecutor, Schedulers.immediate());
retrySchedulingExecutor, Schedulers.immediate(), dynamicConfigurationManager);
connection.start();