diff --git a/service/config/sample.yml b/service/config/sample.yml
index dc86f1da0..00b6e95bd 100644
--- a/service/config/sample.yml
+++ b/service/config/sample.yml
@@ -61,6 +61,10 @@ messageStore: # Postgresql database configuration for message store
password:
url:
+metricsCluster:
+ urls:
+ - redis://redis.example.com:6379/
+
awsAttachments: # AWS S3 configuration
accessKey:
accessSecret:
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
index 9cd217526..a3141391c 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
@@ -99,7 +99,7 @@ public class WhisperServerConfiguration extends Configuration {
@NotNull
@Valid
@JsonProperty
- private RedisConfiguration metricsCache;
+ private RedisClusterConfiguration metricsCluster;
@NotNull
@Valid
@@ -253,8 +253,8 @@ public class WhisperServerConfiguration extends Configuration {
return pubsub;
}
- public RedisConfiguration getMetricsCacheConfiguration() {
- return metricsCache;
+ public RedisClusterConfiguration getMetricsClusterConfiguration() {
+ return metricsCluster;
}
public DirectoryConfiguration getDirectoryConfiguration() {
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
index 4ac9347a8..a51c75e9f 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
@@ -312,19 +312,18 @@ public class WhisperServerService extends Application
+ * When the server sends a push notification to a device, the push latency manager creates a Redis key/value pair
+ * mapping the current timestamp to the given device if such a mapping doesn't already exist. When a client connects and
+ * clears its message queue, the push latency manager gets and clears the time of the initial push notification to that
+ * device and records the time elapsed since the push notification timestamp as a latency observation.
*/
public class PushLatencyManager {
- private static final Logger logger = LoggerFactory.getLogger(PushLatencyManager.class);
- private static final String TIMER_NAME = MetricRegistry.name(PushLatencyManager.class, "latency");
- private static final int TTL = (int)Duration.ofDays(3).toSeconds();
+ private static final String TIMER_NAME = MetricRegistry.name(PushLatencyManager.class, "latency");
+ private static final int TTL = (int)Duration.ofDays(1).toSeconds();
- @VisibleForTesting
- static final int QUEUE_SIZE = 1_000;
+ private final FaultTolerantRedisCluster redisCluster;
- private final ReplicatedJedisPool jedisPool;
- private final ThreadPoolExecutor threadPoolExecutor;
+ public PushLatencyManager(final FaultTolerantRedisCluster redisCluster) {
+ this.redisCluster = redisCluster;
+ }
- public PushLatencyManager(final ReplicatedJedisPool jedisPool) {
- this(jedisPool, new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(QUEUE_SIZE), new ThreadPoolExecutor.DiscardPolicy()));
+ public void recordPushSent(final UUID accountUuid, final long deviceId) {
+ recordPushSent(accountUuid, deviceId, System.currentTimeMillis());
}
@VisibleForTesting
- PushLatencyManager(final ReplicatedJedisPool jedisPool, final ThreadPoolExecutor threadPoolExecutor) {
- this.jedisPool = jedisPool;
- this.threadPoolExecutor = threadPoolExecutor;
-
- Metrics.gaugeCollectionSize(MetricRegistry.name(getClass(), "queueDepth"), Collections.emptyList(), threadPoolExecutor.getQueue());
+ void recordPushSent(final UUID accountUuid, final long deviceId, final long currentTime) {
+ redisCluster.useWriteCluster(connection ->
+ connection.async().set(getFirstUnacknowledgedPushKey(accountUuid, deviceId), String.valueOf(currentTime), SetArgs.Builder.nx().ex(TTL)));
}
- public void recordPushSent(final String accountNumber, final long deviceId) {
- recordPushSent(accountNumber, deviceId, System.currentTimeMillis());
- }
-
- @VisibleForTesting
- void recordPushSent(final String accountNumber, final long deviceId, final long currentTime) {
- final String key = getFirstUnacknowledgedPushKey(accountNumber, deviceId);
-
- threadPoolExecutor.execute(() -> {
- try (final Jedis jedis = jedisPool.getWriteResource()) {
- final Transaction transaction = jedis.multi();
- transaction.setnx(key, String.valueOf(currentTime));
- transaction.expire(key, TTL);
- transaction.exec();
- }
- });
- }
-
- public void recordQueueRead(final String accountNumber, final long deviceId, final String userAgent) {
- threadPoolExecutor.execute(() -> {
- final Optional maybeLatency = getLatencyAndClearTimestamp(accountNumber, deviceId, System.currentTimeMillis());
-
- if (maybeLatency.isPresent()) {
- Metrics.timer(TIMER_NAME, UserAgentTagUtil.getUserAgentTags(userAgent)).record(maybeLatency.get(), TimeUnit.MILLISECONDS);
+ public void recordQueueRead(final UUID accountUuid, final long deviceId, final String userAgent) {
+ getLatencyAndClearTimestamp(accountUuid, deviceId, System.currentTimeMillis()).thenAccept(latency -> {
+ if (latency != null) {
+ Metrics.timer(TIMER_NAME, UserAgentTagUtil.getUserAgentTags(userAgent)).record(latency, TimeUnit.MILLISECONDS);
}
});
}
@VisibleForTesting
- Optional getLatencyAndClearTimestamp(final String accountNumber, final long deviceId, final long currentTimeMillis) {
- final String key = getFirstUnacknowledgedPushKey(accountNumber, deviceId);
+ CompletableFuture getLatencyAndClearTimestamp(final UUID accountUuid, final long deviceId, final long currentTimeMillis) {
+ final String key = getFirstUnacknowledgedPushKey(accountUuid, deviceId);
- try (final Jedis jedis = jedisPool.getWriteResource()) {
- final Transaction transaction = jedis.multi();
- transaction.get(key);
- transaction.del(key);
+ return redisCluster.withWriteCluster(connection -> {
+ final RedisAdvancedClusterAsyncCommands commands = connection.async();
- final List> responses = transaction.execGetResponse();
- final String timestampString = (String)responses.get(0).get();
+ final CompletableFuture getFuture = commands.get(key).toCompletableFuture();
+ commands.del(key);
- if (timestampString != null) {
- try {
- return Optional.of(currentTimeMillis - Long.parseLong(timestampString, 10));
- } catch (final NumberFormatException e) {
- logger.warn("Failed to parse timestamp: {}", timestampString);
- }
- }
-
- return Optional.empty();
- }
+ return getFuture.thenApply(timestampString -> timestampString != null ? currentTimeMillis - Long.parseLong(timestampString, 10) : null);
+ });
}
- private static String getFirstUnacknowledgedPushKey(final String accountNumber, final long deviceId) {
- return "push_latency::" + accountNumber + "::" + deviceId;
+ private static String getFirstUnacknowledgedPushKey(final UUID accountUuid, final long deviceId) {
+ return "push_latency::" + accountUuid.toString() + "::" + deviceId;
}
}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java
index ab7f7c2aa..c8652291b 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java
@@ -114,7 +114,7 @@ public class PushSender implements Managed {
gcmSender.sendMessage(gcmMessage);
- RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getNumber(), device.getId()));
+ RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId()));
}
private void sendApnMessage(Account account, Device device, Envelope outgoingMessage, boolean online) {
@@ -141,7 +141,7 @@ public class PushSender implements Managed {
apnSender.sendMessage(apnMessage);
- RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getNumber(), device.getId()));
+ RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId()));
}
private void sendWebSocketMessage(Account account, Device device, Envelope outgoingMessage, boolean online)
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java
index 3bc81eb29..493c8d5ea 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java
@@ -29,11 +29,8 @@ public class MessagesManager {
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" ));
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
- private static final Logger logger = LoggerFactory.getLogger(MessagesManager.class);
-
- private final Messages messages;
- private final MessagesCache messagesCache;
-
+ private final Messages messages;
+ private final MessagesCache messagesCache;
private final PushLatencyManager pushLatencyManager;
public MessagesManager(Messages messages, MessagesCache messagesCache, PushLatencyManager pushLatencyManager) {
@@ -48,7 +45,7 @@ public class MessagesManager {
}
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
- RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destination, destinationDevice, userAgent));
+ RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destinationUuid, destinationDevice, userAgent));
List messages = this.messages.load(destination, destinationDevice);
diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManagerTest.java
index d752d42a0..4a9214a05 100644
--- a/service/src/test/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManagerTest.java
+++ b/service/src/test/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManagerTest.java
@@ -4,18 +4,22 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.stubbing.Answer;
+import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import redis.clients.jedis.Jedis;
import redis.embedded.RedisServer;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -24,95 +28,24 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-public class PushLatencyManagerTest {
-
- private PushLatencyManager pushLatencyManager;
-
- private RedisServer redisServer;
- private Jedis jedis;
-
- @Before
- public void setUp() {
- redisServer = new RedisServer(6379);
- jedis = new Jedis("localhost", 6379);
-
- final ReplicatedJedisPool jedisPool = mock(ReplicatedJedisPool.class);
- final ThreadPoolExecutor threadPoolExecutor = mock(ThreadPoolExecutor.class);
-
- pushLatencyManager = new PushLatencyManager(jedisPool, threadPoolExecutor);
-
- redisServer.start();
-
- when(jedisPool.getWriteResource()).thenReturn(jedis);
-
- doAnswer(invocation -> {
- invocation.getArgument(0, Runnable.class).run();
- return null;
- }).when(threadPoolExecutor).execute(any(Runnable.class));
- }
-
- @After
- public void tearDown() {
- redisServer.stop();
- }
+public class PushLatencyManagerTest extends AbstractRedisClusterTest {
@Test
- public void testGetLatency() {
- final String accountNumber = "mock-account";
- final long deviceId = 1;
+ public void testGetLatency() throws ExecutionException, InterruptedException {
+ final PushLatencyManager pushLatencyManager = new PushLatencyManager(getRedisCluster());
+ final UUID accountUuid = UUID.randomUUID();
+ final long deviceId = 1;
+ final long expectedLatency = 1234;
+ final long pushSentTimestamp = System.currentTimeMillis();
+ final long clearQueueTimestamp = pushSentTimestamp + expectedLatency;
- final long expectedLatency = 1234;
- final long pushSentTimestamp = System.currentTimeMillis();
- final long clearQueueTimestamp = pushSentTimestamp + expectedLatency;
+ assertNull(pushLatencyManager.getLatencyAndClearTimestamp(accountUuid, deviceId, System.currentTimeMillis()).get());
{
- assertFalse(pushLatencyManager.getLatencyAndClearTimestamp(accountNumber, deviceId, System.currentTimeMillis()).isPresent());
- }
+ pushLatencyManager.recordPushSent(accountUuid, deviceId, pushSentTimestamp);
- {
- pushLatencyManager.recordPushSent(accountNumber, deviceId, pushSentTimestamp);
-
- assertEquals(Optional.of(expectedLatency),
- pushLatencyManager.getLatencyAndClearTimestamp(accountNumber, deviceId, clearQueueTimestamp));
-
- assertFalse(pushLatencyManager.getLatencyAndClearTimestamp(accountNumber, deviceId, System.currentTimeMillis()).isPresent());
- }
- }
-
- @Test
- public void testThreadPoolDoesNotBlock() throws InterruptedException {
- final AtomicBoolean canReturnJedisInstance = new AtomicBoolean(false);
- final ReplicatedJedisPool blockingJedisPool = mock(ReplicatedJedisPool.class);
- final PushLatencyManager blockingPushLatencyManager = new PushLatencyManager(blockingJedisPool);
-
- // One unqueued execution (single-thread pool) plus a full queue
- final CountDownLatch countDownLatch = new CountDownLatch(PushLatencyManager.QUEUE_SIZE + 1);
-
- when(blockingJedisPool.getWriteResource()).thenAnswer((Answer)invocationOnMock -> {
- synchronized (canReturnJedisInstance) {
- while (!canReturnJedisInstance.get()) {
- canReturnJedisInstance.wait();
- }
- }
-
- try {
- return jedis;
- } finally {
- countDownLatch.countDown();
- }
- });
-
- for (int i = 0; i < PushLatencyManager.QUEUE_SIZE * 2; i++) {
- blockingPushLatencyManager.recordPushSent("account-" + i, 1);
- }
-
- synchronized (canReturnJedisInstance) {
- canReturnJedisInstance.set(true);
- canReturnJedisInstance.notifyAll();
- }
-
- if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
- fail("Timed out waiting for countdown latch");
+ assertEquals(expectedLatency, (long)pushLatencyManager.getLatencyAndClearTimestamp(accountUuid, deviceId, clearQueueTimestamp).get());
+ assertNull(pushLatencyManager.getLatencyAndClearTimestamp(accountUuid, deviceId, System.currentTimeMillis()).get());
}
}
}
diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/MatcherHelper.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/MatcherHelper.java
deleted file mode 100644
index 83f6f033a..000000000
--- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/MatcherHelper.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.whispersystems.textsecuregcm.tests.util;
-
-import org.mockito.ArgumentMatchers;
-import org.whispersystems.textsecuregcm.storage.Account;
-import org.whispersystems.textsecuregcm.storage.Device;
-
-public class MatcherHelper {
- public static Account accountWithNumber(final String accountNumber) {
- return ArgumentMatchers.argThat(account -> accountNumber.equals(account.getNumber()));
- }
-
- public static Device deviceWithId(final long deviceId) {
- return ArgumentMatchers.argThat(device -> device.getId() == deviceId);
- }
-}