From 189d95f4fab0b3b750928bbb73324de4dae95c6a Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Tue, 25 Apr 2017 18:03:07 -0700 Subject: [PATCH] Pull GCM/APN senders into service // FREEBIE --- pom.xml | 20 ++ .../WhisperServerConfiguration.java | 21 ++ .../textsecuregcm/WhisperServerService.java | 15 +- .../configuration/ApnConfiguration.java | 70 +++++ .../configuration/GcmConfiguration.java | 42 +++ .../textsecuregcm/entities/GcmMessage.java | 42 --- .../textsecuregcm/push/APNSender.java | 254 ++++++++++++++++++ .../push/ApnFallbackManager.java | 13 +- .../{entities => push}/ApnMessage.java | 51 ++-- .../textsecuregcm/push/FeedbackHandler.java | 120 --------- .../textsecuregcm/push/GCMSender.java | 178 ++++++++++++ .../textsecuregcm/push/GcmMessage.java | 32 +++ .../textsecuregcm/push/PushSender.java | 31 +-- .../textsecuregcm/push/PushServiceClient.java | 94 ------- .../textsecuregcm/workers/PushCommand.java | 173 ------------ .../tests/push/APNSenderTest.java | 87 ++++++ .../tests/push/ApnFallbackManagerTest.java | 26 +- .../tests/push/GCMSenderTest.java | 128 +++++++++ .../util/SynchronousExecutorService.java | 111 ++++++++ 19 files changed, 998 insertions(+), 510 deletions(-) create mode 100644 src/main/java/org/whispersystems/textsecuregcm/configuration/ApnConfiguration.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/configuration/GcmConfiguration.java delete mode 100644 src/main/java/org/whispersystems/textsecuregcm/entities/GcmMessage.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java rename src/main/java/org/whispersystems/textsecuregcm/{entities => push}/ApnMessage.java (57%) delete mode 100644 src/main/java/org/whispersystems/textsecuregcm/push/FeedbackHandler.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/push/GCMSender.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/push/GcmMessage.java delete mode 100644 src/main/java/org/whispersystems/textsecuregcm/push/PushServiceClient.java delete mode 100644 src/main/java/org/whispersystems/textsecuregcm/workers/PushCommand.java create mode 100644 src/test/java/org/whispersystems/textsecuregcm/tests/push/APNSenderTest.java create mode 100644 src/test/java/org/whispersystems/textsecuregcm/tests/push/GCMSenderTest.java create mode 100644 src/test/java/org/whispersystems/textsecuregcm/tests/util/SynchronousExecutorService.java diff --git a/pom.xml b/pom.xml index 784b41052..6af9fc943 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ 0.9.2 + 2.6.0 UTF-8 @@ -103,6 +104,17 @@ 0.1.1 + + com.notnoop.apns + apns + 0.2.3 + + + org.whispersystems + gcm-sender-async + 0.1.4 + + org.glassfish.jersey.test-framework.providers jersey-test-framework-provider-grizzly2 @@ -120,6 +132,8 @@ + + @@ -134,6 +148,12 @@ httpcore 4.4.1 + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.api.version} + diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 8761cb9e4..d1eb16c26 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -17,7 +17,9 @@ package org.whispersystems.textsecuregcm; import com.fasterxml.jackson.annotation.JsonProperty; +import org.whispersystems.textsecuregcm.configuration.ApnConfiguration; import org.whispersystems.textsecuregcm.configuration.FederationConfiguration; +import org.whispersystems.textsecuregcm.configuration.GcmConfiguration; import org.whispersystems.textsecuregcm.configuration.GraphiteConfiguration; import org.whispersystems.textsecuregcm.configuration.MaxDeviceConfiguration; import org.whispersystems.textsecuregcm.configuration.PushConfiguration; @@ -117,6 +119,16 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private TurnConfiguration turn; + @Valid + @NotNull + @JsonProperty + private GcmConfiguration gcm; + + @Valid + @NotNull + @JsonProperty + private ApnConfiguration apn; + public WebsocketConfiguration getWebsocketConfiguration() { return websocket; @@ -174,6 +186,14 @@ public class WhisperServerConfiguration extends Configuration { return turn; } + public GcmConfiguration getGcmConfiguration() { + return gcm; + } + + public ApnConfiguration getApnConfiguration() { + return apn; + } + public Map getTestDevices() { Map results = new HashMap<>(); @@ -195,4 +215,5 @@ public class WhisperServerConfiguration extends Configuration { return results; } + } diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 218fa2ee7..d488c5406 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -60,10 +60,10 @@ import org.whispersystems.textsecuregcm.metrics.NetworkSentGauge; import org.whispersystems.textsecuregcm.providers.RedisClientFactory; import org.whispersystems.textsecuregcm.providers.RedisHealthCheck; import org.whispersystems.textsecuregcm.providers.TimeProvider; +import org.whispersystems.textsecuregcm.push.APNSender; import org.whispersystems.textsecuregcm.push.ApnFallbackManager; -import org.whispersystems.textsecuregcm.push.FeedbackHandler; +import org.whispersystems.textsecuregcm.push.GCMSender; import org.whispersystems.textsecuregcm.push.PushSender; -import org.whispersystems.textsecuregcm.push.PushServiceClient; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.WebsocketSender; import org.whispersystems.textsecuregcm.sms.SmsSender; @@ -89,7 +89,6 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; import org.whispersystems.textsecuregcm.workers.DirectoryCommand; import org.whispersystems.textsecuregcm.workers.PeriodicStatsCommand; -import org.whispersystems.textsecuregcm.workers.PushCommand; import org.whispersystems.textsecuregcm.workers.TrimMessagesCommand; import org.whispersystems.textsecuregcm.workers.VacuumCommand; import org.whispersystems.websocket.WebSocketResourceProviderFactory; @@ -124,7 +123,6 @@ public class WhisperServerService extends Application("accountdb", "accountsdb.xml") { @Override public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) { @@ -178,25 +176,24 @@ public class WhisperServerService extends Applicationof(deadLetterHandler)); PubSubManager pubSubManager = new PubSubManager(cacheClient, dispatchManager); - PushServiceClient pushServiceClient = new PushServiceClient(httpClient, config.getPushConfiguration()); + APNSender apnSender = new APNSender(accountsManager, cacheClient, config.getApnConfiguration()); + GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey()); WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager); AccountAuthenticator deviceAuthenticator = new AccountAuthenticator(accountsManager ); FederatedPeerAuthenticator federatedPeerAuthenticator = new FederatedPeerAuthenticator(config.getFederationConfiguration()); RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheClient); - ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushServiceClient, pubSubManager); + ApnFallbackManager apnFallbackManager = new ApnFallbackManager(apnSender, pubSubManager); TwilioSmsSender twilioSmsSender = new TwilioSmsSender(config.getTwilioConfiguration()); SmsSender smsSender = new SmsSender(twilioSmsSender); UrlSigner urlSigner = new UrlSigner(config.getS3Configuration()); - PushSender pushSender = new PushSender(apnFallbackManager, pushServiceClient, websocketSender, config.getPushConfiguration().getQueueSize()); + PushSender pushSender = new PushSender(apnFallbackManager, gcmSender, apnSender, websocketSender, config.getPushConfiguration().getQueueSize()); ReceiptSender receiptSender = new ReceiptSender(accountsManager, pushSender, federatedClientManager); - FeedbackHandler feedbackHandler = new FeedbackHandler(pushServiceClient, accountsManager); TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration()); Optional authorizationKey = config.getRedphoneConfiguration().getAuthorizationKey(); environment.lifecycle().manage(apnFallbackManager); environment.lifecycle().manage(pubSubManager); - environment.lifecycle().manage(feedbackHandler); environment.lifecycle().manage(pushSender); AttachmentController attachmentController = new AttachmentController(rateLimiters, federatedClientManager, urlSigner); diff --git a/src/main/java/org/whispersystems/textsecuregcm/configuration/ApnConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/configuration/ApnConfiguration.java new file mode 100644 index 000000000..31f08a636 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/configuration/ApnConfiguration.java @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2013 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.hibernate.validator.constraints.NotEmpty; + + +public class ApnConfiguration { + + @NotEmpty + @JsonProperty + private String pushCertificate; + + @NotEmpty + @JsonProperty + private String pushKey; + + @NotEmpty + @JsonProperty + private String voipCertificate; + + @NotEmpty + @JsonProperty + private String voipKey; + + @JsonProperty + private boolean feedback = true; + + @JsonProperty + private boolean sandbox = false; + + public String getPushCertificate() { + return pushCertificate; + } + + public String getPushKey() { + return pushKey; + } + + public String getVoipCertificate() { + return voipCertificate; + } + + public String getVoipKey() { + return voipKey; + } + + public boolean isFeedbackEnabled() { + return feedback; + } + + public boolean isSandboxEnabled() { + return sandbox; + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/configuration/GcmConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/configuration/GcmConfiguration.java new file mode 100644 index 000000000..37a395b21 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/configuration/GcmConfiguration.java @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2013 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.hibernate.validator.constraints.NotEmpty; + +import javax.validation.constraints.NotNull; + +public class GcmConfiguration { + + @NotNull + @JsonProperty + private long senderId; + + @NotEmpty + @JsonProperty + private String apiKey; + + public String getApiKey() { + return apiKey; + } + + public long getSenderId() { + return senderId; + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/entities/GcmMessage.java b/src/main/java/org/whispersystems/textsecuregcm/entities/GcmMessage.java deleted file mode 100644 index e0e8b29bb..000000000 --- a/src/main/java/org/whispersystems/textsecuregcm/entities/GcmMessage.java +++ /dev/null @@ -1,42 +0,0 @@ -package org.whispersystems.textsecuregcm.entities; - -import com.fasterxml.jackson.annotation.JsonProperty; -import org.hibernate.validator.constraints.NotEmpty; - -import javax.validation.constraints.Min; - -public class GcmMessage { - - @JsonProperty - @NotEmpty - private String gcmId; - - @JsonProperty - @NotEmpty - private String number; - - @JsonProperty - @Min(1) - private int deviceId; - - @JsonProperty - private String message; - - @JsonProperty - private boolean receipt; - - @JsonProperty - private boolean notification; - - public GcmMessage() {} - - public GcmMessage(String gcmId, String number, int deviceId, String message, boolean receipt, boolean notification) { - this.gcmId = gcmId; - this.number = number; - this.deviceId = deviceId; - this.message = message; - this.receipt = receipt; - this.notification = notification; - } - -} diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java new file mode 100644 index 000000000..c53028f22 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java @@ -0,0 +1,254 @@ +/** + * Copyright (C) 2013 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.push; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.notnoop.apns.APNS; +import com.notnoop.apns.ApnsService; +import com.notnoop.apns.ApnsServiceBuilder; +import com.notnoop.exceptions.NetworkIOException; +import org.bouncycastle.openssl.PEMReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.configuration.ApnConfiguration; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.Device; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.security.KeyPair; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Date; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import io.dropwizard.lifecycle.Managed; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; + +public class APNSender implements Managed { + + private final Logger logger = LoggerFactory.getLogger(APNSender.class); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + private final AccountsManager accountsManager; + private final JedisPool jedisPool; + + private final String pushCertificate; + private final String pushKey; + + private final String voipCertificate; + private final String voipKey; + + private final boolean feedbackEnabled; + private final boolean sandbox; + + private ApnsService pushApnService; + private ApnsService voipApnService; + + public APNSender(AccountsManager accountsManager, + JedisPool jedisPool, + ApnConfiguration configuration) + { + this.accountsManager = accountsManager; + this.jedisPool = jedisPool; + this.pushCertificate = configuration.getPushCertificate(); + this.pushKey = configuration.getPushKey(); + this.voipCertificate = configuration.getVoipCertificate(); + this.voipKey = configuration.getVoipKey(); + this.feedbackEnabled = configuration.isFeedbackEnabled(); + this.sandbox = configuration.isSandboxEnabled(); + } + + @VisibleForTesting + public APNSender(AccountsManager accountsManager, JedisPool jedisPool, + ApnsService pushApnService, ApnsService voipApnService, + boolean feedbackEnabled, boolean sandbox) + { + this.accountsManager = accountsManager; + this.jedisPool = jedisPool; + this.pushApnService = pushApnService; + this.voipApnService = voipApnService; + this.feedbackEnabled = feedbackEnabled; + this.sandbox = sandbox; + this.pushCertificate = null; + this.pushKey = null; + this.voipCertificate = null; + this.voipKey = null; + } + + public void sendMessage(ApnMessage message) + throws TransientPushFailureException + { + try { + redisSet(message.getApnId(), message.getNumber(), message.getDeviceId()); + + if (message.isVoip()) { + voipApnService.push(message.getApnId(), message.getMessage(), new Date(message.getExpirationTime())); + } else { + pushApnService.push(message.getApnId(), message.getMessage(), new Date(message.getExpirationTime())); + } + } catch (NetworkIOException nioe) { + logger.warn("Network Error", nioe); + throw new TransientPushFailureException(nioe); + } + } + + private static byte[] initializeKeyStore(String pemCertificate, String pemKey) + throws KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException + { + PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemCertificate.getBytes()))); + X509Certificate certificate = (X509Certificate) reader.readObject(); + Certificate[] certificateChain = {certificate}; + + reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemKey.getBytes()))); + KeyPair keyPair = (KeyPair) reader.readObject(); + + KeyStore keyStore = KeyStore.getInstance("pkcs12"); + keyStore.load(null); + keyStore.setEntry("apn", + new KeyStore.PrivateKeyEntry(keyPair.getPrivate(), certificateChain), + new KeyStore.PasswordProtection("insecure".toCharArray())); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + keyStore.store(baos, "insecure".toCharArray()); + + return baos.toByteArray(); + } + + @Override + public void start() throws Exception { + byte[] pushKeyStore = initializeKeyStore(pushCertificate, pushKey); + byte[] voipKeyStore = initializeKeyStore(voipCertificate, voipKey); + + ApnsServiceBuilder pushApnServiceBuilder = APNS.newService() + .withCert(new ByteArrayInputStream(pushKeyStore), "insecure") + .asQueued(); + + + ApnsServiceBuilder voipApnServiceBuilder = APNS.newService() + .withCert(new ByteArrayInputStream(voipKeyStore), "insecure") + .asQueued(); + + + if (sandbox) { + this.pushApnService = pushApnServiceBuilder.withSandboxDestination().build(); + this.voipApnService = voipApnServiceBuilder.withSandboxDestination().build(); + } else { + this.pushApnService = pushApnServiceBuilder.withProductionDestination().build(); + this.voipApnService = voipApnServiceBuilder.withProductionDestination().build(); + } + + if (feedbackEnabled) { + this.executor.scheduleAtFixedRate(new FeedbackRunnable(), 0, 1, TimeUnit.HOURS); + } + } + + @Override + public void stop() throws Exception { + pushApnService.stop(); + voipApnService.stop(); + } + + private void redisSet(String registrationId, String number, int deviceId) { + try (Jedis jedis = jedisPool.getResource()) { + jedis.set("APN-" + registrationId.toLowerCase(), number + "." + deviceId); + jedis.expire("APN-" + registrationId.toLowerCase(), (int) TimeUnit.HOURS.toSeconds(1)); + } + } + + private Optional redisGet(String registrationId) { + try (Jedis jedis = jedisPool.getResource()) { + String number = jedis.get("APN-" + registrationId.toLowerCase()); + return Optional.fromNullable(number); + } + } + + @VisibleForTesting + public void checkFeedback() { + new FeedbackRunnable().run(); + } + + private class FeedbackRunnable implements Runnable { + + @Override + public void run() { + try { + Map inactiveDevices = pushApnService.getInactiveDevices(); + inactiveDevices.putAll(voipApnService.getInactiveDevices()); + + for (String registrationId : inactiveDevices.keySet()) { + Optional device = redisGet(registrationId); + + if (device.isPresent()) { + logger.warn("Got APN unregistered notice!"); + String[] parts = device.get().split("\\.", 2); + + if (parts.length == 2) { + String number = parts[0]; + int deviceId = Integer.parseInt(parts[1]); + long timestamp = inactiveDevices.get(registrationId).getTime(); + + handleApnUnregistered(registrationId, number, deviceId, timestamp); + } else { + logger.warn("APN unregister event for device with no parts: " + device.get()); + } + } else { + logger.warn("APN unregister event received for uncached ID: " + registrationId); + } + } + } catch (Throwable t) { + logger.warn("Exception during feedback", t); + } + } + + private void handleApnUnregistered(String registrationId, String number, int deviceId, long timestamp) { + logger.info("Got APN Unregistered: " + number + "," + deviceId); + + Optional account = accountsManager.get(number); + + if (account.isPresent()) { + Optional device = account.get().getDevice(deviceId); + + if (device.isPresent()) { + if (registrationId.equals(device.get().getApnId())) { + logger.info("APN Unregister APN ID matches!"); + if (device.get().getPushTimestamp() == 0 || + timestamp > device.get().getPushTimestamp()) + { + logger.info("APN Unregister timestamp matches!"); + device.get().setApnId(null); + accountsManager.update(account.get()); + } + } + } + } + } + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java b/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java index a317261d8..2029f1d92 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java @@ -10,7 +10,6 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.dispatch.DispatchChannel; -import org.whispersystems.textsecuregcm.entities.ApnMessage; import org.whispersystems.textsecuregcm.storage.PubSubManager; import org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage; import org.whispersystems.textsecuregcm.util.Constants; @@ -43,12 +42,12 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel { private final ApnFallbackTaskQueue taskQueue = new ApnFallbackTaskQueue(); - private final PushServiceClient pushServiceClient; - private final PubSubManager pubSubManager; + private final APNSender apnSender; + private final PubSubManager pubSubManager; - public ApnFallbackManager(PushServiceClient pushServiceClient, PubSubManager pubSubManager) { - this.pushServiceClient = pushServiceClient; - this.pubSubManager = pubSubManager; + public ApnFallbackManager(APNSender apnSender, PubSubManager pubSubManager) { + this.apnSender = apnSender; + this.pubSubManager = pubSubManager; } public void schedule(final WebsocketAddress address, ApnFallbackTask task) { @@ -102,7 +101,7 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel { pubSubManager.unsubscribe(new WebSocketConnectionInfo(taskEntry.getKey()), this); } - pushServiceClient.send(message); + apnSender.sendMessage(message); } catch (Throwable e) { logger.warn("ApnFallbackThread", e); } diff --git a/src/main/java/org/whispersystems/textsecuregcm/entities/ApnMessage.java b/src/main/java/org/whispersystems/textsecuregcm/push/ApnMessage.java similarity index 57% rename from src/main/java/org/whispersystems/textsecuregcm/entities/ApnMessage.java rename to src/main/java/org/whispersystems/textsecuregcm/push/ApnMessage.java index 60763d271..6110ea979 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/entities/ApnMessage.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/ApnMessage.java @@ -1,40 +1,15 @@ -package org.whispersystems.textsecuregcm.entities; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; -import org.hibernate.validator.constraints.NotEmpty; - -import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; +package org.whispersystems.textsecuregcm.push; public class ApnMessage { public static long MAX_EXPIRATION = Integer.MAX_VALUE * 1000L; - @JsonProperty - @NotEmpty - private String apnId; - - @JsonProperty - @NotEmpty - private String number; - - @JsonProperty - @Min(1) - private int deviceId; - - @JsonProperty - @NotEmpty - private String message; - - @JsonProperty - @NotNull - private boolean voip; - - @JsonProperty - private long expirationTime; - - public ApnMessage() {} + private final String apnId; + private final String number; + private final int deviceId; + private final String message; + private final boolean voip; + private final long expirationTime; public ApnMessage(String apnId, String number, int deviceId, String message, boolean voip, long expirationTime) { this.apnId = apnId; @@ -54,23 +29,27 @@ public class ApnMessage { this.expirationTime = expirationTime; } - @VisibleForTesting public String getApnId() { return apnId; } - @VisibleForTesting public boolean isVoip() { return voip; } - @VisibleForTesting public String getMessage() { return message; } - @VisibleForTesting public long getExpirationTime() { return expirationTime; } + + public String getNumber() { + return number; + } + + public int getDeviceId() { + return deviceId; + } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/FeedbackHandler.java b/src/main/java/org/whispersystems/textsecuregcm/push/FeedbackHandler.java deleted file mode 100644 index c77a7a551..000000000 --- a/src/main/java/org/whispersystems/textsecuregcm/push/FeedbackHandler.java +++ /dev/null @@ -1,120 +0,0 @@ -package org.whispersystems.textsecuregcm.push; - -import com.google.common.base.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.entities.UnregisteredEvent; -import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.AccountsManager; -import org.whispersystems.textsecuregcm.storage.Device; - -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import io.dropwizard.lifecycle.Managed; - -public class FeedbackHandler implements Managed, Runnable { - - private final Logger logger = LoggerFactory.getLogger(PushServiceClient.class); - - private final PushServiceClient client; - private final AccountsManager accountsManager; - - private ScheduledExecutorService executor; - - public FeedbackHandler(PushServiceClient client, AccountsManager accountsManager) { - this.client = client; - this.accountsManager = accountsManager; - } - - @Override - public void start() throws Exception { - this.executor = Executors.newSingleThreadScheduledExecutor(); - this.executor.scheduleAtFixedRate(this, 0, 1, TimeUnit.MINUTES); - } - - @Override - public void stop() throws Exception { - if (this.executor != null) { - this.executor.shutdown(); - } - } - - @Override - public void run() { - logger.info("Checking Push Server feedback..."); - - try { - List gcmFeedback = client.getGcmFeedback(); - List apnFeedback = client.getApnFeedback(); - - logger.info("Got GCM feedback: " + gcmFeedback.size()); - logger.info("Got APN feedback: " + apnFeedback.size()); - - for (UnregisteredEvent gcmEvent : gcmFeedback) { - handleGcmUnregistered(gcmEvent); - } - - for (UnregisteredEvent apnEvent : apnFeedback) { - handleApnUnregistered(apnEvent); - } - } catch (Throwable t) { - logger.warn("Error retrieving feedback: ", t); - } - } - - private void handleGcmUnregistered(UnregisteredEvent event) { - logger.info("Got GCM Unregistered: " + event.getNumber() + "," + event.getDeviceId()); - - Optional account = accountsManager.get(event.getNumber()); - - if (account.isPresent()) { - Optional device = account.get().getDevice(event.getDeviceId()); - - if (device.isPresent()) { - if (event.getRegistrationId().equals(device.get().getGcmId())) { - logger.info("GCM Unregister GCM ID matches!"); - if (device.get().getPushTimestamp() == 0 || - event.getTimestamp() > (device.get().getPushTimestamp() + TimeUnit.SECONDS.toMillis(10))) - { - logger.info("GCM Unregister Timestamp matches!"); - - if (event.getCanonicalId() != null && !event.getCanonicalId().isEmpty()) { - logger.info("It's a canonical ID update..."); - device.get().setGcmId(event.getCanonicalId()); - } else { - device.get().setGcmId(null); - device.get().setFetchesMessages(false); - } - accountsManager.update(account.get()); - } - } - } - } - } - - private void handleApnUnregistered(UnregisteredEvent event) { - logger.info("Got APN Unregistered: " + event.getNumber() + "," + event.getDeviceId()); - - Optional account = accountsManager.get(event.getNumber()); - - if (account.isPresent()) { - Optional device = account.get().getDevice(event.getDeviceId()); - - if (device.isPresent()) { - if (event.getRegistrationId().equals(device.get().getApnId())) { - logger.info("APN Unregister APN ID matches!"); - if (device.get().getPushTimestamp() == 0 || - event.getTimestamp() > device.get().getPushTimestamp()) - { - logger.info("APN Unregister timestamp matches!"); - device.get().setApnId(null); - accountsManager.update(account.get()); - } - } - } - } - } -} diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/GCMSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/GCMSender.java new file mode 100644 index 000000000..e930bc511 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/push/GCMSender.java @@ -0,0 +1,178 @@ +package org.whispersystems.textsecuregcm.push; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.gcm.server.Message; +import org.whispersystems.gcm.server.Result; +import org.whispersystems.gcm.server.Sender; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.util.Constants; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static com.codahale.metrics.MetricRegistry.name; +import io.dropwizard.lifecycle.Managed; + +public class GCMSender implements Managed { + + private final Logger logger = LoggerFactory.getLogger(GCMSender.class); + + private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + private final Meter success = metricRegistry.meter(name(getClass(), "sent", "success")); + private final Meter failure = metricRegistry.meter(name(getClass(), "sent", "failure")); + private final Meter unregistered = metricRegistry.meter(name(getClass(), "sent", "unregistered")); + private final Meter canonical = metricRegistry.meter(name(getClass(), "sent", "canonical")); + + private final Map outboundMeters = new HashMap() {{ + put("receipt", metricRegistry.meter(name(getClass(), "outbound", "receipt"))); + put("notification", metricRegistry.meter(name(getClass(), "outbound", "notification"))); + }}; + + + private final AccountsManager accountsManager; + private final Sender signalSender; + private ExecutorService executor; + + public GCMSender(AccountsManager accountsManager, String signalKey) { + this.accountsManager = accountsManager; + this.signalSender = new Sender(signalKey, 50); + } + + @VisibleForTesting + public GCMSender(AccountsManager accountsManager, Sender sender, ExecutorService executor) { + this.accountsManager = accountsManager; + this.signalSender = sender; + this.executor = executor; + } + + public void sendMessage(GcmMessage message) { + Message.Builder builder = Message.newBuilder() + .withDestination(message.getGcmId()) + .withPriority("high"); + + String key = message.isReceipt() ? "receipt" : "notification"; + Message request = builder.withDataPart(key, "").build(); + + ListenableFuture future = signalSender.send(request, message); + markOutboundMeter(key); + + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Result result) { + if (result.isUnregistered() || result.isInvalidRegistrationId()) { + handleBadRegistration(result); + } else if (result.hasCanonicalRegistrationId()) { + handleCanonicalRegistrationId(result); + } else if (!result.isSuccess()) { + handleGenericError(result); + } else { + success.mark(); + } + } + + @Override + public void onFailure(Throwable throwable) { + logger.warn("GCM Failed: " + throwable); + } + }, executor); + } + + @Override + public void start() { + executor = Executors.newSingleThreadExecutor(); + } + + @Override + public void stop() throws IOException { + this.signalSender.stop(); + this.executor.shutdown(); + } + + private void handleBadRegistration(Result result) { + GcmMessage message = (GcmMessage)result.getContext(); + logger.warn("Got GCM unregistered notice! " + message.getGcmId()); + + Optional account = getAccountForEvent(message); + + if (account.isPresent()) { + Device device = account.get().getDevice(message.getDeviceId()).get(); + device.setGcmId(null); + device.setFetchesMessages(false); + + accountsManager.update(account.get()); + } + + unregistered.mark(); + } + + private void handleCanonicalRegistrationId(Result result) { + GcmMessage message = (GcmMessage)result.getContext(); + logger.warn(String.format("Actually received 'CanonicalRegistrationId' ::: (canonical=%s), (original=%s)", + result.getCanonicalRegistrationId(), message.getGcmId())); + + Optional account = getAccountForEvent(message); + + if (account.isPresent()) { + Device device = account.get().getDevice(message.getDeviceId()).get(); + device.setGcmId(result.getCanonicalRegistrationId()); + + accountsManager.update(account.get()); + } + + canonical.mark(); + } + + private void handleGenericError(Result result) { + GcmMessage message = (GcmMessage)result.getContext(); + logger.warn(String.format("Unrecoverable Error ::: (error=%s), (gcm_id=%s), " + + "(destination=%s), (device_id=%d)", + result.getError(), message.getGcmId(), message.getNumber(), + message.getDeviceId())); + failure.mark(); + } + + private Optional getAccountForEvent(GcmMessage message) { + Optional account = accountsManager.get(message.getNumber()); + + if (account.isPresent()) { + Optional device = account.get().getDevice(message.getDeviceId()); + + if (device.isPresent()) { + if (message.getGcmId().equals(device.get().getGcmId())) { + logger.info("GCM Unregister GCM ID matches!"); + + if (device.get().getPushTimestamp() == 0 || System.currentTimeMillis() > (device.get().getPushTimestamp() + TimeUnit.SECONDS.toMillis(10))) + { + logger.info("GCM Unregister Timestamp matches!"); + + return account; + } + } + } + } + + return Optional.absent(); + } + + private void markOutboundMeter(String key) { + Meter meter = outboundMeters.get(key); + + if (meter != null) meter.mark(); + else logger.warn("Unknown outbound key: " + key); + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/GcmMessage.java b/src/main/java/org/whispersystems/textsecuregcm/push/GcmMessage.java new file mode 100644 index 000000000..76cd97fce --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/push/GcmMessage.java @@ -0,0 +1,32 @@ +package org.whispersystems.textsecuregcm.push; + +public class GcmMessage { + + private final String gcmId; + private final String number; + private final int deviceId; + private final boolean receipt; + + public GcmMessage(String gcmId, String number, int deviceId, boolean receipt) { + this.gcmId = gcmId; + this.number = number; + this.deviceId = deviceId; + this.receipt = receipt; + } + + public String getGcmId() { + return gcmId; + } + + public String getNumber() { + return number; + } + + public boolean isReceipt() { + return receipt; + } + + public int getDeviceId() { + return deviceId; + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java index 991de7cbc..55ee86612 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java @@ -1,4 +1,4 @@ -/** +/* * Copyright (C) 2013 Open WhisperSystems * * This program is free software: you can redistribute it and/or modify @@ -20,8 +20,6 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.SharedMetricRegistries; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.entities.ApnMessage; -import org.whispersystems.textsecuregcm.entities.GcmMessage; import org.whispersystems.textsecuregcm.push.ApnFallbackManager.ApnFallbackTask; import org.whispersystems.textsecuregcm.push.WebsocketSender.DeliveryStatus; import org.whispersystems.textsecuregcm.storage.Account; @@ -44,16 +42,19 @@ public class PushSender implements Managed { public static final String APN_PAYLOAD = "{\"aps\":{\"sound\":\"default\",\"badge\":%d,\"alert\":{\"loc-key\":\"APN_Message\"}}}"; private final ApnFallbackManager apnFallbackManager; - private final PushServiceClient pushServiceClient; + private final GCMSender gcmSender; + private final APNSender apnSender; private final WebsocketSender webSocketSender; private final BlockingThreadPoolExecutor executor; private final int queueSize; - public PushSender(ApnFallbackManager apnFallbackManager, PushServiceClient pushServiceClient, + public PushSender(ApnFallbackManager apnFallbackManager, + GCMSender gcmSender, APNSender apnSender, WebsocketSender websocketSender, int queueSize) { this.apnFallbackManager = apnFallbackManager; - this.pushServiceClient = pushServiceClient; + this.gcmSender = gcmSender; + this.apnSender = apnSender; this.webSocketSender = websocketSender; this.queueSize = queueSize; this.executor = new BlockingThreadPoolExecutor(50, queueSize); @@ -115,14 +116,10 @@ public class PushSender implements Managed { } private void sendGcmNotification(Account account, Device device) { - try { - GcmMessage gcmMessage = new GcmMessage(device.getGcmId(), account.getNumber(), - (int)device.getId(), "", false, true); + GcmMessage gcmMessage = new GcmMessage(device.getGcmId(), account.getNumber(), + (int)device.getId(), false); - pushServiceClient.send(gcmMessage); - } catch (TransientPushFailureException e) { - logger.warn("SILENT PUSH LOSS", e); - } + gcmSender.sendMessage(gcmMessage); } private void sendApnMessage(Account account, Device device, Envelope outgoingMessage, boolean silent) { @@ -153,7 +150,7 @@ public class PushSender implements Managed { } try { - pushServiceClient.send(apnMessage); + apnSender.sendMessage(apnMessage); } catch (TransientPushFailureException e) { logger.warn("SILENT PUSH LOSS", e); } @@ -166,12 +163,16 @@ public class PushSender implements Managed { @Override public void start() throws Exception { - + apnSender.start(); + gcmSender.start(); } @Override public void stop() throws Exception { executor.shutdown(); executor.awaitTermination(5, TimeUnit.MINUTES); + + apnSender.stop(); + gcmSender.stop(); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/PushServiceClient.java b/src/main/java/org/whispersystems/textsecuregcm/push/PushServiceClient.java deleted file mode 100644 index 7d7abe315..000000000 --- a/src/main/java/org/whispersystems/textsecuregcm/push/PushServiceClient.java +++ /dev/null @@ -1,94 +0,0 @@ -package org.whispersystems.textsecuregcm.push; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.configuration.PushConfiguration; -import org.whispersystems.textsecuregcm.entities.ApnMessage; -import org.whispersystems.textsecuregcm.entities.GcmMessage; -import org.whispersystems.textsecuregcm.entities.UnregisteredEvent; -import org.whispersystems.textsecuregcm.entities.UnregisteredEventList; -import org.whispersystems.textsecuregcm.util.Base64; - -import javax.ws.rs.ProcessingException; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.Entity; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import java.io.IOException; -import java.util.List; - -public class PushServiceClient { - - private static final String PUSH_GCM_PATH = "/api/v1/push/gcm"; - private static final String PUSH_APN_PATH = "/api/v1/push/apn"; - - private static final String APN_FEEDBACK_PATH = "/api/v1/feedback/apn"; - private static final String GCM_FEEDBACK_PATH = "/api/v1/feedback/gcm"; - - private final Logger logger = LoggerFactory.getLogger(PushServiceClient.class); - - private final Client client; - private final String host; - private final int port; - private final String authorization; - - public PushServiceClient(Client client, PushConfiguration config) { - this.client = client; - this.host = config.getHost(); - this.port = config.getPort(); - this.authorization = getAuthorizationHeader(config.getUsername(), config.getPassword()); - } - - public void send(GcmMessage message) throws TransientPushFailureException { - sendPush(PUSH_GCM_PATH, message); - } - - public void send(ApnMessage message) throws TransientPushFailureException { - sendPush(PUSH_APN_PATH, message); - } - - public List getGcmFeedback() throws IOException { - return getFeedback(GCM_FEEDBACK_PATH); - } - - public List getApnFeedback() throws IOException { - return getFeedback(APN_FEEDBACK_PATH); - } - - private void sendPush(String path, Object entity) throws TransientPushFailureException { - try { - Response response = client.target("http://" + host + ":" + port) - .path(path) - .request() - .header("Authorization", authorization) - .put(Entity.entity(entity, MediaType.APPLICATION_JSON_TYPE)); - - if (response.getStatus() != 204 && response.getStatus() != 200) { - logger.warn("PushServer response: " + response.getStatus() + " " + response.getStatusInfo().getReasonPhrase()); - throw new TransientPushFailureException("Bad response: " + response.getStatus()); - } - } catch (ProcessingException e) { - logger.warn("Push error: ", e); - throw new TransientPushFailureException(e); - } - } - - private List getFeedback(String path) throws IOException { - try { - UnregisteredEventList unregisteredEvents = client.target("http://" + host + ":" + port) - .path(path) - .request() - .header("Authorization", authorization) - .get(UnregisteredEventList.class); - - return unregisteredEvents.getDevices(); - } catch (ProcessingException e) { - logger.warn("Request error:", e); - throw new IOException(e); - } - } - - private String getAuthorizationHeader(String username, String password) { - return "Basic " + Base64.encodeBytes((username + ":" + password).getBytes()); - } -} diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/PushCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/PushCommand.java deleted file mode 100644 index 2f0cb1f89..000000000 --- a/src/main/java/org/whispersystems/textsecuregcm/workers/PushCommand.java +++ /dev/null @@ -1,173 +0,0 @@ -package org.whispersystems.textsecuregcm.workers; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.google.common.base.Optional; -import net.sourceforge.argparse4j.inf.Namespace; -import net.sourceforge.argparse4j.inf.Subparser; -import org.glassfish.jersey.client.ClientProperties; -import org.skife.jdbi.v2.DBI; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.WhisperServerConfiguration; -import org.whispersystems.textsecuregcm.entities.ApnMessage; -import org.whispersystems.textsecuregcm.entities.GcmMessage; -import org.whispersystems.textsecuregcm.providers.RedisClientFactory; -import org.whispersystems.textsecuregcm.push.ApnFallbackManager; -import org.whispersystems.textsecuregcm.push.PushSender; -import org.whispersystems.textsecuregcm.push.PushServiceClient; -import org.whispersystems.textsecuregcm.push.TransientPushFailureException; -import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.Accounts; -import org.whispersystems.textsecuregcm.storage.AccountsManager; -import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.storage.DirectoryManager; -import org.whispersystems.textsecuregcm.storage.Messages; -import org.whispersystems.textsecuregcm.util.Pair; -import org.whispersystems.textsecuregcm.util.Util; - -import javax.ws.rs.client.Client; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import io.dropwizard.Application; -import io.dropwizard.cli.EnvironmentCommand; -import io.dropwizard.client.JerseyClientBuilder; -import io.dropwizard.jdbi.DBIFactory; -import io.dropwizard.setup.Environment; -import redis.clients.jedis.JedisPool; - -public class PushCommand extends EnvironmentCommand { - - private final Logger logger = LoggerFactory.getLogger(DirectoryCommand.class); - - private static final int LIMIT = 1000; - - public PushCommand() { - super(new Application() { - @Override - public void run(WhisperServerConfiguration configuration, Environment environment) - throws Exception - { - - } - }, "push", "send pushes"); - } - - @Override - public void configure(Subparser subparser) { - super.configure(subparser); - subparser.addArgument("-t", "--time") - .dest("timestamp") - .type(Long.class) - .required(true) - .help("The starting timestamp to notify users from"); - - subparser.addArgument("-o", "--offset") - .dest("offset") - .type(Integer.class) - .required(true) - .help("The starting offset in the user query"); - } - - @Override - protected void run(Environment environment, Namespace namespace, - WhisperServerConfiguration configuration) - throws Exception - { - try { - long timestampStart = namespace.getLong("timestamp"); - int offset = namespace.getInt("offset"); - - environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - - DBIFactory dbiFactory = new DBIFactory(); - DBI database = dbiFactory.build(environment, configuration.getDataSourceFactory(), "accountdb" ); - DBI messagedb = dbiFactory.build(environment, configuration.getMessageStoreConfiguration(), "messagedb"); - - Accounts accounts = database.onDemand(Accounts.class); - Messages messages = messagedb.onDemand(Messages.class); - - JedisPool cacheClient = new RedisClientFactory(configuration.getCacheConfiguration().getUrl()).getRedisClientPool(); - JedisPool redisClient = new RedisClientFactory(configuration.getDirectoryConfiguration().getUrl()).getRedisClientPool(); - DirectoryManager directory = new DirectoryManager(redisClient); - AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient); - - Client httpClient = initializeHttpClient(environment, configuration); - PushServiceClient pushServiceClient = new PushServiceClient(httpClient, configuration.getPushConfiguration()); - - while (true) { - List> pendingDestinations = messages.getPendingDestinations(timestampStart, offset, LIMIT); - - if (pendingDestinations == null || pendingDestinations.size() == 0) { - break; - } - - for (Pair pendingDestination : pendingDestinations) { - Optional account = accountsManager.get(pendingDestination.first()); - - if (account.isPresent()) { - Optional device = account.get().getDevice(pendingDestination.second()); - - if (device.isPresent()) { - if (device.get().getGcmId() != null) { - sendGcm(pushServiceClient, account.get(), device.get()); - } else if (device.get().getApnId() != null) { - sendApn(pushServiceClient, account.get(), device.get()); - } - } else { - logger.warn("No device found: " + pendingDestination.first() + ", " + pendingDestination.second()); - } - } else { - logger.warn("No account found: " + pendingDestination.first()); - } - } - - logger.warn("Processed " + LIMIT + "..."); - offset += LIMIT; - } - - logger.warn("Finished!"); - - } catch (Exception ex) { - logger.warn("Exception", ex); - } - } - - private void sendGcm(PushServiceClient pushServiceClient, Account account, Device device) { - try { - GcmMessage gcmMessage = new GcmMessage(device.getGcmId(), account.getNumber(), - (int)device.getId(), "", false, true); - - logger.warn("Sending GCM: " + account.getNumber()); - pushServiceClient.send(gcmMessage); - } catch (TransientPushFailureException e) { - logger.warn("Push failure", e); - } - } - - private void sendApn(PushServiceClient pushServiceClient, Account account, Device device) { - if (!Util.isEmpty(device.getVoipApnId())) { - try { - ApnMessage apnMessage = new ApnMessage(device.getVoipApnId(), account.getNumber(), (int)device.getId(), - String.format(PushSender.APN_PAYLOAD, 1), - true, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(ApnFallbackManager.FALLBACK_DURATION)); - - logger.warn("Sending APN: " + account.getNumber()); - pushServiceClient.send(apnMessage); - } catch (TransientPushFailureException e) { - logger.warn("SILENT PUSH LOSS", e); - } - } - } - - private Client initializeHttpClient(Environment environment, WhisperServerConfiguration config) { - Client httpClient = new JerseyClientBuilder(environment).using(config.getJerseyClientConfiguration()) - .build(getName()); - - httpClient.property(ClientProperties.CONNECT_TIMEOUT, 1000); - httpClient.property(ClientProperties.READ_TIMEOUT, 1000); - - return httpClient; - } - -} diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/push/APNSenderTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/push/APNSenderTest.java new file mode 100644 index 000000000..47ec9731a --- /dev/null +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/push/APNSenderTest.java @@ -0,0 +1,87 @@ +package org.whispersystems.textsecuregcm.tests.push; + +import com.google.common.base.Optional; +import com.notnoop.apns.ApnsService; +import org.junit.Before; +import org.junit.Test; +import org.whispersystems.textsecuregcm.push.APNSender; +import org.whispersystems.textsecuregcm.push.ApnMessage; +import org.whispersystems.textsecuregcm.push.TransientPushFailureException; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.Device; + +import java.util.Date; +import java.util.HashMap; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.*; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; + +public class APNSenderTest { + + private static final String DESTINATION_NUMBER = "+14151231234"; + private static final String DESTINATION_APN_ID = "foo"; + + private final AccountsManager accountsManager = mock(AccountsManager.class); + private final JedisPool jedisPool = mock(JedisPool.class); + private final Jedis jedis = mock(Jedis.class); + private final ApnsService voipService = mock(ApnsService.class); + private final ApnsService apnsService = mock(ApnsService.class); + + private final Account destinationAccount = mock(Account.class); + private final Device destinationDevice = mock(Device.class ); + + @Before + public void setup() { + when(destinationAccount.getDevice(1)).thenReturn(Optional.of(destinationDevice)); + when(destinationDevice.getApnId()).thenReturn(DESTINATION_APN_ID); + when(accountsManager.get(DESTINATION_NUMBER)).thenReturn(Optional.of(destinationAccount)); + + when(jedisPool.getResource()).thenReturn(jedis); + when(jedis.get("APN-" + DESTINATION_APN_ID)).thenReturn(DESTINATION_NUMBER + "." + 1); + + when(voipService.getInactiveDevices()).thenReturn(new HashMap() {{ + put(DESTINATION_APN_ID, new Date(System.currentTimeMillis())); + }}); + when(apnsService.getInactiveDevices()).thenReturn(new HashMap()); + } + + @Test + public void testSendVoip() throws TransientPushFailureException { + APNSender apnSender = new APNSender(accountsManager, jedisPool, apnsService, voipService, false, false); + + ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, "message", true, 30); + apnSender.sendMessage(message); + + + verify(jedis, times(1)).set(eq("APN-" + DESTINATION_APN_ID.toLowerCase()), eq(DESTINATION_NUMBER + "." + 1)); + verify(voipService, times(1)).push(eq(DESTINATION_APN_ID), eq(message.getMessage()), eq(new Date(30))); + verifyNoMoreInteractions(apnsService); + } + + @Test + public void testSendApns() throws TransientPushFailureException { + APNSender apnSender = new APNSender(accountsManager, jedisPool, apnsService, voipService, false, false); + + ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, "message", false, 30); + apnSender.sendMessage(message); + + verify(jedis, times(1)).set(eq("APN-" + DESTINATION_APN_ID.toLowerCase()), eq(DESTINATION_NUMBER + "." + 1)); + verify(apnsService, times(1)).push(eq(DESTINATION_APN_ID), eq(message.getMessage()), eq(new Date(30))); + verifyNoMoreInteractions(voipService); + } + + @Test + public void testFeedbackUnregistered() { + APNSender apnSender = new APNSender(accountsManager, jedisPool, apnsService, voipService, false, false); + apnSender.checkFeedback(); + + verify(jedis, times(1)).get(eq("APN-" +DESTINATION_APN_ID)); + verify(accountsManager, times(1)).get(eq(DESTINATION_NUMBER)); + verify(destinationDevice, times(1)).setApnId(eq((String)null)); + verify(accountsManager, times(1)).update(eq(destinationAccount)); + } + +} diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/push/ApnFallbackManagerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/push/ApnFallbackManagerTest.java index 3790c33f7..01b2ce89b 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/push/ApnFallbackManagerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/push/ApnFallbackManagerTest.java @@ -2,10 +2,10 @@ package org.whispersystems.textsecuregcm.tests.push; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.whispersystems.textsecuregcm.entities.ApnMessage; +import org.whispersystems.textsecuregcm.push.APNSender; import org.whispersystems.textsecuregcm.push.ApnFallbackManager; import org.whispersystems.textsecuregcm.push.ApnFallbackManager.ApnFallbackTask; -import org.whispersystems.textsecuregcm.push.PushServiceClient; +import org.whispersystems.textsecuregcm.push.ApnMessage; import org.whispersystems.textsecuregcm.storage.PubSubManager; import org.whispersystems.textsecuregcm.storage.PubSubProtos; import org.whispersystems.textsecuregcm.util.Util; @@ -14,23 +14,21 @@ import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; import java.util.List; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; public class ApnFallbackManagerTest { @Test public void testFullFallback() throws Exception { - PushServiceClient pushServiceClient = mock(PushServiceClient.class); - PubSubManager pubSubManager = mock(PubSubManager.class); - WebsocketAddress address = new WebsocketAddress("+14152222223", 1L); - WebSocketConnectionInfo info = new WebSocketConnectionInfo(address); - ApnMessage message = new ApnMessage("bar", "123", 1, "hmm", true, 1111); - ApnFallbackTask task = new ApnFallbackTask("foo", "voipfoo", message, 500, 0); + APNSender apnSender = mock(APNSender.class ); + PubSubManager pubSubManager = mock(PubSubManager.class); + WebsocketAddress address = new WebsocketAddress("+14152222223", 1L); + WebSocketConnectionInfo info = new WebSocketConnectionInfo(address); + ApnMessage message = new ApnMessage("bar", "123", 1, "hmm", true, 1111); + ApnFallbackTask task = new ApnFallbackTask("foo", "voipfoo", message, 500, 0); - ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushServiceClient, pubSubManager); + ApnFallbackManager apnFallbackManager = new ApnFallbackManager(apnSender, pubSubManager); apnFallbackManager.start(); apnFallbackManager.schedule(address, task); @@ -38,7 +36,7 @@ public class ApnFallbackManagerTest { Util.sleep(1100); ArgumentCaptor captor = ArgumentCaptor.forClass(ApnMessage.class); - verify(pushServiceClient, times(2)).send(captor.capture()); + verify(apnSender, times(2)).sendMessage(captor.capture()); verify(pubSubManager).unsubscribe(eq(info), eq(apnFallbackManager)); List arguments = captor.getAllValues(); @@ -56,7 +54,7 @@ public class ApnFallbackManagerTest { @Test public void testNoFallback() throws Exception { - PushServiceClient pushServiceClient = mock(PushServiceClient.class); + APNSender pushServiceClient = mock(APNSender.class); PubSubManager pubSubManager = mock(PubSubManager.class); WebsocketAddress address = new WebsocketAddress("+14152222222", 1); WebSocketConnectionInfo info = new WebSocketConnectionInfo(address); diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/push/GCMSenderTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/push/GCMSenderTest.java new file mode 100644 index 000000000..1eef8cbda --- /dev/null +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/push/GCMSenderTest.java @@ -0,0 +1,128 @@ +package org.whispersystems.textsecuregcm.tests.push; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.SettableFuture; +import org.junit.Test; +import org.mockito.Matchers; +import org.whispersystems.gcm.server.Message; +import org.whispersystems.gcm.server.Result; +import org.whispersystems.gcm.server.Sender; +import org.whispersystems.textsecuregcm.push.GCMSender; +import org.whispersystems.textsecuregcm.push.GcmMessage; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.tests.util.SynchronousExecutorService; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +public class GCMSenderTest { + + @Test + public void testSendMessage() { + AccountsManager accountsManager = mock(AccountsManager.class); + Sender sender = mock(Sender.class ); + Result successResult = mock(Result.class ); + SynchronousExecutorService executorService = new SynchronousExecutorService(); + + when(successResult.isInvalidRegistrationId()).thenReturn(false); + when(successResult.isUnregistered()).thenReturn(false); + when(successResult.hasCanonicalRegistrationId()).thenReturn(false); + when(successResult.isSuccess()).thenReturn(true); + + GcmMessage message = new GcmMessage("foo", "+12223334444", 1, false); + GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService); + + SettableFuture successFuture = SettableFuture.create(); + successFuture.set(successResult); + + when(sender.send(any(Message.class), Matchers.anyObject())).thenReturn(successFuture); + when(successResult.getContext()).thenReturn(message); + + gcmSender.sendMessage(message); + + verify(sender, times(1)).send(any(Message.class), eq(message)); + } + + @Test + public void testSendError() { + String destinationNumber = "+12223334444"; + String gcmId = "foo"; + + AccountsManager accountsManager = mock(AccountsManager.class); + Sender sender = mock(Sender.class ); + Result invalidResult = mock(Result.class ); + SynchronousExecutorService executorService = new SynchronousExecutorService(); + + Account destinationAccount = mock(Account.class); + Device destinationDevice = mock(Device.class ); + + when(destinationAccount.getDevice(1)).thenReturn(Optional.of(destinationDevice)); + when(accountsManager.get(destinationNumber)).thenReturn(Optional.of(destinationAccount)); + when(destinationDevice.getGcmId()).thenReturn(gcmId); + + when(invalidResult.isInvalidRegistrationId()).thenReturn(true); + when(invalidResult.isUnregistered()).thenReturn(false); + when(invalidResult.hasCanonicalRegistrationId()).thenReturn(false); + when(invalidResult.isSuccess()).thenReturn(true); + + GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, false); + GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService); + + SettableFuture invalidFuture = SettableFuture.create(); + invalidFuture.set(invalidResult); + + when(sender.send(any(Message.class), Matchers.anyObject())).thenReturn(invalidFuture); + when(invalidResult.getContext()).thenReturn(message); + + gcmSender.sendMessage(message); + + verify(sender, times(1)).send(any(Message.class), eq(message)); + verify(accountsManager, times(1)).get(eq(destinationNumber)); + verify(accountsManager, times(1)).update(eq(destinationAccount)); + verify(destinationDevice, times(1)).setGcmId(eq((String)null)); + } + + @Test + public void testCanonicalId() { + String destinationNumber = "+12223334444"; + String gcmId = "foo"; + String canonicalId = "bar"; + + AccountsManager accountsManager = mock(AccountsManager.class); + Sender sender = mock(Sender.class ); + Result canonicalResult = mock(Result.class ); + SynchronousExecutorService executorService = new SynchronousExecutorService(); + + Account destinationAccount = mock(Account.class); + Device destinationDevice = mock(Device.class ); + + when(destinationAccount.getDevice(1)).thenReturn(Optional.of(destinationDevice)); + when(accountsManager.get(destinationNumber)).thenReturn(Optional.of(destinationAccount)); + when(destinationDevice.getGcmId()).thenReturn(gcmId); + + when(canonicalResult.isInvalidRegistrationId()).thenReturn(false); + when(canonicalResult.isUnregistered()).thenReturn(false); + when(canonicalResult.hasCanonicalRegistrationId()).thenReturn(true); + when(canonicalResult.isSuccess()).thenReturn(false); + when(canonicalResult.getCanonicalRegistrationId()).thenReturn(canonicalId); + + GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, false); + GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService); + + SettableFuture invalidFuture = SettableFuture.create(); + invalidFuture.set(canonicalResult); + + when(sender.send(any(Message.class), Matchers.anyObject())).thenReturn(invalidFuture); + when(canonicalResult.getContext()).thenReturn(message); + + gcmSender.sendMessage(message); + + verify(sender, times(1)).send(any(Message.class), eq(message)); + verify(accountsManager, times(1)).get(eq(destinationNumber)); + verify(accountsManager, times(1)).update(eq(destinationAccount)); + verify(destinationDevice, times(1)).setGcmId(eq(canonicalId)); + } + +} diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/util/SynchronousExecutorService.java b/src/test/java/org/whispersystems/textsecuregcm/tests/util/SynchronousExecutorService.java new file mode 100644 index 000000000..57388d745 --- /dev/null +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/util/SynchronousExecutorService.java @@ -0,0 +1,111 @@ +package org.whispersystems.textsecuregcm.tests.util; + +import com.google.common.util.concurrent.SettableFuture; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class SynchronousExecutorService implements ExecutorService { + + private boolean shutdown = false; + + @Override + public void shutdown() { + shutdown = true; + } + + @Override + public List shutdownNow() { + shutdown = true; + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return shutdown; + } + + @Override + public boolean isTerminated() { + return shutdown; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return true; + } + + @Override + public Future submit(Callable task) { + SettableFuture future = null; + try { + future = SettableFuture.create(); + future.set(task.call()); + } catch (Throwable e) { + future.setException(e); + } + + return future; + } + + @Override + public Future submit(Runnable task, T result) { + SettableFuture future = SettableFuture.create(); + task.run(); + + future.set(result); + + return future; + } + + @Override + public Future submit(Runnable task) { + SettableFuture future = SettableFuture.create(); + task.run(); + future.set(null); + return future; + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + List> results = new LinkedList<>(); + for (Callable callable : tasks) { + SettableFuture future = SettableFuture.create(); + try { + future.set(callable.call()); + } catch (Throwable e) { + future.setException(e); + } + results.add(future); + } + return results; + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return invokeAll(tasks); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return null; + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return null; + } + + @Override + public void execute(Runnable command) { + command.run(); + } +}