Don't explicitly notify clients when messages get persisted.
This commit is contained in:
parent
7e14a0bc30
commit
66a04ed730
|
@ -326,7 +326,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
|
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
|
||||||
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
||||||
|
|
||||||
MessagePersister clusterMessagePersister = new MessagePersister(messagesCache, messagesManager, pubSubManager, pushSender, accountsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
|
MessagePersister clusterMessagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
|
||||||
|
|
||||||
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
|
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
|
||||||
|
|
||||||
|
|
|
@ -9,11 +9,8 @@ import io.dropwizard.lifecycle.Managed;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||||
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
|
|
||||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
import org.whispersystems.textsecuregcm.util.Util;
|
import org.whispersystems.textsecuregcm.util.Util;
|
||||||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
@ -27,8 +24,6 @@ public class MessagePersister implements Managed {
|
||||||
|
|
||||||
private final MessagesCache messagesCache;
|
private final MessagesCache messagesCache;
|
||||||
private final MessagesManager messagesManager;
|
private final MessagesManager messagesManager;
|
||||||
private final PubSubManager pubSubManager;
|
|
||||||
private final PushSender pushSender;
|
|
||||||
private final AccountsManager accountsManager;
|
private final AccountsManager accountsManager;
|
||||||
|
|
||||||
private final Duration persistDelay;
|
private final Duration persistDelay;
|
||||||
|
@ -39,7 +34,6 @@ public class MessagePersister implements Managed {
|
||||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||||
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
|
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
|
||||||
private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue"));
|
private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue"));
|
||||||
private final Timer notifySubscribersTimer = metricRegistry.timer(name(MessagePersister.class, "notifySubscribers"));
|
|
||||||
private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount"));
|
private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount"));
|
||||||
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize"));
|
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize"));
|
||||||
|
|
||||||
|
@ -50,11 +44,9 @@ public class MessagePersister implements Managed {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
||||||
|
|
||||||
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final Duration persistDelay) {
|
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final Duration persistDelay) {
|
||||||
this.messagesCache = messagesCache;
|
this.messagesCache = messagesCache;
|
||||||
this.messagesManager = messagesManager;
|
this.messagesManager = messagesManager;
|
||||||
this.pubSubManager = pubSubManager;
|
|
||||||
this.pushSender = pushSender;
|
|
||||||
this.accountsManager = accountsManager;
|
this.accountsManager = accountsManager;
|
||||||
this.persistDelay = persistDelay;
|
this.persistDelay = persistDelay;
|
||||||
}
|
}
|
||||||
|
@ -102,7 +94,6 @@ public class MessagePersister implements Managed {
|
||||||
|
|
||||||
for (final String queue : queuesToPersist) {
|
for (final String queue : queuesToPersist) {
|
||||||
persistQueue(queue);
|
persistQueue(queue);
|
||||||
notifyClients(MessagesCache.getAccountUuidFromQueueName(queue), MessagesCache.getDeviceIdFromQueueName(queue));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
queuesPersisted += queuesToPersist.size();
|
queuesPersisted += queuesToPersist.size();
|
||||||
|
@ -149,40 +140,4 @@ public class MessagePersister implements Managed {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void notifyClients(final UUID accountUuid, final long deviceId) {
|
|
||||||
try (final Timer.Context ignored = notifySubscribersTimer.time()) {
|
|
||||||
final Optional<Account> maybeAccount = accountsManager.get(accountUuid);
|
|
||||||
|
|
||||||
final String address;
|
|
||||||
|
|
||||||
if (maybeAccount.isPresent()) {
|
|
||||||
address = maybeAccount.get().getNumber();
|
|
||||||
} else {
|
|
||||||
logger.error("No account record found for account {}", accountUuid);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
final boolean notified = pubSubManager.publish(new WebsocketAddress(address, deviceId),
|
|
||||||
PubSubProtos.PubSubMessage.newBuilder()
|
|
||||||
.setType(PubSubProtos.PubSubMessage.Type.QUERY_DB)
|
|
||||||
.build());
|
|
||||||
|
|
||||||
if (!notified) {
|
|
||||||
Optional<Account> account = accountsManager.get(address);
|
|
||||||
|
|
||||||
if (account.isPresent()) {
|
|
||||||
Optional<Device> device = account.get().getDevice(deviceId);
|
|
||||||
|
|
||||||
if (device.isPresent()) {
|
|
||||||
try {
|
|
||||||
pushSender.sendQueuedNotification(account.get(), device.get());
|
|
||||||
} catch (final NotPushRegisteredException e) {
|
|
||||||
logger.warn("After message persistence, no longer push registered!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,6 @@ import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
|
||||||
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
|
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
@ -34,7 +33,6 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
private ExecutorService notificationExecutorService;
|
private ExecutorService notificationExecutorService;
|
||||||
private MessagesCache messagesCache;
|
private MessagesCache messagesCache;
|
||||||
private Messages messagesDatabase;
|
private Messages messagesDatabase;
|
||||||
private PubSubManager pubSubManager;
|
|
||||||
private MessagePersister messagePersister;
|
private MessagePersister messagePersister;
|
||||||
private AccountsManager accountsManager;
|
private AccountsManager accountsManager;
|
||||||
|
|
||||||
|
@ -55,7 +53,6 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
messagesDatabase = mock(Messages.class);
|
messagesDatabase = mock(Messages.class);
|
||||||
accountsManager = mock(AccountsManager.class);
|
accountsManager = mock(AccountsManager.class);
|
||||||
pubSubManager = mock(PubSubManager.class);
|
|
||||||
|
|
||||||
final Account account = mock(Account.class);
|
final Account account = mock(Account.class);
|
||||||
|
|
||||||
|
@ -64,7 +61,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
notificationExecutorService = Executors.newSingleThreadExecutor();
|
notificationExecutorService = Executors.newSingleThreadExecutor();
|
||||||
messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService);
|
messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService);
|
||||||
messagePersister = new MessagePersister(messagesCache, messagesManager, pubSubManager, mock(PushSender.class), accountsManager, PERSIST_DELAY);
|
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, PERSIST_DELAY);
|
||||||
|
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
final String destination = invocation.getArgument(0, String.class);
|
final String destination = invocation.getArgument(0, String.class);
|
||||||
|
@ -148,7 +145,6 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
messagePersister.persistNextQueues(now.plus(messagePersister.getPersistDelay()));
|
messagePersister.persistNextQueues(now.plus(messagePersister.getPersistDelay()));
|
||||||
|
|
||||||
verify(pubSubManager, times(queueCount)).publish(any(), any());
|
|
||||||
verify(messagesDatabase, times(queueCount * messagesPerQueue)).store(any(UUID.class), any(MessageProtos.Envelope.class), anyString(), anyLong());
|
verify(messagesDatabase, times(queueCount * messagesPerQueue)).store(any(UUID.class), any(MessageProtos.Envelope.class), anyString(), anyLong());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue