Optionally send online-only messages via keyspace notifications.

This commit is contained in:
Jon Chambers 2020-09-04 12:10:24 -04:00 committed by Jon Chambers
parent 06754d6158
commit 12fe28d8ab
4 changed files with 56 additions and 24 deletions

View File

@ -300,7 +300,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager); PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration()); APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey()); GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager); WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager, featureFlagsManager);
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster); RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager); AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);

View File

@ -20,19 +20,19 @@ import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.SharedMetricRegistries;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PubSubManager; import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.websocket.ProvisioningAddress; import org.whispersystems.textsecuregcm.websocket.ProvisioningAddress;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
import java.util.concurrent.Executor;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage; import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
@ -63,38 +63,56 @@ public class WebsocketSender {
private final Meter provisioningOnlineMeter = metricRegistry.meter(name(getClass(), "provisioning_online" )); private final Meter provisioningOnlineMeter = metricRegistry.meter(name(getClass(), "provisioning_online" ));
private final Meter provisioningOfflineMeter = metricRegistry.meter(name(getClass(), "provisioning_offline")); private final Meter provisioningOfflineMeter = metricRegistry.meter(name(getClass(), "provisioning_offline"));
private final Counter ephemeralOnlineCounter = Metrics.counter(name(getClass(), "ephemeral"), "online", "true");
private final Counter ephemeralOfflineCounter = Metrics.counter(name(getClass(), "ephemeral"), "offline", "true");
private final MessagesManager messagesManager; private final MessagesManager messagesManager;
private final PubSubManager pubSubManager; private final PubSubManager pubSubManager;
private final ClientPresenceManager clientPresenceManager; private final ClientPresenceManager clientPresenceManager;
private final FeatureFlagsManager featureFlagsManager;
public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager) { private static final String KEYSPACE_DELIVERY_FEATURE_FLAG = "keyspace-delivery-for-online-messages";
public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager, final FeatureFlagsManager featureFlagsManager) {
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager; this.pubSubManager = pubSubManager;
this.clientPresenceManager = clientPresenceManager; this.clientPresenceManager = clientPresenceManager;
this.featureFlagsManager = featureFlagsManager;
} }
public DeliveryStatus sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) { public DeliveryStatus sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) {
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId()); if (online && featureFlagsManager.isFeatureFlagActive(KEYSPACE_DELIVERY_FEATURE_FLAG)) {
PubSubMessage pubSubMessage = PubSubMessage.newBuilder() if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) {
.setType(PubSubMessage.Type.DELIVER) ephemeralOnlineCounter.increment();
.setContent(message.toByteString()) messagesManager.insertEphemeral(account.getUuid(), device.getId(), message);
.build(); return new DeliveryStatus(true);
} else {
pubSubManager.publish(address, pubSubMessage); ephemeralOfflineCounter.increment();
return new DeliveryStatus(false);
if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) { }
if (channel == Type.APN) apnOnlineMeter.mark();
else if (channel == Type.GCM) gcmOnlineMeter.mark();
else websocketOnlineMeter.mark();
return new DeliveryStatus(true);
} else { } else {
if (channel == Type.APN) apnOfflineMeter.mark(); WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
else if (channel == Type.GCM) gcmOfflineMeter.mark(); PubSubMessage pubSubMessage = PubSubMessage.newBuilder()
else websocketOfflineMeter.mark(); .setType(PubSubMessage.Type.DELIVER)
.setContent(message.toByteString())
.build();
if (!online) queueMessage(account, device, message); pubSubManager.publish(address, pubSubMessage);
return new DeliveryStatus(false);
if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) {
if (channel == Type.APN) apnOnlineMeter.mark();
else if (channel == Type.GCM) gcmOnlineMeter.mark();
else websocketOnlineMeter.mark();
return new DeliveryStatus(true);
} else {
if (channel == Type.APN) apnOfflineMeter.mark();
else if (channel == Type.GCM) gcmOfflineMeter.mark();
else websocketOfflineMeter.mark();
if (!online) queueMessage(account, device, message);
return new DeliveryStatus(false);
}
} }
} }

View File

@ -41,6 +41,14 @@ public class MessagesManager {
messagesCache.insert(UUID.randomUUID(), destinationUuid, destinationDevice, message); messagesCache.insert(UUID.randomUUID(), destinationUuid, destinationDevice, message);
} }
public void insertEphemeral(final UUID destinationUuid, final long destinationDevice, final Envelope message) {
messagesCache.insertEphemeral(UUID.randomUUID(), destinationUuid, destinationDevice, message);
}
public Optional<Envelope> takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice, final UUID messageGuid) {
return messagesCache.takeEphemeralMessage(destinationUuid, destinationDevice, messageGuid);
}
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) { public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destinationUuid, destinationDevice, userAgent)); RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destinationUuid, destinationDevice, userAgent));

View File

@ -225,6 +225,12 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
@Override @Override
public void handleEphemeralMessageAvailable(final UUID ephemeralMessageGuid) { public void handleEphemeralMessageAvailable(final UUID ephemeralMessageGuid) {
ephemeralMessageAvailableMeter.mark(); ephemeralMessageAvailableMeter.mark();
final Optional<Envelope> maybeMessage = messagesManager.takeEphemeralMessage(account.getUuid(), device.getId(), ephemeralMessageGuid);
if (maybeMessage.isPresent()) {
sendMessage(maybeMessage.get(), Optional.empty(), false);
}
} }
@Override @Override