Move operation-mirroring logic to MessagesManager.
This commit is contained in:
parent
8943144b2b
commit
e35e34d2e0
|
@ -30,7 +30,6 @@ import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.wavefront.sdk.common.WavefrontSender;
|
|
||||||
import io.dropwizard.Application;
|
import io.dropwizard.Application;
|
||||||
import io.dropwizard.auth.AuthFilter;
|
import io.dropwizard.auth.AuthFilter;
|
||||||
import io.dropwizard.auth.PolymorphicAuthDynamicFeature;
|
import io.dropwizard.auth.PolymorphicAuthDynamicFeature;
|
||||||
|
@ -171,6 +170,8 @@ import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
|
@ -333,6 +334,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration().getUrls(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getTimeout(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getCircuitBreakerConfiguration());
|
FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration().getUrls(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getTimeout(), config.getMessageCacheConfiguration().getRedisClusterConfiguration().getCircuitBreakerConfiguration());
|
||||||
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration().getUrls(), config.getMetricsClusterConfiguration().getTimeout(), config.getMetricsClusterConfiguration().getCircuitBreakerConfiguration());
|
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration().getUrls(), config.getMetricsClusterConfiguration().getTimeout(), config.getMetricsClusterConfiguration().getCircuitBreakerConfiguration());
|
||||||
|
|
||||||
|
final ExecutorService messageCacheClusterExperimentExecutor = environment.lifecycle().executorService("messages_cache_experiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
||||||
|
|
||||||
DirectoryManager directory = new DirectoryManager(directoryClient);
|
DirectoryManager directory = new DirectoryManager(directoryClient);
|
||||||
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
|
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
|
||||||
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheCluster);
|
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheCluster);
|
||||||
|
@ -341,9 +344,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
||||||
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
||||||
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster);
|
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster);
|
||||||
MessagesCache messagesCache = new MessagesCache(messagesClient, messages, accountsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes(), clusterMessagesCache);
|
MessagesCache messagesCache = new MessagesCache(messagesClient, messages, accountsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes());
|
||||||
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
||||||
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, pushLatencyManager);
|
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, clusterMessagesCache, pushLatencyManager, messageCacheClusterExperimentExecutor);
|
||||||
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
||||||
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
||||||
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
|
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
|
||||||
|
|
|
@ -66,16 +66,7 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||||
private PushSender pushSender;
|
private PushSender pushSender;
|
||||||
private MessagePersister messagePersister;
|
private MessagePersister messagePersister;
|
||||||
|
|
||||||
private final RedisClusterMessagesCache clusterMessagesCache;
|
public MessagesCache(ReplicatedJedisPool jedisPool, Messages database, AccountsManager accountsManager, int delayMinutes) throws IOException {
|
||||||
private final ExecutorService experimentExecutor = new ThreadPoolExecutor(8, 8, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1_000));
|
|
||||||
|
|
||||||
private final Experiment insertExperiment = new Experiment("MessagesCache", "insert");
|
|
||||||
private final Experiment removeByIdExperiment = new Experiment("MessagesCache", "removeById");
|
|
||||||
private final Experiment removeBySenderExperiment = new Experiment("MessagesCache", "removeBySender");
|
|
||||||
private final Experiment removeByUuidExperiment = new Experiment("MessagesCache", "removeByUuid");
|
|
||||||
private final Experiment getMessagesExperiment = new Experiment("MessagesCache", "getMessages");
|
|
||||||
|
|
||||||
public MessagesCache(ReplicatedJedisPool jedisPool, Messages database, AccountsManager accountsManager, int delayMinutes, RedisClusterMessagesCache clusterMessagesCache) throws IOException {
|
|
||||||
this.jedisPool = jedisPool;
|
this.jedisPool = jedisPool;
|
||||||
this.database = database;
|
this.database = database;
|
||||||
this.accountsManager = accountsManager;
|
this.accountsManager = accountsManager;
|
||||||
|
@ -84,8 +75,6 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||||
this.insertOperation = new InsertOperation(jedisPool);
|
this.insertOperation = new InsertOperation(jedisPool);
|
||||||
this.removeOperation = new RemoveOperation(jedisPool);
|
this.removeOperation = new RemoveOperation(jedisPool);
|
||||||
this.getOperation = new GetOperation(jedisPool);
|
this.getOperation = new GetOperation(jedisPool);
|
||||||
|
|
||||||
this.clusterMessagesCache = clusterMessagesCache;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -95,10 +84,7 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||||
Timer.Context timer = insertTimer.time();
|
Timer.Context timer = insertTimer.time();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final long messageId = insertOperation.insert(guid, destination, destinationDevice, System.currentTimeMillis(), messageWithGuid);
|
return insertOperation.insert(guid, destination, destinationDevice, System.currentTimeMillis(), messageWithGuid);
|
||||||
insertExperiment.compareSupplierResultAsync(messageId, () -> clusterMessagesCache.insert(guid, destination, destinationUuid, destinationDevice, message, messageId), experimentExecutor);
|
|
||||||
|
|
||||||
return messageId;
|
|
||||||
} finally {
|
} finally {
|
||||||
timer.stop();
|
timer.stop();
|
||||||
}
|
}
|
||||||
|
@ -120,11 +106,7 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||||
logger.warn("Failed to parse envelope", e);
|
logger.warn("Failed to parse envelope", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = Optional.ofNullable(removedMessageEntity);
|
return Optional.ofNullable(removedMessageEntity);
|
||||||
|
|
||||||
removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, id), experimentExecutor);
|
|
||||||
|
|
||||||
return maybeRemovedMessage;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -144,11 +126,7 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||||
timer.stop();
|
timer.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = Optional.ofNullable(removedMessageEntity);
|
return Optional.ofNullable(removedMessageEntity);
|
||||||
|
|
||||||
removeBySenderExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, sender, timestamp), experimentExecutor);
|
|
||||||
|
|
||||||
return maybeRemovedMessage;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -168,11 +146,7 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||||
timer.stop();
|
timer.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = Optional.ofNullable(removedMessageEntity);
|
return Optional.ofNullable(removedMessageEntity);
|
||||||
|
|
||||||
removeByUuidExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, guid), experimentExecutor);
|
|
||||||
|
|
||||||
return maybeRemovedMessage;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -194,8 +168,6 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
getMessagesExperiment.compareSupplierResultAsync(results, () -> clusterMessagesCache.get(destination, destinationUuid, destinationDevice, limit), experimentExecutor);
|
|
||||||
|
|
||||||
return results;
|
return results;
|
||||||
} finally {
|
} finally {
|
||||||
timer.stop();
|
timer.stop();
|
||||||
|
@ -241,8 +213,6 @@ public class MessagesCache implements Managed, UserMessagesCache {
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
messagePersister.shutdown();
|
messagePersister.shutdown();
|
||||||
logger.info("Message persister shut down...");
|
logger.info("Message persister shut down...");
|
||||||
|
|
||||||
this.experimentExecutor.shutdown();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Key {
|
private static class Key {
|
||||||
|
|
|
@ -4,11 +4,10 @@ package org.whispersystems.textsecuregcm.storage;
|
||||||
import com.codahale.metrics.Meter;
|
import com.codahale.metrics.Meter;
|
||||||
import com.codahale.metrics.MetricRegistry;
|
import com.codahale.metrics.MetricRegistry;
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
|
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
|
||||||
|
import org.whispersystems.textsecuregcm.experiment.Experiment;
|
||||||
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
|
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
|
||||||
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
|
@ -16,6 +15,7 @@ import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
|
|
||||||
|
@ -29,19 +29,31 @@ public class MessagesManager {
|
||||||
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" ));
|
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" ));
|
||||||
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
|
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
|
||||||
|
|
||||||
private final Messages messages;
|
private final Messages messages;
|
||||||
private final MessagesCache messagesCache;
|
private final MessagesCache messagesCache;
|
||||||
private final PushLatencyManager pushLatencyManager;
|
private final RedisClusterMessagesCache clusterMessagesCache;
|
||||||
|
private final PushLatencyManager pushLatencyManager;
|
||||||
|
|
||||||
public MessagesManager(Messages messages, MessagesCache messagesCache, PushLatencyManager pushLatencyManager) {
|
private final ExecutorService experimentExecutor;
|
||||||
this.messages = messages;
|
private final Experiment insertExperiment = new Experiment("MessagesCache", "insert");
|
||||||
this.messagesCache = messagesCache;
|
private final Experiment removeByIdExperiment = new Experiment("MessagesCache", "removeById");
|
||||||
this.pushLatencyManager = pushLatencyManager;
|
private final Experiment removeBySenderExperiment = new Experiment("MessagesCache", "removeBySender");
|
||||||
|
private final Experiment removeByUuidExperiment = new Experiment("MessagesCache", "removeByUuid");
|
||||||
|
private final Experiment getMessagesExperiment = new Experiment("MessagesCache", "getMessages");
|
||||||
|
|
||||||
|
public MessagesManager(Messages messages, MessagesCache messagesCache, RedisClusterMessagesCache clusterMessagesCache, PushLatencyManager pushLatencyManager, final ExecutorService experimentExecutor) {
|
||||||
|
this.messages = messages;
|
||||||
|
this.messagesCache = messagesCache;
|
||||||
|
this.clusterMessagesCache = clusterMessagesCache;
|
||||||
|
this.pushLatencyManager = pushLatencyManager;
|
||||||
|
this.experimentExecutor = experimentExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) {
|
public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) {
|
||||||
UUID guid = UUID.randomUUID();
|
final UUID guid = UUID.randomUUID();
|
||||||
messagesCache.insert(guid, destination, destinationUuid, destinationDevice, message);
|
final long messageId = messagesCache.insert(guid, destination, destinationUuid, destinationDevice, message);
|
||||||
|
|
||||||
|
insertExperiment.compareSupplierResultAsync(messageId, () -> clusterMessagesCache.insert(guid, destination, destinationUuid, destinationDevice, message, messageId), experimentExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
||||||
|
@ -50,7 +62,10 @@ public class MessagesManager {
|
||||||
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
||||||
|
|
||||||
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
||||||
messages.addAll(this.messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));
|
final List<OutgoingMessageEntity> messagesFromCache = this.messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size());
|
||||||
|
getMessagesExperiment.compareSupplierResultAsync(messagesFromCache, () -> clusterMessagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()), experimentExecutor);
|
||||||
|
|
||||||
|
messages.addAll(messagesFromCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE);
|
return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE);
|
||||||
|
@ -69,6 +84,7 @@ public class MessagesManager {
|
||||||
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp)
|
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp)
|
||||||
{
|
{
|
||||||
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp);
|
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp);
|
||||||
|
removeBySenderExperiment.compareSupplierResultAsync(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp), experimentExecutor);
|
||||||
|
|
||||||
if (!removed.isPresent()) {
|
if (!removed.isPresent()) {
|
||||||
removed = this.messages.remove(destination, destinationDevice, source, timestamp);
|
removed = this.messages.remove(destination, destinationDevice, source, timestamp);
|
||||||
|
@ -82,6 +98,7 @@ public class MessagesManager {
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
|
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
|
||||||
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationUuid, deviceId, guid);
|
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationUuid, deviceId, guid);
|
||||||
|
removeByUuidExperiment.compareSupplierResultAsync(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, guid), experimentExecutor);
|
||||||
|
|
||||||
if (!removed.isPresent()) {
|
if (!removed.isPresent()) {
|
||||||
removed = this.messages.remove(destination, guid);
|
removed = this.messages.remove(destination, guid);
|
||||||
|
@ -95,7 +112,8 @@ public class MessagesManager {
|
||||||
|
|
||||||
public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) {
|
public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) {
|
||||||
if (cached) {
|
if (cached) {
|
||||||
this.messagesCache.remove(destination, destinationUuid, deviceId, id);
|
final Optional<OutgoingMessageEntity> maybeRemovedMessage = this.messagesCache.remove(destination, destinationUuid, deviceId, id);
|
||||||
|
removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id), experimentExecutor);
|
||||||
cacheHitByIdMeter.mark();
|
cacheHitByIdMeter.mark();
|
||||||
} else {
|
} else {
|
||||||
this.messages.remove(destination, id);
|
this.messages.remove(destination, id);
|
||||||
|
|
|
@ -26,7 +26,7 @@ public class MessagesCacheTest extends AbstractMessagesCacheTest {
|
||||||
final RedisClientFactory clientFactory = new RedisClientFactory("message-cache-test", redisUrl, List.of(redisUrl), new CircuitBreakerConfiguration());
|
final RedisClientFactory clientFactory = new RedisClientFactory("message-cache-test", redisUrl, List.of(redisUrl), new CircuitBreakerConfiguration());
|
||||||
final ReplicatedJedisPool jedisPool = clientFactory.getRedisClientPool();
|
final ReplicatedJedisPool jedisPool = clientFactory.getRedisClientPool();
|
||||||
|
|
||||||
messagesCache = new MessagesCache(jedisPool, mock(Messages.class), mock(AccountsManager.class), 60, mock(RedisClusterMessagesCache.class));
|
messagesCache = new MessagesCache(jedisPool, mock(Messages.class), mock(AccountsManager.class), 60);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
Loading…
Reference in New Issue