Rely solely on the clustered message cache.
This commit is contained in:
parent
39c4117409
commit
81e8143a43
|
@ -125,9 +125,7 @@ import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||||
import org.whispersystems.textsecuregcm.storage.FeatureFlags;
|
import org.whispersystems.textsecuregcm.storage.FeatureFlags;
|
||||||
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
|
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.Keys;
|
import org.whispersystems.textsecuregcm.storage.Keys;
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagePersister;
|
|
||||||
import org.whispersystems.textsecuregcm.storage.Messages;
|
import org.whispersystems.textsecuregcm.storage.Messages;
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagesCache;
|
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
|
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
|
||||||
import org.whispersystems.textsecuregcm.storage.PendingAccountsManager;
|
import org.whispersystems.textsecuregcm.storage.PendingAccountsManager;
|
||||||
|
@ -151,7 +149,6 @@ import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler;
|
||||||
import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener;
|
import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener;
|
||||||
import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
|
import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
|
||||||
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
|
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
|
||||||
import org.whispersystems.textsecuregcm.workers.ClearMessagesCacheClusterCommand;
|
|
||||||
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
|
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
|
||||||
import org.whispersystems.textsecuregcm.workers.ScourMessageCacheCommand;
|
import org.whispersystems.textsecuregcm.workers.ScourMessageCacheCommand;
|
||||||
import org.whispersystems.textsecuregcm.workers.VacuumCommand;
|
import org.whispersystems.textsecuregcm.workers.VacuumCommand;
|
||||||
|
@ -186,7 +183,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
bootstrap.addCommand(new DeleteUserCommand());
|
bootstrap.addCommand(new DeleteUserCommand());
|
||||||
bootstrap.addCommand(new CertificateCommand());
|
bootstrap.addCommand(new CertificateCommand());
|
||||||
bootstrap.addCommand(new ZkParamsCommand());
|
bootstrap.addCommand(new ZkParamsCommand());
|
||||||
bootstrap.addCommand(new ClearMessagesCacheClusterCommand());
|
|
||||||
bootstrap.addCommand(new ScourMessageCacheCommand());
|
bootstrap.addCommand(new ScourMessageCacheCommand());
|
||||||
|
|
||||||
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("accountdb", "accountsdb.xml") {
|
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("accountdb", "accountsdb.xml") {
|
||||||
|
@ -275,12 +271,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
|
|
||||||
RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
|
RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
|
||||||
RedisClientFactory directoryClientFactory = new RedisClientFactory("directory_cache", config.getDirectoryConfiguration().getRedisConfiguration().getUrl(), config.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), config.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
|
RedisClientFactory directoryClientFactory = new RedisClientFactory("directory_cache", config.getDirectoryConfiguration().getRedisConfiguration().getUrl(), config.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), config.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
|
||||||
RedisClientFactory messagesClientFactory = new RedisClientFactory("message_cache", config.getMessageCacheConfiguration().getRedisConfiguration().getUrl(), config.getMessageCacheConfiguration().getRedisConfiguration().getReplicaUrls(), config.getMessageCacheConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
|
|
||||||
RedisClientFactory pushSchedulerClientFactory = new RedisClientFactory("push_scheduler_cache", config.getPushScheduler().getUrl(), config.getPushScheduler().getReplicaUrls(), config.getPushScheduler().getCircuitBreakerConfiguration());
|
RedisClientFactory pushSchedulerClientFactory = new RedisClientFactory("push_scheduler_cache", config.getPushScheduler().getUrl(), config.getPushScheduler().getReplicaUrls(), config.getPushScheduler().getCircuitBreakerConfiguration());
|
||||||
|
|
||||||
ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool();
|
ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool();
|
||||||
ReplicatedJedisPool directoryClient = directoryClientFactory.getRedisClientPool();
|
ReplicatedJedisPool directoryClient = directoryClientFactory.getRedisClientPool();
|
||||||
ReplicatedJedisPool messagesClient = messagesClientFactory.getRedisClientPool();
|
|
||||||
ReplicatedJedisPool pushSchedulerClient = pushSchedulerClientFactory.getRedisClientPool();
|
ReplicatedJedisPool pushSchedulerClient = pushSchedulerClientFactory.getRedisClientPool();
|
||||||
|
|
||||||
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration());
|
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration());
|
||||||
|
@ -301,9 +295,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
||||||
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
||||||
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster, messageNotificationExecutor);
|
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster, messageNotificationExecutor);
|
||||||
MessagesCache messagesCache = new MessagesCache(messagesClient);
|
|
||||||
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
||||||
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, clusterMessagesCache, pushLatencyManager);
|
MessagesManager messagesManager = new MessagesManager(messages, clusterMessagesCache, pushLatencyManager);
|
||||||
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
||||||
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, refreshFeatureFlagsExecutor);
|
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, refreshFeatureFlagsExecutor);
|
||||||
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
||||||
|
@ -332,8 +325,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
|
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
|
||||||
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
||||||
|
|
||||||
MessagePersister messagePersister = new MessagePersister(messagesClient, messagesManager, pubSubManager, pushSender, accountsManager, featureFlagsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes(), TimeUnit.MINUTES);
|
RedisClusterMessagePersister clusterMessagePersister = new RedisClusterMessagePersister(clusterMessagesCache, messagesManager, pubSubManager, pushSender, accountsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
|
||||||
RedisClusterMessagePersister clusterMessagePersister = new RedisClusterMessagePersister(clusterMessagesCache, messagesManager, pubSubManager, pushSender, accountsManager, featureFlagsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
|
|
||||||
|
|
||||||
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
|
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
|
||||||
|
|
||||||
|
@ -351,7 +343,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
environment.lifecycle().manage(apnFallbackManager);
|
environment.lifecycle().manage(apnFallbackManager);
|
||||||
environment.lifecycle().manage(pubSubManager);
|
environment.lifecycle().manage(pubSubManager);
|
||||||
environment.lifecycle().manage(pushSender);
|
environment.lifecycle().manage(pushSender);
|
||||||
environment.lifecycle().manage(messagePersister);
|
|
||||||
environment.lifecycle().manage(accountDatabaseCrawler);
|
environment.lifecycle().manage(accountDatabaseCrawler);
|
||||||
environment.lifecycle().manage(remoteConfigsManager);
|
environment.lifecycle().manage(remoteConfigsManager);
|
||||||
environment.lifecycle().manage(clusterMessagesCache);
|
environment.lifecycle().manage(clusterMessagesCache);
|
||||||
|
|
|
@ -1,221 +0,0 @@
|
||||||
package org.whispersystems.textsecuregcm.storage;
|
|
||||||
|
|
||||||
import com.codahale.metrics.Histogram;
|
|
||||||
import com.codahale.metrics.MetricRegistry;
|
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
|
||||||
import com.codahale.metrics.Timer;
|
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
|
||||||
import io.dropwizard.lifecycle.Managed;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
|
||||||
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
|
|
||||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
|
||||||
import org.whispersystems.textsecuregcm.redis.LuaScript;
|
|
||||||
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
|
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
|
||||||
import org.whispersystems.textsecuregcm.util.Util;
|
|
||||||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
|
||||||
import redis.clients.jedis.Jedis;
|
|
||||||
import redis.clients.jedis.Tuple;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
|
||||||
|
|
||||||
public class MessagePersister implements Managed, Runnable {
|
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
|
||||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
|
||||||
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagesCache.class, "getQueues" ));
|
|
||||||
private final Timer persistQueueTimer = metricRegistry.timer(name(MessagesCache.class, "persistQueue"));
|
|
||||||
private final Timer notifyTimer = metricRegistry.timer(name(MessagesCache.class, "notifyUser" ));
|
|
||||||
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagesCache.class, "persistQueueSize" ));
|
|
||||||
private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagesCache.class, "persistQueueCount"));
|
|
||||||
|
|
||||||
private static final int CHUNK_SIZE = 100;
|
|
||||||
|
|
||||||
private final AtomicBoolean running = new AtomicBoolean(true);
|
|
||||||
|
|
||||||
private final ReplicatedJedisPool jedisPool;
|
|
||||||
private final long delayTime;
|
|
||||||
private final TimeUnit delayTimeUnit;
|
|
||||||
|
|
||||||
private final MessagesManager messagesManager;
|
|
||||||
private final PubSubManager pubSubManager;
|
|
||||||
private final PushSender pushSender;
|
|
||||||
private final AccountsManager accountsManager;
|
|
||||||
private final FeatureFlagsManager featureFlagsManager;
|
|
||||||
|
|
||||||
private final LuaScript getQueuesScript;
|
|
||||||
|
|
||||||
private boolean finished = false;
|
|
||||||
|
|
||||||
private static final String DISABLE_PERSISTENCE_FLAG = "disable-singleton-persister";
|
|
||||||
|
|
||||||
public MessagePersister(final ReplicatedJedisPool jedisPool,
|
|
||||||
final MessagesManager messagesManager,
|
|
||||||
final PubSubManager pubSubManager,
|
|
||||||
final PushSender pushSender,
|
|
||||||
final AccountsManager accountsManager,
|
|
||||||
final FeatureFlagsManager featureFlagsManager,
|
|
||||||
final long delayTime,
|
|
||||||
final TimeUnit delayTimeUnit)
|
|
||||||
throws IOException
|
|
||||||
{
|
|
||||||
this.jedisPool = jedisPool;
|
|
||||||
|
|
||||||
this.messagesManager = messagesManager;
|
|
||||||
this.pubSubManager = pubSubManager;
|
|
||||||
this.pushSender = pushSender;
|
|
||||||
this.accountsManager = accountsManager;
|
|
||||||
this.featureFlagsManager = featureFlagsManager;
|
|
||||||
|
|
||||||
this.delayTime = delayTime;
|
|
||||||
this.delayTimeUnit = delayTimeUnit;
|
|
||||||
this.getQueuesScript = LuaScript.fromResource(jedisPool, "lua/get_queues_to_persist.lua");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() {
|
|
||||||
new Thread(this, getClass().getSimpleName()).start();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
while (running.get()) {
|
|
||||||
if (!featureFlagsManager.isFeatureFlagActive(DISABLE_PERSISTENCE_FLAG)) {
|
|
||||||
try {
|
|
||||||
List<byte[]> queuesToPersist = getQueuesToPersist();
|
|
||||||
queueCountHistogram.update(queuesToPersist.size());
|
|
||||||
|
|
||||||
for (byte[] queue : queuesToPersist) {
|
|
||||||
Key key = Key.fromUserMessageQueue(queue);
|
|
||||||
|
|
||||||
persistQueue(jedisPool, key);
|
|
||||||
notifyClients(accountsManager, pubSubManager, pushSender, key);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (queuesToPersist.isEmpty()) {
|
|
||||||
//noinspection BusyWait
|
|
||||||
Thread.sleep(10_000);
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
|
||||||
logger.error("Exception while persisting: ", t);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
Thread.sleep(10_000);
|
|
||||||
} catch (final InterruptedException ignored) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (this) {
|
|
||||||
finished = true;
|
|
||||||
notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void stop() {
|
|
||||||
running.set(false);
|
|
||||||
while (!finished) Util.wait(this);
|
|
||||||
|
|
||||||
logger.info("Message persister shut down...");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void persistQueue(ReplicatedJedisPool jedisPool, Key key) {
|
|
||||||
Timer.Context timer = persistQueueTimer.time();
|
|
||||||
|
|
||||||
int messagesPersistedCount = 0;
|
|
||||||
|
|
||||||
UUID destinationUuid = accountsManager.get(key.getAddress()).map(Account::getUuid).orElse(null);
|
|
||||||
|
|
||||||
try (Jedis jedis = jedisPool.getWriteResource()) {
|
|
||||||
while (true) {
|
|
||||||
jedis.setex(key.getUserMessageQueuePersistInProgress(), 30, "1".getBytes());
|
|
||||||
|
|
||||||
Set<Tuple> messages = jedis.zrangeWithScores(key.getUserMessageQueue(), 0, CHUNK_SIZE);
|
|
||||||
|
|
||||||
for (Tuple message : messages) {
|
|
||||||
persistMessage(key, destinationUuid, (long)message.getScore(), message.getBinaryElement());
|
|
||||||
messagesPersistedCount++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (messages.size() < CHUNK_SIZE) {
|
|
||||||
jedis.del(key.getUserMessageQueuePersistInProgress());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
timer.stop();
|
|
||||||
queueSizeHistogram.update(messagesPersistedCount);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void persistMessage(Key key, UUID destinationUuid, long score, byte[] message) {
|
|
||||||
try {
|
|
||||||
MessageProtos.Envelope envelope = MessageProtos.Envelope.parseFrom(message);
|
|
||||||
UUID guid = envelope.hasServerGuid() ? UUID.fromString(envelope.getServerGuid()) : null;
|
|
||||||
|
|
||||||
envelope = envelope.toBuilder().clearServerGuid().build();
|
|
||||||
|
|
||||||
messagesManager.persistMessage(key.getAddress(), destinationUuid, envelope, guid, key.getDeviceId());
|
|
||||||
} catch (InvalidProtocolBufferException e) {
|
|
||||||
logger.error("Error parsing envelope", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<byte[]> getQueuesToPersist() {
|
|
||||||
Timer.Context timer = getQueuesTimer.time();
|
|
||||||
try {
|
|
||||||
long maxTime = System.currentTimeMillis() - delayTimeUnit.toMillis(delayTime);
|
|
||||||
List<byte[]> keys = Collections.singletonList(Key.getUserMessageQueueIndex());
|
|
||||||
List<byte[]> args = Arrays.asList(String.valueOf(maxTime).getBytes(), String.valueOf(100).getBytes());
|
|
||||||
|
|
||||||
//noinspection unchecked
|
|
||||||
return (List<byte[]>)getQueuesScript.execute(keys, args);
|
|
||||||
} finally {
|
|
||||||
timer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void notifyClients(AccountsManager accountsManager, PubSubManager pubSubManager, PushSender pushSender, Key key) {
|
|
||||||
Timer.Context timer = notifyTimer.time();
|
|
||||||
|
|
||||||
try {
|
|
||||||
boolean notified = pubSubManager.publish(new WebsocketAddress(key.getAddress(), key.getDeviceId()),
|
|
||||||
PubSubProtos.PubSubMessage.newBuilder()
|
|
||||||
.setType(PubSubProtos.PubSubMessage.Type.QUERY_DB)
|
|
||||||
.build());
|
|
||||||
|
|
||||||
if (!notified) {
|
|
||||||
Optional<Account> account = accountsManager.get(key.getAddress());
|
|
||||||
|
|
||||||
if (account.isPresent()) {
|
|
||||||
Optional<Device> device = account.get().getDevice(key.getDeviceId());
|
|
||||||
|
|
||||||
if (device.isPresent()) {
|
|
||||||
try {
|
|
||||||
pushSender.sendQueuedNotification(account.get(), device.get());
|
|
||||||
} catch (NotPushRegisteredException e) {
|
|
||||||
logger.warn("After message persistence, no longer push registered!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
timer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -7,7 +7,6 @@ import com.codahale.metrics.SharedMetricRegistries;
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
|
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
|
||||||
import org.whispersystems.textsecuregcm.experiment.Experiment;
|
|
||||||
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
|
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
|
||||||
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
|
@ -29,28 +28,17 @@ public class MessagesManager {
|
||||||
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
|
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
|
||||||
|
|
||||||
private final Messages messages;
|
private final Messages messages;
|
||||||
private final MessagesCache messagesCache;
|
|
||||||
private final RedisClusterMessagesCache clusterMessagesCache;
|
private final RedisClusterMessagesCache clusterMessagesCache;
|
||||||
private final PushLatencyManager pushLatencyManager;
|
private final PushLatencyManager pushLatencyManager;
|
||||||
|
|
||||||
private final Experiment insertExperiment = new Experiment("MessagesCache", "insert");
|
public MessagesManager(Messages messages, RedisClusterMessagesCache clusterMessagesCache, PushLatencyManager pushLatencyManager) {
|
||||||
private final Experiment removeByIdExperiment = new Experiment("MessagesCache", "removeById");
|
|
||||||
private final Experiment removeBySenderExperiment = new Experiment("MessagesCache", "removeBySender");
|
|
||||||
private final Experiment removeByUuidExperiment = new Experiment("MessagesCache", "removeByUuid");
|
|
||||||
private final Experiment getMessagesExperiment = new Experiment("MessagesCache", "getMessages");
|
|
||||||
|
|
||||||
public MessagesManager(Messages messages, MessagesCache messagesCache, RedisClusterMessagesCache clusterMessagesCache, PushLatencyManager pushLatencyManager) {
|
|
||||||
this.messages = messages;
|
this.messages = messages;
|
||||||
this.messagesCache = messagesCache;
|
|
||||||
this.clusterMessagesCache = clusterMessagesCache;
|
this.clusterMessagesCache = clusterMessagesCache;
|
||||||
this.pushLatencyManager = pushLatencyManager;
|
this.pushLatencyManager = pushLatencyManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) {
|
public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) {
|
||||||
final UUID guid = UUID.randomUUID();
|
clusterMessagesCache.insert(UUID.randomUUID(), destination, destinationUuid, destinationDevice, message);
|
||||||
final long messageId = messagesCache.insert(guid, destination, destinationUuid, destinationDevice, message);
|
|
||||||
|
|
||||||
insertExperiment.compareSupplierResult(messageId, () -> clusterMessagesCache.insert(guid, destination, destinationUuid, destinationDevice, message, messageId));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
||||||
|
@ -59,31 +47,25 @@ public class MessagesManager {
|
||||||
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
||||||
|
|
||||||
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
||||||
final List<OutgoingMessageEntity> messagesFromCache = this.messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size());
|
messages.addAll(clusterMessagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));
|
||||||
getMessagesExperiment.compareSupplierResult(messagesFromCache, () -> clusterMessagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));
|
|
||||||
|
|
||||||
messages.addAll(messagesFromCache);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE);
|
return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear(String destination, UUID destinationUuid) {
|
public void clear(String destination, UUID destinationUuid) {
|
||||||
this.messagesCache.clear(destination, destinationUuid);
|
|
||||||
this.clusterMessagesCache.clear(destination, destinationUuid);
|
this.clusterMessagesCache.clear(destination, destinationUuid);
|
||||||
this.messages.clear(destination);
|
this.messages.clear(destination);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear(String destination, UUID destinationUuid, long deviceId) {
|
public void clear(String destination, UUID destinationUuid, long deviceId) {
|
||||||
this.messagesCache.clear(destination, destinationUuid, deviceId);
|
|
||||||
this.clusterMessagesCache.clear(destination, destinationUuid, deviceId);
|
this.clusterMessagesCache.clear(destination, destinationUuid, deviceId);
|
||||||
this.messages.clear(destination, deviceId);
|
this.messages.clear(destination, deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp)
|
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp)
|
||||||
{
|
{
|
||||||
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp);
|
Optional<OutgoingMessageEntity> removed = clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp);
|
||||||
removeBySenderExperiment.compareSupplierResult(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp));
|
|
||||||
|
|
||||||
if (!removed.isPresent()) {
|
if (!removed.isPresent()) {
|
||||||
removed = this.messages.remove(destination, destinationDevice, source, timestamp);
|
removed = this.messages.remove(destination, destinationDevice, source, timestamp);
|
||||||
|
@ -96,8 +78,7 @@ public class MessagesManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
|
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
|
||||||
Optional<OutgoingMessageEntity> removed = this.messagesCache.remove(destination, destinationUuid, deviceId, guid);
|
Optional<OutgoingMessageEntity> removed = clusterMessagesCache.remove(destination, destinationUuid, deviceId, guid);
|
||||||
removeByUuidExperiment.compareSupplierResult(removed, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, guid));
|
|
||||||
|
|
||||||
if (!removed.isPresent()) {
|
if (!removed.isPresent()) {
|
||||||
removed = this.messages.remove(destination, guid);
|
removed = this.messages.remove(destination, guid);
|
||||||
|
@ -111,8 +92,7 @@ public class MessagesManager {
|
||||||
|
|
||||||
public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) {
|
public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) {
|
||||||
if (cached) {
|
if (cached) {
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = this.messagesCache.remove(destination, destinationUuid, deviceId, id);
|
clusterMessagesCache.remove(destination, destinationUuid, deviceId, id);
|
||||||
removeByIdExperiment.compareSupplierResult(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id));
|
|
||||||
cacheHitByIdMeter.mark();
|
cacheHitByIdMeter.mark();
|
||||||
} else {
|
} else {
|
||||||
this.messages.remove(destination, id);
|
this.messages.remove(destination, id);
|
||||||
|
@ -122,7 +102,7 @@ public class MessagesManager {
|
||||||
|
|
||||||
public void persistMessage(String destination, UUID destinationUuid, Envelope envelope, UUID messageGuid, long deviceId) {
|
public void persistMessage(String destination, UUID destinationUuid, Envelope envelope, UUID messageGuid, long deviceId) {
|
||||||
messages.store(messageGuid, envelope, destination, deviceId);
|
messages.store(messageGuid, envelope, destination, deviceId);
|
||||||
delete(destination, destinationUuid, deviceId, messageGuid);
|
clusterMessagesCache.remove(destination, destinationUuid, deviceId, messageGuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
||||||
|
|
|
@ -30,7 +30,6 @@ public class RedisClusterMessagePersister implements Managed {
|
||||||
private final PubSubManager pubSubManager;
|
private final PubSubManager pubSubManager;
|
||||||
private final PushSender pushSender;
|
private final PushSender pushSender;
|
||||||
private final AccountsManager accountsManager;
|
private final AccountsManager accountsManager;
|
||||||
private final FeatureFlagsManager featureFlagsManager;
|
|
||||||
|
|
||||||
private final Duration persistDelay;
|
private final Duration persistDelay;
|
||||||
|
|
||||||
|
@ -51,13 +50,12 @@ public class RedisClusterMessagePersister implements Managed {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RedisClusterMessagePersister.class);
|
private static final Logger logger = LoggerFactory.getLogger(RedisClusterMessagePersister.class);
|
||||||
|
|
||||||
public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final MessagesManager messagesManager, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final FeatureFlagsManager featureFlagsManager, final Duration persistDelay) {
|
public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final MessagesManager messagesManager, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final Duration persistDelay) {
|
||||||
this.messagesCache = messagesCache;
|
this.messagesCache = messagesCache;
|
||||||
this.messagesManager = messagesManager;
|
this.messagesManager = messagesManager;
|
||||||
this.pubSubManager = pubSubManager;
|
this.pubSubManager = pubSubManager;
|
||||||
this.pushSender = pushSender;
|
this.pushSender = pushSender;
|
||||||
this.accountsManager = accountsManager;
|
this.accountsManager = accountsManager;
|
||||||
this.featureFlagsManager = featureFlagsManager;
|
|
||||||
this.persistDelay = persistDelay;
|
this.persistDelay = persistDelay;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,10 +70,7 @@ public class RedisClusterMessagePersister implements Managed {
|
||||||
|
|
||||||
workerThread = new Thread(() -> {
|
workerThread = new Thread(() -> {
|
||||||
while (running) {
|
while (running) {
|
||||||
if (featureFlagsManager.isFeatureFlagActive(ENABLE_PERSISTENCE_FLAG)) {
|
persistNextQueues(Instant.now());
|
||||||
persistNextQueues(Instant.now());
|
|
||||||
}
|
|
||||||
|
|
||||||
Util.sleep(100);
|
Util.sleep(100);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -138,21 +138,6 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
|
||||||
guid.toString().getBytes(StandardCharsets.UTF_8))));
|
guid.toString().getBytes(StandardCharsets.UTF_8))));
|
||||||
}
|
}
|
||||||
|
|
||||||
public long insert(final UUID guid, final String destination, final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message, final long messageId) {
|
|
||||||
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
|
|
||||||
final String sender = message.hasSource() ? (message.getSource() + "::" + message.getTimestamp()) : "nil";
|
|
||||||
|
|
||||||
return (long)insertTimer.record(() ->
|
|
||||||
insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
|
||||||
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
|
|
||||||
getQueueIndexKey(destinationUuid, destinationDevice)),
|
|
||||||
List.of(messageWithGuid.toByteArray(),
|
|
||||||
String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8),
|
|
||||||
sender.getBytes(StandardCharsets.UTF_8),
|
|
||||||
guid.toString().getBytes(StandardCharsets.UTF_8),
|
|
||||||
String.valueOf(messageId).getBytes(StandardCharsets.UTF_8))));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<OutgoingMessageEntity> remove(final String destination, final UUID destinationUuid, final long destinationDevice, final long id) {
|
public Optional<OutgoingMessageEntity> remove(final String destination, final UUID destinationUuid, final long destinationDevice, final long id) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -1,20 +0,0 @@
|
||||||
package org.whispersystems.textsecuregcm.workers;
|
|
||||||
|
|
||||||
import io.dropwizard.cli.ConfiguredCommand;
|
|
||||||
import io.dropwizard.setup.Bootstrap;
|
|
||||||
import net.sourceforge.argparse4j.inf.Namespace;
|
|
||||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
|
||||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
|
||||||
|
|
||||||
public class ClearMessagesCacheClusterCommand extends ConfiguredCommand<WhisperServerConfiguration> {
|
|
||||||
|
|
||||||
public ClearMessagesCacheClusterCommand() {
|
|
||||||
super("clearmessagescluster", "remove all keys from messages cache cluster");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void run(final Bootstrap<WhisperServerConfiguration> bootstrap, final Namespace namespace, final WhisperServerConfiguration config) {
|
|
||||||
final FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration());
|
|
||||||
messagesCacheCluster.useCluster(connection -> connection.sync().masters().commands().flushallAsync());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,15 +1,7 @@
|
||||||
-- keys: queue_key [1], queue_metadata_key [2], queue_total_index [3]
|
-- keys: queue_key [1], queue_metadata_key [2], queue_total_index [3]
|
||||||
-- argv: message [1], current_time [2], sender (possibly null) [3], guid [4], messageId (possibly null) [5]
|
-- argv: message [1], current_time [2], sender (possibly null) [3], guid [4], messageId (possibly null) [5]
|
||||||
|
|
||||||
local messageId
|
local messageId = redis.call("HINCRBY", KEYS[2], "counter", 1)
|
||||||
|
|
||||||
if ARGV[5] ~= nil then
|
|
||||||
-- TODO: Remove this branch (and ARGV[5]) once the migration to a clustered message cache is finished
|
|
||||||
messageId = tonumber(ARGV[5])
|
|
||||||
redis.call("HSET", KEYS[2], "counter", messageId)
|
|
||||||
else
|
|
||||||
messageId = redis.call("HINCRBY", KEYS[2], "counter", 1)
|
|
||||||
end
|
|
||||||
|
|
||||||
redis.call("ZADD", KEYS[1], "NX", messageId, ARGV[1])
|
redis.call("ZADD", KEYS[1], "NX", messageId, ARGV[1])
|
||||||
|
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
notificationExecutorService = Executors.newSingleThreadExecutor();
|
notificationExecutorService = Executors.newSingleThreadExecutor();
|
||||||
messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService);
|
messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService);
|
||||||
messagePersister = new RedisClusterMessagePersister(messagesCache, messagesManager, pubSubManager, mock(PushSender.class), accountsManager, featureFlagsManager, PERSIST_DELAY);
|
messagePersister = new RedisClusterMessagePersister(messagesCache, messagesManager, pubSubManager, mock(PushSender.class), accountsManager, PERSIST_DELAY);
|
||||||
|
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
final String destination = invocation.getArgument(0, String.class);
|
final String destination = invocation.getArgument(0, String.class);
|
||||||
|
|
|
@ -54,17 +54,6 @@ public class RedisClusterMessagesCacheTest extends AbstractMessagesCacheTest {
|
||||||
return messagesCache;
|
return messagesCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
@Parameters({"true", "false"})
|
|
||||||
public void testInsertWithPrescribedId(final boolean sealedSender) {
|
|
||||||
final UUID firstMessageGuid = UUID.randomUUID();
|
|
||||||
final UUID secondMessageGuid = UUID.randomUUID();
|
|
||||||
final long messageId = 74;
|
|
||||||
|
|
||||||
assertEquals(messageId, messagesCache.insert(firstMessageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(firstMessageGuid, sealedSender), messageId));
|
|
||||||
assertEquals(messageId + 1, messagesCache.insert(secondMessageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, generateRandomMessage(secondMessageGuid, sealedSender)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClearNullUuid() {
|
public void testClearNullUuid() {
|
||||||
// We're happy as long as this doesn't throw an exception
|
// We're happy as long as this doesn't throw an exception
|
||||||
|
|
Loading…
Reference in New Issue