Break down push latency metrics by VOIP/not-VOIP and optionally by client version

This commit is contained in:
Jon Chambers 2021-11-10 10:35:41 -05:00 committed by GitHub
parent b1f56c3324
commit aaa2a6eef1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 277 additions and 57 deletions

View File

@ -468,7 +468,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster, Metrics.globalRegistry, config.getReportMessageConfiguration().getCounterTtl());
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager, reportMessageManager);
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,

View File

@ -51,6 +51,10 @@ public class DynamicConfiguration {
@JsonProperty
private DynamicDirectoryReconcilerConfiguration directoryReconciler = new DynamicDirectoryReconcilerConfiguration();
@JsonProperty
@Valid
private DynamicPushLatencyConfiguration pushLatency = new DynamicPushLatencyConfiguration(Collections.emptyMap());
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
@ -101,4 +105,8 @@ public class DynamicConfiguration {
public DynamicDirectoryReconcilerConfiguration getDirectoryReconcilerConfiguration() {
return directoryReconciler;
}
public DynamicPushLatencyConfiguration getPushLatencyConfiguration() {
return pushLatency;
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration.dynamic;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.vdurmont.semver4j.Semver;
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
import java.util.Map;
import java.util.Set;
public class DynamicPushLatencyConfiguration {
private final Map<ClientPlatform, Set<Semver>> instrumentedVersions;
@JsonCreator
public DynamicPushLatencyConfiguration(@JsonProperty("instrumentedVersions") final Map<ClientPlatform, Set<Semver>> instrumentedVersions) {
this.instrumentedVersions = instrumentedVersions;
}
public Map<ClientPlatform, Set<Semver>> getInstrumentedVersions() {
return instrumentedVersions;
}
}

View File

@ -6,17 +6,34 @@
package org.whispersystems.textsecuregcm.metrics;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.vdurmont.semver4j.Semver;
import io.lettuce.core.SetArgs;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import io.micrometer.core.instrument.Tag;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgent;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
/**
* Measures and records the latency between sending a push notification to a device and that device draining its queue
@ -28,48 +45,157 @@ import java.util.concurrent.TimeUnit;
* device and records the time elapsed since the push notification timestamp as a latency observation.
*/
public class PushLatencyManager {
private static final String TIMER_NAME = MetricRegistry.name(PushLatencyManager.class, "latency");
private static final int TTL = (int)Duration.ofDays(1).toSeconds();
private final FaultTolerantRedisCluster redisCluster;
private final FaultTolerantRedisCluster redisCluster;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
public PushLatencyManager(final FaultTolerantRedisCluster redisCluster) {
this.redisCluster = redisCluster;
private final Clock clock;
private static final String TIMER_NAME = MetricRegistry.name(PushLatencyManager.class, "latency");
private static final int TTL = (int) Duration.ofDays(1).toSeconds();
private static final Logger log = LoggerFactory.getLogger(PushLatencyManager.class);
@VisibleForTesting
enum PushType {
STANDARD,
VOIP
}
@VisibleForTesting
static class PushRecord {
private final Instant timestamp;
@Nullable
private final PushType pushType;
@JsonCreator
PushRecord(@JsonProperty("timestamp") final Instant timestamp,
@JsonProperty("pushType") @Nullable final PushType pushType) {
this.timestamp = timestamp;
this.pushType = pushType;
}
public void recordPushSent(final UUID accountUuid, final long deviceId) {
recordPushSent(accountUuid, deviceId, System.currentTimeMillis());
public Instant getTimestamp() {
return timestamp;
}
@VisibleForTesting
void recordPushSent(final UUID accountUuid, final long deviceId, final long currentTime) {
redisCluster.useCluster(connection ->
connection.async().set(getFirstUnacknowledgedPushKey(accountUuid, deviceId), String.valueOf(currentTime), SetArgs.Builder.nx().ex(TTL)));
@Nullable
public PushType getPushType() {
return pushType;
}
}
public void recordQueueRead(final UUID accountUuid, final long deviceId, final String userAgent) {
getLatencyAndClearTimestamp(accountUuid, deviceId, System.currentTimeMillis()).thenAccept(latency -> {
if (latency != null) {
Metrics.timer(TIMER_NAME, Tags.of(UserAgentTagUtil.getPlatformTag(userAgent))).record(latency, TimeUnit.MILLISECONDS);
public PushLatencyManager(final FaultTolerantRedisCluster redisCluster,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
this(redisCluster, dynamicConfigurationManager, Clock.systemUTC());
}
@VisibleForTesting
PushLatencyManager(final FaultTolerantRedisCluster redisCluster,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final Clock clock) {
this.redisCluster = redisCluster;
this.dynamicConfigurationManager = dynamicConfigurationManager;
this.clock = clock;
}
public void recordPushSent(final UUID accountUuid, final long deviceId, final boolean isVoip) {
try {
final String recordJson = SystemMapper.getMapper().writeValueAsString(
new PushRecord(Instant.now(clock), isVoip ? PushType.VOIP : PushType.STANDARD));
redisCluster.useCluster(connection ->
connection.async().set(getFirstUnacknowledgedPushKey(accountUuid, deviceId),
recordJson,
SetArgs.Builder.nx().ex(TTL)));
} catch (final JsonProcessingException e) {
// This should never happen
log.error("Failed to write push latency record JSON", e);
}
}
public void recordQueueRead(final UUID accountUuid, final long deviceId, final String userAgentString) {
takePushRecord(accountUuid, deviceId).thenAccept(pushRecord -> {
if (pushRecord != null) {
final Duration latency = Duration.between(pushRecord.getTimestamp(), Instant.now());
final List<Tag> tags = new ArrayList<>(2);
tags.add(UserAgentTagUtil.getPlatformTag(userAgentString));
if (pushRecord.getPushType() != null) {
tags.add(Tag.of("pushType", pushRecord.getPushType().name().toLowerCase()));
}
try {
final UserAgent userAgent = UserAgentUtil.parseUserAgentString(userAgentString);
final Set<Semver> instrumentedVersions =
dynamicConfigurationManager.getConfiguration().getPushLatencyConfiguration().getInstrumentedVersions()
.getOrDefault(userAgent.getPlatform(), Collections.emptySet());
if (instrumentedVersions.contains(userAgent.getVersion())) {
tags.add(Tag.of("clientVersion", userAgent.getVersion().toString()));
}
} catch (UnrecognizedUserAgentException ignored) {
}
Metrics.timer(TIMER_NAME, tags).record(latency);
}
});
}
@VisibleForTesting
CompletableFuture<PushRecord> takePushRecord(final UUID accountUuid, final long deviceId) {
final String key = getFirstUnacknowledgedPushKey(accountUuid, deviceId);
final String legacyKey = getLegacyFirstUnacknowledgedPushKey(accountUuid, deviceId);
return redisCluster.withCluster(connection -> {
final CompletableFuture<PushRecord> getFuture = connection.async().get(key).toCompletableFuture()
.thenApply(recordJson -> {
if (StringUtils.isNotEmpty(recordJson)) {
try {
return SystemMapper.getMapper().readValue(recordJson, PushRecord.class);
} catch (JsonProcessingException e) {
return null;
}
} else {
return null;
}
});
}
});
@VisibleForTesting
CompletableFuture<Long> getLatencyAndClearTimestamp(final UUID accountUuid, final long deviceId, final long currentTimeMillis) {
final String key = getFirstUnacknowledgedPushKey(accountUuid, deviceId);
final CompletableFuture<PushRecord> legacyGetFuture = connection.async().get(legacyKey).toCompletableFuture()
.thenApply(timestampString -> {
if (StringUtils.isNotEmpty(timestampString)) {
return new PushRecord(Instant.ofEpochMilli(Long.parseLong(timestampString)), null);
} else {
return null;
}
});
return redisCluster.withCluster(connection -> {
final RedisAdvancedClusterAsyncCommands<String, String> commands = connection.async();
final CompletableFuture<PushRecord> pushRecordFuture = getFuture.thenCombine(legacyGetFuture,
(a, b) -> a != null ? a : b);
final CompletableFuture<String> getFuture = commands.get(key).toCompletableFuture();
commands.del(key);
pushRecordFuture.whenComplete((record, cause) -> {
if (cause == null) {
connection.async().del(key, legacyKey);
}
});
return getFuture.thenApply(timestampString -> timestampString != null ? currentTimeMillis - Long.parseLong(timestampString, 10) : null);
});
}
return pushRecordFuture;
});
}
private static String getFirstUnacknowledgedPushKey(final UUID accountUuid, final long deviceId) {
return "push_latency::" + accountUuid.toString() + "::" + deviceId;
}
private static String getFirstUnacknowledgedPushKey(final UUID accountUuid, final long deviceId) {
return "push_latency::v2::" + accountUuid.toString() + "::" + deviceId;
}
@VisibleForTesting
static String getLegacyFirstUnacknowledgedPushKey(final UUID accountUuid, final long deviceId) {
return "push_latency::" + accountUuid.toString() + "::" + deviceId;
}
}

View File

@ -124,22 +124,24 @@ public class MessageSender implements Managed {
gcmSender.sendMessage(gcmMessage);
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId()));
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId(), false));
}
private void sendApnNotification(Account account, Device device) {
ApnMessage apnMessage;
if (!Util.isEmpty(device.getVoipApnId())) {
apnMessage = new ApnMessage(device.getVoipApnId(), account.getUuid(), device.getId(), true, Type.NOTIFICATION, Optional.empty());
final boolean useVoip = !Util.isEmpty(device.getVoipApnId());
if (useVoip) {
apnMessage = new ApnMessage(device.getVoipApnId(), account.getUuid(), device.getId(), useVoip, Type.NOTIFICATION, Optional.empty());
RedisOperation.unchecked(() -> apnFallbackManager.schedule(account, device));
} else {
apnMessage = new ApnMessage(device.getApnId(), account.getUuid(), device.getId(), false, Type.NOTIFICATION, Optional.empty());
apnMessage = new ApnMessage(device.getApnId(), account.getUuid(), device.getId(), useVoip, Type.NOTIFICATION, Optional.empty());
}
apnSender.sendMessage(apnMessage);
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId()));
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId(), useVoip));
}
@Override

View File

@ -190,7 +190,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor);
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster,
keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
DirectoryQueue directoryQueue = new DirectoryQueue(
configuration.getDirectoryConfiguration().getSqsConfiguration());
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);

View File

@ -193,7 +193,7 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor);
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster,
keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
DirectoryQueue directoryQueue = new DirectoryQueue(
configuration.getDirectoryConfiguration().getSqsConfiguration());
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);

View File

@ -6,38 +6,95 @@
package org.whispersystems.textsecuregcm.metrics;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicPushLatencyConfiguration;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager.PushRecord;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager.PushType;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
class PushLatencyManagerTest {
@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
@Test
void testGetLatency() throws ExecutionException, InterruptedException {
private DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
final PushLatencyManager pushLatencyManager = new PushLatencyManager(REDIS_CLUSTER_EXTENSION.getRedisCluster());
@BeforeEach
void setUp() {
//noinspection unchecked
dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
final DynamicPushLatencyConfiguration dynamicPushLatencyConfiguration = mock(DynamicPushLatencyConfiguration.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
when(dynamicConfiguration.getPushLatencyConfiguration()).thenReturn(dynamicPushLatencyConfiguration);
when(dynamicPushLatencyConfiguration.getInstrumentedVersions()).thenReturn(Collections.emptyMap());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testTakeRecord(final boolean isVoip) throws ExecutionException, InterruptedException {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
final long expectedLatency = 1234;
final long pushSentTimestamp = System.currentTimeMillis();
final long clearQueueTimestamp = pushSentTimestamp + expectedLatency;
assertNull(pushLatencyManager.getLatencyAndClearTimestamp(accountUuid, deviceId, System.currentTimeMillis()).get());
final Instant pushTimestamp = Instant.now();
{
pushLatencyManager.recordPushSent(accountUuid, deviceId, pushSentTimestamp);
final PushLatencyManager pushLatencyManager = new PushLatencyManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
dynamicConfigurationManager, Clock.fixed(pushTimestamp, ZoneId.systemDefault()));
assertEquals(expectedLatency,
(long) pushLatencyManager.getLatencyAndClearTimestamp(accountUuid, deviceId, clearQueueTimestamp).get());
assertNull(
pushLatencyManager.getLatencyAndClearTimestamp(accountUuid, deviceId, System.currentTimeMillis()).get());
}
assertNull(pushLatencyManager.takePushRecord(accountUuid, deviceId).get());
pushLatencyManager.recordPushSent(accountUuid, deviceId, isVoip);
final PushRecord pushRecord = pushLatencyManager.takePushRecord(accountUuid, deviceId).get();
assertNotNull(pushRecord);
assertEquals(pushTimestamp, pushRecord.getTimestamp());
assertEquals(isVoip ? PushType.VOIP : PushType.STANDARD, pushRecord.getPushType());
assertNull(pushLatencyManager.takePushRecord(accountUuid, deviceId).get());
}
@Test
void testTakeLegacyRecord() throws ExecutionException, InterruptedException {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
final Instant pushTimestamp = Instant.ofEpochMilli(123456789);
final PushLatencyManager pushLatencyManager = new PushLatencyManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
dynamicConfigurationManager, Clock.fixed(pushTimestamp, ZoneId.systemDefault()));
assertNull(pushLatencyManager.takePushRecord(accountUuid, deviceId).get());
// Inject a legacy record
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection ->
connection.sync().set(PushLatencyManager.getLegacyFirstUnacknowledgedPushKey(accountUuid, deviceId),
String.valueOf(pushTimestamp.toEpochMilli())));
final PushRecord pushRecord = pushLatencyManager.takePushRecord(accountUuid, deviceId).get();
assertNotNull(pushRecord);
assertEquals(pushTimestamp, pushRecord.getTimestamp());
assertNull(pushRecord.getPushType());
assertNull(pushLatencyManager.takePushRecord(accountUuid, deviceId).get());
}
}