Drop pub/sub sending logic from WebsocketSender.

This commit is contained in:
Jon Chambers 2020-09-10 11:37:53 -04:00 committed by Jon Chambers
parent 66a04ed730
commit 17d18b22c7
3 changed files with 3 additions and 52 deletions

View File

@ -305,7 +305,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager); PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration()); APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey()); GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager, featureFlagsManager); WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager);
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster); RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager); AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);

View File

@ -80,14 +80,6 @@ public class PushSender implements Managed {
} }
} }
public void sendQueuedNotification(Account account, Device device)
throws NotPushRegisteredException
{
if (device.getGcmId() != null) sendGcmNotification(account, device);
else if (device.getApnId() != null) sendApnNotification(account, device, true);
else if (!device.getFetchesMessages()) throw new NotPushRegisteredException("No notification possible!");
}
public WebsocketSender getWebSocketSender() { public WebsocketSender getWebSocketSender() {
return webSocketSender; return webSocketSender;
} }

View File

@ -26,12 +26,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PubSubManager; import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.websocket.ProvisioningAddress; import org.whispersystems.textsecuregcm.websocket.ProvisioningAddress;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
@ -50,7 +48,6 @@ public class WebsocketSender {
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter websocketRequeueMeter = metricRegistry.meter(name(getClass(), "ws_requeue"));
private final Meter websocketOnlineMeter = metricRegistry.meter(name(getClass(), "ws_online" )); private final Meter websocketOnlineMeter = metricRegistry.meter(name(getClass(), "ws_online" ));
private final Meter websocketOfflineMeter = metricRegistry.meter(name(getClass(), "ws_offline" )); private final Meter websocketOfflineMeter = metricRegistry.meter(name(getClass(), "ws_offline" ));
@ -69,15 +66,11 @@ public class WebsocketSender {
private final MessagesManager messagesManager; private final MessagesManager messagesManager;
private final PubSubManager pubSubManager; private final PubSubManager pubSubManager;
private final ClientPresenceManager clientPresenceManager; private final ClientPresenceManager clientPresenceManager;
private final FeatureFlagsManager featureFlagsManager;
private static final String KEYSPACE_DELIVERY_FEATURE_FLAG = "keyspace-delivery-for-all-messages"; public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager) {
public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager, final FeatureFlagsManager featureFlagsManager) {
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager; this.pubSubManager = pubSubManager;
this.clientPresenceManager = clientPresenceManager; this.clientPresenceManager = clientPresenceManager;
this.featureFlagsManager = featureFlagsManager;
} }
public boolean sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) { public boolean sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) {
@ -92,7 +85,7 @@ public class WebsocketSender {
ephemeralOfflineCounter.increment(); ephemeralOfflineCounter.increment();
return false; return false;
} }
} else if (featureFlagsManager.isFeatureFlagActive(KEYSPACE_DELIVERY_FEATURE_FLAG)) { } else {
messagesManager.insert(account.getUuid(), device.getId(), message); messagesManager.insert(account.getUuid(), device.getId(), message);
if (clientPresent) { if (clientPresent) {
@ -108,42 +101,8 @@ public class WebsocketSender {
return false; return false;
} }
} else {
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
PubSubMessage pubSubMessage = PubSubMessage.newBuilder()
.setType(PubSubMessage.Type.DELIVER)
.setContent(message.toByteString())
.build();
pubSubManager.publish(address, pubSubMessage);
if (clientPresent) {
if (channel == Type.APN) apnOnlineMeter.mark();
else if (channel == Type.GCM) gcmOnlineMeter.mark();
else websocketOnlineMeter.mark();
return true;
} else {
if (channel == Type.APN) apnOfflineMeter.mark();
else if (channel == Type.GCM) gcmOfflineMeter.mark();
else websocketOfflineMeter.mark();
queueMessage(account, device, message);
return false;
} }
} }
}
public void queueMessage(Account account, Device device, Envelope message) {
websocketRequeueMeter.mark();
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
messagesManager.insert(account.getUuid(), device.getId(), message);
pubSubManager.publish(address, PubSubMessage.newBuilder()
.setType(PubSubMessage.Type.QUERY_DB)
.build());
}
public boolean sendProvisioningMessage(ProvisioningAddress address, byte[] body) { public boolean sendProvisioningMessage(ProvisioningAddress address, byte[] body) {
PubSubMessage pubSubMessage = PubSubMessage.newBuilder() PubSubMessage pubSubMessage = PubSubMessage.newBuilder()