Update gcm-sender-async to use jdk11 httpclient

This commit is contained in:
Moxie Marlinspike 2019-05-29 19:58:09 -07:00
parent e6f25b9c5e
commit 105a38a7db
10 changed files with 177 additions and 248 deletions

View File

@ -15,28 +15,18 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.httpcomponents</groupId> <groupId>io.github.resilience4j</groupId>
<artifactId>httpasyncclient</artifactId> <artifactId>resilience4j-retry</artifactId>
<version>4.0.2</version> <version>${resilience4j.version}</version>
</dependency>
<dependency>
<groupId>com.nurkiewicz.asyncretry</groupId>
<artifactId>asyncretry-jdk7</artifactId>
<version>0.0.5</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.squareup.okhttp</groupId> <groupId>com.squareup.okhttp</groupId>
<artifactId>mockwebserver</artifactId> <artifactId>mockwebserver</artifactId>
<version>2.1.0</version> <version>2.1.0</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<version>1.52</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>

View File

@ -1,4 +1,4 @@
/** /*
* Copyright (C) 2015 Open Whisper Systems * Copyright (C) 2015 Open Whisper Systems
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
@ -21,13 +21,11 @@ package org.whispersystems.gcm.server;
*/ */
public class Result { public class Result {
private final Object context;
private final String canonicalRegistrationId; private final String canonicalRegistrationId;
private final String messageId; private final String messageId;
private final String error; private final String error;
Result(Object context, String canonicalRegistrationId, String messageId, String error) { Result(String canonicalRegistrationId, String messageId, String error) {
this.context = context;
this.canonicalRegistrationId = canonicalRegistrationId; this.canonicalRegistrationId = canonicalRegistrationId;
this.messageId = messageId; this.messageId = messageId;
this.error = error; this.error = error;
@ -91,10 +89,4 @@ public class Result {
return "InvalidRegistration".equals(error); return "InvalidRegistration".equals(error);
} }
/**
* @return The context passed into Sender.send(), if any.
*/
public Object getContext() {
return context;
}
} }

View File

@ -1,4 +1,4 @@
/** /*
* Copyright (C) 2015 Open Whisper Systems * Copyright (C) 2015 Open Whisper Systems
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
@ -16,33 +16,29 @@
*/ */
package org.whispersystems.gcm.server; package org.whispersystems.gcm.server;
import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
import com.nurkiewicz.asyncretry.RetryContext;
import com.nurkiewicz.asyncretry.RetryExecutor;
import com.nurkiewicz.asyncretry.function.RetryCallable;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.util.EntityUtils;
import org.whispersystems.gcm.server.internal.GcmResponseEntity; import org.whispersystems.gcm.server.internal.GcmResponseEntity;
import org.whispersystems.gcm.server.internal.GcmResponseListEntity; import org.whispersystems.gcm.server.internal.GcmResponseListEntity;
import javax.net.ssl.SSLContext;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import io.github.resilience4j.retry.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
/** /**
* The main interface to sending GCM messages. Thread safe. * The main interface to sending GCM messages. Thread safe.
* *
@ -52,18 +48,20 @@ public class Sender {
private static final String PRODUCTION_URL = "https://fcm.googleapis.com/fcm/send"; private static final String PRODUCTION_URL = "https://fcm.googleapis.com/fcm/send";
private final CloseableHttpAsyncClient client; private final HttpClient client;
private final String authorizationHeader; private final String authorizationHeader;
private final RetryExecutor executor; private final URI uri;
private final String url; private final Retry retry;
private final ObjectMapper mapper;
private final ScheduledExecutorService executorService;
/** /**
* Construct a Sender instance. * Construct a Sender instance.
* *
* @param apiKey Your application's GCM API key. * @param apiKey Your application's GCM API key.
*/ */
public Sender(String apiKey) { public Sender(String apiKey, ObjectMapper mapper) {
this(apiKey, 10); this(apiKey, mapper, 10);
} }
/** /**
@ -72,31 +70,36 @@ public class Sender {
* @param apiKey Your application's GCM API key. * @param apiKey Your application's GCM API key.
* @param retryCount The number of retries to attempt on a network error or 500 response. * @param retryCount The number of retries to attempt on a network error or 500 response.
*/ */
public Sender(String apiKey, int retryCount) { public Sender(String apiKey, ObjectMapper mapper, int retryCount) {
this(apiKey, retryCount, PRODUCTION_URL); this(apiKey, mapper, retryCount, PRODUCTION_URL);
} }
@VisibleForTesting @VisibleForTesting
public Sender(String apiKey, int retryCount, String url) { public Sender(String apiKey, ObjectMapper mapper, int retryCount, String url) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); this.mapper = mapper;
this.executorService = Executors.newSingleThreadScheduledExecutor();
this.url = url; this.uri = URI.create(url);
this.authorizationHeader = String.format("key=%s", apiKey); this.authorizationHeader = String.format("key=%s", apiKey);
this.retry = Retry.of("fcm-sender", RetryConfig.custom()
.maxAttempts(retryCount)
.intervalFunction(IntervalFunction.ofExponentialRandomBackoff(Duration.ofMillis(100), 2.0))
.retryOnException(this::isRetryableException)
.build());
this.client = HttpAsyncClients.custom() this.client = HttpClient.newBuilder()
.setMaxConnTotal(100) .version(HttpClient.Version.HTTP_2)
.setMaxConnPerRoute(10) .connectTimeout(Duration.ofSeconds(10))
.build(); .build();
}
this.executor = new AsyncRetryExecutor(scheduler).retryOn(ServerFailedException.class) private boolean isRetryableException(Throwable throwable) {
.retryOn(TimeoutException.class) while (throwable instanceof CompletionException) {
.retryOn(IOException.class) throwable = throwable.getCause();
.withExponentialBackoff(100, 2.0) }
.withUniformJitter()
.withMaxDelay(4000)
.withMaxRetries(retryCount);
this.client.start(); return throwable instanceof ServerFailedException ||
throwable instanceof TimeoutException ||
throwable instanceof IOException;
} }
/** /**
@ -105,100 +108,47 @@ public class Sender {
* @param message The message to send. * @param message The message to send.
* @return A future. * @return A future.
*/ */
public ListenableFuture<Result> send(Message message) { public CompletableFuture<Result> send(Message message) {
return send(message, null);
}
/**
* Asynchronously send a message with a context to be passed in the future result.
*
* @param message The message to send.
* @param requestContext An opaque context to include the future result.
* @return The future.
*/
public ListenableFuture<Result> send(final Message message, final Object requestContext) {
return executor.getFutureWithRetry(new RetryCallable<ListenableFuture<Result>>() {
@Override
public ListenableFuture<Result> call(RetryContext context) throws Exception {
SettableFuture<Result> future = SettableFuture.create();
HttpPost request = new HttpPost(url);
request.setHeader("Authorization", authorizationHeader);
request.setEntity(new StringEntity(message.serialize(),
ContentType.parse("application/json")));
client.execute(request, new ResponseHandler(future, requestContext));
return future;
}
});
}
/**
* Shut down all existing HTTP connections.
* @throws IOException
*/
public void stop() throws IOException {
this.client.close();
}
private static final class ResponseHandler implements FutureCallback<HttpResponse> {
private static final ObjectMapper objectMapper = new ObjectMapper();
static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
private final SettableFuture<Result> future;
private final Object requestContext;
public ResponseHandler(SettableFuture<Result> future, Object requestContext) {
this.future = future;
this.requestContext = requestContext;
}
@Override
public void completed(HttpResponse result) {
try { try {
String responseBody = EntityUtils.toString(result.getEntity()); HttpRequest request = HttpRequest.newBuilder()
.uri(uri)
.header("Authorization", authorizationHeader)
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(message.serialize()))
.timeout(Duration.ofSeconds(10))
.build();
switch (result.getStatusLine().getStatusCode()) { return retry.executeCompletionStage(executorService,
case 400: future.setException(new InvalidRequestException()); break; () -> client.sendAsync(request, BodyHandlers.ofByteArray())
case 401: future.setException(new AuthenticationFailedException()); break; .thenApply(response -> {
switch (response.statusCode()) {
case 400: throw new CompletionException(new InvalidRequestException());
case 401: throw new CompletionException(new AuthenticationFailedException());
case 204: case 204:
case 200: future.set(parseResult(responseBody)); break; case 200: return response.body();
default: future.setException(new ServerFailedException("Bad status: " + result.getStatusLine().getStatusCode())); default: throw new CompletionException(new ServerFailedException("Bad status: " + response.statusCode()));
} }
} catch (IOException e) { })
future.setException(e); .thenApply(responseBytes -> {
} try {
} List<GcmResponseEntity> responseList = mapper.readValue(responseBytes, GcmResponseListEntity.class).getResults();
@Override
public void failed(Exception ex) {
future.setException(ex);
}
@Override
public void cancelled() {
future.setException(new ServerFailedException("Canceled!"));
}
private Result parseResult(String body) throws IOException {
List<GcmResponseEntity> responseList = objectMapper.readValue(body, GcmResponseListEntity.class)
.getResults();
if (responseList == null || responseList.size() == 0) { if (responseList == null || responseList.size() == 0) {
throw new IOException("Empty response list!"); throw new CompletionException(new IOException("Empty response list!"));
} }
GcmResponseEntity responseEntity = responseList.get(0); GcmResponseEntity responseEntity = responseList.get(0);
return new Result(this.requestContext, return new Result(responseEntity.getCanonicalRegistrationId(),
responseEntity.getCanonicalRegistrationId(),
responseEntity.getMessageId(), responseEntity.getMessageId(),
responseEntity.getError()); responseEntity.getError());
} catch (IOException e) {
throw new CompletionException(e);
}
})).toCompletableFuture();
} catch (JsonProcessingException e) {
return CompletableFuture.failedFuture(e);
} }
} }
} }

View File

@ -1,6 +1,9 @@
package org.whispersystems.gcm.server; package org.whispersystems.gcm.server;
import com.google.common.util.concurrent.ListenableFuture; import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.squareup.okhttp.mockwebserver.MockResponse; import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.RecordedRequest; import com.squareup.okhttp.mockwebserver.RecordedRequest;
import com.squareup.okhttp.mockwebserver.rule.MockWebServerRule; import com.squareup.okhttp.mockwebserver.rule.MockWebServerRule;
@ -8,6 +11,7 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -22,25 +26,31 @@ public class SenderTest {
@Rule @Rule
public MockWebServerRule server = new MockWebServerRule(); public MockWebServerRule server = new MockWebServerRule();
private static final ObjectMapper mapper = new ObjectMapper();
static {
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
@Test @Test
public void testSuccess() throws InterruptedException, ExecutionException, TimeoutException, IOException { public void testSuccess() throws InterruptedException, ExecutionException, TimeoutException, IOException {
MockResponse successResponse = new MockResponse().setResponseCode(200) MockResponse successResponse = new MockResponse().setResponseCode(200)
.setBody(fixture("fixtures/response-success.json")); .setBody(fixture("fixtures/response-success.json"));
server.enqueue(successResponse); server.enqueue(successResponse);
String context = "my context"; Sender sender = new Sender("foobarbaz", mapper, 10, server.getUrl("/gcm/send").toExternalForm());
Sender sender = new Sender("foobarbaz", 10, server.getUrl("/gcm/send").toExternalForm()); CompletableFuture<Result> future = sender.send(Message.newBuilder().withDestination("1").build());
ListenableFuture<Result> future = sender.send(Message.newBuilder().withDestination("1").build(), context);
Result result = future.get(10, TimeUnit.SECONDS); Result result = future.get(10, TimeUnit.SECONDS);
assertEquals(result.isSuccess(), true); assertTrue(result.isSuccess());
assertEquals(result.isThrottled(), false); assertFalse(result.isThrottled());
assertEquals(result.isUnregistered(), false); assertFalse(result.isUnregistered());
assertEquals(result.getMessageId(), "1:08"); assertEquals(result.getMessageId(), "1:08");
assertNull(result.getError()); assertNull(result.getError());
assertNull(result.getCanonicalRegistrationId()); assertNull(result.getCanonicalRegistrationId());
assertEquals(context, result.getContext());
RecordedRequest request = server.takeRequest(); RecordedRequest request = server.takeRequest();
assertEquals(request.getPath(), "/gcm/send"); assertEquals(request.getPath(), "/gcm/send");
@ -51,12 +61,12 @@ public class SenderTest {
} }
@Test @Test
public void testBadApiKey() throws ExecutionException, InterruptedException, TimeoutException { public void testBadApiKey() throws InterruptedException, TimeoutException {
MockResponse unauthorizedResponse = new MockResponse().setResponseCode(401); MockResponse unauthorizedResponse = new MockResponse().setResponseCode(401);
server.enqueue(unauthorizedResponse); server.enqueue(unauthorizedResponse);
Sender sender = new Sender("foobar", 10, server.getUrl("/gcm/send").toExternalForm()); Sender sender = new Sender("foobar", mapper, 10, server.getUrl("/gcm/send").toExternalForm());
ListenableFuture<Result> future = sender.send(Message.newBuilder().withDestination("1").build()); CompletableFuture<Result> future = sender.send(Message.newBuilder().withDestination("1").build());
try { try {
future.get(10, TimeUnit.SECONDS); future.get(10, TimeUnit.SECONDS);
@ -73,8 +83,8 @@ public class SenderTest {
MockResponse malformed = new MockResponse().setResponseCode(400); MockResponse malformed = new MockResponse().setResponseCode(400);
server.enqueue(malformed); server.enqueue(malformed);
Sender sender = new Sender("foobarbaz", 10, server.getUrl("/gcm/send").toExternalForm()); Sender sender = new Sender("foobarbaz", mapper, 10, server.getUrl("/gcm/send").toExternalForm());
ListenableFuture<Result> future = sender.send(Message.newBuilder().withDestination("1").build()); CompletableFuture<Result> future = sender.send(Message.newBuilder().withDestination("1").build());
try { try {
future.get(10, TimeUnit.SECONDS); future.get(10, TimeUnit.SECONDS);
@ -93,8 +103,8 @@ public class SenderTest {
server.enqueue(error); server.enqueue(error);
server.enqueue(error); server.enqueue(error);
Sender sender = new Sender("foobarbaz", 2, server.getUrl("/gcm/send").toExternalForm()); Sender sender = new Sender("foobarbaz", mapper, 3, server.getUrl("/gcm/send").toExternalForm());
ListenableFuture<Result> future = sender.send(Message.newBuilder().withDestination("1").build()); CompletableFuture<Result> future = sender.send(Message.newBuilder().withDestination("1").build());
try { try {
future.get(10, TimeUnit.SECONDS); future.get(10, TimeUnit.SECONDS);
@ -118,15 +128,15 @@ public class SenderTest {
server.enqueue(error); server.enqueue(error);
server.enqueue(success); server.enqueue(success);
Sender sender = new Sender("foobarbaz", 3, server.getUrl("/gcm/send").toExternalForm()); Sender sender = new Sender("foobarbaz", mapper, 4, server.getUrl("/gcm/send").toExternalForm());
ListenableFuture<Result> future = sender.send(Message.newBuilder().withDestination("1").build()); CompletableFuture<Result> future = sender.send(Message.newBuilder().withDestination("1").build());
Result result = future.get(10, TimeUnit.SECONDS); Result result = future.get(10, TimeUnit.SECONDS);
assertEquals(server.getRequestCount(), 4); assertEquals(server.getRequestCount(), 4);
assertEquals(result.isSuccess(), true); assertTrue(result.isSuccess());
assertEquals(result.isThrottled(), false); assertFalse(result.isThrottled());
assertEquals(result.isUnregistered(), false); assertFalse(result.isUnregistered());
assertEquals(result.getMessageId(), "1:08"); assertEquals(result.getMessageId(), "1:08");
assertNull(result.getError()); assertNull(result.getError());
assertNull(result.getCanonicalRegistrationId()); assertNull(result.getCanonicalRegistrationId());
@ -141,11 +151,11 @@ public class SenderTest {
server.enqueue(response); server.enqueue(response);
server.enqueue(response); server.enqueue(response);
Sender sender = new Sender("foobarbaz", 2, server.getUrl("/gcm/send").toExternalForm()); Sender sender = new Sender("foobarbaz", mapper ,2, server.getUrl("/gcm/send").toExternalForm());
server.get().shutdown(); server.get().shutdown();
ListenableFuture<Result> future = sender.send(Message.newBuilder().withDestination("1").build()); CompletableFuture<Result> future = sender.send(Message.newBuilder().withDestination("1").build());
try { try {
future.get(10, TimeUnit.SECONDS); future.get(10, TimeUnit.SECONDS);
@ -161,8 +171,8 @@ public class SenderTest {
server.enqueue(response); server.enqueue(response);
Sender sender = new Sender("foobarbaz", 2, server.getUrl("/gcm/send").toExternalForm()); Sender sender = new Sender("foobarbaz", mapper,2, server.getUrl("/gcm/send").toExternalForm());
ListenableFuture<Result> future = sender.send(Message.newBuilder() CompletableFuture<Result> future = sender.send(Message.newBuilder()
.withDestination("2") .withDestination("2")
.withDataPart("message", "new message!") .withDataPart("message", "new message!")
.build()); .build());

View File

@ -1,21 +1,22 @@
package org.whispersystems.gcm.server; package org.whispersystems.gcm.server;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.tomakehurst.wiremock.junit.WireMockRule; import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.google.common.util.concurrent.ListenableFuture;
import com.squareup.okhttp.mockwebserver.MockResponse;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static com.github.tomakehurst.wiremock.client.WireMock.*; import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertTrue; import static junit.framework.TestCase.assertTrue;
import static org.whispersystems.gcm.server.util.FixtureHelpers.fixture; import static org.whispersystems.gcm.server.util.FixtureHelpers.fixture;
@ -24,6 +25,14 @@ public class SimultaneousSenderTest {
@Rule @Rule
public WireMockRule wireMock = new WireMockRule(8089); public WireMockRule wireMock = new WireMockRule(8089);
private static final ObjectMapper mapper = new ObjectMapper();
static {
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
@Test @Test
public void testSimultaneousSuccess() throws TimeoutException, InterruptedException, ExecutionException, JsonProcessingException { public void testSimultaneousSuccess() throws TimeoutException, InterruptedException, ExecutionException, JsonProcessingException {
stubFor(post(urlPathEqualTo("/gcm/send")) stubFor(post(urlPathEqualTo("/gcm/send"))
@ -31,15 +40,15 @@ public class SimultaneousSenderTest {
.withStatus(200) .withStatus(200)
.withBody(fixture("fixtures/response-success.json")))); .withBody(fixture("fixtures/response-success.json"))));
Sender sender = new Sender("foobarbaz", 2, "http://localhost:8089/gcm/send"); Sender sender = new Sender("foobarbaz", mapper, 2, "http://localhost:8089/gcm/send");
List<ListenableFuture<Result>> results = new LinkedList<>(); List<CompletableFuture<Result>> results = new LinkedList<>();
for (int i=0;i<1000;i++) { for (int i=0;i<1000;i++) {
results.add(sender.send(Message.newBuilder().withDestination("1").build())); results.add(sender.send(Message.newBuilder().withDestination("1").build()));
} }
int i=0; int i=0;
for (ListenableFuture<Result> future : results) { for (CompletableFuture<Result> future : results) {
Result result = future.get(60, TimeUnit.SECONDS); Result result = future.get(60, TimeUnit.SECONDS);
System.out.println("Got " + (i++)); System.out.println("Got " + (i++));
@ -55,18 +64,18 @@ public class SimultaneousSenderTest {
.willReturn(aResponse() .willReturn(aResponse()
.withStatus(503))); .withStatus(503)));
Sender sender = new Sender("foobarbaz", 2, "http://localhost:8089/gcm/send"); Sender sender = new Sender("foobarbaz", mapper, 2, "http://localhost:8089/gcm/send");
List<ListenableFuture<Result>> futures = new LinkedList<>(); List<CompletableFuture<Result>> futures = new LinkedList<>();
for (int i=0;i<1000;i++) { for (int i=0;i<1000;i++) {
futures.add(sender.send(Message.newBuilder().withDestination("1").build())); futures.add(sender.send(Message.newBuilder().withDestination("1").build()));
} }
for (ListenableFuture<Result> future : futures) { for (CompletableFuture<Result> future : futures) {
try { try {
Result result = future.get(60, TimeUnit.SECONDS); Result result = future.get(60, TimeUnit.SECONDS);
} catch (ExecutionException e) { } catch (ExecutionException e) {
assertTrue(e.getCause() instanceof ServerFailedException); assertTrue(e.getCause().toString(), e.getCause() instanceof ServerFailedException);
} }
} }
} }

View File

@ -18,6 +18,8 @@
<properties> <properties>
<dropwizard.version>1.3.9</dropwizard.version> <dropwizard.version>1.3.9</dropwizard.version>
<jackson.api.version>2.9.8</jackson.api.version> <jackson.api.version>2.9.8</jackson.api.version>
<resilience4j.version>0.14.1</resilience4j.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<TextSecureServer.version>2.63</TextSecureServer.version> <TextSecureServer.version>2.63</TextSecureServer.version>
</properties> </properties>

View File

@ -13,7 +13,6 @@
<version>${TextSecureServer.version}</version> <version>${TextSecureServer.version}</version>
<properties> <properties>
<resilience4j.version>0.14.1</resilience4j.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>

View File

@ -86,7 +86,6 @@ import javax.servlet.DispatcherType;
import javax.servlet.FilterRegistration; import javax.servlet.FilterRegistration;
import javax.servlet.ServletRegistration; import javax.servlet.ServletRegistration;
import java.security.Security; import java.security.Security;
import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -188,7 +187,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DispatchManager dispatchManager = new DispatchManager(cacheClientFactory, Optional.of(deadLetterHandler)); DispatchManager dispatchManager = new DispatchManager(cacheClientFactory, Optional.of(deadLetterHandler));
PubSubManager pubSubManager = new PubSubManager(cacheClient, dispatchManager); PubSubManager pubSubManager = new PubSubManager(cacheClient, dispatchManager);
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration()); APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey(), directoryQueue); GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager); WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager);
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheClient); RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheClient);

View File

@ -4,25 +4,22 @@ import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.gcm.server.Message; import org.whispersystems.gcm.server.Message;
import org.whispersystems.gcm.server.Result; import org.whispersystems.gcm.server.Result;
import org.whispersystems.gcm.server.Sender; import org.whispersystems.gcm.server.Sender;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.util.Util;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -45,23 +42,19 @@ public class GCMSender implements Managed {
put("notification", metricRegistry.meter(name(getClass(), "outbound", "notification"))); put("notification", metricRegistry.meter(name(getClass(), "outbound", "notification")));
}}; }};
private final AccountsManager accountsManager; private final AccountsManager accountsManager;
private final Sender signalSender; private final Sender signalSender;
private final DirectoryQueue directoryQueue;
private ExecutorService executor; private ExecutorService executor;
public GCMSender(AccountsManager accountsManager, String signalKey, DirectoryQueue directoryQueue) { public GCMSender(AccountsManager accountsManager, String signalKey) {
this.accountsManager = accountsManager; this.accountsManager = accountsManager;
this.signalSender = new Sender(signalKey, 50); this.signalSender = new Sender(signalKey, SystemMapper.getMapper(), 6);
this.directoryQueue = directoryQueue;
} }
@VisibleForTesting @VisibleForTesting
public GCMSender(AccountsManager accountsManager, Sender sender, DirectoryQueue directoryQueue, ExecutorService executor) { public GCMSender(AccountsManager accountsManager, Sender sender, ExecutorService executor) {
this.accountsManager = accountsManager; this.accountsManager = accountsManager;
this.signalSender = sender; this.signalSender = sender;
this.directoryQueue = directoryQueue;
this.executor = executor; this.executor = executor;
} }
@ -73,28 +66,26 @@ public class GCMSender implements Managed {
String key = message.isReceipt() ? "receipt" : "notification"; String key = message.isReceipt() ? "receipt" : "notification";
Message request = builder.withDataPart(key, "").build(); Message request = builder.withDataPart(key, "").build();
ListenableFuture<Result> future = signalSender.send(request, message); CompletableFuture<Result> future = signalSender.send(request);
markOutboundMeter(key); markOutboundMeter(key);
Futures.addCallback(future, new FutureCallback<Result>() { future.handle((result, throwable) -> {
@Override if (result != null) {
public void onSuccess(Result result) {
if (result.isUnregistered() || result.isInvalidRegistrationId()) { if (result.isUnregistered() || result.isInvalidRegistrationId()) {
handleBadRegistration(result); executor.submit(() -> handleBadRegistration(message));
} else if (result.hasCanonicalRegistrationId()) { } else if (result.hasCanonicalRegistrationId()) {
handleCanonicalRegistrationId(result); executor.submit(() -> handleCanonicalRegistrationId(message, result));
} else if (!result.isSuccess()) { } else if (!result.isSuccess()) {
handleGenericError(result); executor.submit(() -> handleGenericError(message, result));
} else { } else {
success.mark(); success.mark();
} }
} else {
logger.warn("FCM Failed: " + throwable + ", " + throwable.getCause());
} }
@Override return null;
public void onFailure(Throwable throwable) { });
logger.warn("GCM Failed: " + throwable);
}
}, executor);
} }
@Override @Override
@ -103,16 +94,15 @@ public class GCMSender implements Managed {
} }
@Override @Override
public void stop() throws IOException { public void stop() {
this.signalSender.stop();
this.executor.shutdown(); this.executor.shutdown();
} }
private void handleBadRegistration(Result result) { private void handleBadRegistration(GcmMessage message) {
GcmMessage message = (GcmMessage) result.getContext();
Optional<Account> account = getAccountForEvent(message); Optional<Account> account = getAccountForEvent(message);
if (account.isPresent()) { if (account.isPresent()) {
//noinspection OptionalGetWithoutIsPresent
Device device = account.get().getDevice(message.getDeviceId()).get(); Device device = account.get().getDevice(message.getDeviceId()).get();
if (device.getUninstalledFeedbackTimestamp() == 0) { if (device.getUninstalledFeedbackTimestamp() == 0) {
@ -124,14 +114,14 @@ public class GCMSender implements Managed {
unregistered.mark(); unregistered.mark();
} }
private void handleCanonicalRegistrationId(Result result) { private void handleCanonicalRegistrationId(GcmMessage message, Result result) {
GcmMessage message = (GcmMessage)result.getContext();
logger.warn(String.format("Actually received 'CanonicalRegistrationId' ::: (canonical=%s), (original=%s)", logger.warn(String.format("Actually received 'CanonicalRegistrationId' ::: (canonical=%s), (original=%s)",
result.getCanonicalRegistrationId(), message.getGcmId())); result.getCanonicalRegistrationId(), message.getGcmId()));
Optional<Account> account = getAccountForEvent(message); Optional<Account> account = getAccountForEvent(message);
if (account.isPresent()) { if (account.isPresent()) {
//noinspection OptionalGetWithoutIsPresent
Device device = account.get().getDevice(message.getDeviceId()).get(); Device device = account.get().getDevice(message.getDeviceId()).get();
device.setGcmId(result.getCanonicalRegistrationId()); device.setGcmId(result.getCanonicalRegistrationId());
@ -141,8 +131,7 @@ public class GCMSender implements Managed {
canonical.mark(); canonical.mark();
} }
private void handleGenericError(Result result) { private void handleGenericError(GcmMessage message, Result result) {
GcmMessage message = (GcmMessage)result.getContext();
logger.warn(String.format("Unrecoverable Error ::: (error=%s), (gcm_id=%s), " + logger.warn(String.format("Unrecoverable Error ::: (error=%s), (gcm_id=%s), " +
"(destination=%s), (device_id=%d)", "(destination=%s), (device_id=%d)",
result.getError(), message.getGcmId(), message.getNumber(), result.getError(), message.getGcmId(), message.getNumber(),

View File

@ -1,14 +1,11 @@
package org.whispersystems.textsecuregcm.tests.push; package org.whispersystems.textsecuregcm.tests.push;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Test; import org.junit.Test;
import org.mockito.Matchers;
import org.whispersystems.gcm.server.Message; import org.whispersystems.gcm.server.Message;
import org.whispersystems.gcm.server.Result; import org.whispersystems.gcm.server.Result;
import org.whispersystems.gcm.server.Sender; import org.whispersystems.gcm.server.Sender;
import org.whispersystems.textsecuregcm.push.GCMSender; import org.whispersystems.textsecuregcm.push.GCMSender;
import org.whispersystems.textsecuregcm.push.GcmMessage; import org.whispersystems.textsecuregcm.push.GcmMessage;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.Device;
@ -16,6 +13,7 @@ import org.whispersystems.textsecuregcm.tests.util.SynchronousExecutorService;
import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.util.Util;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@ -27,7 +25,6 @@ public class GCMSenderTest {
AccountsManager accountsManager = mock(AccountsManager.class); AccountsManager accountsManager = mock(AccountsManager.class);
Sender sender = mock(Sender.class ); Sender sender = mock(Sender.class );
Result successResult = mock(Result.class ); Result successResult = mock(Result.class );
DirectoryQueue directoryQueue = mock(DirectoryQueue.class );
SynchronousExecutorService executorService = new SynchronousExecutorService(); SynchronousExecutorService executorService = new SynchronousExecutorService();
when(successResult.isInvalidRegistrationId()).thenReturn(false); when(successResult.isInvalidRegistrationId()).thenReturn(false);
@ -36,17 +33,15 @@ public class GCMSenderTest {
when(successResult.isSuccess()).thenReturn(true); when(successResult.isSuccess()).thenReturn(true);
GcmMessage message = new GcmMessage("foo", "+12223334444", 1, false); GcmMessage message = new GcmMessage("foo", "+12223334444", 1, false);
GCMSender gcmSender = new GCMSender(accountsManager, sender, directoryQueue, executorService); GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService);
SettableFuture<Result> successFuture = SettableFuture.create(); CompletableFuture<Result> successFuture = CompletableFuture.completedFuture(successResult);
successFuture.set(successResult);
when(sender.send(any(Message.class), Matchers.anyObject())).thenReturn(successFuture); when(sender.send(any(Message.class))).thenReturn(successFuture);
when(successResult.getContext()).thenReturn(message);
gcmSender.sendMessage(message); gcmSender.sendMessage(message);
verify(sender, times(1)).send(any(Message.class), eq(message)); verify(sender, times(1)).send(any(Message.class));
} }
@Test @Test
@ -57,7 +52,6 @@ public class GCMSenderTest {
AccountsManager accountsManager = mock(AccountsManager.class); AccountsManager accountsManager = mock(AccountsManager.class);
Sender sender = mock(Sender.class ); Sender sender = mock(Sender.class );
Result invalidResult = mock(Result.class ); Result invalidResult = mock(Result.class );
DirectoryQueue directoryQueue = mock(DirectoryQueue.class );
SynchronousExecutorService executorService = new SynchronousExecutorService(); SynchronousExecutorService executorService = new SynchronousExecutorService();
Account destinationAccount = mock(Account.class); Account destinationAccount = mock(Account.class);
@ -73,17 +67,15 @@ public class GCMSenderTest {
when(invalidResult.isSuccess()).thenReturn(true); when(invalidResult.isSuccess()).thenReturn(true);
GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, false); GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, false);
GCMSender gcmSender = new GCMSender(accountsManager, sender, directoryQueue, executorService); GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService);
SettableFuture<Result> invalidFuture = SettableFuture.create(); CompletableFuture<Result> invalidFuture = CompletableFuture.completedFuture(invalidResult);
invalidFuture.set(invalidResult);
when(sender.send(any(Message.class), Matchers.anyObject())).thenReturn(invalidFuture); when(sender.send(any(Message.class))).thenReturn(invalidFuture);
when(invalidResult.getContext()).thenReturn(message);
gcmSender.sendMessage(message); gcmSender.sendMessage(message);
verify(sender, times(1)).send(any(Message.class), eq(message)); verify(sender, times(1)).send(any(Message.class));
verify(accountsManager, times(1)).get(eq(destinationNumber)); verify(accountsManager, times(1)).get(eq(destinationNumber));
verify(accountsManager, times(1)).update(eq(destinationAccount)); verify(accountsManager, times(1)).update(eq(destinationAccount));
verify(destinationDevice, times(1)).setUninstalledFeedbackTimestamp(eq(Util.todayInMillis())); verify(destinationDevice, times(1)).setUninstalledFeedbackTimestamp(eq(Util.todayInMillis()));
@ -102,7 +94,6 @@ public class GCMSenderTest {
Account destinationAccount = mock(Account.class ); Account destinationAccount = mock(Account.class );
Device destinationDevice = mock(Device.class ); Device destinationDevice = mock(Device.class );
DirectoryQueue directoryQueue = mock(DirectoryQueue.class);
when(destinationAccount.getDevice(1)).thenReturn(Optional.of(destinationDevice)); when(destinationAccount.getDevice(1)).thenReturn(Optional.of(destinationDevice));
when(accountsManager.get(destinationNumber)).thenReturn(Optional.of(destinationAccount)); when(accountsManager.get(destinationNumber)).thenReturn(Optional.of(destinationAccount));
@ -115,17 +106,15 @@ public class GCMSenderTest {
when(canonicalResult.getCanonicalRegistrationId()).thenReturn(canonicalId); when(canonicalResult.getCanonicalRegistrationId()).thenReturn(canonicalId);
GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, false); GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, false);
GCMSender gcmSender = new GCMSender(accountsManager, sender, directoryQueue, executorService); GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService);
SettableFuture<Result> invalidFuture = SettableFuture.create(); CompletableFuture<Result> invalidFuture = CompletableFuture.completedFuture(canonicalResult);
invalidFuture.set(canonicalResult);
when(sender.send(any(Message.class), Matchers.anyObject())).thenReturn(invalidFuture); when(sender.send(any(Message.class))).thenReturn(invalidFuture);
when(canonicalResult.getContext()).thenReturn(message);
gcmSender.sendMessage(message); gcmSender.sendMessage(message);
verify(sender, times(1)).send(any(Message.class), eq(message)); verify(sender, times(1)).send(any(Message.class));
verify(accountsManager, times(1)).get(eq(destinationNumber)); verify(accountsManager, times(1)).get(eq(destinationNumber));
verify(accountsManager, times(1)).update(eq(destinationAccount)); verify(accountsManager, times(1)).update(eq(destinationAccount));
verify(destinationDevice, times(1)).setGcmId(eq(canonicalId)); verify(destinationDevice, times(1)).setGcmId(eq(canonicalId));