Use the MessagesManager to actually persist messages.

This commit is contained in:
Jon Chambers 2020-08-06 17:51:42 -04:00 committed by Jon Chambers
parent 5fad8f74b1
commit 0fcf28e7e7
3 changed files with 14 additions and 13 deletions

View File

@ -375,7 +375,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
MessagePersister messagePersister = new MessagePersister(messagesCache, messagesClient, messages, pubSubManager, pushSender, accountsManager,config.getMessageCacheConfiguration().getPersistDelayMinutes(), TimeUnit.MINUTES);
MessagePersister messagePersister = new MessagePersister(messagesClient, messagesManager, pubSubManager, pushSender, accountsManager,config.getMessageCacheConfiguration().getPersistDelayMinutes(), TimeUnit.MINUTES);
RedisClusterMessagePersister clusterMessagePersister = new RedisClusterMessagePersister(clusterMessagesCache, messages, pubSubManager, pushSender, accountsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());

View File

@ -33,7 +33,6 @@ import static com.codahale.metrics.MetricRegistry.name;
public class MessagePersister implements Managed, Runnable {
private final MessagesCache messagesCache;
private final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagesCache.class, "getQueues" ));
@ -47,10 +46,10 @@ public class MessagePersister implements Managed, Runnable {
private final AtomicBoolean running = new AtomicBoolean(true);
private final ReplicatedJedisPool jedisPool;
private final Messages database;
private final long delayTime;
private final TimeUnit delayTimeUnit;
private final MessagesManager messagesManager;
private final PubSubManager pubSubManager;
private final PushSender pushSender;
private final AccountsManager accountsManager;
@ -59,9 +58,8 @@ public class MessagePersister implements Managed, Runnable {
private boolean finished = false;
public MessagePersister(final MessagesCache messagesCache,
final ReplicatedJedisPool jedisPool,
final Messages database,
public MessagePersister(final ReplicatedJedisPool jedisPool,
final MessagesManager messagesManager,
final PubSubManager pubSubManager,
final PushSender pushSender,
final AccountsManager accountsManager,
@ -69,10 +67,9 @@ public class MessagePersister implements Managed, Runnable {
final TimeUnit delayTimeUnit)
throws IOException
{
this.messagesCache = messagesCache;
this.jedisPool = jedisPool;
this.database = database;
this.jedisPool = jedisPool;
this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager;
this.pushSender = pushSender;
this.accountsManager = accountsManager;
@ -156,16 +153,14 @@ public class MessagePersister implements Managed, Runnable {
private void persistMessage(Key key, UUID destinationUuid, long score, byte[] message) {
try {
MessageProtos.Envelope envelope = MessageProtos.Envelope.parseFrom(message);
UUID guid = envelope.hasServerGuid() ? UUID.fromString(envelope.getServerGuid()) : null;
UUID guid = envelope.hasServerGuid() ? UUID.fromString(envelope.getServerGuid()) : null;
envelope = envelope.toBuilder().clearServerGuid().build();
database.store(guid, envelope, key.getAddress(), key.getDeviceId());
messagesManager.persistMessage(key.getAddress(), destinationUuid, envelope, guid, key.getDeviceId(), score);
} catch (InvalidProtocolBufferException e) {
logger.error("Error parsing envelope", e);
}
messagesCache.remove(key.getAddress(), destinationUuid, key.getDeviceId(), score);
}
private List<byte[]> getQueuesToPersist() {

View File

@ -121,4 +121,10 @@ public class MessagesManager {
}
}
public void persistMessage(String destination, UUID destinationUuid, Envelope envelope, UUID messageGuid, long deviceId, long id) {
messages.store(messageGuid, envelope, destination, deviceId);
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(destination, destinationUuid, deviceId, id);
removeByIdExperiment.compareSupplierResultAsync(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id), experimentExecutor);
}
}