Added a push latency manager.
This commit is contained in:
parent
6e9b70a8d6
commit
901ba6e87f
|
@ -96,6 +96,11 @@ public class WhisperServerConfiguration extends Configuration {
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private RedisConfiguration pubsub;
|
private RedisConfiguration pubsub;
|
||||||
|
|
||||||
|
@NotNull
|
||||||
|
@Valid
|
||||||
|
@JsonProperty
|
||||||
|
private RedisConfiguration metricsCache;
|
||||||
|
|
||||||
@NotNull
|
@NotNull
|
||||||
@Valid
|
@Valid
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -248,6 +253,10 @@ public class WhisperServerConfiguration extends Configuration {
|
||||||
return pubsub;
|
return pubsub;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RedisConfiguration getMetricsCacheConfiguration() {
|
||||||
|
return metricsCache;
|
||||||
|
}
|
||||||
|
|
||||||
public DirectoryConfiguration getDirectoryConfiguration() {
|
public DirectoryConfiguration getDirectoryConfiguration() {
|
||||||
return directory;
|
return directory;
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,6 +97,7 @@ import org.whispersystems.textsecuregcm.metrics.FreeMemoryGauge;
|
||||||
import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener;
|
import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener;
|
||||||
import org.whispersystems.textsecuregcm.metrics.NetworkReceivedGauge;
|
import org.whispersystems.textsecuregcm.metrics.NetworkReceivedGauge;
|
||||||
import org.whispersystems.textsecuregcm.metrics.NetworkSentGauge;
|
import org.whispersystems.textsecuregcm.metrics.NetworkSentGauge;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
|
||||||
import org.whispersystems.textsecuregcm.metrics.TrafficSource;
|
import org.whispersystems.textsecuregcm.metrics.TrafficSource;
|
||||||
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
|
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
|
||||||
import org.whispersystems.textsecuregcm.providers.RedisClusterHealthCheck;
|
import org.whispersystems.textsecuregcm.providers.RedisClusterHealthCheck;
|
||||||
|
@ -311,11 +312,13 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
RedisClientFactory directoryClientFactory = new RedisClientFactory("directory_cache", config.getDirectoryConfiguration().getRedisConfiguration().getUrl(), config.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), config.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
|
RedisClientFactory directoryClientFactory = new RedisClientFactory("directory_cache", config.getDirectoryConfiguration().getRedisConfiguration().getUrl(), config.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), config.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
|
||||||
RedisClientFactory messagesClientFactory = new RedisClientFactory("message_cache", config.getMessageCacheConfiguration().getRedisConfiguration().getUrl(), config.getMessageCacheConfiguration().getRedisConfiguration().getReplicaUrls(), config.getMessageCacheConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
|
RedisClientFactory messagesClientFactory = new RedisClientFactory("message_cache", config.getMessageCacheConfiguration().getRedisConfiguration().getUrl(), config.getMessageCacheConfiguration().getRedisConfiguration().getReplicaUrls(), config.getMessageCacheConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
|
||||||
RedisClientFactory pushSchedulerClientFactory = new RedisClientFactory("push_scheduler_cache", config.getPushScheduler().getUrl(), config.getPushScheduler().getReplicaUrls(), config.getPushScheduler().getCircuitBreakerConfiguration());
|
RedisClientFactory pushSchedulerClientFactory = new RedisClientFactory("push_scheduler_cache", config.getPushScheduler().getUrl(), config.getPushScheduler().getReplicaUrls(), config.getPushScheduler().getCircuitBreakerConfiguration());
|
||||||
|
RedisClientFactory metricsCacheClientFactory = new RedisClientFactory("metrics_cache", config.getMetricsCacheConfiguration().getUrl(), config.getMetricsCacheConfiguration().getReplicaUrls(), config.getMetricsCacheConfiguration().getCircuitBreakerConfiguration());
|
||||||
|
|
||||||
ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool();
|
ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool();
|
||||||
ReplicatedJedisPool directoryClient = directoryClientFactory.getRedisClientPool();
|
ReplicatedJedisPool directoryClient = directoryClientFactory.getRedisClientPool();
|
||||||
ReplicatedJedisPool messagesClient = messagesClientFactory.getRedisClientPool();
|
ReplicatedJedisPool messagesClient = messagesClientFactory.getRedisClientPool();
|
||||||
ReplicatedJedisPool pushSchedulerClient = pushSchedulerClientFactory.getRedisClientPool();
|
ReplicatedJedisPool pushSchedulerClient = pushSchedulerClientFactory.getRedisClientPool();
|
||||||
|
ReplicatedJedisPool metricsCacheClient = metricsCacheClientFactory.getRedisClientPool();
|
||||||
|
|
||||||
RedisClusterClient cacheClusterClient = RedisClusterClient.create(config.getCacheClusterConfiguration().getUrls().stream().map(RedisURI::create).collect(Collectors.toList()));
|
RedisClusterClient cacheClusterClient = RedisClusterClient.create(config.getCacheClusterConfiguration().getUrls().stream().map(RedisURI::create).collect(Collectors.toList()));
|
||||||
cacheClusterClient.setDefaultTimeout(config.getCacheClusterConfiguration().getTimeout());
|
cacheClusterClient.setDefaultTimeout(config.getCacheClusterConfiguration().getTimeout());
|
||||||
|
@ -332,7 +335,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
||||||
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster);
|
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster);
|
||||||
MessagesCache messagesCache = new MessagesCache(messagesClient, messages, accountsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes(), clusterMessagesCache);
|
MessagesCache messagesCache = new MessagesCache(messagesClient, messages, accountsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes(), clusterMessagesCache);
|
||||||
MessagesManager messagesManager = new MessagesManager(messages, messagesCache);
|
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCacheClient);
|
||||||
|
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, pushLatencyManager);
|
||||||
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
||||||
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
||||||
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
|
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
|
||||||
|
@ -355,7 +359,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushSchedulerClient, apnSender, accountsManager);
|
ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushSchedulerClient, apnSender, accountsManager);
|
||||||
TwilioSmsSender twilioSmsSender = new TwilioSmsSender(config.getTwilioConfiguration());
|
TwilioSmsSender twilioSmsSender = new TwilioSmsSender(config.getTwilioConfiguration());
|
||||||
SmsSender smsSender = new SmsSender(twilioSmsSender);
|
SmsSender smsSender = new SmsSender(twilioSmsSender);
|
||||||
PushSender pushSender = new PushSender(apnFallbackManager, gcmSender, apnSender, websocketSender, config.getPushConfiguration().getQueueSize());
|
PushSender pushSender = new PushSender(apnFallbackManager, gcmSender, apnSender, websocketSender, config.getPushConfiguration().getQueueSize(), pushLatencyManager);
|
||||||
ReceiptSender receiptSender = new ReceiptSender(accountsManager, pushSender);
|
ReceiptSender receiptSender = new ReceiptSender(accountsManager, pushSender);
|
||||||
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
|
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
|
||||||
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
||||||
|
|
|
@ -181,7 +181,7 @@ public class MessageController {
|
||||||
@Timed
|
@Timed
|
||||||
@GET
|
@GET
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
public OutgoingMessageEntityList getPendingMessages(@Auth Account account) {
|
public OutgoingMessageEntityList getPendingMessages(@Auth Account account, @HeaderParam("User-Agent") String userAgent) {
|
||||||
assert account.getAuthenticatedDevice().isPresent();
|
assert account.getAuthenticatedDevice().isPresent();
|
||||||
|
|
||||||
if (!Util.isEmpty(account.getAuthenticatedDevice().get().getApnId())) {
|
if (!Util.isEmpty(account.getAuthenticatedDevice().get().getApnId())) {
|
||||||
|
@ -190,7 +190,8 @@ public class MessageController {
|
||||||
|
|
||||||
return messagesManager.getMessagesForDevice(account.getNumber(),
|
return messagesManager.getMessagesForDevice(account.getNumber(),
|
||||||
account.getUuid(),
|
account.getUuid(),
|
||||||
account.getAuthenticatedDevice().get().getId());
|
account.getAuthenticatedDevice().get().getId(),
|
||||||
|
userAgent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Timed
|
@Timed
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
package org.whispersystems.textsecuregcm.metrics;
|
||||||
|
|
||||||
|
import com.codahale.metrics.MetricRegistry;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
|
||||||
|
import redis.clients.jedis.Jedis;
|
||||||
|
import redis.clients.jedis.Response;
|
||||||
|
import redis.clients.jedis.Transaction;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Measures and records the latency between sending a push notification to a device and that device draining its queue
|
||||||
|
* of messages.
|
||||||
|
*/
|
||||||
|
public class PushLatencyManager {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(PushLatencyManager.class);
|
||||||
|
private static final String TIMER_NAME = MetricRegistry.name(PushLatencyManager.class, "latency");
|
||||||
|
private static final int TTL = (int)Duration.ofDays(3).toSeconds();
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int QUEUE_SIZE = 1_000;
|
||||||
|
|
||||||
|
private final ReplicatedJedisPool jedisPool;
|
||||||
|
private final ThreadPoolExecutor threadPoolExecutor;
|
||||||
|
|
||||||
|
public PushLatencyManager(final ReplicatedJedisPool jedisPool) {
|
||||||
|
this(jedisPool, new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(QUEUE_SIZE), new ThreadPoolExecutor.DiscardPolicy()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
PushLatencyManager(final ReplicatedJedisPool jedisPool, final ThreadPoolExecutor threadPoolExecutor) {
|
||||||
|
this.jedisPool = jedisPool;
|
||||||
|
this.threadPoolExecutor = threadPoolExecutor;
|
||||||
|
|
||||||
|
Metrics.gaugeCollectionSize(MetricRegistry.name(getClass(), "queueDepth"), Collections.emptyList(), threadPoolExecutor.getQueue());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void recordPushSent(final String accountNumber, final long deviceId) {
|
||||||
|
recordPushSent(accountNumber, deviceId, System.currentTimeMillis());
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void recordPushSent(final String accountNumber, final long deviceId, final long currentTime) {
|
||||||
|
final String key = getFirstUnacknowledgedPushKey(accountNumber, deviceId);
|
||||||
|
|
||||||
|
threadPoolExecutor.execute(() -> {
|
||||||
|
try (final Jedis jedis = jedisPool.getWriteResource()) {
|
||||||
|
final Transaction transaction = jedis.multi();
|
||||||
|
transaction.setnx(key, String.valueOf(currentTime));
|
||||||
|
transaction.expire(key, TTL);
|
||||||
|
transaction.exec();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void recordQueueRead(final String accountNumber, final long deviceId, final String userAgent) {
|
||||||
|
threadPoolExecutor.execute(() -> {
|
||||||
|
final Optional<Long> maybeLatency = getLatencyAndClearTimestamp(accountNumber, deviceId, System.currentTimeMillis());
|
||||||
|
|
||||||
|
if (maybeLatency.isPresent()) {
|
||||||
|
Metrics.timer(TIMER_NAME, UserAgentTagUtil.getUserAgentTags(userAgent)).record(maybeLatency.get(), TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Optional<Long> getLatencyAndClearTimestamp(final String accountNumber, final long deviceId, final long currentTimeMillis) {
|
||||||
|
final String key = getFirstUnacknowledgedPushKey(accountNumber, deviceId);
|
||||||
|
|
||||||
|
try (final Jedis jedis = jedisPool.getWriteResource()) {
|
||||||
|
final Transaction transaction = jedis.multi();
|
||||||
|
transaction.get(key);
|
||||||
|
transaction.del(key);
|
||||||
|
|
||||||
|
final List<Response<?>> responses = transaction.execGetResponse();
|
||||||
|
final String timestampString = (String)responses.get(0).get();
|
||||||
|
|
||||||
|
if (timestampString != null) {
|
||||||
|
try {
|
||||||
|
return Optional.of(currentTimeMillis - Long.parseLong(timestampString, 10));
|
||||||
|
} catch (final NumberFormatException e) {
|
||||||
|
logger.warn("Failed to parse timestamp: {}", timestampString);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getFirstUnacknowledgedPushKey(final String accountNumber, final long deviceId) {
|
||||||
|
return "push_latency::" + accountNumber + "::" + deviceId;
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,7 @@ import com.codahale.metrics.Gauge;
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
|
||||||
import org.whispersystems.textsecuregcm.push.WebsocketSender.DeliveryStatus;
|
import org.whispersystems.textsecuregcm.push.WebsocketSender.DeliveryStatus;
|
||||||
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
|
@ -46,10 +47,12 @@ public class PushSender implements Managed {
|
||||||
private final WebsocketSender webSocketSender;
|
private final WebsocketSender webSocketSender;
|
||||||
private final BlockingThreadPoolExecutor executor;
|
private final BlockingThreadPoolExecutor executor;
|
||||||
private final int queueSize;
|
private final int queueSize;
|
||||||
|
private final PushLatencyManager pushLatencyManager;
|
||||||
|
|
||||||
public PushSender(ApnFallbackManager apnFallbackManager,
|
public PushSender(ApnFallbackManager apnFallbackManager,
|
||||||
GCMSender gcmSender, APNSender apnSender,
|
GCMSender gcmSender, APNSender apnSender,
|
||||||
WebsocketSender websocketSender, int queueSize)
|
WebsocketSender websocketSender, int queueSize,
|
||||||
|
PushLatencyManager pushLatencyManager)
|
||||||
{
|
{
|
||||||
this.apnFallbackManager = apnFallbackManager;
|
this.apnFallbackManager = apnFallbackManager;
|
||||||
this.gcmSender = gcmSender;
|
this.gcmSender = gcmSender;
|
||||||
|
@ -57,6 +60,7 @@ public class PushSender implements Managed {
|
||||||
this.webSocketSender = websocketSender;
|
this.webSocketSender = websocketSender;
|
||||||
this.queueSize = queueSize;
|
this.queueSize = queueSize;
|
||||||
this.executor = new BlockingThreadPoolExecutor("pushSender", 50, queueSize);
|
this.executor = new BlockingThreadPoolExecutor("pushSender", 50, queueSize);
|
||||||
|
this.pushLatencyManager = pushLatencyManager;
|
||||||
|
|
||||||
SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME)
|
SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME)
|
||||||
.register(name(PushSender.class, "send_queue_depth"),
|
.register(name(PushSender.class, "send_queue_depth"),
|
||||||
|
@ -109,6 +113,8 @@ public class PushSender implements Managed {
|
||||||
(int)device.getId(), GcmMessage.Type.NOTIFICATION, Optional.empty());
|
(int)device.getId(), GcmMessage.Type.NOTIFICATION, Optional.empty());
|
||||||
|
|
||||||
gcmSender.sendMessage(gcmMessage);
|
gcmSender.sendMessage(gcmMessage);
|
||||||
|
|
||||||
|
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getNumber(), device.getId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendApnMessage(Account account, Device device, Envelope outgoingMessage, boolean online) {
|
private void sendApnMessage(Account account, Device device, Envelope outgoingMessage, boolean online) {
|
||||||
|
@ -134,6 +140,8 @@ public class PushSender implements Managed {
|
||||||
}
|
}
|
||||||
|
|
||||||
apnSender.sendMessage(apnMessage);
|
apnSender.sendMessage(apnMessage);
|
||||||
|
|
||||||
|
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getNumber(), device.getId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendWebSocketMessage(Account account, Device device, Envelope outgoingMessage, boolean online)
|
private void sendWebSocketMessage(Account account, Device device, Envelope outgoingMessage, boolean online)
|
||||||
|
|
|
@ -4,9 +4,13 @@ package org.whispersystems.textsecuregcm.storage;
|
||||||
import com.codahale.metrics.Meter;
|
import com.codahale.metrics.Meter;
|
||||||
import com.codahale.metrics.MetricRegistry;
|
import com.codahale.metrics.MetricRegistry;
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
|
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
|
||||||
|
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -25,13 +29,17 @@ public class MessagesManager {
|
||||||
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" ));
|
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" ));
|
||||||
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
|
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(MessagesManager.class);
|
||||||
|
|
||||||
private final Messages messages;
|
private final Messages messages;
|
||||||
private final MessagesCache messagesCache;
|
private final MessagesCache messagesCache;
|
||||||
|
|
||||||
public MessagesManager(Messages messages, MessagesCache messagesCache) {
|
private final PushLatencyManager pushLatencyManager;
|
||||||
this.messages = messages;
|
|
||||||
this.messagesCache = messagesCache;
|
public MessagesManager(Messages messages, MessagesCache messagesCache, PushLatencyManager pushLatencyManager) {
|
||||||
|
this.messages = messages;
|
||||||
|
this.messagesCache = messagesCache;
|
||||||
|
this.pushLatencyManager = pushLatencyManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) {
|
public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) {
|
||||||
|
@ -39,7 +47,9 @@ public class MessagesManager {
|
||||||
messagesCache.insert(guid, destination, destinationUuid, destinationDevice, message);
|
messagesCache.insert(guid, destination, destinationUuid, destinationDevice, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice) {
|
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
||||||
|
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destination, destinationDevice, userAgent));
|
||||||
|
|
||||||
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
||||||
|
|
||||||
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
||||||
|
|
|
@ -172,7 +172,7 @@ public class WebSocketConnection implements DispatchChannel {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processStoredMessages() {
|
private void processStoredMessages() {
|
||||||
OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId());
|
OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent());
|
||||||
Iterator<OutgoingMessageEntity> iterator = messages.getMessages().iterator();
|
Iterator<OutgoingMessageEntity> iterator = messages.getMessages().iterator();
|
||||||
|
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
|
|
|
@ -0,0 +1,118 @@
|
||||||
|
package org.whispersystems.textsecuregcm.metrics;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
|
||||||
|
import redis.clients.jedis.Jedis;
|
||||||
|
import redis.embedded.RedisServer;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class PushLatencyManagerTest {
|
||||||
|
|
||||||
|
private PushLatencyManager pushLatencyManager;
|
||||||
|
|
||||||
|
private RedisServer redisServer;
|
||||||
|
private Jedis jedis;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
redisServer = new RedisServer(6379);
|
||||||
|
jedis = new Jedis("localhost", 6379);
|
||||||
|
|
||||||
|
final ReplicatedJedisPool jedisPool = mock(ReplicatedJedisPool.class);
|
||||||
|
final ThreadPoolExecutor threadPoolExecutor = mock(ThreadPoolExecutor.class);
|
||||||
|
|
||||||
|
pushLatencyManager = new PushLatencyManager(jedisPool, threadPoolExecutor);
|
||||||
|
|
||||||
|
redisServer.start();
|
||||||
|
|
||||||
|
when(jedisPool.getWriteResource()).thenReturn(jedis);
|
||||||
|
|
||||||
|
doAnswer(invocation -> {
|
||||||
|
invocation.getArgument(0, Runnable.class).run();
|
||||||
|
return null;
|
||||||
|
}).when(threadPoolExecutor).execute(any(Runnable.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
redisServer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLatency() {
|
||||||
|
final String accountNumber = "mock-account";
|
||||||
|
final long deviceId = 1;
|
||||||
|
|
||||||
|
final long expectedLatency = 1234;
|
||||||
|
final long pushSentTimestamp = System.currentTimeMillis();
|
||||||
|
final long clearQueueTimestamp = pushSentTimestamp + expectedLatency;
|
||||||
|
|
||||||
|
{
|
||||||
|
assertFalse(pushLatencyManager.getLatencyAndClearTimestamp(accountNumber, deviceId, System.currentTimeMillis()).isPresent());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
pushLatencyManager.recordPushSent(accountNumber, deviceId, pushSentTimestamp);
|
||||||
|
|
||||||
|
assertEquals(Optional.of(expectedLatency),
|
||||||
|
pushLatencyManager.getLatencyAndClearTimestamp(accountNumber, deviceId, clearQueueTimestamp));
|
||||||
|
|
||||||
|
assertFalse(pushLatencyManager.getLatencyAndClearTimestamp(accountNumber, deviceId, System.currentTimeMillis()).isPresent());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testThreadPoolDoesNotBlock() throws InterruptedException {
|
||||||
|
final AtomicBoolean canReturnJedisInstance = new AtomicBoolean(false);
|
||||||
|
final ReplicatedJedisPool blockingJedisPool = mock(ReplicatedJedisPool.class);
|
||||||
|
final PushLatencyManager blockingPushLatencyManager = new PushLatencyManager(blockingJedisPool);
|
||||||
|
|
||||||
|
// One unqueued execution (single-thread pool) plus a full queue
|
||||||
|
final CountDownLatch countDownLatch = new CountDownLatch(PushLatencyManager.QUEUE_SIZE + 1);
|
||||||
|
|
||||||
|
when(blockingJedisPool.getWriteResource()).thenAnswer((Answer<Jedis>)invocationOnMock -> {
|
||||||
|
synchronized (canReturnJedisInstance) {
|
||||||
|
while (!canReturnJedisInstance.get()) {
|
||||||
|
canReturnJedisInstance.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return jedis;
|
||||||
|
} finally {
|
||||||
|
countDownLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for (int i = 0; i < PushLatencyManager.QUEUE_SIZE * 2; i++) {
|
||||||
|
blockingPushLatencyManager.recordPushSent("account-" + i, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized (canReturnJedisInstance) {
|
||||||
|
canReturnJedisInstance.set(true);
|
||||||
|
canReturnJedisInstance.notifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
|
||||||
|
fail("Timed out waiting for countdown latch");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -257,7 +257,7 @@ public class MessageControllerTest {
|
||||||
|
|
||||||
OutgoingMessageEntityList messagesList = new OutgoingMessageEntityList(messages, false);
|
OutgoingMessageEntityList messagesList = new OutgoingMessageEntityList(messages, false);
|
||||||
|
|
||||||
when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(AuthHelper.VALID_UUID), eq(1L))).thenReturn(messagesList);
|
when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(AuthHelper.VALID_UUID), eq(1L), anyString())).thenReturn(messagesList);
|
||||||
|
|
||||||
OutgoingMessageEntityList response =
|
OutgoingMessageEntityList response =
|
||||||
resources.getJerseyTest().target("/v1/messages/")
|
resources.getJerseyTest().target("/v1/messages/")
|
||||||
|
@ -294,7 +294,7 @@ public class MessageControllerTest {
|
||||||
|
|
||||||
OutgoingMessageEntityList messagesList = new OutgoingMessageEntityList(messages, false);
|
OutgoingMessageEntityList messagesList = new OutgoingMessageEntityList(messages, false);
|
||||||
|
|
||||||
when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(AuthHelper.VALID_UUID), eq(1L))).thenReturn(messagesList);
|
when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(AuthHelper.VALID_UUID), eq(1L), anyString())).thenReturn(messagesList);
|
||||||
|
|
||||||
Response response =
|
Response response =
|
||||||
resources.getJerseyTest().target("/v1/messages/")
|
resources.getJerseyTest().target("/v1/messages/")
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
package org.whispersystems.textsecuregcm.tests.util;
|
||||||
|
|
||||||
|
import org.mockito.ArgumentMatchers;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
|
|
||||||
|
public class MatcherHelper {
|
||||||
|
public static Account accountWithNumber(final String accountNumber) {
|
||||||
|
return ArgumentMatchers.argThat(account -> accountNumber.equals(account.getNumber()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Device deviceWithId(final long deviceId) {
|
||||||
|
return ArgumentMatchers.argThat(device -> device.getId() == deviceId);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package org.whispersystems.textsecuregcm.tests.websocket;
|
package org.whispersystems.textsecuregcm.tests.websocket;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
import org.eclipse.jetty.websocket.api.Session;
|
||||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentMatchers;
|
import org.mockito.ArgumentMatchers;
|
||||||
|
@ -136,12 +137,15 @@ public class WebSocketConnectionTest {
|
||||||
when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1));
|
when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1));
|
||||||
when(accountsManager.get("sender2")).thenReturn(Optional.empty());
|
when(accountsManager.get("sender2")).thenReturn(Optional.empty());
|
||||||
|
|
||||||
when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId()))
|
String userAgent = "user-agent";
|
||||||
|
|
||||||
|
when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), userAgent))
|
||||||
.thenReturn(outgoingMessagesList);
|
.thenReturn(outgoingMessagesList);
|
||||||
|
|
||||||
final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
|
final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
|
||||||
final WebSocketClient client = mock(WebSocketClient.class);
|
final WebSocketClient client = mock(WebSocketClient.class);
|
||||||
|
|
||||||
|
when(client.getUserAgent()).thenReturn(userAgent);
|
||||||
when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.<Optional<byte[]>>any()))
|
when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.<Optional<byte[]>>any()))
|
||||||
.thenAnswer(new Answer<CompletableFuture<WebSocketResponseMessage>>() {
|
.thenAnswer(new Answer<CompletableFuture<WebSocketResponseMessage>>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -220,12 +224,15 @@ public class WebSocketConnectionTest {
|
||||||
when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1));
|
when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1));
|
||||||
when(accountsManager.get("sender2")).thenReturn(Optional.<Account>empty());
|
when(accountsManager.get("sender2")).thenReturn(Optional.<Account>empty());
|
||||||
|
|
||||||
when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId()))
|
String userAgent = "user-agent";
|
||||||
|
|
||||||
|
when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), userAgent))
|
||||||
.thenReturn(pendingMessagesList);
|
.thenReturn(pendingMessagesList);
|
||||||
|
|
||||||
final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
|
final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
|
||||||
final WebSocketClient client = mock(WebSocketClient.class);
|
final WebSocketClient client = mock(WebSocketClient.class);
|
||||||
|
|
||||||
|
when(client.getUserAgent()).thenReturn(userAgent);
|
||||||
when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.<Optional<byte[]>>any()))
|
when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.<Optional<byte[]>>any()))
|
||||||
.thenAnswer(new Answer<CompletableFuture<WebSocketResponseMessage>>() {
|
.thenAnswer(new Answer<CompletableFuture<WebSocketResponseMessage>>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -328,12 +335,15 @@ public class WebSocketConnectionTest {
|
||||||
when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1));
|
when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1));
|
||||||
when(accountsManager.get("sender2")).thenReturn(Optional.<Account>empty());
|
when(accountsManager.get("sender2")).thenReturn(Optional.<Account>empty());
|
||||||
|
|
||||||
when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId()))
|
String userAgent = "user-agent";
|
||||||
|
|
||||||
|
when(storedMessages.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), userAgent))
|
||||||
.thenReturn(pendingMessagesList);
|
.thenReturn(pendingMessagesList);
|
||||||
|
|
||||||
final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
|
final List<CompletableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
|
||||||
final WebSocketClient client = mock(WebSocketClient.class);
|
final WebSocketClient client = mock(WebSocketClient.class);
|
||||||
|
|
||||||
|
when(client.getUserAgent()).thenReturn(userAgent);
|
||||||
when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.<Optional<byte[]>>any()))
|
when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.<Optional<byte[]>>any()))
|
||||||
.thenAnswer(new Answer<CompletableFuture<WebSocketResponseMessage>>() {
|
.thenAnswer(new Answer<CompletableFuture<WebSocketResponseMessage>>() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.whispersystems.websocket;
|
package org.whispersystems.websocket;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
|
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
|
||||||
import org.eclipse.jetty.websocket.api.Session;
|
import org.eclipse.jetty.websocket.api.Session;
|
||||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||||
|
@ -86,6 +87,10 @@ public class WebSocketClient {
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getUserAgent() {
|
||||||
|
return session.getUpgradeRequest().getHeader("User-Agent");
|
||||||
|
}
|
||||||
|
|
||||||
public void close(int code, String message) {
|
public void close(int code, String message) {
|
||||||
session.close(code, message);
|
session.close(code, message);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue