Update apns library, remove retrying executor

This commit is contained in:
Moxie Marlinspike 2019-03-22 19:58:52 -07:00
parent 6610a29422
commit b3c615576e
4 changed files with 88 additions and 166 deletions

17
pom.xml
View File

@ -110,16 +110,25 @@
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>com.relayrides</groupId>
<groupId>com.turo</groupId>
<artifactId>pushy</artifactId>
<version>0.9.3</version>
<version>0.13.7</version>
</dependency>
<dependency>
<groupId>com.relayrides</groupId>
<groupId>com.turo</groupId>
<artifactId>pushy-dropwizard-metrics-listener</artifactId>
<version>0.9.3</version>
<version>0.13.7</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>2.0.20.Final</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.whispersystems</groupId>
<artifactId>gcm-sender-async</artifactId>

View File

@ -68,7 +68,7 @@ public class APNSender implements Managed {
this.sandbox = configuration.isSandboxEnabled();
this.apnsClient = new RetryingApnsClient(configuration.getPushCertificate(),
configuration.getPushKey(),
10);
sandbox);
}
@VisibleForTesting
@ -115,7 +115,6 @@ public class APNSender implements Managed {
@Override
public void start() {
this.executor = Executors.newSingleThreadExecutor();
this.apnsClient.connect(sandbox);
}
@Override

View File

@ -6,19 +6,12 @@ import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
import com.nurkiewicz.asyncretry.RetryContext;
import com.nurkiewicz.asyncretry.RetryExecutor;
import com.nurkiewicz.asyncretry.function.RetryCallable;
import com.relayrides.pushy.apns.ApnsClient;
import com.relayrides.pushy.apns.ApnsClientBuilder;
import com.relayrides.pushy.apns.ApnsServerException;
import com.relayrides.pushy.apns.ClientNotConnectedException;
import com.relayrides.pushy.apns.DeliveryPriority;
import com.relayrides.pushy.apns.PushNotificationResponse;
import com.relayrides.pushy.apns.metrics.dropwizard.DropwizardApnsClientMetricsListener;
import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
import com.turo.pushy.apns.ApnsClient;
import com.turo.pushy.apns.ApnsClientBuilder;
import com.turo.pushy.apns.DeliveryPriority;
import com.turo.pushy.apns.PushNotificationResponse;
import com.turo.pushy.apns.metrics.dropwizard.DropwizardApnsClientMetricsListener;
import com.turo.pushy.apns.util.SimpleApnsPushNotification;
import org.bouncycastle.openssl.PEMReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,21 +26,17 @@ import java.security.cert.X509Certificate;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static com.codahale.metrics.MetricRegistry.name;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
public class RetryingApnsClient {
private static final Logger logger = LoggerFactory.getLogger(RetryingApnsClient.class);
private final ApnsClient apnsClient;
private final RetryExecutor retryExecutor;
private final ApnsClient apnsClient;
RetryingApnsClient(String apnCertificate, String apnKey, int retryCount)
RetryingApnsClient(String apnCertificate, String apnKey, boolean sandbox)
throws IOException
{
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
@ -60,48 +49,26 @@ public class RetryingApnsClient {
this.apnsClient = new ApnsClientBuilder().setClientCredentials(initializeCertificate(apnCertificate),
initializePrivateKey(apnKey), null)
.setMetricsListener(metricsListener)
.setApnsServer(sandbox ? ApnsClientBuilder.DEVELOPMENT_APNS_HOST : ApnsClientBuilder.PRODUCTION_APNS_HOST)
.build();
this.retryExecutor = initializeExecutor(retryCount);
}
@VisibleForTesting
public RetryingApnsClient(ApnsClient apnsClient, int retryCount) {
this.apnsClient = apnsClient;
this.retryExecutor = initializeExecutor(retryCount);
}
private static RetryExecutor initializeExecutor(int retryCount) {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
return new AsyncRetryExecutor(executorService).retryOn(ClientNotConnectedException.class)
.retryOn(InterruptedException.class)
.retryOn(ApnsServerException.class)
.withExponentialBackoff(100, 2.0)
.withUniformJitter()
.withMaxDelay(4000)
.withMaxRetries(retryCount);
public RetryingApnsClient(ApnsClient apnsClient) {
this.apnsClient = apnsClient;
}
ListenableFuture<ApnResult> send(final String apnId, final String topic, final String payload, final Date expiration) {
return this.retryExecutor.getFutureWithRetry(new RetryCallable<ListenableFuture<ApnResult>>() {
@Override
public ListenableFuture<ApnResult> call(RetryContext context) throws Exception {
SettableFuture<ApnResult> result = SettableFuture.create();
SimpleApnsPushNotification notification = new SimpleApnsPushNotification(apnId, topic, payload, expiration, DeliveryPriority.IMMEDIATE);
SettableFuture<ApnResult> result = SettableFuture.create();
SimpleApnsPushNotification notification = new SimpleApnsPushNotification(apnId, topic, payload, expiration, DeliveryPriority.IMMEDIATE);
apnsClient.sendNotification(notification).addListener(new ResponseHandler(apnsClient, result));
apnsClient.sendNotification(notification).addListener(new ResponseHandler(result));
return result;
}
});
}
void connect(boolean sandbox) {
apnsClient.connect(sandbox ? ApnsClient.DEVELOPMENT_APNS_HOST : ApnsClient.PRODUCTION_APNS_HOST).awaitUninterruptibly();
return result;
}
void disconnect() {
apnsClient.disconnect();
apnsClient.close();
}
private static X509Certificate initializeCertificate(String pemCertificate) throws IOException {
@ -116,11 +83,9 @@ public class RetryingApnsClient {
private static final class ResponseHandler implements GenericFutureListener<io.netty.util.concurrent.Future<PushNotificationResponse<SimpleApnsPushNotification>>> {
private final ApnsClient client;
private final SettableFuture<ApnResult> future;
private ResponseHandler(ApnsClient client, SettableFuture<ApnResult> future) {
this.client = client;
private ResponseHandler(SettableFuture<ApnResult> future) {
this.future = future;
}
@ -145,21 +110,9 @@ public class RetryingApnsClient {
future.setException(e);
} catch (ExecutionException e) {
logger.warn("Execution exception", e);
if (e.getCause() instanceof ClientNotConnectedException) setDisconnected(e.getCause());
else future.setException(e.getCause());
future.setException(e.getCause());
}
}
private void setDisconnected(final Throwable t) {
logger.warn("Client disconnected, waiting for reconnect...", t);
client.getReconnectionFuture().addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(Future<Void> complete) {
logger.warn("Client reconnected...");
future.setException(t);
}
});
}
}
public static class ApnResult {

View File

@ -1,15 +1,16 @@
package org.whispersystems.textsecuregcm.tests.push;
import com.google.common.util.concurrent.ListenableFuture;
import com.relayrides.pushy.apns.ApnsClient;
import com.relayrides.pushy.apns.ApnsServerException;
import com.relayrides.pushy.apns.ClientNotConnectedException;
import com.relayrides.pushy.apns.DeliveryPriority;
import com.relayrides.pushy.apns.PushNotificationResponse;
import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
import com.turo.pushy.apns.ApnsClient;
import com.turo.pushy.apns.ApnsPushNotification;
import com.turo.pushy.apns.DeliveryPriority;
import com.turo.pushy.apns.PushNotificationResponse;
import com.turo.pushy.apns.util.SimpleApnsPushNotification;
import com.turo.pushy.apns.util.concurrent.PushNotificationFuture;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.push.APNSender;
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
import org.whispersystems.textsecuregcm.push.ApnMessage;
@ -22,10 +23,12 @@ import org.whispersystems.textsecuregcm.tests.util.SynchronousExecutorService;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*;
@ -56,13 +59,10 @@ public class APNSenderTest {
PushNotificationResponse<SimpleApnsPushNotification> response = mock(PushNotificationResponse.class);
when(response.isAccepted()).thenReturn(true);
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
result.setSuccess(response);
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
.thenReturn(result);
.thenAnswer((Answer) invocationOnMock -> new MockPushNotificationFuture<>(executor, invocationOnMock.getArgument(0), response));
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10);
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient);
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, true);
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
@ -93,13 +93,10 @@ public class APNSenderTest {
PushNotificationResponse<SimpleApnsPushNotification> response = mock(PushNotificationResponse.class);
when(response.isAccepted()).thenReturn(true);
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
result.setSuccess(response);
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
.thenReturn(result);
.thenAnswer((Answer) invocationOnMock -> new MockPushNotificationFuture<>(executor, invocationOnMock.getArgument(0), response));
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10);
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient);
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, false);
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
apnSender.setApnFallbackManager(fallbackManager);
@ -131,13 +128,11 @@ public class APNSenderTest {
when(response.isAccepted()).thenReturn(false);
when(response.getRejectionReason()).thenReturn("Unregistered");
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
result.setSuccess(response);
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
.thenReturn(result);
.thenAnswer((Answer) invocationOnMock -> new MockPushNotificationFuture<>(executor, invocationOnMock.getArgument(0), response));
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10);
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient);
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, true);
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
apnSender.setApnFallbackManager(fallbackManager);
@ -237,13 +232,10 @@ public class APNSenderTest {
when(response.isAccepted()).thenReturn(false);
when(response.getRejectionReason()).thenReturn("Unregistered");
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
result.setSuccess(response);
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
.thenReturn(result);
.thenAnswer((Answer) invocationOnMock -> new MockPushNotificationFuture<>(executor, invocationOnMock.getArgument(0), response));
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10);
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient);
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, true);
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
apnSender.setApnFallbackManager(fallbackManager);
@ -335,13 +327,10 @@ public class APNSenderTest {
when(response.isAccepted()).thenReturn(false);
when(response.getRejectionReason()).thenReturn("BadTopic");
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
result.setSuccess(response);
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
.thenReturn(result);
.thenAnswer((Answer) invocationOnMock -> new MockPushNotificationFuture<>(executor, invocationOnMock.getArgument(0), response));
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10);
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient);
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, true);
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
apnSender.setApnFallbackManager(fallbackManager);
@ -365,74 +354,16 @@ public class APNSenderTest {
}
@Test
public void testTransientFailure() throws Exception {
public void testFailure() throws Exception {
ApnsClient apnsClient = mock(ApnsClient.class);
PushNotificationResponse<SimpleApnsPushNotification> response = mock(PushNotificationResponse.class);
when(response.isAccepted()).thenReturn(true);
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
result.setFailure(new ClientNotConnectedException("lost connection"));
DefaultPromise<Void> connectedResult = new DefaultPromise<>(executor);
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
.thenReturn(result);
.thenAnswer((Answer) invocationOnMock -> new MockPushNotificationFuture<>(executor, invocationOnMock.getArgument(0), new Exception("lost connection")));
when(apnsClient.getReconnectionFuture())
.thenReturn(connectedResult);
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 10);
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, true);
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
apnSender.setApnFallbackManager(fallbackManager);
ListenableFuture<ApnResult> sendFuture = apnSender.sendMessage(message);
Thread.sleep(1000);
assertThat(sendFuture.isDone()).isFalse();
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> updatedResult = new DefaultPromise<>(executor);
updatedResult.setSuccess(response);
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
.thenReturn(updatedResult);
connectedResult.setSuccess(null);
ApnResult apnResult = sendFuture.get();
ArgumentCaptor<SimpleApnsPushNotification> notification = ArgumentCaptor.forClass(SimpleApnsPushNotification.class);
verify(apnsClient, times(2)).sendNotification(notification.capture());
verify(apnsClient, times(1)).getReconnectionFuture();
assertThat(notification.getValue().getToken()).isEqualTo(DESTINATION_APN_ID);
assertThat(notification.getValue().getExpiration()).isEqualTo(new Date(ApnMessage.MAX_EXPIRATION));
assertThat(notification.getValue().getPayload()).isEqualTo(ApnMessage.APN_PAYLOAD);
assertThat(notification.getValue().getPriority()).isEqualTo(DeliveryPriority.IMMEDIATE);
assertThat(apnResult.getStatus()).isEqualTo(ApnResult.Status.SUCCESS);
verifyNoMoreInteractions(apnsClient);
verifyNoMoreInteractions(accountsManager);
verifyNoMoreInteractions(fallbackManager);
}
@Test
public void testPersistentTransientFailure() throws Exception {
ApnsClient apnsClient = mock(ApnsClient.class);
PushNotificationResponse<SimpleApnsPushNotification> response = mock(PushNotificationResponse.class);
when(response.isAccepted()).thenReturn(true);
DefaultPromise<PushNotificationResponse<SimpleApnsPushNotification>> result = new DefaultPromise<>(executor);
result.setFailure(new ApnsServerException("apn servers suck again"));
when(apnsClient.sendNotification(any(SimpleApnsPushNotification.class)))
.thenReturn(result);
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient, 3);
RetryingApnsClient retryingApnsClient = new RetryingApnsClient(apnsClient);
ApnMessage message = new ApnMessage(DESTINATION_APN_ID, DESTINATION_NUMBER, 1, true);
APNSender apnSender = new APNSender(new SynchronousExecutorService(), accountsManager, retryingApnsClient, "foo", false);
apnSender.setApnFallbackManager(fallbackManager);
@ -441,13 +372,15 @@ public class APNSenderTest {
try {
sendFuture.get();
throw new AssertionError("future did not throw exception");
} catch (Exception e) {
throw new AssertionError();
} catch (InterruptedException e) {
throw new AssertionError(e);
} catch (ExecutionException e) {
// good
}
ArgumentCaptor<SimpleApnsPushNotification> notification = ArgumentCaptor.forClass(SimpleApnsPushNotification.class);
verify(apnsClient, times(4)).sendNotification(notification.capture());
verify(apnsClient, times(1)).sendNotification(notification.capture());
assertThat(notification.getValue().getToken()).isEqualTo(DESTINATION_APN_ID);
assertThat(notification.getValue().getExpiration()).isEqualTo(new Date(ApnMessage.MAX_EXPIRATION));
@ -459,4 +392,32 @@ public class APNSenderTest {
verifyNoMoreInteractions(fallbackManager);
}
private static class MockPushNotificationFuture <P extends ApnsPushNotification, V> extends DefaultPromise<V> implements PushNotificationFuture<P, V> {
private final P pushNotification;
MockPushNotificationFuture(final EventExecutor eventExecutor, final P pushNotification) {
super(eventExecutor);
this.pushNotification = pushNotification;
}
MockPushNotificationFuture(final EventExecutor eventExecutor, final P pushNotification, final V response) {
super(eventExecutor);
this.pushNotification = pushNotification;
setSuccess(response);
}
MockPushNotificationFuture(final EventExecutor eventExecutor, final P pushNotification, final Exception exception) {
super(eventExecutor);
this.pushNotification = pushNotification;
setFailure(exception);
}
@Override
public P getPushNotification() {
return pushNotification;
}
}
}