Update the push latency manager to use UUIDs and a Redis cluster.
This commit is contained in:
parent
901ba6e87f
commit
f3b644ceb8
|
@ -61,6 +61,10 @@ messageStore: # Postgresql database configuration for message store
|
|||
password:
|
||||
url:
|
||||
|
||||
metricsCluster:
|
||||
urls:
|
||||
- redis://redis.example.com:6379/
|
||||
|
||||
awsAttachments: # AWS S3 configuration
|
||||
accessKey:
|
||||
accessSecret:
|
||||
|
|
|
@ -99,7 +99,7 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
@NotNull
|
||||
@Valid
|
||||
@JsonProperty
|
||||
private RedisConfiguration metricsCache;
|
||||
private RedisClusterConfiguration metricsCluster;
|
||||
|
||||
@NotNull
|
||||
@Valid
|
||||
|
@ -253,8 +253,8 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
return pubsub;
|
||||
}
|
||||
|
||||
public RedisConfiguration getMetricsCacheConfiguration() {
|
||||
return metricsCache;
|
||||
public RedisClusterConfiguration getMetricsClusterConfiguration() {
|
||||
return metricsCluster;
|
||||
}
|
||||
|
||||
public DirectoryConfiguration getDirectoryConfiguration() {
|
||||
|
|
|
@ -312,19 +312,18 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
RedisClientFactory directoryClientFactory = new RedisClientFactory("directory_cache", config.getDirectoryConfiguration().getRedisConfiguration().getUrl(), config.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), config.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
|
||||
RedisClientFactory messagesClientFactory = new RedisClientFactory("message_cache", config.getMessageCacheConfiguration().getRedisConfiguration().getUrl(), config.getMessageCacheConfiguration().getRedisConfiguration().getReplicaUrls(), config.getMessageCacheConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
|
||||
RedisClientFactory pushSchedulerClientFactory = new RedisClientFactory("push_scheduler_cache", config.getPushScheduler().getUrl(), config.getPushScheduler().getReplicaUrls(), config.getPushScheduler().getCircuitBreakerConfiguration());
|
||||
RedisClientFactory metricsCacheClientFactory = new RedisClientFactory("metrics_cache", config.getMetricsCacheConfiguration().getUrl(), config.getMetricsCacheConfiguration().getReplicaUrls(), config.getMetricsCacheConfiguration().getCircuitBreakerConfiguration());
|
||||
|
||||
ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool();
|
||||
ReplicatedJedisPool directoryClient = directoryClientFactory.getRedisClientPool();
|
||||
ReplicatedJedisPool messagesClient = messagesClientFactory.getRedisClientPool();
|
||||
ReplicatedJedisPool pushSchedulerClient = pushSchedulerClientFactory.getRedisClientPool();
|
||||
ReplicatedJedisPool metricsCacheClient = metricsCacheClientFactory.getRedisClientPool();
|
||||
|
||||
RedisClusterClient cacheClusterClient = RedisClusterClient.create(config.getCacheClusterConfiguration().getUrls().stream().map(RedisURI::create).collect(Collectors.toList()));
|
||||
cacheClusterClient.setDefaultTimeout(config.getCacheClusterConfiguration().getTimeout());
|
||||
|
||||
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration().getUrls(), config.getCacheClusterConfiguration().getTimeout(), config.getCacheClusterConfiguration().getCircuitBreakerConfiguration());
|
||||
FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration().getUrls(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getTimeout(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getCircuitBreakerConfiguration());
|
||||
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration().getUrls(), config.getMetricsClusterConfiguration().getTimeout(), config.getMetricsClusterConfiguration().getCircuitBreakerConfiguration());
|
||||
|
||||
DirectoryManager directory = new DirectoryManager(directoryClient);
|
||||
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
|
||||
|
@ -335,7 +334,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
||||
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster);
|
||||
MessagesCache messagesCache = new MessagesCache(messagesClient, messages, accountsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes(), clusterMessagesCache);
|
||||
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCacheClient);
|
||||
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
||||
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, pushLatencyManager);
|
||||
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
||||
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
||||
|
|
|
@ -2,102 +2,68 @@ package org.whispersystems.textsecuregcm.metrics;
|
|||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.lettuce.core.SetArgs;
|
||||
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.Response;
|
||||
import redis.clients.jedis.Transaction;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Measures and records the latency between sending a push notification to a device and that device draining its queue
|
||||
* of messages.
|
||||
* <p/>
|
||||
* When the server sends a push notification to a device, the push latency manager creates a Redis key/value pair
|
||||
* mapping the current timestamp to the given device if such a mapping doesn't already exist. When a client connects and
|
||||
* clears its message queue, the push latency manager gets and clears the time of the initial push notification to that
|
||||
* device and records the time elapsed since the push notification timestamp as a latency observation.
|
||||
*/
|
||||
public class PushLatencyManager {
|
||||
private static final Logger logger = LoggerFactory.getLogger(PushLatencyManager.class);
|
||||
private static final String TIMER_NAME = MetricRegistry.name(PushLatencyManager.class, "latency");
|
||||
private static final int TTL = (int)Duration.ofDays(3).toSeconds();
|
||||
private static final String TIMER_NAME = MetricRegistry.name(PushLatencyManager.class, "latency");
|
||||
private static final int TTL = (int)Duration.ofDays(1).toSeconds();
|
||||
|
||||
@VisibleForTesting
|
||||
static final int QUEUE_SIZE = 1_000;
|
||||
private final FaultTolerantRedisCluster redisCluster;
|
||||
|
||||
private final ReplicatedJedisPool jedisPool;
|
||||
private final ThreadPoolExecutor threadPoolExecutor;
|
||||
public PushLatencyManager(final FaultTolerantRedisCluster redisCluster) {
|
||||
this.redisCluster = redisCluster;
|
||||
}
|
||||
|
||||
public PushLatencyManager(final ReplicatedJedisPool jedisPool) {
|
||||
this(jedisPool, new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(QUEUE_SIZE), new ThreadPoolExecutor.DiscardPolicy()));
|
||||
public void recordPushSent(final UUID accountUuid, final long deviceId) {
|
||||
recordPushSent(accountUuid, deviceId, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
PushLatencyManager(final ReplicatedJedisPool jedisPool, final ThreadPoolExecutor threadPoolExecutor) {
|
||||
this.jedisPool = jedisPool;
|
||||
this.threadPoolExecutor = threadPoolExecutor;
|
||||
|
||||
Metrics.gaugeCollectionSize(MetricRegistry.name(getClass(), "queueDepth"), Collections.emptyList(), threadPoolExecutor.getQueue());
|
||||
void recordPushSent(final UUID accountUuid, final long deviceId, final long currentTime) {
|
||||
redisCluster.useWriteCluster(connection ->
|
||||
connection.async().set(getFirstUnacknowledgedPushKey(accountUuid, deviceId), String.valueOf(currentTime), SetArgs.Builder.nx().ex(TTL)));
|
||||
}
|
||||
|
||||
public void recordPushSent(final String accountNumber, final long deviceId) {
|
||||
recordPushSent(accountNumber, deviceId, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void recordPushSent(final String accountNumber, final long deviceId, final long currentTime) {
|
||||
final String key = getFirstUnacknowledgedPushKey(accountNumber, deviceId);
|
||||
|
||||
threadPoolExecutor.execute(() -> {
|
||||
try (final Jedis jedis = jedisPool.getWriteResource()) {
|
||||
final Transaction transaction = jedis.multi();
|
||||
transaction.setnx(key, String.valueOf(currentTime));
|
||||
transaction.expire(key, TTL);
|
||||
transaction.exec();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void recordQueueRead(final String accountNumber, final long deviceId, final String userAgent) {
|
||||
threadPoolExecutor.execute(() -> {
|
||||
final Optional<Long> maybeLatency = getLatencyAndClearTimestamp(accountNumber, deviceId, System.currentTimeMillis());
|
||||
|
||||
if (maybeLatency.isPresent()) {
|
||||
Metrics.timer(TIMER_NAME, UserAgentTagUtil.getUserAgentTags(userAgent)).record(maybeLatency.get(), TimeUnit.MILLISECONDS);
|
||||
public void recordQueueRead(final UUID accountUuid, final long deviceId, final String userAgent) {
|
||||
getLatencyAndClearTimestamp(accountUuid, deviceId, System.currentTimeMillis()).thenAccept(latency -> {
|
||||
if (latency != null) {
|
||||
Metrics.timer(TIMER_NAME, UserAgentTagUtil.getUserAgentTags(userAgent)).record(latency, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Optional<Long> getLatencyAndClearTimestamp(final String accountNumber, final long deviceId, final long currentTimeMillis) {
|
||||
final String key = getFirstUnacknowledgedPushKey(accountNumber, deviceId);
|
||||
CompletableFuture<Long> getLatencyAndClearTimestamp(final UUID accountUuid, final long deviceId, final long currentTimeMillis) {
|
||||
final String key = getFirstUnacknowledgedPushKey(accountUuid, deviceId);
|
||||
|
||||
try (final Jedis jedis = jedisPool.getWriteResource()) {
|
||||
final Transaction transaction = jedis.multi();
|
||||
transaction.get(key);
|
||||
transaction.del(key);
|
||||
return redisCluster.withWriteCluster(connection -> {
|
||||
final RedisAdvancedClusterAsyncCommands<String, String> commands = connection.async();
|
||||
|
||||
final List<Response<?>> responses = transaction.execGetResponse();
|
||||
final String timestampString = (String)responses.get(0).get();
|
||||
final CompletableFuture<String> getFuture = commands.get(key).toCompletableFuture();
|
||||
commands.del(key);
|
||||
|
||||
if (timestampString != null) {
|
||||
try {
|
||||
return Optional.of(currentTimeMillis - Long.parseLong(timestampString, 10));
|
||||
} catch (final NumberFormatException e) {
|
||||
logger.warn("Failed to parse timestamp: {}", timestampString);
|
||||
}
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
return getFuture.thenApply(timestampString -> timestampString != null ? currentTimeMillis - Long.parseLong(timestampString, 10) : null);
|
||||
});
|
||||
}
|
||||
|
||||
private static String getFirstUnacknowledgedPushKey(final String accountNumber, final long deviceId) {
|
||||
return "push_latency::" + accountNumber + "::" + deviceId;
|
||||
private static String getFirstUnacknowledgedPushKey(final UUID accountUuid, final long deviceId) {
|
||||
return "push_latency::" + accountUuid.toString() + "::" + deviceId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,7 +114,7 @@ public class PushSender implements Managed {
|
|||
|
||||
gcmSender.sendMessage(gcmMessage);
|
||||
|
||||
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getNumber(), device.getId()));
|
||||
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId()));
|
||||
}
|
||||
|
||||
private void sendApnMessage(Account account, Device device, Envelope outgoingMessage, boolean online) {
|
||||
|
@ -141,7 +141,7 @@ public class PushSender implements Managed {
|
|||
|
||||
apnSender.sendMessage(apnMessage);
|
||||
|
||||
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getNumber(), device.getId()));
|
||||
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId()));
|
||||
}
|
||||
|
||||
private void sendWebSocketMessage(Account account, Device device, Envelope outgoingMessage, boolean online)
|
||||
|
|
|
@ -29,11 +29,8 @@ public class MessagesManager {
|
|||
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" ));
|
||||
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MessagesManager.class);
|
||||
|
||||
private final Messages messages;
|
||||
private final MessagesCache messagesCache;
|
||||
|
||||
private final Messages messages;
|
||||
private final MessagesCache messagesCache;
|
||||
private final PushLatencyManager pushLatencyManager;
|
||||
|
||||
public MessagesManager(Messages messages, MessagesCache messagesCache, PushLatencyManager pushLatencyManager) {
|
||||
|
@ -48,7 +45,7 @@ public class MessagesManager {
|
|||
}
|
||||
|
||||
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
||||
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destination, destinationDevice, userAgent));
|
||||
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destinationUuid, destinationDevice, userAgent));
|
||||
|
||||
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
||||
|
||||
|
|
|
@ -4,18 +4,22 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
|
||||
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.embedded.RedisServer;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
|
@ -24,95 +28,24 @@ import static org.mockito.Mockito.times;
|
|||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class PushLatencyManagerTest {
|
||||
|
||||
private PushLatencyManager pushLatencyManager;
|
||||
|
||||
private RedisServer redisServer;
|
||||
private Jedis jedis;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
redisServer = new RedisServer(6379);
|
||||
jedis = new Jedis("localhost", 6379);
|
||||
|
||||
final ReplicatedJedisPool jedisPool = mock(ReplicatedJedisPool.class);
|
||||
final ThreadPoolExecutor threadPoolExecutor = mock(ThreadPoolExecutor.class);
|
||||
|
||||
pushLatencyManager = new PushLatencyManager(jedisPool, threadPoolExecutor);
|
||||
|
||||
redisServer.start();
|
||||
|
||||
when(jedisPool.getWriteResource()).thenReturn(jedis);
|
||||
|
||||
doAnswer(invocation -> {
|
||||
invocation.getArgument(0, Runnable.class).run();
|
||||
return null;
|
||||
}).when(threadPoolExecutor).execute(any(Runnable.class));
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
redisServer.stop();
|
||||
}
|
||||
public class PushLatencyManagerTest extends AbstractRedisClusterTest {
|
||||
|
||||
@Test
|
||||
public void testGetLatency() {
|
||||
final String accountNumber = "mock-account";
|
||||
final long deviceId = 1;
|
||||
public void testGetLatency() throws ExecutionException, InterruptedException {
|
||||
final PushLatencyManager pushLatencyManager = new PushLatencyManager(getRedisCluster());
|
||||
final UUID accountUuid = UUID.randomUUID();
|
||||
final long deviceId = 1;
|
||||
final long expectedLatency = 1234;
|
||||
final long pushSentTimestamp = System.currentTimeMillis();
|
||||
final long clearQueueTimestamp = pushSentTimestamp + expectedLatency;
|
||||
|
||||
final long expectedLatency = 1234;
|
||||
final long pushSentTimestamp = System.currentTimeMillis();
|
||||
final long clearQueueTimestamp = pushSentTimestamp + expectedLatency;
|
||||
assertNull(pushLatencyManager.getLatencyAndClearTimestamp(accountUuid, deviceId, System.currentTimeMillis()).get());
|
||||
|
||||
{
|
||||
assertFalse(pushLatencyManager.getLatencyAndClearTimestamp(accountNumber, deviceId, System.currentTimeMillis()).isPresent());
|
||||
}
|
||||
pushLatencyManager.recordPushSent(accountUuid, deviceId, pushSentTimestamp);
|
||||
|
||||
{
|
||||
pushLatencyManager.recordPushSent(accountNumber, deviceId, pushSentTimestamp);
|
||||
|
||||
assertEquals(Optional.of(expectedLatency),
|
||||
pushLatencyManager.getLatencyAndClearTimestamp(accountNumber, deviceId, clearQueueTimestamp));
|
||||
|
||||
assertFalse(pushLatencyManager.getLatencyAndClearTimestamp(accountNumber, deviceId, System.currentTimeMillis()).isPresent());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThreadPoolDoesNotBlock() throws InterruptedException {
|
||||
final AtomicBoolean canReturnJedisInstance = new AtomicBoolean(false);
|
||||
final ReplicatedJedisPool blockingJedisPool = mock(ReplicatedJedisPool.class);
|
||||
final PushLatencyManager blockingPushLatencyManager = new PushLatencyManager(blockingJedisPool);
|
||||
|
||||
// One unqueued execution (single-thread pool) plus a full queue
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(PushLatencyManager.QUEUE_SIZE + 1);
|
||||
|
||||
when(blockingJedisPool.getWriteResource()).thenAnswer((Answer<Jedis>)invocationOnMock -> {
|
||||
synchronized (canReturnJedisInstance) {
|
||||
while (!canReturnJedisInstance.get()) {
|
||||
canReturnJedisInstance.wait();
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
return jedis;
|
||||
} finally {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
for (int i = 0; i < PushLatencyManager.QUEUE_SIZE * 2; i++) {
|
||||
blockingPushLatencyManager.recordPushSent("account-" + i, 1);
|
||||
}
|
||||
|
||||
synchronized (canReturnJedisInstance) {
|
||||
canReturnJedisInstance.set(true);
|
||||
canReturnJedisInstance.notifyAll();
|
||||
}
|
||||
|
||||
if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
|
||||
fail("Timed out waiting for countdown latch");
|
||||
assertEquals(expectedLatency, (long)pushLatencyManager.getLatencyAndClearTimestamp(accountUuid, deviceId, clearQueueTimestamp).get());
|
||||
assertNull(pushLatencyManager.getLatencyAndClearTimestamp(accountUuid, deviceId, System.currentTimeMillis()).get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,15 +0,0 @@
|
|||
package org.whispersystems.textsecuregcm.tests.util;
|
||||
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
|
||||
public class MatcherHelper {
|
||||
public static Account accountWithNumber(final String accountNumber) {
|
||||
return ArgumentMatchers.argThat(account -> accountNumber.equals(account.getNumber()));
|
||||
}
|
||||
|
||||
public static Device deviceWithId(final long deviceId) {
|
||||
return ArgumentMatchers.argThat(device -> device.getId() == deviceId);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue