Use push microservice instead of doing push directly.

// FREEBIE
This commit is contained in:
Moxie Marlinspike 2014-12-01 11:23:29 -08:00
parent 675b6f4b5e
commit 3452ea29b8
11 changed files with 241 additions and 715 deletions

View File

@ -53,6 +53,12 @@
<artifactId>dropwizard-metrics-graphite</artifactId>
<version>${dropwizard.version}</version>
</dependency>
<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-client</artifactId>
<version>${dropwizard.version}</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>

View File

@ -24,6 +24,7 @@ import org.whispersystems.textsecuregcm.configuration.GraphiteConfiguration;
import org.whispersystems.textsecuregcm.configuration.MemcacheConfiguration;
import org.whispersystems.textsecuregcm.configuration.MetricsConfiguration;
import org.whispersystems.textsecuregcm.configuration.NexmoConfiguration;
import org.whispersystems.textsecuregcm.configuration.PushConfiguration;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedPhoneConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisConfiguration;
@ -35,6 +36,7 @@ import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import io.dropwizard.Configuration;
import io.dropwizard.client.JerseyClientConfiguration;
import io.dropwizard.db.DataSourceFactory;
public class WhisperServerConfiguration extends Configuration {
@ -50,7 +52,7 @@ public class WhisperServerConfiguration extends Configuration {
@NotNull
@Valid
@JsonProperty
private GcmConfiguration gcm;
private PushConfiguration push;
@NotNull
@Valid
@ -67,9 +69,6 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private RedisConfiguration redis;
@JsonProperty
private ApnConfiguration apn = new ApnConfiguration();
@Valid
@JsonProperty
private FederationConfiguration federation = new FederationConfiguration();
@ -99,6 +98,12 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private RedPhoneConfiguration redphone = new RedPhoneConfiguration();
@Valid
@NotNull
@JsonProperty
private JerseyClientConfiguration httpClient = new JerseyClientConfiguration();
public WebsocketConfiguration getWebsocketConfiguration() {
return websocket;
}
@ -111,12 +116,12 @@ public class WhisperServerConfiguration extends Configuration {
return nexmo;
}
public GcmConfiguration getGcmConfiguration() {
return gcm;
public PushConfiguration getPushConfiguration() {
return push;
}
public ApnConfiguration getApnConfiguration() {
return apn;
public JerseyClientConfiguration getJerseyClientConfiguration() {
return httpClient;
}
public S3Configuration getS3Configuration() {

View File

@ -20,6 +20,7 @@ import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.graphite.GraphiteReporter;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.google.common.base.Optional;
import com.sun.jersey.api.client.Client;
import net.spy.memcached.MemcachedClient;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.eclipse.jetty.servlets.CrossOriginFilter;
@ -53,9 +54,8 @@ import org.whispersystems.textsecuregcm.providers.MemcachedClientFactory;
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
import org.whispersystems.textsecuregcm.providers.RedisHealthCheck;
import org.whispersystems.textsecuregcm.providers.TimeProvider;
import org.whispersystems.textsecuregcm.push.APNSender;
import org.whispersystems.textsecuregcm.push.GCMSender;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.PushServiceClient;
import org.whispersystems.textsecuregcm.push.WebsocketSender;
import org.whispersystems.textsecuregcm.sms.NexmoSmsSender;
import org.whispersystems.textsecuregcm.sms.SmsSender;
@ -89,6 +89,7 @@ import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.Application;
import io.dropwizard.client.JerseyClientBuilder;
import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.jdbi.DBIFactory;
import io.dropwizard.metrics.graphite.GraphiteReporterFactory;
@ -137,6 +138,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MemcachedClient memcachedClient = new MemcachedClientFactory(config.getMemcacheConfiguration()).getClient();
JedisPool redisClient = new RedisClientFactory(config.getRedisConfiguration()).getRedisClientPool();
Client httpClient = new JerseyClientBuilder(environment).using(config.getJerseyClientConfiguration())
.build(getName());
DirectoryManager directory = new DirectoryManager(redisClient);
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, memcachedClient);
@ -145,28 +148,16 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FederatedClientManager federatedClientManager = new FederatedClientManager(config.getFederationConfiguration());
StoredMessages storedMessages = new StoredMessages(redisClient);
PubSubManager pubSubManager = new PubSubManager(redisClient);
APNSender apnSender = new APNSender(accountsManager, pubSubManager, storedMessages, memcachedClient,
config.getApnConfiguration().getCertificate(),
config.getApnConfiguration().getKey());
GCMSender gcmSender = new GCMSender(accountsManager,
config.getGcmConfiguration().getSenderId(),
config.getGcmConfiguration().getApiKey());
WebsocketSender websocketSender = new WebsocketSender(storedMessages, pubSubManager);
environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(gcmSender);
AccountAuthenticator deviceAuthenticator = new AccountAuthenticator(accountsManager);
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), memcachedClient);
PushServiceClient pushServiceClient = new PushServiceClient(httpClient, config.getPushConfiguration());
WebsocketSender websocketSender = new WebsocketSender(storedMessages, pubSubManager);
AccountAuthenticator deviceAuthenticator = new AccountAuthenticator(accountsManager);
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), memcachedClient);
TwilioSmsSender twilioSmsSender = new TwilioSmsSender(config.getTwilioConfiguration());
Optional<NexmoSmsSender> nexmoSmsSender = initializeNexmoSmsSender(config.getNexmoConfiguration());
SmsSender smsSender = new SmsSender(twilioSmsSender, nexmoSmsSender, config.getTwilioConfiguration().isInternational());
UrlSigner urlSigner = new UrlSigner(config.getS3Configuration());
PushSender pushSender = new PushSender(gcmSender, apnSender, websocketSender);
PushSender pushSender = new PushSender(pushServiceClient, websocketSender);
Optional<byte[]> authorizationKey = config.getRedphoneConfiguration().getAuthorizationKey();
AttachmentController attachmentController = new AttachmentController(rateLimiters, federatedClientManager, urlSigner);

View File

@ -0,0 +1,40 @@
package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.hibernate.validator.constraints.NotEmpty;
import javax.validation.constraints.Min;
public class PushConfiguration {
@JsonProperty
@NotEmpty
private String host;
@JsonProperty
@Min(1)
private int port;
@JsonProperty
@NotEmpty
private String username;
@JsonProperty
@NotEmpty
private String password;
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
}

View File

@ -0,0 +1,34 @@
package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import org.hibernate.validator.constraints.NotEmpty;
import javax.validation.constraints.Min;
public class ApnMessage {
@JsonProperty
@NotEmpty
private String apnId;
@JsonProperty
@NotEmpty
private String number;
@JsonProperty
@Min(1)
private int deviceId;
@JsonProperty
@NotEmpty
private String message;
public ApnMessage() {}
public ApnMessage(String apnId, String number, int deviceId, String message) {
this.apnId = apnId;
this.number = number;
this.deviceId = deviceId;
this.message = message;
}
}

View File

@ -0,0 +1,39 @@
package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.hibernate.validator.constraints.NotEmpty;
import javax.validation.constraints.Min;
public class GcmMessage {
@JsonProperty
@NotEmpty
private String gcmId;
@JsonProperty
@NotEmpty
private String number;
@JsonProperty
@Min(1)
private int deviceId;
@JsonProperty
@NotEmpty
private String message;
@JsonProperty
private boolean receipt;
public GcmMessage() {}
public GcmMessage(String gcmId, String number, int deviceId, String message, boolean receipt) {
this.gcmId = gcmId;
this.number = number;
this.deviceId = deviceId;
this.message = message;
this.receipt = receipt;
}
}

View File

@ -1,241 +0,0 @@
/**
* Copyright (C) 2013 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.notnoop.apns.APNS;
import com.notnoop.apns.ApnsService;
import com.notnoop.exceptions.NetworkIOException;
import net.spy.memcached.MemcachedClient;
import org.bouncycastle.openssl.PEMReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.PendingMessage;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.PubSubMessage;
import org.whispersystems.textsecuregcm.storage.StoredMessages;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
public class APNSender implements Managed {
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter websocketMeter = metricRegistry.meter(name(getClass(), "websocket"));
private final Meter pushMeter = metricRegistry.meter(name(getClass(), "push"));
private final Meter failureMeter = metricRegistry.meter(name(getClass(), "failure"));
private final Logger logger = LoggerFactory.getLogger(APNSender.class);
private static final String PAYLOAD = "{\"aps\":{\"sound\":\"default\",\"alert\":{\"loc-key\":\"APN_Message\"},\"content-available\":1,\"category\":\"Signal_Message\"}}";
private static final ObjectMapper mapper = SystemMapper.getMapper();
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final AccountsManager accounts;
private final PubSubManager pubSubManager;
private final StoredMessages storedMessages;
private final MemcachedClient memcachedClient;
private final String apnCertificate;
private final String apnKey;
private Optional<ApnsService> apnService;
public APNSender(AccountsManager accounts,
PubSubManager pubSubManager,
StoredMessages storedMessages,
MemcachedClient memcachedClient,
String apnCertificate, String apnKey)
{
this.accounts = accounts;
this.pubSubManager = pubSubManager;
this.storedMessages = storedMessages;
this.apnCertificate = apnCertificate;
this.apnKey = apnKey;
this.memcachedClient = memcachedClient;
}
public void sendMessage(Account account, Device device,
String registrationId, PendingMessage message)
throws TransientPushFailureException
{
try {
String serializedPendingMessage = mapper.writeValueAsString(message);
WebsocketAddress websocketAddress = new WebsocketAddress(account.getNumber(), device.getId());
if (pubSubManager.publish(websocketAddress, new PubSubMessage(PubSubMessage.TYPE_DELIVER,
serializedPendingMessage)))
{
websocketMeter.mark();
} else {
memcacheSet(registrationId, account.getNumber());
storedMessages.insert(websocketAddress, message);
if (!message.isReceipt()) {
sendPush(registrationId);
}
}
} catch (IOException e) {
throw new TransientPushFailureException(e);
}
}
private void sendPush(String registrationId)
throws TransientPushFailureException
{
try {
if (!apnService.isPresent()) {
failureMeter.mark();
throw new TransientPushFailureException("APN access not configured!");
}
apnService.get().push(registrationId, PAYLOAD);
pushMeter.mark();
} catch (NetworkIOException nioe) {
logger.warn("Network Error", nioe);
failureMeter.mark();
throw new TransientPushFailureException(nioe);
}
}
private static byte[] initializeKeyStore(String pemCertificate, String pemKey)
throws KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException
{
PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemCertificate.getBytes())));
X509Certificate certificate = (X509Certificate) reader.readObject();
Certificate[] certificateChain = {certificate};
reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemKey.getBytes())));
KeyPair keyPair = (KeyPair) reader.readObject();
KeyStore keyStore = KeyStore.getInstance("pkcs12");
keyStore.load(null);
keyStore.setEntry("apn",
new KeyStore.PrivateKeyEntry(keyPair.getPrivate(), certificateChain),
new KeyStore.PasswordProtection("insecure".toCharArray()));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
keyStore.store(baos, "insecure".toCharArray());
return baos.toByteArray();
}
@Override
public void start() throws Exception {
if (!Util.isEmpty(apnCertificate) && !Util.isEmpty(apnKey)) {
byte[] keyStore = initializeKeyStore(apnCertificate, apnKey);
this.apnService = Optional.of(APNS.newService()
.withCert(new ByteArrayInputStream(keyStore), "insecure")
.asQueued()
.withProductionDestination().build());
this.executor.scheduleAtFixedRate(new FeedbackRunnable(), 0, 1, TimeUnit.HOURS);
} else {
this.apnService = Optional.absent();
}
}
@Override
public void stop() throws Exception {
if (apnService.isPresent()) {
apnService.get().stop();
}
}
private void memcacheSet(String registrationId, String number) {
if (memcachedClient != null) {
memcachedClient.set("APN-" + registrationId, 60 * 60 * 24, number);
}
}
private Optional<String> memcacheGet(String registrationId) {
if (memcachedClient != null) {
return Optional.fromNullable((String)memcachedClient.get("APN-" + registrationId));
} else {
return Optional.absent();
}
}
private class FeedbackRunnable implements Runnable {
private void updateAccount(Account account, String registrationId) {
boolean needsUpdate = false;
for (Device device : account.getDevices()) {
if (registrationId.equals(device.getApnId())) {
needsUpdate = true;
device.setApnId(null);
}
}
if (needsUpdate) {
accounts.update(account);
}
}
@Override
public void run() {
if (apnService.isPresent()) {
Map<String, Date> inactiveDevices = apnService.get().getInactiveDevices();
for (String registrationId : inactiveDevices.keySet()) {
Optional<String> number = memcacheGet(registrationId);
if (number.isPresent()) {
Optional<Account> account = accounts.get(number.get());
if (account.isPresent()) {
updateAccount(account.get(), registrationId);
}
} else {
logger.warn("APN unregister event received for uncached ID: " + registrationId);
}
}
}
}
}
}

View File

@ -1,424 +0,0 @@
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.base.Optional;
import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.DefaultPacketExtension;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.PacketExtension;
import org.jivesoftware.smack.provider.PacketExtensionProvider;
import org.jivesoftware.smack.provider.ProviderManager;
import org.jivesoftware.smack.tcp.XMPPTCPConnection;
import org.jivesoftware.smack.util.StringUtils;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.PendingMessage;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Util;
import org.xmlpull.v1.XmlPullParser;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
public class GCMSender implements Managed, PacketListener {
private final Logger logger = LoggerFactory.getLogger(GCMSender.class);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(org.whispersystems.textsecuregcm.util.Constants.METRICS_NAME);
private final Meter success = metricRegistry.meter(name(getClass(), "sent", "success"));
private final Meter failure = metricRegistry.meter(name(getClass(), "sent", "failure"));
private final Meter unregistered = metricRegistry.meter(name(getClass(), "sent", "unregistered"));
private static final String GCM_SERVER = "gcm.googleapis.com";
private static final int GCM_PORT = 5235;
private static final String GCM_ELEMENT_NAME = "gcm";
private static final String GCM_NAMESPACE = "google:mobile:data";
private final Map<String, UnacknowledgedMessage> pendingMessages = new ConcurrentHashMap<>();
private final long senderId;
private final String apiKey;
private final AccountsManager accounts;
private XMPPTCPConnection connection;
public GCMSender(AccountsManager accounts, long senderId, String apiKey) {
this.accounts = accounts;
this.senderId = senderId;
this.apiKey = apiKey;
ProviderManager.addExtensionProvider(GCM_ELEMENT_NAME, GCM_NAMESPACE,
new GcmPacketExtensionProvider());
}
public void sendMessage(String destinationNumber, long destinationDeviceId,
String registrationId, PendingMessage message)
{
String messageId = "m-" + UUID.randomUUID().toString();
UnacknowledgedMessage unacknowledgedMessage = new UnacknowledgedMessage(destinationNumber,
destinationDeviceId,
registrationId, message);
sendMessage(messageId, unacknowledgedMessage);
}
public void sendMessage(String messageId, UnacknowledgedMessage message) {
try {
boolean isReceipt = message.getPendingMessage().isReceipt();
Map<String, String> dataObject = new HashMap<>();
dataObject.put("type", "message");
dataObject.put(isReceipt ? "receipt" : "message", message.getPendingMessage().getEncryptedOutgoingMessage());
Map<String, Object> messageObject = new HashMap<>();
messageObject.put("to", message.getRegistrationId());
messageObject.put("message_id", messageId);
messageObject.put("data", dataObject);
String json = JSONObject.toJSONString(messageObject);
pendingMessages.put(messageId, message);
connection.sendPacket(new GcmPacketExtension(json).toPacket());
} catch (SmackException.NotConnectedException e) {
logger.warn("GCMClient", "No connection", e);
}
}
@Override
public void start() throws Exception {
this.connection = connect(senderId, apiKey);
}
@Override
public void stop() throws Exception {
this.connection.disconnect();
}
@Override
public void processPacket(Packet packet) throws SmackException.NotConnectedException {
Message incomingMessage = (Message) packet;
GcmPacketExtension gcmPacket = (GcmPacketExtension) incomingMessage.getExtension(GCM_NAMESPACE);
String json = gcmPacket.getJson();
try {
Map<String, Object> jsonObject = (Map<String, Object>) JSONValue.parseWithException(json);
Object messageType = jsonObject.get("message_type");
if (messageType == null) {
handleUpstreamMessage(jsonObject);
return;
}
switch (messageType.toString()) {
case "ack" : handleAckReceipt(jsonObject); break;
case "nack" : handleNackReceipt(jsonObject); break;
case "receipt" : handleDeliveryReceipt(jsonObject); break;
case "control" : handleControlMessage(jsonObject); break;
default:
logger.warn("Received unknown GCM message: " + messageType.toString());
}
} catch (ParseException e) {
logger.warn("GCMClient", "Received unparsable message", e);
} catch (Exception e) {
logger.warn("GCMClient", "Failed to process packet", e);
}
}
private void handleControlMessage(Map<String, Object> message) {
String controlType = (String) message.get("control_type");
if ("CONNECTION_DRAINING".equals(controlType)) {
logger.warn("GCM Connection is draining! Initiating reconnect...");
reconnect();
} else {
logger.warn("Received unknown GCM control message: " + controlType);
}
}
private void handleDeliveryReceipt(Map<String, Object> message) {
logger.warn("Got delivery receipt!");
}
private void handleNackReceipt(Map<String, Object> message) {
String messageId = (String) message.get("message_id");
String errorCode = (String) message.get("error");
if (errorCode == null) {
logger.warn("Null GCM error code!");
if (messageId != null) {
pendingMessages.remove(messageId);
}
return;
}
switch (errorCode) {
case "BAD_REGISTRATION" : handleBadRegistration(message); break;
case "DEVICE_UNREGISTERED" : handleBadRegistration(message); break;
case "INTERNAL_SERVER_ERROR" : handleServerFailure(message); break;
case "INVALID_JSON" : handleClientFailure(message); break;
case "QUOTA_EXCEEDED" : handleClientFailure(message); break;
case "SERVICE_UNAVAILABLE" : handleServerFailure(message); break;
}
}
private void handleAckReceipt(Map<String, Object> message) {
success.mark();
String messageId = (String) message.get("message_id");
if (messageId != null) {
pendingMessages.remove(messageId);
}
}
private void handleUpstreamMessage(Map<String, Object> message)
throws SmackException.NotConnectedException
{
logger.warn("Got upstream message from GCM Server!");
for (String key : message.keySet()) {
logger.warn(key + " : " + message.get(key));
}
Map<String, Object> ack = new HashMap<>();
message.put("message_type", "ack");
message.put("to", message.get("from"));
message.put("message_id", message.get("message_id"));
String json = JSONValue.toJSONString(ack);
Packet request = new GcmPacketExtension(json).toPacket();
connection.sendPacket(request);
}
private void handleBadRegistration(Map<String, Object> message) {
unregistered.mark();
String messageId = (String) message.get("message_id");
if (messageId != null) {
UnacknowledgedMessage unacknowledgedMessage = pendingMessages.remove(messageId);
if (unacknowledgedMessage != null) {
Optional<Account> account = accounts.get(unacknowledgedMessage.getDestinationNumber());
if (account.isPresent()) {
Optional<Device> device = account.get().getDevice(unacknowledgedMessage.getDestinationDeviceId());
if (device.isPresent()) {
device.get().setGcmId(null);
accounts.update(account.get());
}
}
}
}
}
private void handleServerFailure(Map<String, Object> message) {
failure.mark();
String messageId = (String)message.get("message_id");
if (messageId != null) {
UnacknowledgedMessage unacknowledgedMessage = pendingMessages.remove(messageId);
if (unacknowledgedMessage != null) {
sendMessage(messageId, unacknowledgedMessage);
}
}
}
private void handleClientFailure(Map<String, Object> message) {
failure.mark();
logger.warn("Unrecoverable error: " + message.get("error"));
String messageId = (String)message.get("message_id");
if (messageId != null) {
pendingMessages.remove(messageId);
}
}
private void reconnect() {
try {
this.connection.disconnect();
} catch (SmackException.NotConnectedException e) {
logger.warn("GCMClient", "Disconnect attempt", e);
}
while (true) {
try {
this.connection = connect(senderId, apiKey);
return;
} catch (XMPPException | IOException | SmackException e) {
logger.warn("GCMClient", "Reconnecting", e);
Util.sleep(1000);
}
}
}
private XMPPTCPConnection connect(long senderId, String apiKey)
throws XMPPException, IOException, SmackException
{
ConnectionConfiguration config = new ConnectionConfiguration(GCM_SERVER, GCM_PORT);
config.setSecurityMode(ConnectionConfiguration.SecurityMode.enabled);
config.setReconnectionAllowed(true);
config.setRosterLoadedAtLogin(false);
config.setSendPresence(false);
config.setSocketFactory(SSLSocketFactory.getDefault());
XMPPTCPConnection connection = new XMPPTCPConnection(config);
connection.connect();
connection.addConnectionListener(new LoggingConnectionListener());
connection.addPacketListener(this, new PacketTypeFilter(Message.class));
connection.login(senderId + "@gcm.googleapis.com", apiKey);
return connection;
}
private static class GcmPacketExtensionProvider implements PacketExtensionProvider {
@Override
public PacketExtension parseExtension(XmlPullParser xmlPullParser) throws Exception {
String json = xmlPullParser.nextText();
return new GcmPacketExtension(json);
}
}
private static final class GcmPacketExtension extends DefaultPacketExtension {
private final String json;
public GcmPacketExtension(String json) {
super(GCM_ELEMENT_NAME, GCM_NAMESPACE);
this.json = json;
}
public String getJson() {
return json;
}
@Override
public String toXML() {
return String.format("<%s xmlns=\"%s\">%s</%s>", GCM_ELEMENT_NAME, GCM_NAMESPACE,
StringUtils.escapeForXML(json), GCM_ELEMENT_NAME);
}
public Packet toPacket() {
Message message = new Message();
message.addExtension(this);
return message;
}
}
private class LoggingConnectionListener implements ConnectionListener {
@Override
public void connected(XMPPConnection xmppConnection) {
logger.warn("GCM XMPP Connected.");
}
@Override
public void authenticated(XMPPConnection xmppConnection) {
logger.warn("GCM XMPP Authenticated.");
}
@Override
public void reconnectionSuccessful() {
logger.warn("GCM XMPP Reconnecting..");
Iterator<Map.Entry<String, UnacknowledgedMessage>> iterator =
pendingMessages.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, UnacknowledgedMessage> entry = iterator.next();
iterator.remove();
sendMessage(entry.getKey(), entry.getValue());
}
}
@Override
public void reconnectionFailed(Exception e) {
logger.warn("GCM XMPP Reconnection failed!", e);
reconnect();
}
@Override
public void reconnectingIn(int seconds) {
logger.warn(String.format("GCM XMPP Reconnecting in %d secs", seconds));
}
@Override
public void connectionClosedOnError(Exception e) {
logger.warn("GCM XMPP Connection closed on error.");
}
@Override
public void connectionClosed() {
logger.warn("GCM XMPP Connection closed.");
reconnect();
}
}
private static class UnacknowledgedMessage {
private final String destinationNumber;
private final long destinationDeviceId;
private final String registrationId;
private final PendingMessage pendingMessage;
private UnacknowledgedMessage(String destinationNumber,
long destinationDeviceId,
String registrationId,
PendingMessage pendingMessage)
{
this.destinationNumber = destinationNumber;
this.destinationDeviceId = destinationDeviceId;
this.registrationId = registrationId;
this.pendingMessage = pendingMessage;
}
private String getRegistrationId() {
return registrationId;
}
private PendingMessage getPendingMessage() {
return pendingMessage;
}
public String getDestinationNumber() {
return destinationNumber;
}
public long getDestinationDeviceId() {
return destinationDeviceId;
}
}
}

View File

@ -18,9 +18,10 @@ package org.whispersystems.textsecuregcm.push;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.ApnMessage;
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.GcmMessage;
import org.whispersystems.textsecuregcm.entities.PendingMessage;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
@ -31,17 +32,12 @@ public class PushSender {
private final Logger logger = LoggerFactory.getLogger(PushSender.class);
private final GCMSender gcmSender;
private final APNSender apnSender;
private final WebsocketSender webSocketSender;
private final PushServiceClient pushServiceClient;
private final WebsocketSender webSocketSender;
public PushSender(GCMSender gcmClient,
APNSender apnSender,
WebsocketSender websocketSender)
{
this.gcmSender = gcmClient;
this.apnSender = apnSender;
this.webSocketSender = websocketSender;
public PushSender(PushServiceClient pushServiceClient, WebsocketSender websocketSender) {
this.pushServiceClient = pushServiceClient;
this.webSocketSender = websocketSender;
}
public void sendMessage(Account account, Device device, OutgoingMessageSignal message)
@ -71,22 +67,35 @@ public class PushSender {
else throw new NotPushRegisteredException("No delivery possible!");
}
private void sendGcmMessage(Account account, Device device, PendingMessage pendingMessage) {
String number = account.getNumber();
long deviceId = device.getId();
String registrationId = device.getGcmId();
private void sendGcmMessage(Account account, Device device, PendingMessage pendingMessage)
throws TransientPushFailureException
{
String number = account.getNumber();
long deviceId = device.getId();
String registrationId = device.getGcmId();
GcmMessage gcmMessage = new GcmMessage(registrationId, number, (int)deviceId,
pendingMessage.getEncryptedOutgoingMessage(),
pendingMessage.isReceipt() );
gcmSender.sendMessage(number, deviceId, registrationId, pendingMessage);
pushServiceClient.send(gcmMessage);
}
private void sendApnMessage(Account account, Device device, PendingMessage outgoingMessage)
throws TransientPushFailureException
{
apnSender.sendMessage(account, device, device.getApnId(), outgoingMessage);
boolean online = webSocketSender.sendMessage(account, device, outgoingMessage, true);
if (!online && !outgoingMessage.isReceipt()) {
ApnMessage apnMessage = new ApnMessage(device.getApnId(), account.getNumber(),
(int)device.getId(),
outgoingMessage.getEncryptedOutgoingMessage());
pushServiceClient.send(apnMessage);
}
}
private void sendWebSocketMessage(Account account, Device device, PendingMessage outgoingMessage)
{
webSocketSender.sendMessage(account, device, outgoingMessage);
webSocketSender.sendMessage(account, device, outgoingMessage, false);
}
}

View File

@ -0,0 +1,56 @@
package org.whispersystems.textsecuregcm.push;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.PushConfiguration;
import org.whispersystems.textsecuregcm.entities.ApnMessage;
import org.whispersystems.textsecuregcm.entities.GcmMessage;
import org.whispersystems.textsecuregcm.util.Base64;
import javax.ws.rs.core.MediaType;
public class PushServiceClient {
private static final String PUSH_GCM_PATH = "/api/v1/push/gcm";
private static final String PUSH_APN_PATH = "/api/v1/push/apn";
private final Logger logger = LoggerFactory.getLogger(PushServiceClient.class);
private final Client client;
private final String host;
private final int port;
private final String authorization;
public PushServiceClient(Client client, PushConfiguration config) {
this.client = client;
this.host = config.getHost();
this.port = config.getPort();
this.authorization = getAuthorizationHeader(config.getUsername(), config.getPassword());
}
public void send(GcmMessage message) throws TransientPushFailureException {
sendPush(PUSH_GCM_PATH, message);
}
public void send(ApnMessage message) throws TransientPushFailureException {
sendPush(PUSH_APN_PATH, message);
}
private void sendPush(String path, Object entity) throws TransientPushFailureException {
ClientResponse response = client.resource("http://" + host + ":" + port + path)
.header("Authorization", authorization)
.entity(entity, MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
if (response.getStatus() != 204 && response.getStatus() != 200) {
logger.warn("PushServer response: " + response.getStatus() + " " + response.getStatusInfo().getReasonPhrase());
throw new TransientPushFailureException("Bad response: " + response.getStatus());
}
}
private String getAuthorizationHeader(String username, String password) {
return "Basic " + Base64.encodeBytes((username + ":" + password).getBytes());
}
}

View File

@ -40,8 +40,12 @@ public class WebsocketSender {
private static final Logger logger = LoggerFactory.getLogger(WebsocketSender.class);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter onlineMeter = metricRegistry.meter(name(getClass(), "online"));
private final Meter offlineMeter = metricRegistry.meter(name(getClass(), "offline"));
private final Meter websocketOnlineMeter = metricRegistry.meter(name(getClass(), "ws_online" ));
private final Meter websocketOfflineMeter = metricRegistry.meter(name(getClass(), "ws_offline" ));
private final Meter apnOnlineMeter = metricRegistry.meter(name(getClass(), "apn_online" ));
private final Meter apnOfflineMeter = metricRegistry.meter(name(getClass(), "apn_offline"));
private static final ObjectMapper mapper = SystemMapper.getMapper();
@ -53,21 +57,28 @@ public class WebsocketSender {
this.pubSubManager = pubSubManager;
}
public void sendMessage(Account account, Device device, PendingMessage pendingMessage) {
public boolean sendMessage(Account account, Device device, PendingMessage pendingMessage, boolean apn) {
try {
String serialized = mapper.writeValueAsString(pendingMessage);
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
PubSubMessage pubSubMessage = new PubSubMessage(PubSubMessage.TYPE_DELIVER, serialized);
if (pubSubManager.publish(address, pubSubMessage)) {
onlineMeter.mark();
if (apn) apnOnlineMeter.mark();
else websocketOnlineMeter.mark();
return true;
} else {
offlineMeter.mark();
if (apn) apnOfflineMeter.mark();
else websocketOfflineMeter.mark();
storedMessages.insert(address, pendingMessage);
pubSubManager.publish(address, new PubSubMessage(PubSubMessage.TYPE_QUERY_DB, null));
return false;
}
} catch (JsonProcessingException e) {
logger.warn("WebsocketSender", "Unable to serialize json", e);
return false;
}
}
}