diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 997fcbd8b..7009fd581 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -468,7 +468,7 @@ public class WhisperServerService extends Application getExperimentEnrollmentConfiguration( final String experimentName) { return Optional.ofNullable(experiments.get(experimentName)); @@ -101,4 +105,8 @@ public class DynamicConfiguration { public DynamicDirectoryReconcilerConfiguration getDirectoryReconcilerConfiguration() { return directoryReconciler; } + + public DynamicPushLatencyConfiguration getPushLatencyConfiguration() { + return pushLatency; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicPushLatencyConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicPushLatencyConfiguration.java new file mode 100644 index 000000000..beed2829a --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicPushLatencyConfiguration.java @@ -0,0 +1,27 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.configuration.dynamic; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.vdurmont.semver4j.Semver; +import org.whispersystems.textsecuregcm.util.ua.ClientPlatform; +import java.util.Map; +import java.util.Set; + +public class DynamicPushLatencyConfiguration { + + private final Map> instrumentedVersions; + + @JsonCreator + public DynamicPushLatencyConfiguration(@JsonProperty("instrumentedVersions") final Map> instrumentedVersions) { + this.instrumentedVersions = instrumentedVersions; + } + + public Map> getInstrumentedVersions() { + return instrumentedVersions; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManager.java index 1a68c38fa..29a01a35e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManager.java @@ -6,17 +6,34 @@ package org.whispersystems.textsecuregcm.metrics; import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; +import com.vdurmont.semver4j.Semver; import io.lettuce.core.SetArgs; -import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tags; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; - +import io.micrometer.core.instrument.Tag; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; +import org.whispersystems.textsecuregcm.util.SystemMapper; +import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException; +import org.whispersystems.textsecuregcm.util.ua.UserAgent; +import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil; /** * Measures and records the latency between sending a push notification to a device and that device draining its queue @@ -28,48 +45,157 @@ import java.util.concurrent.TimeUnit; * device and records the time elapsed since the push notification timestamp as a latency observation. */ public class PushLatencyManager { - private static final String TIMER_NAME = MetricRegistry.name(PushLatencyManager.class, "latency"); - private static final int TTL = (int)Duration.ofDays(1).toSeconds(); - private final FaultTolerantRedisCluster redisCluster; + private final FaultTolerantRedisCluster redisCluster; + private final DynamicConfigurationManager dynamicConfigurationManager; - public PushLatencyManager(final FaultTolerantRedisCluster redisCluster) { - this.redisCluster = redisCluster; + private final Clock clock; + + private static final String TIMER_NAME = MetricRegistry.name(PushLatencyManager.class, "latency"); + private static final int TTL = (int) Duration.ofDays(1).toSeconds(); + + private static final Logger log = LoggerFactory.getLogger(PushLatencyManager.class); + + @VisibleForTesting + enum PushType { + STANDARD, + VOIP + } + + @VisibleForTesting + static class PushRecord { + private final Instant timestamp; + + @Nullable + private final PushType pushType; + + @JsonCreator + PushRecord(@JsonProperty("timestamp") final Instant timestamp, + @JsonProperty("pushType") @Nullable final PushType pushType) { + + this.timestamp = timestamp; + this.pushType = pushType; } - public void recordPushSent(final UUID accountUuid, final long deviceId) { - recordPushSent(accountUuid, deviceId, System.currentTimeMillis()); + public Instant getTimestamp() { + return timestamp; } - @VisibleForTesting - void recordPushSent(final UUID accountUuid, final long deviceId, final long currentTime) { - redisCluster.useCluster(connection -> - connection.async().set(getFirstUnacknowledgedPushKey(accountUuid, deviceId), String.valueOf(currentTime), SetArgs.Builder.nx().ex(TTL))); + @Nullable + public PushType getPushType() { + return pushType; } + } - public void recordQueueRead(final UUID accountUuid, final long deviceId, final String userAgent) { - getLatencyAndClearTimestamp(accountUuid, deviceId, System.currentTimeMillis()).thenAccept(latency -> { - if (latency != null) { - Metrics.timer(TIMER_NAME, Tags.of(UserAgentTagUtil.getPlatformTag(userAgent))).record(latency, TimeUnit.MILLISECONDS); + public PushLatencyManager(final FaultTolerantRedisCluster redisCluster, + final DynamicConfigurationManager dynamicConfigurationManager) { + + this(redisCluster, dynamicConfigurationManager, Clock.systemUTC()); + } + + @VisibleForTesting + PushLatencyManager(final FaultTolerantRedisCluster redisCluster, + final DynamicConfigurationManager dynamicConfigurationManager, + final Clock clock) { + + this.redisCluster = redisCluster; + this.dynamicConfigurationManager = dynamicConfigurationManager; + this.clock = clock; + } + + public void recordPushSent(final UUID accountUuid, final long deviceId, final boolean isVoip) { + try { + final String recordJson = SystemMapper.getMapper().writeValueAsString( + new PushRecord(Instant.now(clock), isVoip ? PushType.VOIP : PushType.STANDARD)); + + redisCluster.useCluster(connection -> + connection.async().set(getFirstUnacknowledgedPushKey(accountUuid, deviceId), + recordJson, + SetArgs.Builder.nx().ex(TTL))); + } catch (final JsonProcessingException e) { + // This should never happen + log.error("Failed to write push latency record JSON", e); + } + } + + public void recordQueueRead(final UUID accountUuid, final long deviceId, final String userAgentString) { + takePushRecord(accountUuid, deviceId).thenAccept(pushRecord -> { + if (pushRecord != null) { + final Duration latency = Duration.between(pushRecord.getTimestamp(), Instant.now()); + + final List tags = new ArrayList<>(2); + + tags.add(UserAgentTagUtil.getPlatformTag(userAgentString)); + + if (pushRecord.getPushType() != null) { + tags.add(Tag.of("pushType", pushRecord.getPushType().name().toLowerCase())); + } + + try { + final UserAgent userAgent = UserAgentUtil.parseUserAgentString(userAgentString); + + final Set instrumentedVersions = + dynamicConfigurationManager.getConfiguration().getPushLatencyConfiguration().getInstrumentedVersions() + .getOrDefault(userAgent.getPlatform(), Collections.emptySet()); + + if (instrumentedVersions.contains(userAgent.getVersion())) { + tags.add(Tag.of("clientVersion", userAgent.getVersion().toString())); + } + } catch (UnrecognizedUserAgentException ignored) { + } + + Metrics.timer(TIMER_NAME, tags).record(latency); + } + }); + } + + @VisibleForTesting + CompletableFuture takePushRecord(final UUID accountUuid, final long deviceId) { + final String key = getFirstUnacknowledgedPushKey(accountUuid, deviceId); + final String legacyKey = getLegacyFirstUnacknowledgedPushKey(accountUuid, deviceId); + + return redisCluster.withCluster(connection -> { + final CompletableFuture getFuture = connection.async().get(key).toCompletableFuture() + .thenApply(recordJson -> { + if (StringUtils.isNotEmpty(recordJson)) { + try { + return SystemMapper.getMapper().readValue(recordJson, PushRecord.class); + } catch (JsonProcessingException e) { + return null; + } + } else { + return null; } - }); - } + }); - @VisibleForTesting - CompletableFuture getLatencyAndClearTimestamp(final UUID accountUuid, final long deviceId, final long currentTimeMillis) { - final String key = getFirstUnacknowledgedPushKey(accountUuid, deviceId); + final CompletableFuture legacyGetFuture = connection.async().get(legacyKey).toCompletableFuture() + .thenApply(timestampString -> { + if (StringUtils.isNotEmpty(timestampString)) { + return new PushRecord(Instant.ofEpochMilli(Long.parseLong(timestampString)), null); + } else { + return null; + } + }); - return redisCluster.withCluster(connection -> { - final RedisAdvancedClusterAsyncCommands commands = connection.async(); + final CompletableFuture pushRecordFuture = getFuture.thenCombine(legacyGetFuture, + (a, b) -> a != null ? a : b); - final CompletableFuture getFuture = commands.get(key).toCompletableFuture(); - commands.del(key); + pushRecordFuture.whenComplete((record, cause) -> { + if (cause == null) { + connection.async().del(key, legacyKey); + } + }); - return getFuture.thenApply(timestampString -> timestampString != null ? currentTimeMillis - Long.parseLong(timestampString, 10) : null); - }); - } + return pushRecordFuture; + }); + } - private static String getFirstUnacknowledgedPushKey(final UUID accountUuid, final long deviceId) { - return "push_latency::" + accountUuid.toString() + "::" + deviceId; - } + private static String getFirstUnacknowledgedPushKey(final UUID accountUuid, final long deviceId) { + return "push_latency::v2::" + accountUuid.toString() + "::" + deviceId; + } + + @VisibleForTesting + static String getLegacyFirstUnacknowledgedPushKey(final UUID accountUuid, final long deviceId) { + return "push_latency::" + accountUuid.toString() + "::" + deviceId; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java index 57ba089b6..a3b3a719f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java @@ -124,22 +124,24 @@ public class MessageSender implements Managed { gcmSender.sendMessage(gcmMessage); - RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId())); + RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId(), false)); } private void sendApnNotification(Account account, Device device) { ApnMessage apnMessage; - if (!Util.isEmpty(device.getVoipApnId())) { - apnMessage = new ApnMessage(device.getVoipApnId(), account.getUuid(), device.getId(), true, Type.NOTIFICATION, Optional.empty()); + final boolean useVoip = !Util.isEmpty(device.getVoipApnId()); + + if (useVoip) { + apnMessage = new ApnMessage(device.getVoipApnId(), account.getUuid(), device.getId(), useVoip, Type.NOTIFICATION, Optional.empty()); RedisOperation.unchecked(() -> apnFallbackManager.schedule(account, device)); } else { - apnMessage = new ApnMessage(device.getApnId(), account.getUuid(), device.getId(), false, Type.NOTIFICATION, Optional.empty()); + apnMessage = new ApnMessage(device.getApnId(), account.getUuid(), device.getId(), useVoip, Type.NOTIFICATION, Optional.empty()); } apnSender.sendMessage(apnMessage); - RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId())); + RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId(), useVoip)); } @Override diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index cedefac7b..b74275607 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -190,7 +190,7 @@ public class DeleteUserCommand extends EnvironmentCommand dynamicConfigurationManager; - final PushLatencyManager pushLatencyManager = new PushLatencyManager(REDIS_CLUSTER_EXTENSION.getRedisCluster()); + @BeforeEach + void setUp() { + //noinspection unchecked + dynamicConfigurationManager = mock(DynamicConfigurationManager.class); + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + final DynamicPushLatencyConfiguration dynamicPushLatencyConfiguration = mock(DynamicPushLatencyConfiguration.class); + + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + when(dynamicConfiguration.getPushLatencyConfiguration()).thenReturn(dynamicPushLatencyConfiguration); + when(dynamicPushLatencyConfiguration.getInstrumentedVersions()).thenReturn(Collections.emptyMap()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testTakeRecord(final boolean isVoip) throws ExecutionException, InterruptedException { final UUID accountUuid = UUID.randomUUID(); final long deviceId = 1; - final long expectedLatency = 1234; - final long pushSentTimestamp = System.currentTimeMillis(); - final long clearQueueTimestamp = pushSentTimestamp + expectedLatency; - assertNull(pushLatencyManager.getLatencyAndClearTimestamp(accountUuid, deviceId, System.currentTimeMillis()).get()); + final Instant pushTimestamp = Instant.now(); - { - pushLatencyManager.recordPushSent(accountUuid, deviceId, pushSentTimestamp); + final PushLatencyManager pushLatencyManager = new PushLatencyManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), + dynamicConfigurationManager, Clock.fixed(pushTimestamp, ZoneId.systemDefault())); - assertEquals(expectedLatency, - (long) pushLatencyManager.getLatencyAndClearTimestamp(accountUuid, deviceId, clearQueueTimestamp).get()); - assertNull( - pushLatencyManager.getLatencyAndClearTimestamp(accountUuid, deviceId, System.currentTimeMillis()).get()); - } + assertNull(pushLatencyManager.takePushRecord(accountUuid, deviceId).get()); + + pushLatencyManager.recordPushSent(accountUuid, deviceId, isVoip); + + final PushRecord pushRecord = pushLatencyManager.takePushRecord(accountUuid, deviceId).get(); + + assertNotNull(pushRecord); + assertEquals(pushTimestamp, pushRecord.getTimestamp()); + assertEquals(isVoip ? PushType.VOIP : PushType.STANDARD, pushRecord.getPushType()); + + assertNull(pushLatencyManager.takePushRecord(accountUuid, deviceId).get()); + } + + @Test + void testTakeLegacyRecord() throws ExecutionException, InterruptedException { + final UUID accountUuid = UUID.randomUUID(); + final long deviceId = 1; + + final Instant pushTimestamp = Instant.ofEpochMilli(123456789); + + final PushLatencyManager pushLatencyManager = new PushLatencyManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), + dynamicConfigurationManager, Clock.fixed(pushTimestamp, ZoneId.systemDefault())); + + assertNull(pushLatencyManager.takePushRecord(accountUuid, deviceId).get()); + + // Inject a legacy record + REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> + connection.sync().set(PushLatencyManager.getLegacyFirstUnacknowledgedPushKey(accountUuid, deviceId), + String.valueOf(pushTimestamp.toEpochMilli()))); + + final PushRecord pushRecord = pushLatencyManager.takePushRecord(accountUuid, deviceId).get(); + + assertNotNull(pushRecord); + assertEquals(pushTimestamp, pushRecord.getTimestamp()); + assertNull(pushRecord.getPushType()); + + assertNull(pushLatencyManager.takePushRecord(accountUuid, deviceId).get()); } }