parent
e10baa915d
commit
716150cfd2
6
pom.xml
6
pom.xml
|
@ -105,9 +105,9 @@
|
|||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.notnoop.apns</groupId>
|
||||
<artifactId>apns</artifactId>
|
||||
<version>0.2.3</version>
|
||||
<groupId>com.relayrides</groupId>
|
||||
<artifactId>pushy</artifactId>
|
||||
<version>0.9.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.whispersystems</groupId>
|
||||
|
|
|
@ -172,7 +172,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(messagesManager);
|
||||
DispatchManager dispatchManager = new DispatchManager(cacheClientFactory, Optional.<DispatchChannel>of(deadLetterHandler));
|
||||
PubSubManager pubSubManager = new PubSubManager(cacheClient, dispatchManager);
|
||||
APNSender apnSender = new APNSender(accountsManager, cacheClient, config.getApnConfiguration());
|
||||
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
|
||||
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
|
||||
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager);
|
||||
AccountAuthenticator deviceAuthenticator = new AccountAuthenticator(accountsManager );
|
||||
|
|
|
@ -32,14 +32,7 @@ public class ApnConfiguration {
|
|||
|
||||
@NotEmpty
|
||||
@JsonProperty
|
||||
private String voipCertificate;
|
||||
|
||||
@NotEmpty
|
||||
@JsonProperty
|
||||
private String voipKey;
|
||||
|
||||
@JsonProperty
|
||||
private boolean feedback = true;
|
||||
private String bundleId;
|
||||
|
||||
@JsonProperty
|
||||
private boolean sandbox = false;
|
||||
|
@ -52,16 +45,8 @@ public class ApnConfiguration {
|
|||
return pushKey;
|
||||
}
|
||||
|
||||
public String getVoipCertificate() {
|
||||
return voipCertificate;
|
||||
}
|
||||
|
||||
public String getVoipKey() {
|
||||
return voipKey;
|
||||
}
|
||||
|
||||
public boolean isFeedbackEnabled() {
|
||||
return feedback;
|
||||
public String getBundleId() {
|
||||
return bundleId;
|
||||
}
|
||||
|
||||
public boolean isSandboxEnabled() {
|
||||
|
|
|
@ -18,234 +18,131 @@ package org.whispersystems.textsecuregcm.push;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Optional;
|
||||
import com.notnoop.apns.APNS;
|
||||
import com.notnoop.apns.ApnsService;
|
||||
import com.notnoop.apns.ApnsServiceBuilder;
|
||||
import com.notnoop.exceptions.NetworkIOException;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.relayrides.pushy.apns.ApnsClient;
|
||||
import com.relayrides.pushy.apns.ApnsClientBuilder;
|
||||
import org.bouncycastle.openssl.PEMReader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.configuration.ApnConfiguration;
|
||||
import org.whispersystems.textsecuregcm.push.RetryingApnsClient.ApnResult;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.security.KeyPair;
|
||||
import java.security.KeyStore;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.cert.Certificate;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.PrivateKey;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
|
||||
public class APNSender implements Managed {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(APNSender.class);
|
||||
|
||||
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
||||
private ExecutorService executor;
|
||||
|
||||
private final AccountsManager accountsManager;
|
||||
private final JedisPool jedisPool;
|
||||
private final AccountsManager accountsManager;
|
||||
private final String bundleId;
|
||||
private final boolean sandbox;
|
||||
private final RetryingApnsClient apnsClient;
|
||||
|
||||
private final String pushCertificate;
|
||||
private final String pushKey;
|
||||
|
||||
private final String voipCertificate;
|
||||
private final String voipKey;
|
||||
|
||||
private final boolean feedbackEnabled;
|
||||
private final boolean sandbox;
|
||||
|
||||
private ApnsService pushApnService;
|
||||
private ApnsService voipApnService;
|
||||
|
||||
public APNSender(AccountsManager accountsManager,
|
||||
JedisPool jedisPool,
|
||||
ApnConfiguration configuration)
|
||||
public APNSender(AccountsManager accountsManager, ApnConfiguration configuration)
|
||||
throws IOException
|
||||
{
|
||||
this.accountsManager = accountsManager;
|
||||
this.jedisPool = jedisPool;
|
||||
this.pushCertificate = configuration.getPushCertificate();
|
||||
this.pushKey = configuration.getPushKey();
|
||||
this.voipCertificate = configuration.getVoipCertificate();
|
||||
this.voipKey = configuration.getVoipKey();
|
||||
this.feedbackEnabled = configuration.isFeedbackEnabled();
|
||||
this.bundleId = configuration.getBundleId();
|
||||
this.sandbox = configuration.isSandboxEnabled();
|
||||
this.apnsClient = new RetryingApnsClient(configuration.getPushCertificate(),
|
||||
configuration.getPushKey(),
|
||||
10);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public APNSender(AccountsManager accountsManager, JedisPool jedisPool,
|
||||
ApnsService pushApnService, ApnsService voipApnService,
|
||||
boolean feedbackEnabled, boolean sandbox)
|
||||
{
|
||||
public APNSender(ExecutorService executor, AccountsManager accountsManager, RetryingApnsClient apnsClient, String bundleId, boolean sandbox) {
|
||||
this.executor = executor;
|
||||
this.accountsManager = accountsManager;
|
||||
this.jedisPool = jedisPool;
|
||||
this.pushApnService = pushApnService;
|
||||
this.voipApnService = voipApnService;
|
||||
this.feedbackEnabled = feedbackEnabled;
|
||||
this.apnsClient = apnsClient;
|
||||
this.sandbox = sandbox;
|
||||
this.pushCertificate = null;
|
||||
this.pushKey = null;
|
||||
this.voipCertificate = null;
|
||||
this.voipKey = null;
|
||||
this.bundleId = bundleId;
|
||||
}
|
||||
|
||||
public void sendMessage(ApnMessage message)
|
||||
public ListenableFuture<ApnResult> sendMessage(final ApnMessage message)
|
||||
throws TransientPushFailureException
|
||||
{
|
||||
try {
|
||||
redisSet(message.getApnId(), message.getNumber(), message.getDeviceId());
|
||||
String topic = bundleId;
|
||||
|
||||
if (message.isVoip()) {
|
||||
voipApnService.push(message.getApnId(), message.getMessage(), new Date(message.getExpirationTime()));
|
||||
} else {
|
||||
pushApnService.push(message.getApnId(), message.getMessage(), new Date(message.getExpirationTime()));
|
||||
}
|
||||
} catch (NetworkIOException nioe) {
|
||||
logger.warn("Network Error", nioe);
|
||||
throw new TransientPushFailureException(nioe);
|
||||
if (message.isVoip()) {
|
||||
topic = topic + ".voip";
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] initializeKeyStore(String pemCertificate, String pemKey)
|
||||
throws KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException
|
||||
{
|
||||
PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemCertificate.getBytes())));
|
||||
X509Certificate certificate = (X509Certificate) reader.readObject();
|
||||
Certificate[] certificateChain = {certificate};
|
||||
|
||||
reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemKey.getBytes())));
|
||||
KeyPair keyPair = (KeyPair) reader.readObject();
|
||||
ListenableFuture<ApnResult> future = apnsClient.send(message.getApnId(), topic,
|
||||
message.getMessage(),
|
||||
new Date(message.getExpirationTime()));
|
||||
|
||||
KeyStore keyStore = KeyStore.getInstance("pkcs12");
|
||||
keyStore.load(null);
|
||||
keyStore.setEntry("apn",
|
||||
new KeyStore.PrivateKeyEntry(keyPair.getPrivate(), certificateChain),
|
||||
new KeyStore.PasswordProtection("insecure".toCharArray()));
|
||||
Futures.addCallback(future, new FutureCallback<ApnResult>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable ApnResult result) {
|
||||
if (result == null) {
|
||||
logger.warn("*** RECEIVED NULL APN RESULT ***");
|
||||
} else if (result.getStatus() == ApnResult.Status.NO_SUCH_USER) {
|
||||
handleUnregisteredUser(message.getApnId(), message.getNumber(), message.getDeviceId());
|
||||
} else if (result.getStatus() == ApnResult.Status.GENERIC_FAILURE) {
|
||||
logger.warn("*** Got APN generic failure: " + result.getReason());
|
||||
}
|
||||
}
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
keyStore.store(baos, "insecure".toCharArray());
|
||||
@Override
|
||||
public void onFailure(@Nullable Throwable t) {
|
||||
logger.warn("Got fatal APNS exception", t);
|
||||
}
|
||||
}, executor);
|
||||
|
||||
return baos.toByteArray();
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
byte[] pushKeyStore = initializeKeyStore(pushCertificate, pushKey);
|
||||
byte[] voipKeyStore = initializeKeyStore(voipCertificate, voipKey);
|
||||
|
||||
ApnsServiceBuilder pushApnServiceBuilder = APNS.newService()
|
||||
.withCert(new ByteArrayInputStream(pushKeyStore), "insecure")
|
||||
.asQueued();
|
||||
|
||||
|
||||
ApnsServiceBuilder voipApnServiceBuilder = APNS.newService()
|
||||
.withCert(new ByteArrayInputStream(voipKeyStore), "insecure")
|
||||
.asQueued();
|
||||
|
||||
|
||||
if (sandbox) {
|
||||
this.pushApnService = pushApnServiceBuilder.withSandboxDestination().build();
|
||||
this.voipApnService = voipApnServiceBuilder.withSandboxDestination().build();
|
||||
} else {
|
||||
this.pushApnService = pushApnServiceBuilder.withProductionDestination().build();
|
||||
this.voipApnService = voipApnServiceBuilder.withProductionDestination().build();
|
||||
}
|
||||
|
||||
if (feedbackEnabled) {
|
||||
this.executor.scheduleAtFixedRate(new FeedbackRunnable(), 0, 1, TimeUnit.HOURS);
|
||||
}
|
||||
this.executor = Executors.newSingleThreadExecutor();
|
||||
this.apnsClient.connect(sandbox);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
pushApnService.stop();
|
||||
voipApnService.stop();
|
||||
this.executor.shutdown();
|
||||
this.apnsClient.disconnect();
|
||||
}
|
||||
|
||||
private void redisSet(String registrationId, String number, int deviceId) {
|
||||
try (Jedis jedis = jedisPool.getResource()) {
|
||||
jedis.set("APN-" + registrationId.toLowerCase(), number + "." + deviceId);
|
||||
jedis.expire("APN-" + registrationId.toLowerCase(), (int) TimeUnit.HOURS.toSeconds(1));
|
||||
}
|
||||
}
|
||||
private void handleUnregisteredUser(String registrationId, String number, int deviceId) {
|
||||
logger.info("Got APN Unregistered: " + number + "," + deviceId);
|
||||
|
||||
private Optional<String> redisGet(String registrationId) {
|
||||
try (Jedis jedis = jedisPool.getResource()) {
|
||||
String number = jedis.get("APN-" + registrationId.toLowerCase());
|
||||
return Optional.fromNullable(number);
|
||||
}
|
||||
}
|
||||
Optional<Account> account = accountsManager.get(number);
|
||||
|
||||
@VisibleForTesting
|
||||
public void checkFeedback() {
|
||||
new FeedbackRunnable().run();
|
||||
}
|
||||
if (account.isPresent()) {
|
||||
Optional<Device> device = account.get().getDevice(deviceId);
|
||||
|
||||
private class FeedbackRunnable implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Map<String, Date> inactiveDevices = pushApnService.getInactiveDevices();
|
||||
inactiveDevices.putAll(voipApnService.getInactiveDevices());
|
||||
|
||||
for (String registrationId : inactiveDevices.keySet()) {
|
||||
Optional<String> device = redisGet(registrationId);
|
||||
|
||||
if (device.isPresent()) {
|
||||
logger.warn("Got APN unregistered notice!");
|
||||
String[] parts = device.get().split("\\.", 2);
|
||||
|
||||
if (parts.length == 2) {
|
||||
String number = parts[0];
|
||||
int deviceId = Integer.parseInt(parts[1]);
|
||||
long timestamp = inactiveDevices.get(registrationId).getTime();
|
||||
|
||||
handleApnUnregistered(registrationId, number, deviceId, timestamp);
|
||||
} else {
|
||||
logger.warn("APN unregister event for device with no parts: " + device.get());
|
||||
}
|
||||
} else {
|
||||
logger.warn("APN unregister event received for uncached ID: " + registrationId);
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logger.warn("Exception during feedback", t);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleApnUnregistered(String registrationId, String number, int deviceId, long timestamp) {
|
||||
logger.info("Got APN Unregistered: " + number + "," + deviceId);
|
||||
|
||||
Optional<Account> account = accountsManager.get(number);
|
||||
|
||||
if (account.isPresent()) {
|
||||
Optional<Device> device = account.get().getDevice(deviceId);
|
||||
|
||||
if (device.isPresent()) {
|
||||
if (registrationId.equals(device.get().getApnId())) {
|
||||
logger.info("APN Unregister APN ID matches!");
|
||||
if (device.get().getPushTimestamp() == 0 ||
|
||||
timestamp > device.get().getPushTimestamp())
|
||||
{
|
||||
logger.info("APN Unregister timestamp matches!");
|
||||
device.get().setApnId(null);
|
||||
accountsManager.update(account.get());
|
||||
}
|
||||
if (device.isPresent()) {
|
||||
if (registrationId.equals(device.get().getApnId())) {
|
||||
logger.info("APN Unregister APN ID matches!");
|
||||
if (device.get().getPushTimestamp() == 0 ||
|
||||
System.currentTimeMillis() > device.get().getPushTimestamp() + TimeUnit.SECONDS.toMillis(10))
|
||||
{
|
||||
logger.info("APN Unregister timestamp matches!");
|
||||
device.get().setApnId(null);
|
||||
device.get().setFetchesMessages(false);
|
||||
accountsManager.update(account.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,35 +8,35 @@ public class ApnMessage {
|
|||
private final String number;
|
||||
private final int deviceId;
|
||||
private final String message;
|
||||
private final boolean voip;
|
||||
private final boolean isVoip;
|
||||
private final long expirationTime;
|
||||
|
||||
public ApnMessage(String apnId, String number, int deviceId, String message, boolean voip, long expirationTime) {
|
||||
public ApnMessage(String apnId, String number, int deviceId, String message, boolean isVoip, long expirationTime) {
|
||||
this.apnId = apnId;
|
||||
this.number = number;
|
||||
this.deviceId = deviceId;
|
||||
this.message = message;
|
||||
this.voip = voip;
|
||||
this.isVoip = isVoip;
|
||||
this.expirationTime = expirationTime;
|
||||
}
|
||||
|
||||
public ApnMessage(ApnMessage copy, String apnId, boolean voip, long expirationTime) {
|
||||
public ApnMessage(ApnMessage copy, String apnId, boolean isVoip, long expirationTime) {
|
||||
this.apnId = apnId;
|
||||
this.number = copy.number;
|
||||
this.deviceId = copy.deviceId;
|
||||
this.message = copy.message;
|
||||
this.voip = voip;
|
||||
this.isVoip = isVoip;
|
||||
this.expirationTime = expirationTime;
|
||||
}
|
||||
|
||||
public boolean isVoip() {
|
||||
return isVoip;
|
||||
}
|
||||
|
||||
public String getApnId() {
|
||||
return apnId;
|
||||
}
|
||||
|
||||
public boolean isVoip() {
|
||||
return voip;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
|
|
@ -136,8 +136,8 @@ public class PushSender implements Managed {
|
|||
|
||||
if (!Util.isEmpty(device.getVoipApnId())) {
|
||||
apnMessage = new ApnMessage(device.getVoipApnId(), account.getNumber(), (int)device.getId(),
|
||||
String.format(APN_PAYLOAD, messageQueueDepth),
|
||||
true, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(ApnFallbackManager.FALLBACK_DURATION));
|
||||
String.format(APN_PAYLOAD, messageQueueDepth), true,
|
||||
System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(ApnFallbackManager.FALLBACK_DURATION));
|
||||
|
||||
if (fallback) {
|
||||
apnFallbackManager.schedule(new WebsocketAddress(account.getNumber(), device.getId()),
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
|
||||
import com.nurkiewicz.asyncretry.RetryContext;
|
||||
import com.nurkiewicz.asyncretry.RetryExecutor;
|
||||
import com.nurkiewicz.asyncretry.function.RetryCallable;
|
||||
import com.relayrides.pushy.apns.ApnsClient;
|
||||
import com.relayrides.pushy.apns.ApnsClientBuilder;
|
||||
import com.relayrides.pushy.apns.ApnsServerException;
|
||||
import com.relayrides.pushy.apns.ClientNotConnectedException;
|
||||
import com.relayrides.pushy.apns.DeliveryPriority;
|
||||
import com.relayrides.pushy.apns.PushNotificationResponse;
|
||||
import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
|
||||
|
||||
import org.bouncycastle.openssl.PEMReader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.security.KeyPair;
|
||||
import java.security.PrivateKey;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Date;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
|
||||
public class RetryingApnsClient {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RetryingApnsClient.class);
|
||||
|
||||
private final ApnsClient apnsClient;
|
||||
private final RetryExecutor retryExecutor;
|
||||
|
||||
RetryingApnsClient(String apnCertificate, String apnKey, int retryCount)
|
||||
throws IOException
|
||||
{
|
||||
this(new ApnsClientBuilder().setClientCredentials(initializeCertificate(apnCertificate),
|
||||
initializePrivateKey(apnKey), null)
|
||||
.build(),
|
||||
retryCount);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public RetryingApnsClient(ApnsClient apnsClient, int retryCount) {
|
||||
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
this.apnsClient = apnsClient;
|
||||
this.retryExecutor = new AsyncRetryExecutor(executorService).retryOn(ClientNotConnectedException.class)
|
||||
.retryOn(InterruptedException.class)
|
||||
.retryOn(ApnsServerException.class)
|
||||
.withExponentialBackoff(100, 2.0)
|
||||
.withUniformJitter()
|
||||
.withMaxDelay(4000)
|
||||
.withMaxRetries(retryCount);
|
||||
}
|
||||
|
||||
ListenableFuture<ApnResult> send(final String apnId, final String topic, final String payload, final Date expiration) {
|
||||
return this.retryExecutor.getFutureWithRetry(new RetryCallable<ListenableFuture<ApnResult>>() {
|
||||
@Override
|
||||
public ListenableFuture<ApnResult> call(RetryContext context) throws Exception {
|
||||
SettableFuture<ApnResult> result = SettableFuture.create();
|
||||
SimpleApnsPushNotification notification = new SimpleApnsPushNotification(apnId, topic, payload, expiration, DeliveryPriority.IMMEDIATE);
|
||||
|
||||
apnsClient.sendNotification(notification).addListener(new ResponseHandler(apnsClient, result));
|
||||
|
||||
return result;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void connect(boolean sandbox) {
|
||||
apnsClient.connect(sandbox ? ApnsClient.DEVELOPMENT_APNS_HOST : ApnsClient.PRODUCTION_APNS_HOST).awaitUninterruptibly();
|
||||
}
|
||||
|
||||
void disconnect() {
|
||||
apnsClient.disconnect();
|
||||
}
|
||||
|
||||
private static X509Certificate initializeCertificate(String pemCertificate) throws IOException {
|
||||
PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemCertificate.getBytes())));
|
||||
return (X509Certificate) reader.readObject();
|
||||
}
|
||||
|
||||
private static PrivateKey initializePrivateKey(String pemKey) throws IOException {
|
||||
PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemKey.getBytes())));
|
||||
return ((KeyPair) reader.readObject()).getPrivate();
|
||||
}
|
||||
|
||||
private static final class ResponseHandler implements GenericFutureListener<io.netty.util.concurrent.Future<PushNotificationResponse<SimpleApnsPushNotification>>> {
|
||||
|
||||
private final ApnsClient client;
|
||||
private final SettableFuture<ApnResult> future;
|
||||
|
||||
private ResponseHandler(ApnsClient client, SettableFuture<ApnResult> future) {
|
||||
this.client = client;
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(io.netty.util.concurrent.Future<PushNotificationResponse<SimpleApnsPushNotification>> result) {
|
||||
try {
|
||||
PushNotificationResponse<SimpleApnsPushNotification> response = result.get();
|
||||
|
||||
if (response.isAccepted()) {
|
||||
future.set(new ApnResult(ApnResult.Status.SUCCESS, null));
|
||||
} else if ("Unregistered".equals(response.getRejectionReason())) {
|
||||
future.set(new ApnResult(ApnResult.Status.NO_SUCH_USER, response.getRejectionReason()));
|
||||
} else {
|
||||
logger.warn("Got APN failure: " + response.getRejectionReason());
|
||||
future.set(new ApnResult(ApnResult.Status.GENERIC_FAILURE, response.getRejectionReason()));
|
||||
}
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
future.setException(e);
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof ClientNotConnectedException) setDisconnected(e.getCause());
|
||||
else future.setException(e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
private void setDisconnected(final Throwable t) {
|
||||
logger.warn("Client disconnected, waiting for reconnect...", t);
|
||||
client.getReconnectionFuture().addListener(new GenericFutureListener<Future<Void>>() {
|
||||
@Override
|
||||
public void operationComplete(Future<Void> complete) {
|
||||
logger.warn("Client reconnected...");
|
||||
future.setException(t);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public static class ApnResult {
|
||||
public enum Status {
|
||||
SUCCESS, NO_SUCH_USER, GENERIC_FAILURE
|
||||
}
|
||||
|
||||
private final Status status;
|
||||
private final String reason;
|
||||
|
||||
ApnResult(Status status, String reason) {
|
||||
this.status = status;
|
||||
this.reason = reason;
|
||||
}
|
||||
|
||||
public Status getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public String getReason() {
|
||||
return reason;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,23 +1,31 @@
|
|||
package org.whispersystems.textsecuregcm.tests.push;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.notnoop.apns.ApnsService;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.relayrides.pushy.apns.ApnsClient;
|
||||
import com.relayrides.pushy.apns.ApnsServerException;
|
||||
import com.relayrides.pushy.apns.ClientNotConnectedException;
|
||||
import com.relayrides.pushy.apns.DeliveryPriority;
|
||||
import com.relayrides.pushy.apns.PushNotificationResponse;
|
||||
import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.whispersystems.textsecuregcm.push.APNSender;
|
||||
import org.whispersystems.textsecuregcm.push.ApnMessage;
|
||||
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
|
||||
import org.whispersystems.textsecuregcm.push.RetryingApnsClient;
|
||||
import org.whispersystems.textsecuregcm.push.RetryingApnsClient.ApnResult;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.tests.util.SynchronousExecutorService;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import io.netty.util.concurrent.DefaultEventExecutor;
|
||||
import io.netty.util.concurrent.DefaultPromise;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.*;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
|
||||
public class APNSenderTest {
|
||||
|
||||
|
@ -25,63 +33,255 @@ public class APNSenderTest {
|
|||
private static final String DESTINATION_APN_ID = "foo";
|
||||
|
||||
private final AccountsManager accountsManager = mock(AccountsManager.class);
|
||||
private final JedisPool jedisPool = mock(JedisPool.class);
|
||||
private final Jedis jedis = mock(Jedis.class);
|
||||
private final ApnsService voipService = mock(ApnsService.class);
|
||||
private final ApnsService apnsService = mock(ApnsService.class);
|
||||
|
||||
private final Account destinationAccount = mock(Account.class);
|
||||
private final Device destinationDevice = mock(Device.class );
|
||||
|
||||
private final DefaultEventExecutor executor = new DefaultEventExecutor();
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
when(destinationAccount.getDevice(1)).thenReturn(Optional.of(destinationDevice));
|
||||
when(destinationDevice.getApnId()).thenReturn(DESTINATION_APN_ID);
|
||||
when(accountsManager.get(DESTINATION_NUMBER)).thenReturn(Optional.of(destinationAccount));
|
||||
|
||||
when(jedisPool.getResource()).thenReturn(jedis);
|
||||
when(jedis.get("APN-" + DESTINATION_APN_ID)).thenReturn(DESTINATION_NUMBER + "." + 1);
|
||||
|
||||
when(voipService.getInactiveDevices()).thenReturn(new HashMap<String, Date>() {{
|
||||
put(DESTINATION_APN_ID, new Date(System.currentTimeMillis()));
|
||||
}});
|
||||
when(apnsService.getInactiveDevices()).thenReturn(new HashMap<String, Date>());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendVoip() throws TransientPushFailureException {
|
||||
APNSender apnSender = new APNSender(accountsManager, jedisPool, apnsService, voipService, false, false);
|
||||
public void testSendVoip() throws Exception {
|
||||
ApnsClient apnsClient = mock(ApnsClient.class);
|
||||
|
||||
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, "message", true, 30);
|
||||
apnSender.sendMessage(message);
|
||||
PushNotificationResponse<SimpleApnsPushNotification> response = mock(PushNotificationResponse.class);
|
||||
when(response.isAccepted()).thenReturn(true);
|
||||
|
||||
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
|
||||
result.setSuccess(response);
|
||||
|
||||
verify(jedis, times(1)).set(eq("APN-" + DESTINATION_APN_ID.toLowerCase()), eq(DESTINATION_NUMBER + "." + 1));
|
||||
verify(voipService, times(1)).push(eq(DESTINATION_APN_ID), eq(message.getMessage()), eq(new Date(30)));
|
||||
verifyNoMoreInteractions(apnsService);
|
||||
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
|
||||
.thenReturn(result);
|
||||
|
||||
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10);
|
||||
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, "message", true, 30);
|
||||
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
|
||||
|
||||
ListenableFuture<ApnResult> sendFuture = apnSender.sendMessage(message);
|
||||
ApnResult apnResult = sendFuture.get();
|
||||
|
||||
ArgumentCaptor<SimpleApnsPushNotification> notification = ArgumentCaptor.forClass(SimpleApnsPushNotification.class);
|
||||
verify(apnsClient, times(1)).sendNotification(notification.capture());
|
||||
|
||||
assertThat(notification.getValue().getToken()).isEqualTo(DESTINATION_APN_ID);
|
||||
assertThat(notification.getValue().getExpiration()).isEqualTo(new Date(30));
|
||||
assertThat(notification.getValue().getPayload()).isEqualTo("message");
|
||||
assertThat(notification.getValue().getPriority()).isEqualTo(DeliveryPriority.IMMEDIATE);
|
||||
assertThat(notification.getValue().getTopic()).isEqualTo("foo.voip");
|
||||
|
||||
assertThat(apnResult.getStatus()).isEqualTo(ApnResult.Status.SUCCESS);
|
||||
|
||||
verifyNoMoreInteractions(apnsClient);
|
||||
verifyNoMoreInteractions(accountsManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendApns() throws TransientPushFailureException {
|
||||
APNSender apnSender = new APNSender(accountsManager, jedisPool, apnsService, voipService, false, false);
|
||||
public void testSendApns() throws Exception {
|
||||
ApnsClient apnsClient = mock(ApnsClient.class);
|
||||
|
||||
PushNotificationResponse<SimpleApnsPushNotification> response = mock(PushNotificationResponse.class);
|
||||
when(response.isAccepted()).thenReturn(true);
|
||||
|
||||
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
|
||||
result.setSuccess(response);
|
||||
|
||||
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
|
||||
.thenReturn(result);
|
||||
|
||||
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10);
|
||||
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, "message", false, 30);
|
||||
apnSender.sendMessage(message);
|
||||
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
|
||||
|
||||
verify(jedis, times(1)).set(eq("APN-" + DESTINATION_APN_ID.toLowerCase()), eq(DESTINATION_NUMBER + "." + 1));
|
||||
verify(apnsService, times(1)).push(eq(DESTINATION_APN_ID), eq(message.getMessage()), eq(new Date(30)));
|
||||
verifyNoMoreInteractions(voipService);
|
||||
ListenableFuture<ApnResult> sendFuture = apnSender.sendMessage(message);
|
||||
ApnResult apnResult = sendFuture.get();
|
||||
|
||||
ArgumentCaptor<SimpleApnsPushNotification> notification = ArgumentCaptor.forClass(SimpleApnsPushNotification.class);
|
||||
verify(apnsClient, times(1)).sendNotification(notification.capture());
|
||||
|
||||
assertThat(notification.getValue().getToken()).isEqualTo(DESTINATION_APN_ID);
|
||||
assertThat(notification.getValue().getExpiration()).isEqualTo(new Date(30));
|
||||
assertThat(notification.getValue().getPayload()).isEqualTo("message");
|
||||
assertThat(notification.getValue().getPriority()).isEqualTo(DeliveryPriority.IMMEDIATE);
|
||||
assertThat(notification.getValue().getTopic()).isEqualTo("foo");
|
||||
|
||||
assertThat(apnResult.getStatus()).isEqualTo(ApnResult.Status.SUCCESS);
|
||||
|
||||
verifyNoMoreInteractions(apnsClient);
|
||||
verifyNoMoreInteractions(accountsManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFeedbackUnregistered() {
|
||||
APNSender apnSender = new APNSender(accountsManager, jedisPool, apnsService, voipService, false, false);
|
||||
apnSender.checkFeedback();
|
||||
public void testUnregisteredUser() throws Exception {
|
||||
ApnsClient apnsClient = mock(ApnsClient.class);
|
||||
|
||||
verify(jedis, times(1)).get(eq("APN-" +DESTINATION_APN_ID));
|
||||
PushNotificationResponse<SimpleApnsPushNotification> response = mock(PushNotificationResponse.class);
|
||||
when(response.isAccepted()).thenReturn(false);
|
||||
when(response.getRejectionReason()).thenReturn("Unregistered");
|
||||
|
||||
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
|
||||
result.setSuccess(response);
|
||||
|
||||
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
|
||||
.thenReturn(result);
|
||||
|
||||
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10);
|
||||
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, "message", true, 30);
|
||||
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
|
||||
|
||||
ListenableFuture<ApnResult> sendFuture = apnSender.sendMessage(message);
|
||||
ApnResult apnResult = sendFuture.get();
|
||||
|
||||
Thread.sleep(1000); // =(
|
||||
|
||||
ArgumentCaptor<SimpleApnsPushNotification> notification = ArgumentCaptor.forClass(SimpleApnsPushNotification.class);
|
||||
verify(apnsClient, times(1)).sendNotification(notification.capture());
|
||||
|
||||
assertThat(notification.getValue().getToken()).isEqualTo(DESTINATION_APN_ID);
|
||||
assertThat(notification.getValue().getExpiration()).isEqualTo(new Date(30));
|
||||
assertThat(notification.getValue().getPayload()).isEqualTo("message");
|
||||
assertThat(notification.getValue().getPriority()).isEqualTo(DeliveryPriority.IMMEDIATE);
|
||||
|
||||
assertThat(apnResult.getStatus()).isEqualTo(ApnResult.Status.NO_SUCH_USER);
|
||||
|
||||
verifyNoMoreInteractions(apnsClient);
|
||||
verify(accountsManager, times(1)).get(eq(DESTINATION_NUMBER));
|
||||
verify(destinationDevice, times(1)).getApnId();
|
||||
verify(destinationDevice, times(1)).setApnId(eq((String)null));
|
||||
verify(accountsManager, times(1)).update(eq(destinationAccount));
|
||||
|
||||
verifyNoMoreInteractions(accountsManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGenericFailure() throws Exception {
|
||||
ApnsClient apnsClient = mock(ApnsClient.class);
|
||||
|
||||
PushNotificationResponse<SimpleApnsPushNotification> response = mock(PushNotificationResponse.class);
|
||||
when(response.isAccepted()).thenReturn(false);
|
||||
when(response.getRejectionReason()).thenReturn("BadTopic");
|
||||
|
||||
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
|
||||
result.setSuccess(response);
|
||||
|
||||
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
|
||||
.thenReturn(result);
|
||||
|
||||
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10);
|
||||
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, "message", true, 30);
|
||||
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
|
||||
|
||||
ListenableFuture<ApnResult> sendFuture = apnSender.sendMessage(message);
|
||||
ApnResult apnResult = sendFuture.get();
|
||||
|
||||
ArgumentCaptor<SimpleApnsPushNotification> notification = ArgumentCaptor.forClass(SimpleApnsPushNotification.class);
|
||||
verify(apnsClient, times(1)).sendNotification(notification.capture());
|
||||
|
||||
assertThat(notification.getValue().getToken()).isEqualTo(DESTINATION_APN_ID);
|
||||
assertThat(notification.getValue().getExpiration()).isEqualTo(new Date(30));
|
||||
assertThat(notification.getValue().getPayload()).isEqualTo("message");
|
||||
assertThat(notification.getValue().getPriority()).isEqualTo(DeliveryPriority.IMMEDIATE);
|
||||
|
||||
assertThat(apnResult.getStatus()).isEqualTo(ApnResult.Status.GENERIC_FAILURE);
|
||||
|
||||
verifyNoMoreInteractions(apnsClient);
|
||||
verifyNoMoreInteractions(accountsManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransientFailure() throws Exception {
|
||||
ApnsClient apnsClient = mock(ApnsClient.class);
|
||||
|
||||
PushNotificationResponse<SimpleApnsPushNotification> response = mock(PushNotificationResponse.class);
|
||||
when(response.isAccepted()).thenReturn(true);
|
||||
|
||||
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
|
||||
result.setFailure(new ClientNotConnectedException("lost connection"));
|
||||
|
||||
DefaultPromise<Void> connectedResult = new DefaultPromise<>(executor);
|
||||
|
||||
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
|
||||
.thenReturn(result);
|
||||
|
||||
when(apnsClient.getReconnectionFuture())
|
||||
.thenReturn(connectedResult);
|
||||
|
||||
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10);
|
||||
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, "message", true, 30);
|
||||
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
|
||||
|
||||
ListenableFuture<ApnResult> sendFuture = apnSender.sendMessage(message);
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
assertThat(sendFuture.isDone()).isFalse();
|
||||
|
||||
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> updatedResult = new DefaultPromise<>(executor);
|
||||
updatedResult.setSuccess(response);
|
||||
|
||||
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
|
||||
.thenReturn(updatedResult);
|
||||
|
||||
connectedResult.setSuccess(null);
|
||||
|
||||
ApnResult apnResult = sendFuture.get();
|
||||
|
||||
ArgumentCaptor<SimpleApnsPushNotification> notification = ArgumentCaptor.forClass(SimpleApnsPushNotification.class);
|
||||
verify(apnsClient, times(2)).sendNotification(notification.capture());
|
||||
verify(apnsClient, times(1)).getReconnectionFuture();
|
||||
|
||||
assertThat(notification.getValue().getToken()).isEqualTo(DESTINATION_APN_ID);
|
||||
assertThat(notification.getValue().getExpiration()).isEqualTo(new Date(30));
|
||||
assertThat(notification.getValue().getPayload()).isEqualTo("message");
|
||||
assertThat(notification.getValue().getPriority()).isEqualTo(DeliveryPriority.IMMEDIATE);
|
||||
|
||||
assertThat(apnResult.getStatus()).isEqualTo(ApnResult.Status.SUCCESS);
|
||||
|
||||
verifyNoMoreInteractions(apnsClient);
|
||||
verifyNoMoreInteractions(accountsManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPersistentTransientFailure() throws Exception {
|
||||
ApnsClient apnsClient = mock(ApnsClient.class);
|
||||
|
||||
PushNotificationResponse<SimpleApnsPushNotification> response = mock(PushNotificationResponse.class);
|
||||
when(response.isAccepted()).thenReturn(true);
|
||||
|
||||
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
|
||||
result.setFailure(new ApnsServerException("apn servers suck again"));
|
||||
|
||||
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
|
||||
.thenReturn(result);
|
||||
|
||||
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 3);
|
||||
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, "message", true, 30);
|
||||
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
|
||||
|
||||
ListenableFuture<ApnResult> sendFuture = apnSender.sendMessage(message);
|
||||
|
||||
try {
|
||||
sendFuture.get();
|
||||
throw new AssertionError("future did not throw exception");
|
||||
} catch (Exception e) {
|
||||
// good
|
||||
}
|
||||
|
||||
ArgumentCaptor<SimpleApnsPushNotification> notification = ArgumentCaptor.forClass(SimpleApnsPushNotification.class);
|
||||
verify(apnsClient, times(4)).sendNotification(notification.capture());
|
||||
|
||||
assertThat(notification.getValue().getToken()).isEqualTo(DESTINATION_APN_ID);
|
||||
assertThat(notification.getValue().getExpiration()).isEqualTo(new Date(30));
|
||||
assertThat(notification.getValue().getPayload()).isEqualTo("message");
|
||||
assertThat(notification.getValue().getPriority()).isEqualTo(DeliveryPriority.IMMEDIATE);
|
||||
|
||||
verifyNoMoreInteractions(apnsClient);
|
||||
verifyNoMoreInteractions(accountsManager);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -43,12 +43,10 @@ public class ApnFallbackManagerTest {
|
|||
|
||||
assertEquals(arguments.get(0).getMessage(), message.getMessage());
|
||||
assertEquals(arguments.get(0).getApnId(), task.getVoipApnId());
|
||||
assertTrue(arguments.get(0).isVoip());
|
||||
// assertEquals(arguments.get(0).getExpirationTime(), Integer.MAX_VALUE * 1000L);
|
||||
|
||||
assertEquals(arguments.get(1).getMessage(), message.getMessage());
|
||||
assertEquals(arguments.get(1).getApnId(), task.getApnId());
|
||||
assertFalse(arguments.get(1).isVoip());
|
||||
assertEquals(arguments.get(1).getExpirationTime(), Integer.MAX_VALUE * 1000L);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue