Move provisioning message-sending to its own manager class.
This commit is contained in:
parent
32bf742709
commit
5e30b0499a
|
@ -103,6 +103,7 @@ import org.whispersystems.textsecuregcm.push.APNSender;
|
||||||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||||
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
||||||
import org.whispersystems.textsecuregcm.push.GCMSender;
|
import org.whispersystems.textsecuregcm.push.GCMSender;
|
||||||
|
import org.whispersystems.textsecuregcm.push.ProvisioningManager;
|
||||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||||
import org.whispersystems.textsecuregcm.push.WebsocketSender;
|
import org.whispersystems.textsecuregcm.push.WebsocketSender;
|
||||||
|
@ -312,8 +313,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
|
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
|
||||||
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
|
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
|
||||||
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
|
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
|
||||||
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager);
|
WebsocketSender websocketSender = new WebsocketSender(messagesManager, clientPresenceManager);
|
||||||
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
|
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
|
||||||
|
ProvisioningManager provisioningManager = new ProvisioningManager(pubSubManager);
|
||||||
|
|
||||||
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
|
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
|
||||||
DisabledPermittedAccountAuthenticator disabledPermittedAccountAuthenticator = new DisabledPermittedAccountAuthenticator(accountsManager);
|
DisabledPermittedAccountAuthenticator disabledPermittedAccountAuthenticator = new DisabledPermittedAccountAuthenticator(accountsManager);
|
||||||
|
@ -394,7 +396,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
environment.jersey().register(new AccountController(pendingAccountsManager, accountsManager, usernamesManager, abusiveHostRules, rateLimiters, smsSender, directoryQueue, messagesManager, turnTokenGenerator, config.getTestDevices(), recaptchaClient, gcmSender, apnSender, backupCredentialsGenerator));
|
environment.jersey().register(new AccountController(pendingAccountsManager, accountsManager, usernamesManager, abusiveHostRules, rateLimiters, smsSender, directoryQueue, messagesManager, turnTokenGenerator, config.getTestDevices(), recaptchaClient, gcmSender, apnSender, backupCredentialsGenerator));
|
||||||
environment.jersey().register(new DeviceController(pendingDevicesManager, accountsManager, messagesManager, directoryQueue, rateLimiters, config.getMaxDevices()));
|
environment.jersey().register(new DeviceController(pendingDevicesManager, accountsManager, messagesManager, directoryQueue, rateLimiters, config.getMaxDevices()));
|
||||||
environment.jersey().register(new DirectoryController(rateLimiters, directory, directoryCredentialsGenerator));
|
environment.jersey().register(new DirectoryController(rateLimiters, directory, directoryCredentialsGenerator));
|
||||||
environment.jersey().register(new ProvisioningController(rateLimiters, pushSender));
|
environment.jersey().register(new ProvisioningController(rateLimiters, provisioningManager));
|
||||||
environment.jersey().register(new CertificateController(new CertificateGenerator(config.getDeliveryCertificate().getCertificate(), config.getDeliveryCertificate().getPrivateKey(), config.getDeliveryCertificate().getExpiresDays()), zkAuthOperations, isZkEnabled));
|
environment.jersey().register(new CertificateController(new CertificateGenerator(config.getDeliveryCertificate().getCertificate(), config.getDeliveryCertificate().getPrivateKey(), config.getDeliveryCertificate().getExpiresDays()), zkAuthOperations, isZkEnabled));
|
||||||
environment.jersey().register(new VoiceVerificationController(config.getVoiceVerificationConfiguration().getUrl(), config.getVoiceVerificationConfiguration().getLocales()));
|
environment.jersey().register(new VoiceVerificationController(config.getVoiceVerificationConfiguration().getUrl(), config.getVoiceVerificationConfiguration().getLocales()));
|
||||||
environment.jersey().register(new SecureStorageController(storageCredentialsGenerator));
|
environment.jersey().register(new SecureStorageController(storageCredentialsGenerator));
|
||||||
|
|
|
@ -3,8 +3,7 @@ package org.whispersystems.textsecuregcm.controllers;
|
||||||
import com.codahale.metrics.annotation.Timed;
|
import com.codahale.metrics.annotation.Timed;
|
||||||
import org.whispersystems.textsecuregcm.entities.ProvisioningMessage;
|
import org.whispersystems.textsecuregcm.entities.ProvisioningMessage;
|
||||||
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
||||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
import org.whispersystems.textsecuregcm.push.ProvisioningManager;
|
||||||
import org.whispersystems.textsecuregcm.push.WebsocketSender;
|
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
import org.whispersystems.textsecuregcm.util.Base64;
|
import org.whispersystems.textsecuregcm.util.Base64;
|
||||||
import org.whispersystems.textsecuregcm.websocket.InvalidWebsocketAddressException;
|
import org.whispersystems.textsecuregcm.websocket.InvalidWebsocketAddressException;
|
||||||
|
@ -26,12 +25,12 @@ import io.dropwizard.auth.Auth;
|
||||||
@Path("/v1/provisioning")
|
@Path("/v1/provisioning")
|
||||||
public class ProvisioningController {
|
public class ProvisioningController {
|
||||||
|
|
||||||
private final RateLimiters rateLimiters;
|
private final RateLimiters rateLimiters;
|
||||||
private final WebsocketSender websocketSender;
|
private final ProvisioningManager provisioningManager;
|
||||||
|
|
||||||
public ProvisioningController(RateLimiters rateLimiters, PushSender pushSender) {
|
public ProvisioningController(RateLimiters rateLimiters, ProvisioningManager provisioningManager) {
|
||||||
this.rateLimiters = rateLimiters;
|
this.rateLimiters = rateLimiters;
|
||||||
this.websocketSender = pushSender.getWebSocketSender();
|
this.provisioningManager = provisioningManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Timed
|
@Timed
|
||||||
|
@ -46,8 +45,8 @@ public class ProvisioningController {
|
||||||
{
|
{
|
||||||
rateLimiters.getMessagesLimiter().validate(source.getNumber());
|
rateLimiters.getMessagesLimiter().validate(source.getNumber());
|
||||||
|
|
||||||
if (!websocketSender.sendProvisioningMessage(new ProvisioningAddress(destinationName, 0),
|
if (!provisioningManager.sendProvisioningMessage(new ProvisioningAddress(destinationName, 0),
|
||||||
Base64.decode(message.getBody())))
|
Base64.decode(message.getBody())))
|
||||||
{
|
{
|
||||||
throw new WebApplicationException(Response.Status.NOT_FOUND);
|
throw new WebApplicationException(Response.Status.NOT_FOUND);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
package org.whispersystems.textsecuregcm.push;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import io.micrometer.core.instrument.Counter;
|
||||||
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
|
||||||
|
import org.whispersystems.textsecuregcm.websocket.ProvisioningAddress;
|
||||||
|
|
||||||
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
|
|
||||||
|
public class ProvisioningManager {
|
||||||
|
private final PubSubManager pubSubManager;
|
||||||
|
|
||||||
|
private final Counter provisioningMessageOnlineCounter = Metrics.counter(name(getClass(), "sendProvisioningMessage"), "online", "true");
|
||||||
|
private final Counter provisioningMessageOfflineCounter = Metrics.counter(name(getClass(), "sendProvisioningMessage"), "online", "false");
|
||||||
|
|
||||||
|
public ProvisioningManager(final PubSubManager pubSubManager) {
|
||||||
|
this.pubSubManager = pubSubManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean sendProvisioningMessage(ProvisioningAddress address, byte[] body) {
|
||||||
|
PubSubProtos.PubSubMessage pubSubMessage = PubSubProtos.PubSubMessage.newBuilder()
|
||||||
|
.setType(PubSubProtos.PubSubMessage.Type.DELIVER)
|
||||||
|
.setContent(ByteString.copyFrom(body))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
if (pubSubManager.publish(address, pubSubMessage)) {
|
||||||
|
provisioningMessageOnlineCounter.increment();
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
provisioningMessageOfflineCounter.increment();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -80,10 +80,6 @@ public class PushSender implements Managed {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public WebsocketSender getWebSocketSender() {
|
|
||||||
return webSocketSender;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendSynchronousMessage(Account account, Device device, Envelope message, boolean online) {
|
private void sendSynchronousMessage(Account account, Device device, Envelope message, boolean online) {
|
||||||
if (device.getGcmId() != null) sendGcmMessage(account, device, message, online);
|
if (device.getGcmId() != null) sendGcmMessage(account, device, message, online);
|
||||||
else if (device.getApnId() != null) sendApnMessage(account, device, message, online);
|
else if (device.getApnId() != null) sendApnMessage(account, device, message, online);
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.whispersystems.textsecuregcm.push;
|
||||||
import com.codahale.metrics.Meter;
|
import com.codahale.metrics.Meter;
|
||||||
import com.codahale.metrics.MetricRegistry;
|
import com.codahale.metrics.MetricRegistry;
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
import com.google.protobuf.ByteString;
|
|
||||||
import io.micrometer.core.instrument.Counter;
|
import io.micrometer.core.instrument.Counter;
|
||||||
import io.micrometer.core.instrument.Metrics;
|
import io.micrometer.core.instrument.Metrics;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -27,13 +26,10 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
import org.whispersystems.textsecuregcm.storage.Device;
|
import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
import org.whispersystems.textsecuregcm.websocket.ProvisioningAddress;
|
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||||
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
|
|
||||||
|
|
||||||
public class WebsocketSender {
|
public class WebsocketSender {
|
||||||
|
|
||||||
|
@ -57,19 +53,14 @@ public class WebsocketSender {
|
||||||
private final Meter gcmOnlineMeter = metricRegistry.meter(name(getClass(), "gcm_online" ));
|
private final Meter gcmOnlineMeter = metricRegistry.meter(name(getClass(), "gcm_online" ));
|
||||||
private final Meter gcmOfflineMeter = metricRegistry.meter(name(getClass(), "gcm_offline"));
|
private final Meter gcmOfflineMeter = metricRegistry.meter(name(getClass(), "gcm_offline"));
|
||||||
|
|
||||||
private final Meter provisioningOnlineMeter = metricRegistry.meter(name(getClass(), "provisioning_online" ));
|
|
||||||
private final Meter provisioningOfflineMeter = metricRegistry.meter(name(getClass(), "provisioning_offline"));
|
|
||||||
|
|
||||||
private final Counter ephemeralOnlineCounter = Metrics.counter(name(getClass(), "ephemeral"), "online", "true");
|
private final Counter ephemeralOnlineCounter = Metrics.counter(name(getClass(), "ephemeral"), "online", "true");
|
||||||
private final Counter ephemeralOfflineCounter = Metrics.counter(name(getClass(), "ephemeral"), "offline", "true");
|
private final Counter ephemeralOfflineCounter = Metrics.counter(name(getClass(), "ephemeral"), "offline", "true");
|
||||||
|
|
||||||
private final MessagesManager messagesManager;
|
private final MessagesManager messagesManager;
|
||||||
private final PubSubManager pubSubManager;
|
|
||||||
private final ClientPresenceManager clientPresenceManager;
|
private final ClientPresenceManager clientPresenceManager;
|
||||||
|
|
||||||
public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager) {
|
public WebsocketSender(MessagesManager messagesManager, ClientPresenceManager clientPresenceManager) {
|
||||||
this.messagesManager = messagesManager;
|
this.messagesManager = messagesManager;
|
||||||
this.pubSubManager = pubSubManager;
|
|
||||||
this.clientPresenceManager = clientPresenceManager;
|
this.clientPresenceManager = clientPresenceManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,19 +94,4 @@ public class WebsocketSender {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean sendProvisioningMessage(ProvisioningAddress address, byte[] body) {
|
|
||||||
PubSubMessage pubSubMessage = PubSubMessage.newBuilder()
|
|
||||||
.setType(PubSubMessage.Type.DELIVER)
|
|
||||||
.setContent(ByteString.copyFrom(body))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
if (pubSubManager.publish(address, pubSubMessage)) {
|
|
||||||
provisioningOnlineMeter.mark();
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
provisioningOfflineMeter.mark();
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue