diff --git a/pom.xml b/pom.xml index 348c77de4..4fe29675f 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,12 @@ dropwizard-metrics-graphite ${dropwizard.version} + + io.dropwizard + dropwizard-client + ${dropwizard.version} + + com.sun.jersey diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 6245fe67a..3618bb500 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -24,6 +24,7 @@ import org.whispersystems.textsecuregcm.configuration.GraphiteConfiguration; import org.whispersystems.textsecuregcm.configuration.MemcacheConfiguration; import org.whispersystems.textsecuregcm.configuration.MetricsConfiguration; import org.whispersystems.textsecuregcm.configuration.NexmoConfiguration; +import org.whispersystems.textsecuregcm.configuration.PushConfiguration; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration; import org.whispersystems.textsecuregcm.configuration.RedPhoneConfiguration; import org.whispersystems.textsecuregcm.configuration.RedisConfiguration; @@ -35,6 +36,7 @@ import javax.validation.Valid; import javax.validation.constraints.NotNull; import io.dropwizard.Configuration; +import io.dropwizard.client.JerseyClientConfiguration; import io.dropwizard.db.DataSourceFactory; public class WhisperServerConfiguration extends Configuration { @@ -50,7 +52,7 @@ public class WhisperServerConfiguration extends Configuration { @NotNull @Valid @JsonProperty - private GcmConfiguration gcm; + private PushConfiguration push; @NotNull @Valid @@ -67,9 +69,6 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private RedisConfiguration redis; - @JsonProperty - private ApnConfiguration apn = new ApnConfiguration(); - @Valid @JsonProperty private FederationConfiguration federation = new FederationConfiguration(); @@ -99,6 +98,12 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private RedPhoneConfiguration redphone = new RedPhoneConfiguration(); + @Valid + @NotNull + @JsonProperty + private JerseyClientConfiguration httpClient = new JerseyClientConfiguration(); + + public WebsocketConfiguration getWebsocketConfiguration() { return websocket; } @@ -111,12 +116,12 @@ public class WhisperServerConfiguration extends Configuration { return nexmo; } - public GcmConfiguration getGcmConfiguration() { - return gcm; + public PushConfiguration getPushConfiguration() { + return push; } - public ApnConfiguration getApnConfiguration() { - return apn; + public JerseyClientConfiguration getJerseyClientConfiguration() { + return httpClient; } public S3Configuration getS3Configuration() { diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 13965d81c..e37ca9551 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -20,6 +20,7 @@ import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.graphite.GraphiteReporter; import com.fasterxml.jackson.databind.DeserializationFeature; import com.google.common.base.Optional; +import com.sun.jersey.api.client.Client; import net.spy.memcached.MemcachedClient; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.eclipse.jetty.servlets.CrossOriginFilter; @@ -53,9 +54,8 @@ import org.whispersystems.textsecuregcm.providers.MemcachedClientFactory; import org.whispersystems.textsecuregcm.providers.RedisClientFactory; import org.whispersystems.textsecuregcm.providers.RedisHealthCheck; import org.whispersystems.textsecuregcm.providers.TimeProvider; -import org.whispersystems.textsecuregcm.push.APNSender; -import org.whispersystems.textsecuregcm.push.GCMSender; import org.whispersystems.textsecuregcm.push.PushSender; +import org.whispersystems.textsecuregcm.push.PushServiceClient; import org.whispersystems.textsecuregcm.push.WebsocketSender; import org.whispersystems.textsecuregcm.sms.NexmoSmsSender; import org.whispersystems.textsecuregcm.sms.SmsSender; @@ -89,6 +89,7 @@ import java.util.concurrent.TimeUnit; import static com.codahale.metrics.MetricRegistry.name; import io.dropwizard.Application; +import io.dropwizard.client.JerseyClientBuilder; import io.dropwizard.db.DataSourceFactory; import io.dropwizard.jdbi.DBIFactory; import io.dropwizard.metrics.graphite.GraphiteReporterFactory; @@ -137,6 +138,8 @@ public class WhisperServerService extends Application nexmoSmsSender = initializeNexmoSmsSender(config.getNexmoConfiguration()); SmsSender smsSender = new SmsSender(twilioSmsSender, nexmoSmsSender, config.getTwilioConfiguration().isInternational()); UrlSigner urlSigner = new UrlSigner(config.getS3Configuration()); - PushSender pushSender = new PushSender(gcmSender, apnSender, websocketSender); + PushSender pushSender = new PushSender(pushServiceClient, websocketSender); Optional authorizationKey = config.getRedphoneConfiguration().getAuthorizationKey(); AttachmentController attachmentController = new AttachmentController(rateLimiters, federatedClientManager, urlSigner); diff --git a/src/main/java/org/whispersystems/textsecuregcm/configuration/PushConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/configuration/PushConfiguration.java new file mode 100644 index 000000000..987ecd529 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/configuration/PushConfiguration.java @@ -0,0 +1,40 @@ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.hibernate.validator.constraints.NotEmpty; + +import javax.validation.constraints.Min; + +public class PushConfiguration { + @JsonProperty + @NotEmpty + private String host; + + @JsonProperty + @Min(1) + private int port; + + @JsonProperty + @NotEmpty + private String username; + + @JsonProperty + @NotEmpty + private String password; + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/entities/ApnMessage.java b/src/main/java/org/whispersystems/textsecuregcm/entities/ApnMessage.java new file mode 100644 index 000000000..2842d5b1c --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/entities/ApnMessage.java @@ -0,0 +1,34 @@ +package org.whispersystems.textsecuregcm.entities; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import org.hibernate.validator.constraints.NotEmpty; + +import javax.validation.constraints.Min; + +public class ApnMessage { + @JsonProperty + @NotEmpty + private String apnId; + + @JsonProperty + @NotEmpty + private String number; + + @JsonProperty + @Min(1) + private int deviceId; + + @JsonProperty + @NotEmpty + private String message; + + public ApnMessage() {} + + public ApnMessage(String apnId, String number, int deviceId, String message) { + this.apnId = apnId; + this.number = number; + this.deviceId = deviceId; + this.message = message; + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/entities/GcmMessage.java b/src/main/java/org/whispersystems/textsecuregcm/entities/GcmMessage.java new file mode 100644 index 000000000..69b46a4e6 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/entities/GcmMessage.java @@ -0,0 +1,39 @@ +package org.whispersystems.textsecuregcm.entities; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.hibernate.validator.constraints.NotEmpty; + +import javax.validation.constraints.Min; + +public class GcmMessage { + + @JsonProperty + @NotEmpty + private String gcmId; + + @JsonProperty + @NotEmpty + private String number; + + @JsonProperty + @Min(1) + private int deviceId; + + @JsonProperty + @NotEmpty + private String message; + + @JsonProperty + private boolean receipt; + + public GcmMessage() {} + + public GcmMessage(String gcmId, String number, int deviceId, String message, boolean receipt) { + this.gcmId = gcmId; + this.number = number; + this.deviceId = deviceId; + this.message = message; + this.receipt = receipt; + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java deleted file mode 100644 index d01366966..000000000 --- a/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java +++ /dev/null @@ -1,241 +0,0 @@ -/** - * Copyright (C) 2013 Open WhisperSystems - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.whispersystems.textsecuregcm.push; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SharedMetricRegistries; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; -import com.notnoop.apns.APNS; -import com.notnoop.apns.ApnsService; -import com.notnoop.exceptions.NetworkIOException; -import net.spy.memcached.MemcachedClient; -import org.bouncycastle.openssl.PEMReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.entities.PendingMessage; -import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.AccountsManager; -import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.storage.PubSubManager; -import org.whispersystems.textsecuregcm.storage.PubSubMessage; -import org.whispersystems.textsecuregcm.storage.StoredMessages; -import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.SystemMapper; -import org.whispersystems.textsecuregcm.util.Util; -import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.security.KeyPair; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.Certificate; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.Date; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static com.codahale.metrics.MetricRegistry.name; -import io.dropwizard.lifecycle.Managed; - -public class APNSender implements Managed { - - private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private final Meter websocketMeter = metricRegistry.meter(name(getClass(), "websocket")); - private final Meter pushMeter = metricRegistry.meter(name(getClass(), "push")); - private final Meter failureMeter = metricRegistry.meter(name(getClass(), "failure")); - private final Logger logger = LoggerFactory.getLogger(APNSender.class); - - private static final String PAYLOAD = "{\"aps\":{\"sound\":\"default\",\"alert\":{\"loc-key\":\"APN_Message\"},\"content-available\":1,\"category\":\"Signal_Message\"}}"; - - private static final ObjectMapper mapper = SystemMapper.getMapper(); - - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - - private final AccountsManager accounts; - private final PubSubManager pubSubManager; - private final StoredMessages storedMessages; - private final MemcachedClient memcachedClient; - - private final String apnCertificate; - private final String apnKey; - - private Optional apnService; - - public APNSender(AccountsManager accounts, - PubSubManager pubSubManager, - StoredMessages storedMessages, - MemcachedClient memcachedClient, - String apnCertificate, String apnKey) - { - this.accounts = accounts; - this.pubSubManager = pubSubManager; - this.storedMessages = storedMessages; - this.apnCertificate = apnCertificate; - this.apnKey = apnKey; - this.memcachedClient = memcachedClient; - } - - public void sendMessage(Account account, Device device, - String registrationId, PendingMessage message) - throws TransientPushFailureException - { - try { - String serializedPendingMessage = mapper.writeValueAsString(message); - WebsocketAddress websocketAddress = new WebsocketAddress(account.getNumber(), device.getId()); - - if (pubSubManager.publish(websocketAddress, new PubSubMessage(PubSubMessage.TYPE_DELIVER, - serializedPendingMessage))) - { - websocketMeter.mark(); - } else { - memcacheSet(registrationId, account.getNumber()); - storedMessages.insert(websocketAddress, message); - - if (!message.isReceipt()) { - sendPush(registrationId); - } - } - } catch (IOException e) { - throw new TransientPushFailureException(e); - } - } - - private void sendPush(String registrationId) - throws TransientPushFailureException - { - try { - if (!apnService.isPresent()) { - failureMeter.mark(); - throw new TransientPushFailureException("APN access not configured!"); - } - - apnService.get().push(registrationId, PAYLOAD); - pushMeter.mark(); - } catch (NetworkIOException nioe) { - logger.warn("Network Error", nioe); - failureMeter.mark(); - throw new TransientPushFailureException(nioe); - } - - } - - private static byte[] initializeKeyStore(String pemCertificate, String pemKey) - throws KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException - { - PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemCertificate.getBytes()))); - X509Certificate certificate = (X509Certificate) reader.readObject(); - Certificate[] certificateChain = {certificate}; - - reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemKey.getBytes()))); - KeyPair keyPair = (KeyPair) reader.readObject(); - - KeyStore keyStore = KeyStore.getInstance("pkcs12"); - keyStore.load(null); - keyStore.setEntry("apn", - new KeyStore.PrivateKeyEntry(keyPair.getPrivate(), certificateChain), - new KeyStore.PasswordProtection("insecure".toCharArray())); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - keyStore.store(baos, "insecure".toCharArray()); - - return baos.toByteArray(); - } - - @Override - public void start() throws Exception { - if (!Util.isEmpty(apnCertificate) && !Util.isEmpty(apnKey)) { - byte[] keyStore = initializeKeyStore(apnCertificate, apnKey); - - this.apnService = Optional.of(APNS.newService() - .withCert(new ByteArrayInputStream(keyStore), "insecure") - .asQueued() - .withProductionDestination().build()); - - this.executor.scheduleAtFixedRate(new FeedbackRunnable(), 0, 1, TimeUnit.HOURS); - } else { - this.apnService = Optional.absent(); - } - } - - @Override - public void stop() throws Exception { - if (apnService.isPresent()) { - apnService.get().stop(); - } - } - - private void memcacheSet(String registrationId, String number) { - if (memcachedClient != null) { - memcachedClient.set("APN-" + registrationId, 60 * 60 * 24, number); - } - } - - private Optional memcacheGet(String registrationId) { - if (memcachedClient != null) { - return Optional.fromNullable((String)memcachedClient.get("APN-" + registrationId)); - } else { - return Optional.absent(); - } - } - - private class FeedbackRunnable implements Runnable { - private void updateAccount(Account account, String registrationId) { - boolean needsUpdate = false; - - for (Device device : account.getDevices()) { - if (registrationId.equals(device.getApnId())) { - needsUpdate = true; - device.setApnId(null); - } - } - - if (needsUpdate) { - accounts.update(account); - } - } - - @Override - public void run() { - if (apnService.isPresent()) { - Map inactiveDevices = apnService.get().getInactiveDevices(); - - for (String registrationId : inactiveDevices.keySet()) { - Optional number = memcacheGet(registrationId); - - if (number.isPresent()) { - Optional account = accounts.get(number.get()); - - if (account.isPresent()) { - updateAccount(account.get(), registrationId); - } - } else { - logger.warn("APN unregister event received for uncached ID: " + registrationId); - } - } - } - } - } -} diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/GCMSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/GCMSender.java deleted file mode 100644 index 155e37004..000000000 --- a/src/main/java/org/whispersystems/textsecuregcm/push/GCMSender.java +++ /dev/null @@ -1,424 +0,0 @@ -package org.whispersystems.textsecuregcm.push; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SharedMetricRegistries; -import com.google.common.base.Optional; -import org.jivesoftware.smack.ConnectionConfiguration; -import org.jivesoftware.smack.ConnectionListener; -import org.jivesoftware.smack.PacketListener; -import org.jivesoftware.smack.SmackException; -import org.jivesoftware.smack.XMPPConnection; -import org.jivesoftware.smack.XMPPException; -import org.jivesoftware.smack.filter.PacketTypeFilter; -import org.jivesoftware.smack.packet.DefaultPacketExtension; -import org.jivesoftware.smack.packet.Message; -import org.jivesoftware.smack.packet.Packet; -import org.jivesoftware.smack.packet.PacketExtension; -import org.jivesoftware.smack.provider.PacketExtensionProvider; -import org.jivesoftware.smack.provider.ProviderManager; -import org.jivesoftware.smack.tcp.XMPPTCPConnection; -import org.jivesoftware.smack.util.StringUtils; -import org.json.simple.JSONObject; -import org.json.simple.JSONValue; -import org.json.simple.parser.ParseException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.entities.PendingMessage; -import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.AccountsManager; -import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.util.Util; -import org.xmlpull.v1.XmlPullParser; - -import javax.net.ssl.SSLSocketFactory; -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import static com.codahale.metrics.MetricRegistry.name; -import io.dropwizard.lifecycle.Managed; - -public class GCMSender implements Managed, PacketListener { - - private final Logger logger = LoggerFactory.getLogger(GCMSender.class); - - private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(org.whispersystems.textsecuregcm.util.Constants.METRICS_NAME); - private final Meter success = metricRegistry.meter(name(getClass(), "sent", "success")); - private final Meter failure = metricRegistry.meter(name(getClass(), "sent", "failure")); - private final Meter unregistered = metricRegistry.meter(name(getClass(), "sent", "unregistered")); - - private static final String GCM_SERVER = "gcm.googleapis.com"; - private static final int GCM_PORT = 5235; - - private static final String GCM_ELEMENT_NAME = "gcm"; - private static final String GCM_NAMESPACE = "google:mobile:data"; - - private final Map pendingMessages = new ConcurrentHashMap<>(); - - private final long senderId; - private final String apiKey; - private final AccountsManager accounts; - - private XMPPTCPConnection connection; - - public GCMSender(AccountsManager accounts, long senderId, String apiKey) { - this.accounts = accounts; - this.senderId = senderId; - this.apiKey = apiKey; - - ProviderManager.addExtensionProvider(GCM_ELEMENT_NAME, GCM_NAMESPACE, - new GcmPacketExtensionProvider()); - } - - public void sendMessage(String destinationNumber, long destinationDeviceId, - String registrationId, PendingMessage message) - { - String messageId = "m-" + UUID.randomUUID().toString(); - UnacknowledgedMessage unacknowledgedMessage = new UnacknowledgedMessage(destinationNumber, - destinationDeviceId, - registrationId, message); - - sendMessage(messageId, unacknowledgedMessage); - } - - public void sendMessage(String messageId, UnacknowledgedMessage message) { - try { - boolean isReceipt = message.getPendingMessage().isReceipt(); - - Map dataObject = new HashMap<>(); - dataObject.put("type", "message"); - dataObject.put(isReceipt ? "receipt" : "message", message.getPendingMessage().getEncryptedOutgoingMessage()); - - Map messageObject = new HashMap<>(); - messageObject.put("to", message.getRegistrationId()); - messageObject.put("message_id", messageId); - messageObject.put("data", dataObject); - - String json = JSONObject.toJSONString(messageObject); - - pendingMessages.put(messageId, message); - connection.sendPacket(new GcmPacketExtension(json).toPacket()); - } catch (SmackException.NotConnectedException e) { - logger.warn("GCMClient", "No connection", e); - } - } - - @Override - public void start() throws Exception { - this.connection = connect(senderId, apiKey); - } - - @Override - public void stop() throws Exception { - this.connection.disconnect(); - } - - @Override - public void processPacket(Packet packet) throws SmackException.NotConnectedException { - Message incomingMessage = (Message) packet; - GcmPacketExtension gcmPacket = (GcmPacketExtension) incomingMessage.getExtension(GCM_NAMESPACE); - String json = gcmPacket.getJson(); - - try { - Map jsonObject = (Map) JSONValue.parseWithException(json); - Object messageType = jsonObject.get("message_type"); - - if (messageType == null) { - handleUpstreamMessage(jsonObject); - return; - } - - switch (messageType.toString()) { - case "ack" : handleAckReceipt(jsonObject); break; - case "nack" : handleNackReceipt(jsonObject); break; - case "receipt" : handleDeliveryReceipt(jsonObject); break; - case "control" : handleControlMessage(jsonObject); break; - default: - logger.warn("Received unknown GCM message: " + messageType.toString()); - } - - } catch (ParseException e) { - logger.warn("GCMClient", "Received unparsable message", e); - } catch (Exception e) { - logger.warn("GCMClient", "Failed to process packet", e); - } - } - - private void handleControlMessage(Map message) { - String controlType = (String) message.get("control_type"); - - if ("CONNECTION_DRAINING".equals(controlType)) { - logger.warn("GCM Connection is draining! Initiating reconnect..."); - reconnect(); - } else { - logger.warn("Received unknown GCM control message: " + controlType); - } - } - - private void handleDeliveryReceipt(Map message) { - logger.warn("Got delivery receipt!"); - } - - private void handleNackReceipt(Map message) { - String messageId = (String) message.get("message_id"); - String errorCode = (String) message.get("error"); - - if (errorCode == null) { - logger.warn("Null GCM error code!"); - if (messageId != null) { - pendingMessages.remove(messageId); - } - - return; - } - - switch (errorCode) { - case "BAD_REGISTRATION" : handleBadRegistration(message); break; - case "DEVICE_UNREGISTERED" : handleBadRegistration(message); break; - case "INTERNAL_SERVER_ERROR" : handleServerFailure(message); break; - case "INVALID_JSON" : handleClientFailure(message); break; - case "QUOTA_EXCEEDED" : handleClientFailure(message); break; - case "SERVICE_UNAVAILABLE" : handleServerFailure(message); break; - } - } - - private void handleAckReceipt(Map message) { - success.mark(); - - String messageId = (String) message.get("message_id"); - - if (messageId != null) { - pendingMessages.remove(messageId); - } - } - - private void handleUpstreamMessage(Map message) - throws SmackException.NotConnectedException - { - logger.warn("Got upstream message from GCM Server!"); - - for (String key : message.keySet()) { - logger.warn(key + " : " + message.get(key)); - } - - Map ack = new HashMap<>(); - message.put("message_type", "ack"); - message.put("to", message.get("from")); - message.put("message_id", message.get("message_id")); - - String json = JSONValue.toJSONString(ack); - - Packet request = new GcmPacketExtension(json).toPacket(); - connection.sendPacket(request); - } - - private void handleBadRegistration(Map message) { - unregistered.mark(); - - String messageId = (String) message.get("message_id"); - - if (messageId != null) { - UnacknowledgedMessage unacknowledgedMessage = pendingMessages.remove(messageId); - - if (unacknowledgedMessage != null) { - Optional account = accounts.get(unacknowledgedMessage.getDestinationNumber()); - - if (account.isPresent()) { - Optional device = account.get().getDevice(unacknowledgedMessage.getDestinationDeviceId()); - - if (device.isPresent()) { - device.get().setGcmId(null); - accounts.update(account.get()); - } - } - - } - } - } - - private void handleServerFailure(Map message) { - failure.mark(); - - String messageId = (String)message.get("message_id"); - - if (messageId != null) { - UnacknowledgedMessage unacknowledgedMessage = pendingMessages.remove(messageId); - - if (unacknowledgedMessage != null) { - sendMessage(messageId, unacknowledgedMessage); - } - } - } - - private void handleClientFailure(Map message) { - failure.mark(); - - logger.warn("Unrecoverable error: " + message.get("error")); - String messageId = (String)message.get("message_id"); - - if (messageId != null) { - pendingMessages.remove(messageId); - } - } - - private void reconnect() { - try { - this.connection.disconnect(); - } catch (SmackException.NotConnectedException e) { - logger.warn("GCMClient", "Disconnect attempt", e); - } - - while (true) { - try { - this.connection = connect(senderId, apiKey); - return; - } catch (XMPPException | IOException | SmackException e) { - logger.warn("GCMClient", "Reconnecting", e); - Util.sleep(1000); - } - } - } - - private XMPPTCPConnection connect(long senderId, String apiKey) - throws XMPPException, IOException, SmackException - { - ConnectionConfiguration config = new ConnectionConfiguration(GCM_SERVER, GCM_PORT); - config.setSecurityMode(ConnectionConfiguration.SecurityMode.enabled); - config.setReconnectionAllowed(true); - config.setRosterLoadedAtLogin(false); - config.setSendPresence(false); - config.setSocketFactory(SSLSocketFactory.getDefault()); - - XMPPTCPConnection connection = new XMPPTCPConnection(config); - connection.connect(); - - connection.addConnectionListener(new LoggingConnectionListener()); - connection.addPacketListener(this, new PacketTypeFilter(Message.class)); - - connection.login(senderId + "@gcm.googleapis.com", apiKey); - - return connection; - } - - private static class GcmPacketExtensionProvider implements PacketExtensionProvider { - @Override - public PacketExtension parseExtension(XmlPullParser xmlPullParser) throws Exception { - String json = xmlPullParser.nextText(); - return new GcmPacketExtension(json); - } - } - - private static final class GcmPacketExtension extends DefaultPacketExtension { - - private final String json; - - public GcmPacketExtension(String json) { - super(GCM_ELEMENT_NAME, GCM_NAMESPACE); - this.json = json; - } - - public String getJson() { - return json; - } - - @Override - public String toXML() { - return String.format("<%s xmlns=\"%s\">%s", GCM_ELEMENT_NAME, GCM_NAMESPACE, - StringUtils.escapeForXML(json), GCM_ELEMENT_NAME); - } - - public Packet toPacket() { - Message message = new Message(); - message.addExtension(this); - return message; - } - } - - private class LoggingConnectionListener implements ConnectionListener { - - @Override - public void connected(XMPPConnection xmppConnection) { - logger.warn("GCM XMPP Connected."); - } - - @Override - public void authenticated(XMPPConnection xmppConnection) { - logger.warn("GCM XMPP Authenticated."); - } - - @Override - public void reconnectionSuccessful() { - logger.warn("GCM XMPP Reconnecting.."); - Iterator> iterator = - pendingMessages.entrySet().iterator(); - - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - iterator.remove(); - - sendMessage(entry.getKey(), entry.getValue()); - } - } - - @Override - public void reconnectionFailed(Exception e) { - logger.warn("GCM XMPP Reconnection failed!", e); - reconnect(); - } - - @Override - public void reconnectingIn(int seconds) { - logger.warn(String.format("GCM XMPP Reconnecting in %d secs", seconds)); - } - - @Override - public void connectionClosedOnError(Exception e) { - logger.warn("GCM XMPP Connection closed on error."); - } - - @Override - public void connectionClosed() { - logger.warn("GCM XMPP Connection closed."); - reconnect(); - } - } - - private static class UnacknowledgedMessage { - private final String destinationNumber; - private final long destinationDeviceId; - - private final String registrationId; - private final PendingMessage pendingMessage; - - private UnacknowledgedMessage(String destinationNumber, - long destinationDeviceId, - String registrationId, - PendingMessage pendingMessage) - { - this.destinationNumber = destinationNumber; - this.destinationDeviceId = destinationDeviceId; - this.registrationId = registrationId; - this.pendingMessage = pendingMessage; - } - - private String getRegistrationId() { - return registrationId; - } - - private PendingMessage getPendingMessage() { - return pendingMessage; - } - - public String getDestinationNumber() { - return destinationNumber; - } - - public long getDestinationDeviceId() { - return destinationDeviceId; - } - } -} diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java index 74d5da008..7f8aac6a4 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java @@ -18,9 +18,10 @@ package org.whispersystems.textsecuregcm.push; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.entities.ApnMessage; import org.whispersystems.textsecuregcm.entities.CryptoEncodingException; import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage; -import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.entities.GcmMessage; import org.whispersystems.textsecuregcm.entities.PendingMessage; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; @@ -31,17 +32,12 @@ public class PushSender { private final Logger logger = LoggerFactory.getLogger(PushSender.class); - private final GCMSender gcmSender; - private final APNSender apnSender; - private final WebsocketSender webSocketSender; + private final PushServiceClient pushServiceClient; + private final WebsocketSender webSocketSender; - public PushSender(GCMSender gcmClient, - APNSender apnSender, - WebsocketSender websocketSender) - { - this.gcmSender = gcmClient; - this.apnSender = apnSender; - this.webSocketSender = websocketSender; + public PushSender(PushServiceClient pushServiceClient, WebsocketSender websocketSender) { + this.pushServiceClient = pushServiceClient; + this.webSocketSender = websocketSender; } public void sendMessage(Account account, Device device, OutgoingMessageSignal message) @@ -71,22 +67,35 @@ public class PushSender { else throw new NotPushRegisteredException("No delivery possible!"); } - private void sendGcmMessage(Account account, Device device, PendingMessage pendingMessage) { - String number = account.getNumber(); - long deviceId = device.getId(); - String registrationId = device.getGcmId(); + private void sendGcmMessage(Account account, Device device, PendingMessage pendingMessage) + throws TransientPushFailureException + { + String number = account.getNumber(); + long deviceId = device.getId(); + String registrationId = device.getGcmId(); + GcmMessage gcmMessage = new GcmMessage(registrationId, number, (int)deviceId, + pendingMessage.getEncryptedOutgoingMessage(), + pendingMessage.isReceipt() ); - gcmSender.sendMessage(number, deviceId, registrationId, pendingMessage); + pushServiceClient.send(gcmMessage); } private void sendApnMessage(Account account, Device device, PendingMessage outgoingMessage) throws TransientPushFailureException { - apnSender.sendMessage(account, device, device.getApnId(), outgoingMessage); + boolean online = webSocketSender.sendMessage(account, device, outgoingMessage, true); + + if (!online && !outgoingMessage.isReceipt()) { + ApnMessage apnMessage = new ApnMessage(device.getApnId(), account.getNumber(), + (int)device.getId(), + outgoingMessage.getEncryptedOutgoingMessage()); + + pushServiceClient.send(apnMessage); + } } private void sendWebSocketMessage(Account account, Device device, PendingMessage outgoingMessage) { - webSocketSender.sendMessage(account, device, outgoingMessage); + webSocketSender.sendMessage(account, device, outgoingMessage, false); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/PushServiceClient.java b/src/main/java/org/whispersystems/textsecuregcm/push/PushServiceClient.java new file mode 100644 index 000000000..822cf42c2 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/push/PushServiceClient.java @@ -0,0 +1,56 @@ +package org.whispersystems.textsecuregcm.push; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.configuration.PushConfiguration; +import org.whispersystems.textsecuregcm.entities.ApnMessage; +import org.whispersystems.textsecuregcm.entities.GcmMessage; +import org.whispersystems.textsecuregcm.util.Base64; + +import javax.ws.rs.core.MediaType; + +public class PushServiceClient { + + private static final String PUSH_GCM_PATH = "/api/v1/push/gcm"; + private static final String PUSH_APN_PATH = "/api/v1/push/apn"; + + private final Logger logger = LoggerFactory.getLogger(PushServiceClient.class); + + private final Client client; + private final String host; + private final int port; + private final String authorization; + + public PushServiceClient(Client client, PushConfiguration config) { + this.client = client; + this.host = config.getHost(); + this.port = config.getPort(); + this.authorization = getAuthorizationHeader(config.getUsername(), config.getPassword()); + } + + public void send(GcmMessage message) throws TransientPushFailureException { + sendPush(PUSH_GCM_PATH, message); + } + + public void send(ApnMessage message) throws TransientPushFailureException { + sendPush(PUSH_APN_PATH, message); + } + + private void sendPush(String path, Object entity) throws TransientPushFailureException { + ClientResponse response = client.resource("http://" + host + ":" + port + path) + .header("Authorization", authorization) + .entity(entity, MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + if (response.getStatus() != 204 && response.getStatus() != 200) { + logger.warn("PushServer response: " + response.getStatus() + " " + response.getStatusInfo().getReasonPhrase()); + throw new TransientPushFailureException("Bad response: " + response.getStatus()); + } + } + + private String getAuthorizationHeader(String username, String password) { + return "Basic " + Base64.encodeBytes((username + ":" + password).getBytes()); + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java index f33944faf..bd909fcea 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java @@ -40,8 +40,12 @@ public class WebsocketSender { private static final Logger logger = LoggerFactory.getLogger(WebsocketSender.class); private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private final Meter onlineMeter = metricRegistry.meter(name(getClass(), "online")); - private final Meter offlineMeter = metricRegistry.meter(name(getClass(), "offline")); + + private final Meter websocketOnlineMeter = metricRegistry.meter(name(getClass(), "ws_online" )); + private final Meter websocketOfflineMeter = metricRegistry.meter(name(getClass(), "ws_offline" )); + + private final Meter apnOnlineMeter = metricRegistry.meter(name(getClass(), "apn_online" )); + private final Meter apnOfflineMeter = metricRegistry.meter(name(getClass(), "apn_offline")); private static final ObjectMapper mapper = SystemMapper.getMapper(); @@ -53,21 +57,28 @@ public class WebsocketSender { this.pubSubManager = pubSubManager; } - public void sendMessage(Account account, Device device, PendingMessage pendingMessage) { + public boolean sendMessage(Account account, Device device, PendingMessage pendingMessage, boolean apn) { try { String serialized = mapper.writeValueAsString(pendingMessage); WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId()); PubSubMessage pubSubMessage = new PubSubMessage(PubSubMessage.TYPE_DELIVER, serialized); if (pubSubManager.publish(address, pubSubMessage)) { - onlineMeter.mark(); + if (apn) apnOnlineMeter.mark(); + else websocketOnlineMeter.mark(); + + return true; } else { - offlineMeter.mark(); + if (apn) apnOfflineMeter.mark(); + else websocketOfflineMeter.mark(); + storedMessages.insert(address, pendingMessage); pubSubManager.publish(address, new PubSubMessage(PubSubMessage.TYPE_QUERY_DB, null)); + return false; } } catch (JsonProcessingException e) { logger.warn("WebsocketSender", "Unable to serialize json", e); + return false; } } }