Move cache-mirroring operations to the calling thread.
This commit is contained in:
parent
c6419a9c61
commit
1dcc491fec
|
@ -290,7 +290,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
ScheduledExecutorService clientPresenceExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build();
|
ScheduledExecutorService clientPresenceExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build();
|
||||||
ScheduledExecutorService refreshFeatureFlagsExecutor = environment.lifecycle().scheduledExecutorService("featureFlags").threads(1).build();
|
ScheduledExecutorService refreshFeatureFlagsExecutor = environment.lifecycle().scheduledExecutorService("featureFlags").threads(1).build();
|
||||||
ExecutorService messageNotificationExecutor = environment.lifecycle().executorService("messageCacheNotifications").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
ExecutorService messageNotificationExecutor = environment.lifecycle().executorService("messageCacheNotifications").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
||||||
ExecutorService messageCacheClusterExperimentExecutor = environment.lifecycle().executorService("messages_cache_experiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
|
||||||
ExecutorService websocketExperimentExecutor = environment.lifecycle().executorService("websocketPresenceExperiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
ExecutorService websocketExperimentExecutor = environment.lifecycle().executorService("websocketPresenceExperiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
|
||||||
|
|
||||||
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, clientPresenceExecutor);
|
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(messagesCacheCluster, clientPresenceExecutor);
|
||||||
|
@ -304,7 +303,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster, messageNotificationExecutor);
|
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster, messageNotificationExecutor);
|
||||||
MessagesCache messagesCache = new MessagesCache(messagesClient);
|
MessagesCache messagesCache = new MessagesCache(messagesClient);
|
||||||
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
||||||
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, clusterMessagesCache, pushLatencyManager, messageCacheClusterExperimentExecutor);
|
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, clusterMessagesCache, pushLatencyManager);
|
||||||
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
||||||
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, refreshFeatureFlagsExecutor);
|
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, refreshFeatureFlagsExecutor);
|
||||||
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
||||||
|
|
|
@ -15,7 +15,6 @@ import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
|
|
||||||
|
@ -34,26 +33,24 @@ public class MessagesManager {
|
||||||
private final RedisClusterMessagesCache clusterMessagesCache;
|
private final RedisClusterMessagesCache clusterMessagesCache;
|
||||||
private final PushLatencyManager pushLatencyManager;
|
private final PushLatencyManager pushLatencyManager;
|
||||||
|
|
||||||
private final ExecutorService experimentExecutor;
|
|
||||||
private final Experiment insertExperiment = new Experiment("MessagesCache", "insert");
|
private final Experiment insertExperiment = new Experiment("MessagesCache", "insert");
|
||||||
private final Experiment removeByIdExperiment = new Experiment("MessagesCache", "removeById");
|
private final Experiment removeByIdExperiment = new Experiment("MessagesCache", "removeById");
|
||||||
private final Experiment removeBySenderExperiment = new Experiment("MessagesCache", "removeBySender");
|
private final Experiment removeBySenderExperiment = new Experiment("MessagesCache", "removeBySender");
|
||||||
private final Experiment removeByUuidExperiment = new Experiment("MessagesCache", "removeByUuid");
|
private final Experiment removeByUuidExperiment = new Experiment("MessagesCache", "removeByUuid");
|
||||||
private final Experiment getMessagesExperiment = new Experiment("MessagesCache", "getMessages");
|
private final Experiment getMessagesExperiment = new Experiment("MessagesCache", "getMessages");
|
||||||
|
|
||||||
public MessagesManager(Messages messages, MessagesCache messagesCache, RedisClusterMessagesCache clusterMessagesCache, PushLatencyManager pushLatencyManager, final ExecutorService experimentExecutor) {
|
public MessagesManager(Messages messages, MessagesCache messagesCache, RedisClusterMessagesCache clusterMessagesCache, PushLatencyManager pushLatencyManager) {
|
||||||
this.messages = messages;
|
this.messages = messages;
|
||||||
this.messagesCache = messagesCache;
|
this.messagesCache = messagesCache;
|
||||||
this.clusterMessagesCache = clusterMessagesCache;
|
this.clusterMessagesCache = clusterMessagesCache;
|
||||||
this.pushLatencyManager = pushLatencyManager;
|
this.pushLatencyManager = pushLatencyManager;
|
||||||
this.experimentExecutor = experimentExecutor;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) {
|
public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) {
|
||||||
final UUID guid = UUID.randomUUID();
|
final UUID guid = UUID.randomUUID();
|
||||||
final long messageId = messagesCache.insert(guid, destination, destinationUuid, destinationDevice, message);
|
final long messageId = messagesCache.insert(guid, destination, destinationUuid, destinationDevice, message);
|
||||||
|
|
||||||
insertExperiment.compareSupplierResultAsync(messageId, () -> clusterMessagesCache.insert(guid, destination, destinationUuid, destinationDevice, message, messageId), experimentExecutor);
|
insertExperiment.compareSupplierResult(messageId, () -> clusterMessagesCache.insert(guid, destination, destinationUuid, destinationDevice, message, messageId));
|
||||||
}
|
}
|
||||||
|
|
||||||
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
||||||
|
@ -63,7 +60,7 @@ public class MessagesManager {
|
||||||
|
|
||||||
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
||||||
final List<OutgoingMessageEntity> messagesFromCache = this.messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size());
|
final List<OutgoingMessageEntity> messagesFromCache = this.messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size());
|
||||||
getMessagesExperiment.compareSupplierResultAsync(messagesFromCache, () -> clusterMessagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()), experimentExecutor);
|
getMessagesExperiment.compareSupplierResult(messagesFromCache, () -> clusterMessagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));
|
||||||
|
|
||||||
messages.addAll(messagesFromCache);
|
messages.addAll(messagesFromCache);
|
||||||
}
|
}
|
||||||
|
@ -86,7 +83,7 @@ public class MessagesManager {
|
||||||
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp)
|
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp)
|
||||||
{
|
{
|
||||||
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp);
|
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp);
|
||||||
removeBySenderExperiment.compareSupplierResultAsync(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp), experimentExecutor);
|
removeBySenderExperiment.compareSupplierResult(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp));
|
||||||
|
|
||||||
if (!removed.isPresent()) {
|
if (!removed.isPresent()) {
|
||||||
removed = this.messages.remove(destination, destinationDevice, source, timestamp);
|
removed = this.messages.remove(destination, destinationDevice, source, timestamp);
|
||||||
|
@ -100,7 +97,7 @@ public class MessagesManager {
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
|
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
|
||||||
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationUuid, deviceId, guid);
|
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationUuid, deviceId, guid);
|
||||||
removeByUuidExperiment.compareSupplierResultAsync(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, guid), experimentExecutor);
|
removeByUuidExperiment.compareSupplierResult(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, guid));
|
||||||
|
|
||||||
if (!removed.isPresent()) {
|
if (!removed.isPresent()) {
|
||||||
removed = this.messages.remove(destination, guid);
|
removed = this.messages.remove(destination, guid);
|
||||||
|
@ -115,7 +112,7 @@ public class MessagesManager {
|
||||||
public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) {
|
public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) {
|
||||||
if (cached) {
|
if (cached) {
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = this.messagesCache.remove(destination, destinationUuid, deviceId, id);
|
final Optional<OutgoingMessageEntity> maybeRemovedMessage = this.messagesCache.remove(destination, destinationUuid, deviceId, id);
|
||||||
removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id), experimentExecutor);
|
removeByIdExperiment.compareSupplierResult(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id));
|
||||||
cacheHitByIdMeter.mark();
|
cacheHitByIdMeter.mark();
|
||||||
} else {
|
} else {
|
||||||
this.messages.remove(destination, id);
|
this.messages.remove(destination, id);
|
||||||
|
@ -127,7 +124,7 @@ public class MessagesManager {
|
||||||
messages.store(messageGuid, envelope, destination, deviceId);
|
messages.store(messageGuid, envelope, destination, deviceId);
|
||||||
|
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(destination, destinationUuid, deviceId, id);
|
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(destination, destinationUuid, deviceId, id);
|
||||||
removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id), experimentExecutor);
|
removeByIdExperiment.compareSupplierResult(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
||||||
|
|
Loading…
Reference in New Issue