Retire `PushLatencyManager`
This commit is contained in:
parent
4f10014902
commit
37369929f3
|
@ -189,7 +189,6 @@ import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
|||
import org.whispersystems.textsecuregcm.push.FcmSender;
|
||||
import org.whispersystems.textsecuregcm.push.MessageSender;
|
||||
import org.whispersystems.textsecuregcm.push.ProvisioningManager;
|
||||
import org.whispersystems.textsecuregcm.push.PushLatencyManager;
|
||||
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
|
||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||
import org.whispersystems.textsecuregcm.redis.ConnectionEventLogger;
|
||||
|
@ -596,7 +595,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
recurringJobExecutor,
|
||||
config.getClientReleaseConfiguration().refreshInterval(),
|
||||
Clock.systemUTC());
|
||||
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, clientReleaseManager);
|
||||
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
|
||||
config.getReportMessageConfiguration().getCounterTtl());
|
||||
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager,
|
||||
|
@ -616,8 +614,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
FcmSender fcmSender = new FcmSender(fcmSenderExecutor, config.getFcmConfiguration().credentials().value());
|
||||
ApnPushNotificationScheduler apnPushNotificationScheduler = new ApnPushNotificationScheduler(pushSchedulerCluster,
|
||||
apnSender, accountsManager, 0);
|
||||
PushNotificationManager pushNotificationManager = new PushNotificationManager(accountsManager, apnSender, fcmSender,
|
||||
apnPushNotificationScheduler, pushLatencyManager);
|
||||
PushNotificationManager pushNotificationManager =
|
||||
new PushNotificationManager(accountsManager, apnSender, fcmSender, apnPushNotificationScheduler);
|
||||
RateLimiters rateLimiters = RateLimiters.createAndValidate(config.getLimitsConfiguration(),
|
||||
dynamicConfigurationManager, rateLimitersCluster);
|
||||
ProvisioningManager provisioningManager = new ProvisioningManager(
|
||||
|
@ -649,9 +647,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
|
||||
final AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
|
||||
|
||||
final MessageSender messageSender = new MessageSender(clientPresenceManager, messagesManager,
|
||||
pushNotificationManager,
|
||||
pushLatencyManager);
|
||||
final MessageSender messageSender =
|
||||
new MessageSender(clientPresenceManager, messagesManager, pushNotificationManager);
|
||||
final ReceiptSender receiptSender = new ReceiptSender(accountsManager, messageSender, receiptSenderExecutor);
|
||||
final TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(dynamicConfigurationManager,
|
||||
config.getTurnConfiguration().secret().value());
|
||||
|
|
|
@ -19,12 +19,9 @@ import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
|
|||
import io.micrometer.core.instrument.config.MeterFilter;
|
||||
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
|
||||
import io.micrometer.statsd.StatsdMeterRegistry;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.WhisperServerVersion;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
import org.whispersystems.textsecuregcm.push.PushLatencyManager;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
|
@ -34,8 +31,6 @@ public class MetricsUtil {
|
|||
|
||||
private static volatile boolean registeredMetrics = false;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MetricsUtil.class);
|
||||
|
||||
/**
|
||||
* Returns a dot-separated ('.') name for the given class and name parts
|
||||
*/
|
||||
|
@ -113,7 +108,6 @@ public class MetricsUtil {
|
|||
return MeterFilter.super.map(id);
|
||||
}
|
||||
})
|
||||
.meterFilter(MeterFilter.denyNameStartsWith(PushLatencyManager.TIMER_NAME + ".percentile"))
|
||||
.meterFilter(MeterFilter.denyNameStartsWith(MessageMetrics.DELIVERY_LATENCY_TIMER_NAME + ".percentile"));
|
||||
}
|
||||
|
||||
|
|
|
@ -8,8 +8,6 @@ import static com.codahale.metrics.MetricRegistry.name;
|
|||
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
|
@ -32,7 +30,6 @@ public class MessageSender {
|
|||
private final ClientPresenceManager clientPresenceManager;
|
||||
private final MessagesManager messagesManager;
|
||||
private final PushNotificationManager pushNotificationManager;
|
||||
private final PushLatencyManager pushLatencyManager;
|
||||
|
||||
private static final String SEND_COUNTER_NAME = name(MessageSender.class, "sendMessage");
|
||||
private static final String CHANNEL_TAG_NAME = "channel";
|
||||
|
@ -42,14 +39,13 @@ public class MessageSender {
|
|||
private static final String STORY_TAG_NAME = "story";
|
||||
private static final String SEALED_SENDER_TAG_NAME = "sealedSender";
|
||||
|
||||
public MessageSender(ClientPresenceManager clientPresenceManager,
|
||||
MessagesManager messagesManager,
|
||||
PushNotificationManager pushNotificationManager,
|
||||
PushLatencyManager pushLatencyManager) {
|
||||
public MessageSender(final ClientPresenceManager clientPresenceManager,
|
||||
final MessagesManager messagesManager,
|
||||
final PushNotificationManager pushNotificationManager) {
|
||||
|
||||
this.clientPresenceManager = clientPresenceManager;
|
||||
this.messagesManager = messagesManager;
|
||||
this.pushNotificationManager = pushNotificationManager;
|
||||
this.pushLatencyManager = pushLatencyManager;
|
||||
}
|
||||
|
||||
public void sendMessage(final Account account, final Device device, final Envelope message, final boolean online) {
|
||||
|
@ -85,9 +81,6 @@ public class MessageSender {
|
|||
if (!clientPresent) {
|
||||
try {
|
||||
pushNotificationManager.sendNewMessageNotification(account, device.getId(), message.getUrgent());
|
||||
|
||||
final boolean useVoip = StringUtils.isNotBlank(device.getVoipApnId());
|
||||
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId(), useVoip, message.getUrgent()));
|
||||
} catch (final NotPushRegisteredException ignored) {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,147 +0,0 @@
|
|||
/*
|
||||
* Copyright 2013 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.lettuce.core.SetArgs;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Tag;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
|
||||
/**
|
||||
* Measures and records the latency between sending a push notification to a device and that device draining its queue
|
||||
* of messages.
|
||||
* <p/>
|
||||
* When the server sends a push notification to a device, the push latency manager creates a Redis key/value pair
|
||||
* mapping the current timestamp to the given device if such a mapping doesn't already exist. When a client connects and
|
||||
* clears its message queue, the push latency manager gets and clears the time of the initial push notification to that
|
||||
* device and records the time elapsed since the push notification timestamp as a latency observation.
|
||||
*/
|
||||
public class PushLatencyManager {
|
||||
|
||||
private final FaultTolerantRedisCluster redisCluster;
|
||||
private final ClientReleaseManager clientReleaseManager;
|
||||
|
||||
private final Clock clock;
|
||||
|
||||
public static final String TIMER_NAME = MetricRegistry.name(PushLatencyManager.class, "latency");
|
||||
private static final int TTL = (int) Duration.ofDays(1).toSeconds();
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PushLatencyManager.class);
|
||||
|
||||
@VisibleForTesting
|
||||
enum PushType {
|
||||
STANDARD,
|
||||
VOIP
|
||||
}
|
||||
|
||||
record PushRecord(Instant timestamp, PushType pushType, Optional<Boolean> urgent) {
|
||||
}
|
||||
|
||||
public PushLatencyManager(final FaultTolerantRedisCluster redisCluster,
|
||||
final ClientReleaseManager clientReleaseManager) {
|
||||
|
||||
this(redisCluster, clientReleaseManager, Clock.systemUTC());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
PushLatencyManager(final FaultTolerantRedisCluster redisCluster,
|
||||
final ClientReleaseManager clientReleaseManager,
|
||||
final Clock clock) {
|
||||
|
||||
this.redisCluster = redisCluster;
|
||||
this.clientReleaseManager = clientReleaseManager;
|
||||
this.clock = clock;
|
||||
}
|
||||
|
||||
void recordPushSent(final UUID accountUuid, final byte deviceId, final boolean isVoip, final boolean isUrgent) {
|
||||
try {
|
||||
final String recordJson = SystemMapper.jsonMapper().writeValueAsString(
|
||||
new PushRecord(Instant.now(clock), isVoip ? PushType.VOIP : PushType.STANDARD, Optional.of(isUrgent)));
|
||||
|
||||
redisCluster.useCluster(connection ->
|
||||
connection.async().set(getFirstUnacknowledgedPushKey(accountUuid, deviceId),
|
||||
recordJson,
|
||||
SetArgs.Builder.nx().ex(TTL)));
|
||||
} catch (final JsonProcessingException e) {
|
||||
// This should never happen
|
||||
log.error("Failed to write push latency record JSON", e);
|
||||
}
|
||||
}
|
||||
|
||||
void recordQueueRead(final UUID accountUuid, final byte deviceId, final String userAgentString) {
|
||||
takePushRecord(accountUuid, deviceId).thenAccept(pushRecord -> {
|
||||
if (pushRecord != null) {
|
||||
final Duration latency = Duration.between(pushRecord.timestamp(), Instant.now());
|
||||
|
||||
final List<Tag> tags = new ArrayList<>(3);
|
||||
|
||||
tags.add(UserAgentTagUtil.getPlatformTag(userAgentString));
|
||||
tags.add(Tag.of("pushType", pushRecord.pushType().name().toLowerCase()));
|
||||
|
||||
UserAgentTagUtil.getClientVersionTag(userAgentString, clientReleaseManager)
|
||||
.ifPresent(tags::add);
|
||||
|
||||
pushRecord.urgent().ifPresent(urgent -> tags.add(Tag.of("urgent", String.valueOf(urgent))));
|
||||
|
||||
Timer.builder(TIMER_NAME)
|
||||
.publishPercentileHistogram(true)
|
||||
.tags(tags)
|
||||
.register(Metrics.globalRegistry)
|
||||
.record(latency);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
CompletableFuture<PushRecord> takePushRecord(final UUID accountUuid, final byte deviceId) {
|
||||
final String key = getFirstUnacknowledgedPushKey(accountUuid, deviceId);
|
||||
|
||||
return redisCluster.withCluster(connection -> {
|
||||
final CompletableFuture<PushRecord> getFuture = connection.async().get(key).toCompletableFuture()
|
||||
.thenApply(recordJson -> {
|
||||
if (StringUtils.isNotEmpty(recordJson)) {
|
||||
try {
|
||||
return SystemMapper.jsonMapper().readValue(recordJson, PushRecord.class);
|
||||
} catch (JsonProcessingException e) {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
getFuture.whenComplete((record, cause) -> {
|
||||
if (cause == null) {
|
||||
connection.async().del(key);
|
||||
}
|
||||
});
|
||||
|
||||
return getFuture;
|
||||
});
|
||||
}
|
||||
|
||||
private static String getFirstUnacknowledgedPushKey(final UUID accountUuid, final byte deviceId) {
|
||||
return "push_latency::v2::" + accountUuid.toString() + "::" + deviceId;
|
||||
}
|
||||
}
|
|
@ -29,7 +29,6 @@ public class PushNotificationManager {
|
|||
private final APNSender apnSender;
|
||||
private final FcmSender fcmSender;
|
||||
private final ApnPushNotificationScheduler apnPushNotificationScheduler;
|
||||
private final PushLatencyManager pushLatencyManager;
|
||||
|
||||
private static final String SENT_NOTIFICATION_COUNTER_NAME = name(PushNotificationManager.class, "sentPushNotification");
|
||||
private static final String FAILED_NOTIFICATION_COUNTER_NAME = name(PushNotificationManager.class, "failedPushNotification");
|
||||
|
@ -40,14 +39,12 @@ public class PushNotificationManager {
|
|||
public PushNotificationManager(final AccountsManager accountsManager,
|
||||
final APNSender apnSender,
|
||||
final FcmSender fcmSender,
|
||||
final ApnPushNotificationScheduler apnPushNotificationScheduler,
|
||||
final PushLatencyManager pushLatencyManager) {
|
||||
final ApnPushNotificationScheduler apnPushNotificationScheduler) {
|
||||
|
||||
this.accountsManager = accountsManager;
|
||||
this.apnSender = apnSender;
|
||||
this.fcmSender = fcmSender;
|
||||
this.apnPushNotificationScheduler = apnPushNotificationScheduler;
|
||||
this.pushLatencyManager = pushLatencyManager;
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<SendPushNotificationResult>> sendNewMessageNotification(final Account destination, final byte destinationDeviceId, final boolean urgent) throws NotPushRegisteredException {
|
||||
|
@ -85,7 +82,6 @@ public class PushNotificationManager {
|
|||
}
|
||||
|
||||
public void handleMessagesRetrieved(final Account account, final Device device, final String userAgent) {
|
||||
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(account.getUuid(), device.getId(), userAgent));
|
||||
apnPushNotificationScheduler.cancelScheduledNotifications(account, device).whenComplete(logErrors());
|
||||
}
|
||||
|
||||
|
|
|
@ -54,10 +54,7 @@ class MessageSenderTest {
|
|||
clientPresenceManager = mock(ClientPresenceManager.class);
|
||||
messagesManager = mock(MessagesManager.class);
|
||||
pushNotificationManager = mock(PushNotificationManager.class);
|
||||
messageSender = new MessageSender(clientPresenceManager,
|
||||
messagesManager,
|
||||
pushNotificationManager,
|
||||
mock(PushLatencyManager.class));
|
||||
messageSender = new MessageSender(clientPresenceManager, messagesManager, pushNotificationManager);
|
||||
|
||||
when(account.getUuid()).thenReturn(ACCOUNT_UUID);
|
||||
when(device.getId()).thenReturn(DEVICE_ID);
|
||||
|
|
|
@ -1,67 +0,0 @@
|
|||
/*
|
||||
* Copyright 2013-2022 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Stream;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.whispersystems.textsecuregcm.push.PushLatencyManager.PushRecord;
|
||||
import org.whispersystems.textsecuregcm.push.PushLatencyManager.PushType;
|
||||
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
|
||||
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
|
||||
|
||||
class PushLatencyManagerTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource
|
||||
void testTakeRecord(final boolean isVoip, final boolean isUrgent) throws ExecutionException, InterruptedException {
|
||||
final UUID accountUuid = UUID.randomUUID();
|
||||
final byte deviceId = 1;
|
||||
|
||||
final Instant pushTimestamp = Instant.now();
|
||||
|
||||
final PushLatencyManager pushLatencyManager = new PushLatencyManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
|
||||
mock(ClientReleaseManager.class), Clock.fixed(pushTimestamp, ZoneId.systemDefault()));
|
||||
|
||||
assertNull(pushLatencyManager.takePushRecord(accountUuid, deviceId).get());
|
||||
|
||||
pushLatencyManager.recordPushSent(accountUuid, deviceId, isVoip, isUrgent);
|
||||
|
||||
final PushRecord pushRecord = pushLatencyManager.takePushRecord(accountUuid, deviceId).get();
|
||||
|
||||
assertNotNull(pushRecord);
|
||||
assertEquals(pushTimestamp, pushRecord.timestamp());
|
||||
assertEquals(isVoip ? PushType.VOIP : PushType.STANDARD, pushRecord.pushType());
|
||||
assertEquals(Optional.of(isUrgent), pushRecord.urgent());
|
||||
|
||||
assertNull(pushLatencyManager.takePushRecord(accountUuid, deviceId).get());
|
||||
}
|
||||
|
||||
private static Stream<Arguments> testTakeRecord() {
|
||||
return Stream.of(
|
||||
Arguments.of(true, true),
|
||||
Arguments.of(true, false),
|
||||
Arguments.of(false, true),
|
||||
Arguments.of(false, false)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -34,7 +34,6 @@ class PushNotificationManagerTest {
|
|||
private APNSender apnSender;
|
||||
private FcmSender fcmSender;
|
||||
private ApnPushNotificationScheduler apnPushNotificationScheduler;
|
||||
private PushLatencyManager pushLatencyManager;
|
||||
|
||||
private PushNotificationManager pushNotificationManager;
|
||||
|
||||
|
@ -44,12 +43,11 @@ class PushNotificationManagerTest {
|
|||
apnSender = mock(APNSender.class);
|
||||
fcmSender = mock(FcmSender.class);
|
||||
apnPushNotificationScheduler = mock(ApnPushNotificationScheduler.class);
|
||||
pushLatencyManager = mock(PushLatencyManager.class);
|
||||
|
||||
AccountsHelper.setupMockUpdate(accountsManager);
|
||||
|
||||
pushNotificationManager = new PushNotificationManager(accountsManager, apnSender, fcmSender,
|
||||
apnPushNotificationScheduler, pushLatencyManager);
|
||||
pushNotificationManager =
|
||||
new PushNotificationManager(accountsManager, apnSender, fcmSender, apnPushNotificationScheduler);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -319,7 +317,6 @@ class PushNotificationManagerTest {
|
|||
|
||||
pushNotificationManager.handleMessagesRetrieved(account, device, userAgent);
|
||||
|
||||
verify(pushLatencyManager).recordQueueRead(accountIdentifier, Device.PRIMARY_ID, userAgent);
|
||||
verify(apnPushNotificationScheduler).cancelScheduledNotifications(account, device);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue