Optionally send online-only messages via keyspace notifications.
This commit is contained in:
parent
fdef21a871
commit
5e34823a49
|
@ -300,7 +300,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
|
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
|
||||||
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
|
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
|
||||||
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
|
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
|
||||||
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager);
|
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager, featureFlagsManager);
|
||||||
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
|
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
|
||||||
|
|
||||||
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
|
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
|
||||||
|
|
|
@ -20,10 +20,13 @@ import com.codahale.metrics.Meter;
|
||||||
import com.codahale.metrics.MetricRegistry;
|
import com.codahale.metrics.MetricRegistry;
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
import io.micrometer.core.instrument.Counter;
|
||||||
|
import io.micrometer.core.instrument.Metrics;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
import org.whispersystems.textsecuregcm.storage.Device;
|
import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
|
@ -60,38 +63,56 @@ public class WebsocketSender {
|
||||||
private final Meter provisioningOnlineMeter = metricRegistry.meter(name(getClass(), "provisioning_online" ));
|
private final Meter provisioningOnlineMeter = metricRegistry.meter(name(getClass(), "provisioning_online" ));
|
||||||
private final Meter provisioningOfflineMeter = metricRegistry.meter(name(getClass(), "provisioning_offline"));
|
private final Meter provisioningOfflineMeter = metricRegistry.meter(name(getClass(), "provisioning_offline"));
|
||||||
|
|
||||||
|
private final Counter ephemeralOnlineCounter = Metrics.counter(name(getClass(), "ephemeral"), "online", "true");
|
||||||
|
private final Counter ephemeralOfflineCounter = Metrics.counter(name(getClass(), "ephemeral"), "offline", "true");
|
||||||
|
|
||||||
private final MessagesManager messagesManager;
|
private final MessagesManager messagesManager;
|
||||||
private final PubSubManager pubSubManager;
|
private final PubSubManager pubSubManager;
|
||||||
private final ClientPresenceManager clientPresenceManager;
|
private final ClientPresenceManager clientPresenceManager;
|
||||||
|
private final FeatureFlagsManager featureFlagsManager;
|
||||||
|
|
||||||
public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager) {
|
private static final String KEYSPACE_DELIVERY_FEATURE_FLAG = "keyspace-delivery-for-online-messages";
|
||||||
|
|
||||||
|
public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager, final FeatureFlagsManager featureFlagsManager) {
|
||||||
this.messagesManager = messagesManager;
|
this.messagesManager = messagesManager;
|
||||||
this.pubSubManager = pubSubManager;
|
this.pubSubManager = pubSubManager;
|
||||||
this.clientPresenceManager = clientPresenceManager;
|
this.clientPresenceManager = clientPresenceManager;
|
||||||
|
this.featureFlagsManager = featureFlagsManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) {
|
public boolean sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) {
|
||||||
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
|
if (online && featureFlagsManager.isFeatureFlagActive(KEYSPACE_DELIVERY_FEATURE_FLAG)) {
|
||||||
PubSubMessage pubSubMessage = PubSubMessage.newBuilder()
|
if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) {
|
||||||
.setType(PubSubMessage.Type.DELIVER)
|
ephemeralOnlineCounter.increment();
|
||||||
.setContent(message.toByteString())
|
messagesManager.insertEphemeral(account.getUuid(), device.getId(), message);
|
||||||
.build();
|
return true;
|
||||||
|
} else {
|
||||||
pubSubManager.publish(address, pubSubMessage);
|
ephemeralOfflineCounter.increment();
|
||||||
|
return false;
|
||||||
if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) {
|
}
|
||||||
if (channel == Type.APN) apnOnlineMeter.mark();
|
|
||||||
else if (channel == Type.GCM) gcmOnlineMeter.mark();
|
|
||||||
else websocketOnlineMeter.mark();
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} else {
|
} else {
|
||||||
if (channel == Type.APN) apnOfflineMeter.mark();
|
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
|
||||||
else if (channel == Type.GCM) gcmOfflineMeter.mark();
|
PubSubMessage pubSubMessage = PubSubMessage.newBuilder()
|
||||||
else websocketOfflineMeter.mark();
|
.setType(PubSubMessage.Type.DELIVER)
|
||||||
|
.setContent(message.toByteString())
|
||||||
|
.build();
|
||||||
|
|
||||||
if (!online) queueMessage(account, device, message);
|
pubSubManager.publish(address, pubSubMessage);
|
||||||
return false;
|
|
||||||
|
if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) {
|
||||||
|
if (channel == Type.APN) apnOnlineMeter.mark();
|
||||||
|
else if (channel == Type.GCM) gcmOnlineMeter.mark();
|
||||||
|
else websocketOnlineMeter.mark();
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
if (channel == Type.APN) apnOfflineMeter.mark();
|
||||||
|
else if (channel == Type.GCM) gcmOfflineMeter.mark();
|
||||||
|
else websocketOfflineMeter.mark();
|
||||||
|
|
||||||
|
if (!online) queueMessage(account, device, message);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,6 +41,14 @@ public class MessagesManager {
|
||||||
messagesCache.insert(UUID.randomUUID(), destinationUuid, destinationDevice, message);
|
messagesCache.insert(UUID.randomUUID(), destinationUuid, destinationDevice, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void insertEphemeral(final UUID destinationUuid, final long destinationDevice, final Envelope message) {
|
||||||
|
messagesCache.insertEphemeral(destinationUuid, destinationDevice, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<Envelope> takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice) {
|
||||||
|
return messagesCache.takeEphemeralMessage(destinationUuid, destinationDevice);
|
||||||
|
}
|
||||||
|
|
||||||
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
||||||
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destinationUuid, destinationDevice, userAgent));
|
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destinationUuid, destinationDevice, userAgent));
|
||||||
|
|
||||||
|
|
|
@ -224,6 +224,12 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
@Override
|
@Override
|
||||||
public void handleNewEphemeralMessageAvailable() {
|
public void handleNewEphemeralMessageAvailable() {
|
||||||
ephemeralMessageAvailableMeter.mark();
|
ephemeralMessageAvailableMeter.mark();
|
||||||
|
|
||||||
|
final Optional<Envelope> maybeMessage = messagesManager.takeEphemeralMessage(account.getUuid(), device.getId());
|
||||||
|
|
||||||
|
if (maybeMessage.isPresent()) {
|
||||||
|
sendMessage(maybeMessage.get(), Optional.empty(), false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue