diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index f71a34444..9cd217526 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -96,6 +96,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private RedisConfiguration pubsub; + @NotNull + @Valid + @JsonProperty + private RedisConfiguration metricsCache; + @NotNull @Valid @JsonProperty @@ -248,6 +253,10 @@ public class WhisperServerConfiguration extends Configuration { return pubsub; } + public RedisConfiguration getMetricsCacheConfiguration() { + return metricsCache; + } + public DirectoryConfiguration getDirectoryConfiguration() { return directory; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 5b78237b3..4ac9347a8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -97,6 +97,7 @@ import org.whispersystems.textsecuregcm.metrics.FreeMemoryGauge; import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener; import org.whispersystems.textsecuregcm.metrics.NetworkReceivedGauge; import org.whispersystems.textsecuregcm.metrics.NetworkSentGauge; +import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.metrics.TrafficSource; import org.whispersystems.textsecuregcm.providers.RedisClientFactory; import org.whispersystems.textsecuregcm.providers.RedisClusterHealthCheck; @@ -311,11 +312,13 @@ public class WhisperServerService extends Application(QUEUE_SIZE), new ThreadPoolExecutor.DiscardPolicy())); + } + + @VisibleForTesting + PushLatencyManager(final ReplicatedJedisPool jedisPool, final ThreadPoolExecutor threadPoolExecutor) { + this.jedisPool = jedisPool; + this.threadPoolExecutor = threadPoolExecutor; + + Metrics.gaugeCollectionSize(MetricRegistry.name(getClass(), "queueDepth"), Collections.emptyList(), threadPoolExecutor.getQueue()); + } + + public void recordPushSent(final String accountNumber, final long deviceId) { + recordPushSent(accountNumber, deviceId, System.currentTimeMillis()); + } + + @VisibleForTesting + void recordPushSent(final String accountNumber, final long deviceId, final long currentTime) { + final String key = getFirstUnacknowledgedPushKey(accountNumber, deviceId); + + threadPoolExecutor.execute(() -> { + try (final Jedis jedis = jedisPool.getWriteResource()) { + final Transaction transaction = jedis.multi(); + transaction.setnx(key, String.valueOf(currentTime)); + transaction.expire(key, TTL); + transaction.exec(); + } + }); + } + + public void recordQueueRead(final String accountNumber, final long deviceId, final String userAgent) { + threadPoolExecutor.execute(() -> { + final Optional maybeLatency = getLatencyAndClearTimestamp(accountNumber, deviceId, System.currentTimeMillis()); + + if (maybeLatency.isPresent()) { + Metrics.timer(TIMER_NAME, UserAgentTagUtil.getUserAgentTags(userAgent)).record(maybeLatency.get(), TimeUnit.MILLISECONDS); + } + }); + } + + @VisibleForTesting + Optional getLatencyAndClearTimestamp(final String accountNumber, final long deviceId, final long currentTimeMillis) { + final String key = getFirstUnacknowledgedPushKey(accountNumber, deviceId); + + try (final Jedis jedis = jedisPool.getWriteResource()) { + final Transaction transaction = jedis.multi(); + transaction.get(key); + transaction.del(key); + + final List> responses = transaction.execGetResponse(); + final String timestampString = (String)responses.get(0).get(); + + if (timestampString != null) { + try { + return Optional.of(currentTimeMillis - Long.parseLong(timestampString, 10)); + } catch (final NumberFormatException e) { + logger.warn("Failed to parse timestamp: {}", timestampString); + } + } + + return Optional.empty(); + } + } + + private static String getFirstUnacknowledgedPushKey(final String accountNumber, final long deviceId) { + return "push_latency::" + accountNumber + "::" + deviceId; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java index cabbf7813..ab7f7c2aa 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java @@ -20,6 +20,7 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.SharedMetricRegistries; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.push.WebsocketSender.DeliveryStatus; import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.storage.Account; @@ -46,10 +47,12 @@ public class PushSender implements Managed { private final WebsocketSender webSocketSender; private final BlockingThreadPoolExecutor executor; private final int queueSize; + private final PushLatencyManager pushLatencyManager; public PushSender(ApnFallbackManager apnFallbackManager, GCMSender gcmSender, APNSender apnSender, - WebsocketSender websocketSender, int queueSize) + WebsocketSender websocketSender, int queueSize, + PushLatencyManager pushLatencyManager) { this.apnFallbackManager = apnFallbackManager; this.gcmSender = gcmSender; @@ -57,6 +60,7 @@ public class PushSender implements Managed { this.webSocketSender = websocketSender; this.queueSize = queueSize; this.executor = new BlockingThreadPoolExecutor("pushSender", 50, queueSize); + this.pushLatencyManager = pushLatencyManager; SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME) .register(name(PushSender.class, "send_queue_depth"), @@ -109,6 +113,8 @@ public class PushSender implements Managed { (int)device.getId(), GcmMessage.Type.NOTIFICATION, Optional.empty()); gcmSender.sendMessage(gcmMessage); + + RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getNumber(), device.getId())); } private void sendApnMessage(Account account, Device device, Envelope outgoingMessage, boolean online) { @@ -134,6 +140,8 @@ public class PushSender implements Managed { } apnSender.sendMessage(apnMessage); + + RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getNumber(), device.getId())); } private void sendWebSocketMessage(Account account, Device device, Envelope outgoingMessage, boolean online) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 4a10fe88c..3bc81eb29 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -4,9 +4,13 @@ package org.whispersystems.textsecuregcm.storage; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList; +import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; +import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.util.Constants; import java.util.List; @@ -25,13 +29,17 @@ public class MessagesManager { private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" )); private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid")); + private static final Logger logger = LoggerFactory.getLogger(MessagesManager.class); private final Messages messages; private final MessagesCache messagesCache; - public MessagesManager(Messages messages, MessagesCache messagesCache) { - this.messages = messages; - this.messagesCache = messagesCache; + private final PushLatencyManager pushLatencyManager; + + public MessagesManager(Messages messages, MessagesCache messagesCache, PushLatencyManager pushLatencyManager) { + this.messages = messages; + this.messagesCache = messagesCache; + this.pushLatencyManager = pushLatencyManager; } public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) { @@ -39,7 +47,9 @@ public class MessagesManager { messagesCache.insert(guid, destination, destinationUuid, destinationDevice, message); } - public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice) { + public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) { + RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destination, destinationDevice, userAgent)); + List messages = this.messages.load(destination, destinationDevice); if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index fdeee7a5a..97a5345ff 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -172,7 +172,7 @@ public class WebSocketConnection implements DispatchChannel { } private void processStoredMessages() { - OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId()); + OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent()); Iterator iterator = messages.getMessages().iterator(); while (iterator.hasNext()) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManagerTest.java new file mode 100644 index 000000000..d752d42a0 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManagerTest.java @@ -0,0 +1,118 @@ +package org.whispersystems.textsecuregcm.metrics; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; +import redis.clients.jedis.Jedis; +import redis.embedded.RedisServer; + +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class PushLatencyManagerTest { + + private PushLatencyManager pushLatencyManager; + + private RedisServer redisServer; + private Jedis jedis; + + @Before + public void setUp() { + redisServer = new RedisServer(6379); + jedis = new Jedis("localhost", 6379); + + final ReplicatedJedisPool jedisPool = mock(ReplicatedJedisPool.class); + final ThreadPoolExecutor threadPoolExecutor = mock(ThreadPoolExecutor.class); + + pushLatencyManager = new PushLatencyManager(jedisPool, threadPoolExecutor); + + redisServer.start(); + + when(jedisPool.getWriteResource()).thenReturn(jedis); + + doAnswer(invocation -> { + invocation.getArgument(0, Runnable.class).run(); + return null; + }).when(threadPoolExecutor).execute(any(Runnable.class)); + } + + @After + public void tearDown() { + redisServer.stop(); + } + + @Test + public void testGetLatency() { + final String accountNumber = "mock-account"; + final long deviceId = 1; + + final long expectedLatency = 1234; + final long pushSentTimestamp = System.currentTimeMillis(); + final long clearQueueTimestamp = pushSentTimestamp + expectedLatency; + + { + assertFalse(pushLatencyManager.getLatencyAndClearTimestamp(accountNumber, deviceId, System.currentTimeMillis()).isPresent()); + } + + { + pushLatencyManager.recordPushSent(accountNumber, deviceId, pushSentTimestamp); + + assertEquals(Optional.of(expectedLatency), + pushLatencyManager.getLatencyAndClearTimestamp(accountNumber, deviceId, clearQueueTimestamp)); + + assertFalse(pushLatencyManager.getLatencyAndClearTimestamp(accountNumber, deviceId, System.currentTimeMillis()).isPresent()); + } + } + + @Test + public void testThreadPoolDoesNotBlock() throws InterruptedException { + final AtomicBoolean canReturnJedisInstance = new AtomicBoolean(false); + final ReplicatedJedisPool blockingJedisPool = mock(ReplicatedJedisPool.class); + final PushLatencyManager blockingPushLatencyManager = new PushLatencyManager(blockingJedisPool); + + // One unqueued execution (single-thread pool) plus a full queue + final CountDownLatch countDownLatch = new CountDownLatch(PushLatencyManager.QUEUE_SIZE + 1); + + when(blockingJedisPool.getWriteResource()).thenAnswer((Answer)invocationOnMock -> { + synchronized (canReturnJedisInstance) { + while (!canReturnJedisInstance.get()) { + canReturnJedisInstance.wait(); + } + } + + try { + return jedis; + } finally { + countDownLatch.countDown(); + } + }); + + for (int i = 0; i < PushLatencyManager.QUEUE_SIZE * 2; i++) { + blockingPushLatencyManager.recordPushSent("account-" + i, 1); + } + + synchronized (canReturnJedisInstance) { + canReturnJedisInstance.set(true); + canReturnJedisInstance.notifyAll(); + } + + if (!countDownLatch.await(5, TimeUnit.SECONDS)) { + fail("Timed out waiting for countdown latch"); + } + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java index 4a71d6b78..757c6a104 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java @@ -257,7 +257,7 @@ public class MessageControllerTest { OutgoingMessageEntityList messagesList = new OutgoingMessageEntityList(messages, false); - when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(AuthHelper.VALID_UUID), eq(1L))).thenReturn(messagesList); + when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(AuthHelper.VALID_UUID), eq(1L), anyString())).thenReturn(messagesList); OutgoingMessageEntityList response = resources.getJerseyTest().target("/v1/messages/") @@ -294,7 +294,7 @@ public class MessageControllerTest { OutgoingMessageEntityList messagesList = new OutgoingMessageEntityList(messages, false); - when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(AuthHelper.VALID_UUID), eq(1L))).thenReturn(messagesList); + when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(AuthHelper.VALID_UUID), eq(1L), anyString())).thenReturn(messagesList); Response response = resources.getJerseyTest().target("/v1/messages/") diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/MatcherHelper.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/MatcherHelper.java new file mode 100644 index 000000000..83f6f033a --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/util/MatcherHelper.java @@ -0,0 +1,15 @@ +package org.whispersystems.textsecuregcm.tests.util; + +import org.mockito.ArgumentMatchers; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Device; + +public class MatcherHelper { + public static Account accountWithNumber(final String accountNumber) { + return ArgumentMatchers.argThat(account -> accountNumber.equals(account.getNumber())); + } + + public static Device deviceWithId(final long deviceId) { + return ArgumentMatchers.argThat(device -> device.getId() == deviceId); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java index d50c45af0..1ecd26524 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java @@ -1,6 +1,7 @@ package org.whispersystems.textsecuregcm.tests.websocket; import com.google.protobuf.ByteString; +import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.junit.Test; import org.mockito.ArgumentMatchers; @@ -136,12 +137,15 @@ public class WebSocketConnectionTest { when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1)); when(accountsManager.get("sender2")).thenReturn(Optional.empty()); - when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId())) + String userAgent = "user-agent"; + + when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), userAgent)) .thenReturn(outgoingMessagesList); final List> futures = new LinkedList<>(); final WebSocketClient client = mock(WebSocketClient.class); + when(client.getUserAgent()).thenReturn(userAgent); when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.>any())) .thenAnswer(new Answer>() { @Override @@ -220,12 +224,15 @@ public class WebSocketConnectionTest { when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1)); when(accountsManager.get("sender2")).thenReturn(Optional.empty()); - when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId())) + String userAgent = "user-agent"; + + when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), userAgent)) .thenReturn(pendingMessagesList); final List> futures = new LinkedList<>(); final WebSocketClient client = mock(WebSocketClient.class); + when(client.getUserAgent()).thenReturn(userAgent); when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.>any())) .thenAnswer(new Answer>() { @Override @@ -328,12 +335,15 @@ public class WebSocketConnectionTest { when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1)); when(accountsManager.get("sender2")).thenReturn(Optional.empty()); - when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId())) + String userAgent = "user-agent"; + + when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), userAgent)) .thenReturn(pendingMessagesList); final List> futures = new LinkedList<>(); final WebSocketClient client = mock(WebSocketClient.class); + when(client.getUserAgent()).thenReturn(userAgent); when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.>any())) .thenAnswer(new Answer>() { @Override diff --git a/websocket-resources/src/main/java/org/whispersystems/websocket/WebSocketClient.java b/websocket-resources/src/main/java/org/whispersystems/websocket/WebSocketClient.java index 2de87d708..327987802 100644 --- a/websocket-resources/src/main/java/org/whispersystems/websocket/WebSocketClient.java +++ b/websocket-resources/src/main/java/org/whispersystems/websocket/WebSocketClient.java @@ -16,6 +16,7 @@ */ package org.whispersystems.websocket; +import org.apache.commons.lang3.StringUtils; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WebSocketException; @@ -86,6 +87,10 @@ public class WebSocketClient { return future; } + public String getUserAgent() { + return session.getUpgradeRequest().getHeader("User-Agent"); + } + public void close(int code, String message) { session.close(code, message); }