diff --git a/pom.xml b/pom.xml index 8106feb65..3ceb736ed 100644 --- a/pom.xml +++ b/pom.xml @@ -110,16 +110,25 @@ 0.5.0 + - com.relayrides + com.turo pushy - 0.9.3 + 0.13.7 - com.relayrides + com.turo pushy-dropwizard-metrics-listener - 0.9.3 + 0.13.7 + + io.netty + netty-tcnative-boringssl-static + 2.0.20.Final + runtime + + + org.whispersystems gcm-sender-async diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java index 4905c677e..d1a9189c1 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java @@ -68,7 +68,7 @@ public class APNSender implements Managed { this.sandbox = configuration.isSandboxEnabled(); this.apnsClient = new RetryingApnsClient(configuration.getPushCertificate(), configuration.getPushKey(), - 10); + sandbox); } @VisibleForTesting @@ -115,7 +115,6 @@ public class APNSender implements Managed { @Override public void start() { this.executor = Executors.newSingleThreadExecutor(); - this.apnsClient.connect(sandbox); } @Override diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/RetryingApnsClient.java b/src/main/java/org/whispersystems/textsecuregcm/push/RetryingApnsClient.java index 7e97b95ff..85dc4ee29 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/RetryingApnsClient.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/RetryingApnsClient.java @@ -6,19 +6,12 @@ import com.codahale.metrics.SharedMetricRegistries; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; -import com.nurkiewicz.asyncretry.RetryContext; -import com.nurkiewicz.asyncretry.RetryExecutor; -import com.nurkiewicz.asyncretry.function.RetryCallable; -import com.relayrides.pushy.apns.ApnsClient; -import com.relayrides.pushy.apns.ApnsClientBuilder; -import com.relayrides.pushy.apns.ApnsServerException; -import com.relayrides.pushy.apns.ClientNotConnectedException; -import com.relayrides.pushy.apns.DeliveryPriority; -import com.relayrides.pushy.apns.PushNotificationResponse; -import com.relayrides.pushy.apns.metrics.dropwizard.DropwizardApnsClientMetricsListener; -import com.relayrides.pushy.apns.util.SimpleApnsPushNotification; - +import com.turo.pushy.apns.ApnsClient; +import com.turo.pushy.apns.ApnsClientBuilder; +import com.turo.pushy.apns.DeliveryPriority; +import com.turo.pushy.apns.PushNotificationResponse; +import com.turo.pushy.apns.metrics.dropwizard.DropwizardApnsClientMetricsListener; +import com.turo.pushy.apns.util.SimpleApnsPushNotification; import org.bouncycastle.openssl.PEMReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,21 +26,17 @@ import java.security.cert.X509Certificate; import java.util.Date; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import static com.codahale.metrics.MetricRegistry.name; -import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; public class RetryingApnsClient { private static final Logger logger = LoggerFactory.getLogger(RetryingApnsClient.class); - private final ApnsClient apnsClient; - private final RetryExecutor retryExecutor; + private final ApnsClient apnsClient; - RetryingApnsClient(String apnCertificate, String apnKey, int retryCount) + RetryingApnsClient(String apnCertificate, String apnKey, boolean sandbox) throws IOException { MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); @@ -60,48 +49,26 @@ public class RetryingApnsClient { this.apnsClient = new ApnsClientBuilder().setClientCredentials(initializeCertificate(apnCertificate), initializePrivateKey(apnKey), null) .setMetricsListener(metricsListener) + .setApnsServer(sandbox ? ApnsClientBuilder.DEVELOPMENT_APNS_HOST : ApnsClientBuilder.PRODUCTION_APNS_HOST) .build(); - this.retryExecutor = initializeExecutor(retryCount); } @VisibleForTesting - public RetryingApnsClient(ApnsClient apnsClient, int retryCount) { - this.apnsClient = apnsClient; - this.retryExecutor = initializeExecutor(retryCount); - } - - private static RetryExecutor initializeExecutor(int retryCount) { - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - - return new AsyncRetryExecutor(executorService).retryOn(ClientNotConnectedException.class) - .retryOn(InterruptedException.class) - .retryOn(ApnsServerException.class) - .withExponentialBackoff(100, 2.0) - .withUniformJitter() - .withMaxDelay(4000) - .withMaxRetries(retryCount); + public RetryingApnsClient(ApnsClient apnsClient) { + this.apnsClient = apnsClient; } ListenableFuture send(final String apnId, final String topic, final String payload, final Date expiration) { - return this.retryExecutor.getFutureWithRetry(new RetryCallable>() { - @Override - public ListenableFuture call(RetryContext context) throws Exception { - SettableFuture result = SettableFuture.create(); - SimpleApnsPushNotification notification = new SimpleApnsPushNotification(apnId, topic, payload, expiration, DeliveryPriority.IMMEDIATE); + SettableFuture result = SettableFuture.create(); + SimpleApnsPushNotification notification = new SimpleApnsPushNotification(apnId, topic, payload, expiration, DeliveryPriority.IMMEDIATE); - apnsClient.sendNotification(notification).addListener(new ResponseHandler(apnsClient, result)); + apnsClient.sendNotification(notification).addListener(new ResponseHandler(result)); - return result; - } - }); - } - - void connect(boolean sandbox) { - apnsClient.connect(sandbox ? ApnsClient.DEVELOPMENT_APNS_HOST : ApnsClient.PRODUCTION_APNS_HOST).awaitUninterruptibly(); + return result; } void disconnect() { - apnsClient.disconnect(); + apnsClient.close(); } private static X509Certificate initializeCertificate(String pemCertificate) throws IOException { @@ -116,11 +83,9 @@ public class RetryingApnsClient { private static final class ResponseHandler implements GenericFutureListener>> { - private final ApnsClient client; private final SettableFuture future; - private ResponseHandler(ApnsClient client, SettableFuture future) { - this.client = client; + private ResponseHandler(SettableFuture future) { this.future = future; } @@ -145,21 +110,9 @@ public class RetryingApnsClient { future.setException(e); } catch (ExecutionException e) { logger.warn("Execution exception", e); - if (e.getCause() instanceof ClientNotConnectedException) setDisconnected(e.getCause()); - else future.setException(e.getCause()); + future.setException(e.getCause()); } } - - private void setDisconnected(final Throwable t) { - logger.warn("Client disconnected, waiting for reconnect...", t); - client.getReconnectionFuture().addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future complete) { - logger.warn("Client reconnected..."); - future.setException(t); - } - }); - } } public static class ApnResult { diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/push/APNSenderTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/push/APNSenderTest.java index cbfc4e7f9..a913be98d 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/push/APNSenderTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/push/APNSenderTest.java @@ -1,15 +1,16 @@ package org.whispersystems.textsecuregcm.tests.push; import com.google.common.util.concurrent.ListenableFuture; -import com.relayrides.pushy.apns.ApnsClient; -import com.relayrides.pushy.apns.ApnsServerException; -import com.relayrides.pushy.apns.ClientNotConnectedException; -import com.relayrides.pushy.apns.DeliveryPriority; -import com.relayrides.pushy.apns.PushNotificationResponse; -import com.relayrides.pushy.apns.util.SimpleApnsPushNotification; +import com.turo.pushy.apns.ApnsClient; +import com.turo.pushy.apns.ApnsPushNotification; +import com.turo.pushy.apns.DeliveryPriority; +import com.turo.pushy.apns.PushNotificationResponse; +import com.turo.pushy.apns.util.SimpleApnsPushNotification; +import com.turo.pushy.apns.util.concurrent.PushNotificationFuture; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.push.APNSender; import org.whispersystems.textsecuregcm.push.ApnFallbackManager; import org.whispersystems.textsecuregcm.push.ApnMessage; @@ -22,10 +23,12 @@ import org.whispersystems.textsecuregcm.tests.util.SynchronousExecutorService; import java.util.Date; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import io.netty.util.concurrent.DefaultEventExecutor; import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.EventExecutor; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.*; @@ -56,13 +59,10 @@ public class APNSenderTest { PushNotificationResponse response = mock(PushNotificationResponse.class); when(response.isAccepted()).thenReturn(true); - DefaultPromise> result = new DefaultPromise<>(executor); - result.setSuccess(response); - when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class))) - .thenReturn(result); + .thenAnswer((Answer) invocationOnMock -> new MockPushNotificationFuture<>(executor, invocationOnMock.getArgument(0), response)); - RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10); + RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient); ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, true); APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false); @@ -93,13 +93,10 @@ public class APNSenderTest { PushNotificationResponse response = mock(PushNotificationResponse.class); when(response.isAccepted()).thenReturn(true); - DefaultPromise> result = new DefaultPromise<>(executor); - result.setSuccess(response); - when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class))) - .thenReturn(result); + .thenAnswer((Answer) invocationOnMock -> new MockPushNotificationFuture<>(executor, invocationOnMock.getArgument(0), response)); - RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10); + RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient); ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, false); APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false); apnSender.setApnFallbackManager(fallbackManager); @@ -131,13 +128,11 @@ public class APNSenderTest { when(response.isAccepted()).thenReturn(false); when(response.getRejectionReason()).thenReturn("Unregistered"); - DefaultPromise> result = new DefaultPromise<>(executor); - result.setSuccess(response); - when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class))) - .thenReturn(result); + .thenAnswer((Answer) invocationOnMock -> new MockPushNotificationFuture<>(executor, invocationOnMock.getArgument(0), response)); - RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10); + + RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient); ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, true); APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false); apnSender.setApnFallbackManager(fallbackManager); @@ -237,13 +232,10 @@ public class APNSenderTest { when(response.isAccepted()).thenReturn(false); when(response.getRejectionReason()).thenReturn("Unregistered"); - DefaultPromise> result = new DefaultPromise<>(executor); - result.setSuccess(response); - when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class))) - .thenReturn(result); + .thenAnswer((Answer) invocationOnMock -> new MockPushNotificationFuture<>(executor, invocationOnMock.getArgument(0), response)); - RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10); + RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient); ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, true); APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false); apnSender.setApnFallbackManager(fallbackManager); @@ -335,13 +327,10 @@ public class APNSenderTest { when(response.isAccepted()).thenReturn(false); when(response.getRejectionReason()).thenReturn("BadTopic"); - DefaultPromise> result = new DefaultPromise<>(executor); - result.setSuccess(response); - when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class))) - .thenReturn(result); + .thenAnswer((Answer) invocationOnMock -> new MockPushNotificationFuture<>(executor, invocationOnMock.getArgument(0), response)); - RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10); + RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient); ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, true); APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false); apnSender.setApnFallbackManager(fallbackManager); @@ -365,74 +354,16 @@ public class APNSenderTest { } @Test - public void testTransientFailure() throws Exception { + public void testFailure() throws Exception { ApnsClient apnsClient = mock(ApnsClient.class); PushNotificationResponse response = mock(PushNotificationResponse.class); when(response.isAccepted()).thenReturn(true); - DefaultPromise> result = new DefaultPromise<>(executor); - result.setFailure(new ClientNotConnectedException("lost connection")); - - DefaultPromise connectedResult = new DefaultPromise<>(executor); - when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class))) - .thenReturn(result); + .thenAnswer((Answer) invocationOnMock -> new MockPushNotificationFuture<>(executor, invocationOnMock.getArgument(0), new Exception("lost connection"))); - when(apnsClient.getReconnectionFuture()) - .thenReturn(connectedResult); - - RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10); - ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, true); - APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false); - apnSender.setApnFallbackManager(fallbackManager); - - ListenableFuture sendFuture = apnSender.sendMessage(message); - - Thread.sleep(1000); - - assertThat(sendFuture.isDone()).isFalse(); - - DefaultPromise> updatedResult = new DefaultPromise<>(executor); - updatedResult.setSuccess(response); - - when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class))) - .thenReturn(updatedResult); - - connectedResult.setSuccess(null); - - ApnResult apnResult = sendFuture.get(); - - ArgumentCaptor notification = ArgumentCaptor.forClass(SimpleApnsPushNotification.class); - verify(apnsClient, times(2)).sendNotification(notification.capture()); - verify(apnsClient, times(1)).getReconnectionFuture(); - - assertThat(notification.getValue().getToken()).isEqualTo(DESTINATION_APN_ID); - assertThat(notification.getValue().getExpiration()).isEqualTo(new Date(ApnMessage.MAX_EXPIRATION)); - assertThat(notification.getValue().getPayload()).isEqualTo(ApnMessage.APN_PAYLOAD); - assertThat(notification.getValue().getPriority()).isEqualTo(DeliveryPriority.IMMEDIATE); - - assertThat(apnResult.getStatus()).isEqualTo(ApnResult.Status.SUCCESS); - - verifyNoMoreInteractions(apnsClient); - verifyNoMoreInteractions(accountsManager); - verifyNoMoreInteractions(fallbackManager); - } - - @Test - public void testPersistentTransientFailure() throws Exception { - ApnsClient apnsClient = mock(ApnsClient.class); - - PushNotificationResponse response = mock(PushNotificationResponse.class); - when(response.isAccepted()).thenReturn(true); - - DefaultPromise> result = new DefaultPromise<>(executor); - result.setFailure(new ApnsServerException("apn servers suck again")); - - when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class))) - .thenReturn(result); - - RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 3); + RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient); ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, true); APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false); apnSender.setApnFallbackManager(fallbackManager); @@ -441,13 +372,15 @@ public class APNSenderTest { try { sendFuture.get(); - throw new AssertionError("future did not throw exception"); - } catch (Exception e) { + throw new AssertionError(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } catch (ExecutionException e) { // good } ArgumentCaptor notification = ArgumentCaptor.forClass(SimpleApnsPushNotification.class); - verify(apnsClient, times(4)).sendNotification(notification.capture()); + verify(apnsClient, times(1)).sendNotification(notification.capture()); assertThat(notification.getValue().getToken()).isEqualTo(DESTINATION_APN_ID); assertThat(notification.getValue().getExpiration()).isEqualTo(new Date(ApnMessage.MAX_EXPIRATION)); @@ -459,4 +392,32 @@ public class APNSenderTest { verifyNoMoreInteractions(fallbackManager); } + private static class MockPushNotificationFuture

extends DefaultPromise implements PushNotificationFuture { + + private final P pushNotification; + + MockPushNotificationFuture(final EventExecutor eventExecutor, final P pushNotification) { + super(eventExecutor); + this.pushNotification = pushNotification; + } + + MockPushNotificationFuture(final EventExecutor eventExecutor, final P pushNotification, final V response) { + super(eventExecutor); + this.pushNotification = pushNotification; + setSuccess(response); + } + + MockPushNotificationFuture(final EventExecutor eventExecutor, final P pushNotification, final Exception exception) { + super(eventExecutor); + this.pushNotification = pushNotification; + setFailure(exception); + } + + + @Override + public P getPushNotification() { + return pushNotification; + } + } + }