diff --git a/gcm-sender-async/pom.xml b/gcm-sender-async/pom.xml
index e9cb035b0..00285aab4 100644
--- a/gcm-sender-async/pom.xml
+++ b/gcm-sender-async/pom.xml
@@ -15,28 +15,18 @@
- org.apache.httpcomponents
- httpasyncclient
- 4.0.2
-
-
- com.nurkiewicz.asyncretry
- asyncretry-jdk7
- 0.0.5
+ io.github.resilience4j
+ resilience4j-retry
+ ${resilience4j.version}
+
com.squareup.okhttp
mockwebserver
2.1.0
test
-
- com.github.tomakehurst
- wiremock
- 1.52
- test
-
diff --git a/gcm-sender-async/src/main/java/org/whispersystems/gcm/server/Result.java b/gcm-sender-async/src/main/java/org/whispersystems/gcm/server/Result.java
index e76a2ad4d..20014bea2 100644
--- a/gcm-sender-async/src/main/java/org/whispersystems/gcm/server/Result.java
+++ b/gcm-sender-async/src/main/java/org/whispersystems/gcm/server/Result.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (C) 2015 Open Whisper Systems
*
* This program is free software: you can redistribute it and/or modify
@@ -21,13 +21,11 @@ package org.whispersystems.gcm.server;
*/
public class Result {
- private final Object context;
private final String canonicalRegistrationId;
private final String messageId;
private final String error;
- Result(Object context, String canonicalRegistrationId, String messageId, String error) {
- this.context = context;
+ Result(String canonicalRegistrationId, String messageId, String error) {
this.canonicalRegistrationId = canonicalRegistrationId;
this.messageId = messageId;
this.error = error;
@@ -91,10 +89,4 @@ public class Result {
return "InvalidRegistration".equals(error);
}
- /**
- * @return The context passed into Sender.send(), if any.
- */
- public Object getContext() {
- return context;
- }
}
diff --git a/gcm-sender-async/src/main/java/org/whispersystems/gcm/server/Sender.java b/gcm-sender-async/src/main/java/org/whispersystems/gcm/server/Sender.java
index 0cb572960..06aad2176 100644
--- a/gcm-sender-async/src/main/java/org/whispersystems/gcm/server/Sender.java
+++ b/gcm-sender-async/src/main/java/org/whispersystems/gcm/server/Sender.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (C) 2015 Open Whisper Systems
*
* This program is free software: you can redistribute it and/or modify
@@ -16,33 +16,29 @@
*/
package org.whispersystems.gcm.server;
-import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
-import com.nurkiewicz.asyncretry.RetryContext;
-import com.nurkiewicz.asyncretry.RetryExecutor;
-import com.nurkiewicz.asyncretry.function.RetryCallable;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.concurrent.FutureCallback;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.impl.nio.client.HttpAsyncClients;
-import org.apache.http.util.EntityUtils;
import org.whispersystems.gcm.server.internal.GcmResponseEntity;
import org.whispersystems.gcm.server.internal.GcmResponseListEntity;
-import javax.net.ssl.SSLContext;
import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse.BodyHandlers;
+import java.time.Duration;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
+import io.github.resilience4j.retry.IntervalFunction;
+import io.github.resilience4j.retry.Retry;
+import io.github.resilience4j.retry.RetryConfig;
+
/**
* The main interface to sending GCM messages. Thread safe.
*
@@ -52,18 +48,20 @@ public class Sender {
private static final String PRODUCTION_URL = "https://fcm.googleapis.com/fcm/send";
- private final CloseableHttpAsyncClient client;
+ private final HttpClient client;
private final String authorizationHeader;
- private final RetryExecutor executor;
- private final String url;
+ private final URI uri;
+ private final Retry retry;
+ private final ObjectMapper mapper;
+ private final ScheduledExecutorService executorService;
/**
* Construct a Sender instance.
*
* @param apiKey Your application's GCM API key.
*/
- public Sender(String apiKey) {
- this(apiKey, 10);
+ public Sender(String apiKey, ObjectMapper mapper) {
+ this(apiKey, mapper, 10);
}
/**
@@ -72,31 +70,36 @@ public class Sender {
* @param apiKey Your application's GCM API key.
* @param retryCount The number of retries to attempt on a network error or 500 response.
*/
- public Sender(String apiKey, int retryCount) {
- this(apiKey, retryCount, PRODUCTION_URL);
+ public Sender(String apiKey, ObjectMapper mapper, int retryCount) {
+ this(apiKey, mapper, retryCount, PRODUCTION_URL);
}
@VisibleForTesting
- public Sender(String apiKey, int retryCount, String url) {
- ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
-
- this.url = url;
+ public Sender(String apiKey, ObjectMapper mapper, int retryCount, String url) {
+ this.mapper = mapper;
+ this.executorService = Executors.newSingleThreadScheduledExecutor();
+ this.uri = URI.create(url);
this.authorizationHeader = String.format("key=%s", apiKey);
+ this.retry = Retry.of("fcm-sender", RetryConfig.custom()
+ .maxAttempts(retryCount)
+ .intervalFunction(IntervalFunction.ofExponentialRandomBackoff(Duration.ofMillis(100), 2.0))
+ .retryOnException(this::isRetryableException)
+ .build());
- this.client = HttpAsyncClients.custom()
- .setMaxConnTotal(100)
- .setMaxConnPerRoute(10)
- .build();
+ this.client = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .connectTimeout(Duration.ofSeconds(10))
+ .build();
+ }
- this.executor = new AsyncRetryExecutor(scheduler).retryOn(ServerFailedException.class)
- .retryOn(TimeoutException.class)
- .retryOn(IOException.class)
- .withExponentialBackoff(100, 2.0)
- .withUniformJitter()
- .withMaxDelay(4000)
- .withMaxRetries(retryCount);
+ private boolean isRetryableException(Throwable throwable) {
+ while (throwable instanceof CompletionException) {
+ throwable = throwable.getCause();
+ }
- this.client.start();
+ return throwable instanceof ServerFailedException ||
+ throwable instanceof TimeoutException ||
+ throwable instanceof IOException;
}
/**
@@ -105,100 +108,47 @@ public class Sender {
* @param message The message to send.
* @return A future.
*/
- public ListenableFuture send(Message message) {
- return send(message, null);
- }
+ public CompletableFuture send(Message message) {
+ try {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(uri)
+ .header("Authorization", authorizationHeader)
+ .header("Content-Type", "application/json")
+ .POST(HttpRequest.BodyPublishers.ofString(message.serialize()))
+ .timeout(Duration.ofSeconds(10))
+ .build();
- /**
- * Asynchronously send a message with a context to be passed in the future result.
- *
- * @param message The message to send.
- * @param requestContext An opaque context to include the future result.
- * @return The future.
- */
- public ListenableFuture send(final Message message, final Object requestContext) {
- return executor.getFutureWithRetry(new RetryCallable>() {
- @Override
- public ListenableFuture call(RetryContext context) throws Exception {
- SettableFuture future = SettableFuture.create();
- HttpPost request = new HttpPost(url);
+ return retry.executeCompletionStage(executorService,
+ () -> client.sendAsync(request, BodyHandlers.ofByteArray())
+ .thenApply(response -> {
+ switch (response.statusCode()) {
+ case 400: throw new CompletionException(new InvalidRequestException());
+ case 401: throw new CompletionException(new AuthenticationFailedException());
+ case 204:
+ case 200: return response.body();
+ default: throw new CompletionException(new ServerFailedException("Bad status: " + response.statusCode()));
+ }
+ })
+ .thenApply(responseBytes -> {
+ try {
+ List responseList = mapper.readValue(responseBytes, GcmResponseListEntity.class).getResults();
- request.setHeader("Authorization", authorizationHeader);
- request.setEntity(new StringEntity(message.serialize(),
- ContentType.parse("application/json")));
+ if (responseList == null || responseList.size() == 0) {
+ throw new CompletionException(new IOException("Empty response list!"));
+ }
- client.execute(request, new ResponseHandler(future, requestContext));
+ GcmResponseEntity responseEntity = responseList.get(0);
- return future;
- }
- });
- }
-
- /**
- * Shut down all existing HTTP connections.
- * @throws IOException
- */
- public void stop() throws IOException {
- this.client.close();
- }
-
- private static final class ResponseHandler implements FutureCallback {
-
- private static final ObjectMapper objectMapper = new ObjectMapper();
-
- static {
- objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- }
-
- private final SettableFuture future;
- private final Object requestContext;
-
- public ResponseHandler(SettableFuture future, Object requestContext) {
- this.future = future;
- this.requestContext = requestContext;
- }
-
- @Override
- public void completed(HttpResponse result) {
- try {
- String responseBody = EntityUtils.toString(result.getEntity());
-
- switch (result.getStatusLine().getStatusCode()) {
- case 400: future.setException(new InvalidRequestException()); break;
- case 401: future.setException(new AuthenticationFailedException()); break;
- case 204:
- case 200: future.set(parseResult(responseBody)); break;
- default: future.setException(new ServerFailedException("Bad status: " + result.getStatusLine().getStatusCode()));
- }
- } catch (IOException e) {
- future.setException(e);
- }
- }
-
- @Override
- public void failed(Exception ex) {
- future.setException(ex);
- }
-
- @Override
- public void cancelled() {
- future.setException(new ServerFailedException("Canceled!"));
- }
-
- private Result parseResult(String body) throws IOException {
- List responseList = objectMapper.readValue(body, GcmResponseListEntity.class)
- .getResults();
-
- if (responseList == null || responseList.size() == 0) {
- throw new IOException("Empty response list!");
- }
-
- GcmResponseEntity responseEntity = responseList.get(0);
-
- return new Result(this.requestContext,
- responseEntity.getCanonicalRegistrationId(),
- responseEntity.getMessageId(),
- responseEntity.getError());
+ return new Result(responseEntity.getCanonicalRegistrationId(),
+ responseEntity.getMessageId(),
+ responseEntity.getError());
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ })).toCompletableFuture();
+ } catch (JsonProcessingException e) {
+ return CompletableFuture.failedFuture(e);
}
}
+
}
\ No newline at end of file
diff --git a/gcm-sender-async/src/test/java/org/whispersystems/gcm/server/SenderTest.java b/gcm-sender-async/src/test/java/org/whispersystems/gcm/server/SenderTest.java
index c3c7b199f..fa98cd268 100644
--- a/gcm-sender-async/src/test/java/org/whispersystems/gcm/server/SenderTest.java
+++ b/gcm-sender-async/src/test/java/org/whispersystems/gcm/server/SenderTest.java
@@ -1,6 +1,9 @@
package org.whispersystems.gcm.server;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import com.squareup.okhttp.mockwebserver.rule.MockWebServerRule;
@@ -8,6 +11,7 @@ import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -22,25 +26,31 @@ public class SenderTest {
@Rule
public MockWebServerRule server = new MockWebServerRule();
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ static {
+ mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
+ mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
@Test
public void testSuccess() throws InterruptedException, ExecutionException, TimeoutException, IOException {
MockResponse successResponse = new MockResponse().setResponseCode(200)
.setBody(fixture("fixtures/response-success.json"));
server.enqueue(successResponse);
- String context = "my context";
- Sender sender = new Sender("foobarbaz", 10, server.getUrl("/gcm/send").toExternalForm());
- ListenableFuture future = sender.send(Message.newBuilder().withDestination("1").build(), context);
+ Sender sender = new Sender("foobarbaz", mapper, 10, server.getUrl("/gcm/send").toExternalForm());
+ CompletableFuture future = sender.send(Message.newBuilder().withDestination("1").build());
Result result = future.get(10, TimeUnit.SECONDS);
- assertEquals(result.isSuccess(), true);
- assertEquals(result.isThrottled(), false);
- assertEquals(result.isUnregistered(), false);
+ assertTrue(result.isSuccess());
+ assertFalse(result.isThrottled());
+ assertFalse(result.isUnregistered());
assertEquals(result.getMessageId(), "1:08");
assertNull(result.getError());
assertNull(result.getCanonicalRegistrationId());
- assertEquals(context, result.getContext());
RecordedRequest request = server.takeRequest();
assertEquals(request.getPath(), "/gcm/send");
@@ -51,12 +61,12 @@ public class SenderTest {
}
@Test
- public void testBadApiKey() throws ExecutionException, InterruptedException, TimeoutException {
+ public void testBadApiKey() throws InterruptedException, TimeoutException {
MockResponse unauthorizedResponse = new MockResponse().setResponseCode(401);
server.enqueue(unauthorizedResponse);
- Sender sender = new Sender("foobar", 10, server.getUrl("/gcm/send").toExternalForm());
- ListenableFuture future = sender.send(Message.newBuilder().withDestination("1").build());
+ Sender sender = new Sender("foobar", mapper, 10, server.getUrl("/gcm/send").toExternalForm());
+ CompletableFuture future = sender.send(Message.newBuilder().withDestination("1").build());
try {
future.get(10, TimeUnit.SECONDS);
@@ -73,8 +83,8 @@ public class SenderTest {
MockResponse malformed = new MockResponse().setResponseCode(400);
server.enqueue(malformed);
- Sender sender = new Sender("foobarbaz", 10, server.getUrl("/gcm/send").toExternalForm());
- ListenableFuture future = sender.send(Message.newBuilder().withDestination("1").build());
+ Sender sender = new Sender("foobarbaz", mapper, 10, server.getUrl("/gcm/send").toExternalForm());
+ CompletableFuture future = sender.send(Message.newBuilder().withDestination("1").build());
try {
future.get(10, TimeUnit.SECONDS);
@@ -93,8 +103,8 @@ public class SenderTest {
server.enqueue(error);
server.enqueue(error);
- Sender sender = new Sender("foobarbaz", 2, server.getUrl("/gcm/send").toExternalForm());
- ListenableFuture future = sender.send(Message.newBuilder().withDestination("1").build());
+ Sender sender = new Sender("foobarbaz", mapper, 3, server.getUrl("/gcm/send").toExternalForm());
+ CompletableFuture future = sender.send(Message.newBuilder().withDestination("1").build());
try {
future.get(10, TimeUnit.SECONDS);
@@ -118,15 +128,15 @@ public class SenderTest {
server.enqueue(error);
server.enqueue(success);
- Sender sender = new Sender("foobarbaz", 3, server.getUrl("/gcm/send").toExternalForm());
- ListenableFuture future = sender.send(Message.newBuilder().withDestination("1").build());
+ Sender sender = new Sender("foobarbaz", mapper, 4, server.getUrl("/gcm/send").toExternalForm());
+ CompletableFuture future = sender.send(Message.newBuilder().withDestination("1").build());
Result result = future.get(10, TimeUnit.SECONDS);
assertEquals(server.getRequestCount(), 4);
- assertEquals(result.isSuccess(), true);
- assertEquals(result.isThrottled(), false);
- assertEquals(result.isUnregistered(), false);
+ assertTrue(result.isSuccess());
+ assertFalse(result.isThrottled());
+ assertFalse(result.isUnregistered());
assertEquals(result.getMessageId(), "1:08");
assertNull(result.getError());
assertNull(result.getCanonicalRegistrationId());
@@ -141,11 +151,11 @@ public class SenderTest {
server.enqueue(response);
server.enqueue(response);
- Sender sender = new Sender("foobarbaz", 2, server.getUrl("/gcm/send").toExternalForm());
+ Sender sender = new Sender("foobarbaz", mapper ,2, server.getUrl("/gcm/send").toExternalForm());
server.get().shutdown();
- ListenableFuture future = sender.send(Message.newBuilder().withDestination("1").build());
+ CompletableFuture future = sender.send(Message.newBuilder().withDestination("1").build());
try {
future.get(10, TimeUnit.SECONDS);
@@ -161,8 +171,8 @@ public class SenderTest {
server.enqueue(response);
- Sender sender = new Sender("foobarbaz", 2, server.getUrl("/gcm/send").toExternalForm());
- ListenableFuture future = sender.send(Message.newBuilder()
+ Sender sender = new Sender("foobarbaz", mapper,2, server.getUrl("/gcm/send").toExternalForm());
+ CompletableFuture future = sender.send(Message.newBuilder()
.withDestination("2")
.withDataPart("message", "new message!")
.build());
diff --git a/gcm-sender-async/src/test/java/org/whispersystems/gcm/server/SimultaneousSenderTest.java b/gcm-sender-async/src/test/java/org/whispersystems/gcm/server/SimultaneousSenderTest.java
index f7fb184fa..2e7fe1561 100644
--- a/gcm-sender-async/src/test/java/org/whispersystems/gcm/server/SimultaneousSenderTest.java
+++ b/gcm-sender-async/src/test/java/org/whispersystems/gcm/server/SimultaneousSenderTest.java
@@ -1,21 +1,22 @@
package org.whispersystems.gcm.server;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.squareup.okhttp.mockwebserver.MockResponse;
import org.junit.Rule;
import org.junit.Test;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
-import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertTrue;
import static org.whispersystems.gcm.server.util.FixtureHelpers.fixture;
@@ -24,6 +25,14 @@ public class SimultaneousSenderTest {
@Rule
public WireMockRule wireMock = new WireMockRule(8089);
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ static {
+ mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
+ mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
@Test
public void testSimultaneousSuccess() throws TimeoutException, InterruptedException, ExecutionException, JsonProcessingException {
stubFor(post(urlPathEqualTo("/gcm/send"))
@@ -31,15 +40,15 @@ public class SimultaneousSenderTest {
.withStatus(200)
.withBody(fixture("fixtures/response-success.json"))));
- Sender sender = new Sender("foobarbaz", 2, "http://localhost:8089/gcm/send");
- List> results = new LinkedList<>();
+ Sender sender = new Sender("foobarbaz", mapper, 2, "http://localhost:8089/gcm/send");
+ List> results = new LinkedList<>();
for (int i=0;i<1000;i++) {
results.add(sender.send(Message.newBuilder().withDestination("1").build()));
}
int i=0;
- for (ListenableFuture future : results) {
+ for (CompletableFuture future : results) {
Result result = future.get(60, TimeUnit.SECONDS);
System.out.println("Got " + (i++));
@@ -55,18 +64,18 @@ public class SimultaneousSenderTest {
.willReturn(aResponse()
.withStatus(503)));
- Sender sender = new Sender("foobarbaz", 2, "http://localhost:8089/gcm/send");
- List> futures = new LinkedList<>();
+ Sender sender = new Sender("foobarbaz", mapper, 2, "http://localhost:8089/gcm/send");
+ List> futures = new LinkedList<>();
for (int i=0;i<1000;i++) {
futures.add(sender.send(Message.newBuilder().withDestination("1").build()));
}
- for (ListenableFuture future : futures) {
+ for (CompletableFuture future : futures) {
try {
Result result = future.get(60, TimeUnit.SECONDS);
} catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof ServerFailedException);
+ assertTrue(e.getCause().toString(), e.getCause() instanceof ServerFailedException);
}
}
}
diff --git a/pom.xml b/pom.xml
index d1ee288ac..1dd182844 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,6 +18,8 @@
1.3.9
2.9.8
+ 0.14.1
+
UTF-8
2.63
diff --git a/service/pom.xml b/service/pom.xml
index 8ab88cca2..b151cc985 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -13,7 +13,6 @@
${TextSecureServer.version}
- 0.14.1
UTF-8
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
index 259929c43..279950c71 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
@@ -86,7 +86,6 @@ import javax.servlet.DispatcherType;
import javax.servlet.FilterRegistration;
import javax.servlet.ServletRegistration;
import java.security.Security;
-import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
@@ -188,7 +187,7 @@ public class WhisperServerService extends Application future = signalSender.send(request, message);
+ CompletableFuture future = signalSender.send(request);
markOutboundMeter(key);
- Futures.addCallback(future, new FutureCallback() {
- @Override
- public void onSuccess(Result result) {
+ future.handle((result, throwable) -> {
+ if (result != null) {
if (result.isUnregistered() || result.isInvalidRegistrationId()) {
- handleBadRegistration(result);
+ executor.submit(() -> handleBadRegistration(message));
} else if (result.hasCanonicalRegistrationId()) {
- handleCanonicalRegistrationId(result);
+ executor.submit(() -> handleCanonicalRegistrationId(message, result));
} else if (!result.isSuccess()) {
- handleGenericError(result);
+ executor.submit(() -> handleGenericError(message, result));
} else {
success.mark();
}
+ } else {
+ logger.warn("FCM Failed: " + throwable + ", " + throwable.getCause());
}
- @Override
- public void onFailure(Throwable throwable) {
- logger.warn("GCM Failed: " + throwable);
- }
- }, executor);
+ return null;
+ });
}
@Override
@@ -103,16 +94,15 @@ public class GCMSender implements Managed {
}
@Override
- public void stop() throws IOException {
- this.signalSender.stop();
+ public void stop() {
this.executor.shutdown();
}
- private void handleBadRegistration(Result result) {
- GcmMessage message = (GcmMessage) result.getContext();
+ private void handleBadRegistration(GcmMessage message) {
Optional account = getAccountForEvent(message);
if (account.isPresent()) {
+ //noinspection OptionalGetWithoutIsPresent
Device device = account.get().getDevice(message.getDeviceId()).get();
if (device.getUninstalledFeedbackTimestamp() == 0) {
@@ -124,14 +114,14 @@ public class GCMSender implements Managed {
unregistered.mark();
}
- private void handleCanonicalRegistrationId(Result result) {
- GcmMessage message = (GcmMessage)result.getContext();
+ private void handleCanonicalRegistrationId(GcmMessage message, Result result) {
logger.warn(String.format("Actually received 'CanonicalRegistrationId' ::: (canonical=%s), (original=%s)",
result.getCanonicalRegistrationId(), message.getGcmId()));
Optional account = getAccountForEvent(message);
if (account.isPresent()) {
+ //noinspection OptionalGetWithoutIsPresent
Device device = account.get().getDevice(message.getDeviceId()).get();
device.setGcmId(result.getCanonicalRegistrationId());
@@ -141,8 +131,7 @@ public class GCMSender implements Managed {
canonical.mark();
}
- private void handleGenericError(Result result) {
- GcmMessage message = (GcmMessage)result.getContext();
+ private void handleGenericError(GcmMessage message, Result result) {
logger.warn(String.format("Unrecoverable Error ::: (error=%s), (gcm_id=%s), " +
"(destination=%s), (device_id=%d)",
result.getError(), message.getGcmId(), message.getNumber(),
diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/push/GCMSenderTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/push/GCMSenderTest.java
index efe038144..7f590a794 100644
--- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/push/GCMSenderTest.java
+++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/push/GCMSenderTest.java
@@ -1,14 +1,11 @@
package org.whispersystems.textsecuregcm.tests.push;
-import com.google.common.util.concurrent.SettableFuture;
import org.junit.Test;
-import org.mockito.Matchers;
import org.whispersystems.gcm.server.Message;
import org.whispersystems.gcm.server.Result;
import org.whispersystems.gcm.server.Sender;
import org.whispersystems.textsecuregcm.push.GCMSender;
import org.whispersystems.textsecuregcm.push.GcmMessage;
-import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@@ -16,6 +13,7 @@ import org.whispersystems.textsecuregcm.tests.util.SynchronousExecutorService;
import org.whispersystems.textsecuregcm.util.Util;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
@@ -27,7 +25,6 @@ public class GCMSenderTest {
AccountsManager accountsManager = mock(AccountsManager.class);
Sender sender = mock(Sender.class );
Result successResult = mock(Result.class );
- DirectoryQueue directoryQueue = mock(DirectoryQueue.class );
SynchronousExecutorService executorService = new SynchronousExecutorService();
when(successResult.isInvalidRegistrationId()).thenReturn(false);
@@ -36,17 +33,15 @@ public class GCMSenderTest {
when(successResult.isSuccess()).thenReturn(true);
GcmMessage message = new GcmMessage("foo", "+12223334444", 1, false);
- GCMSender gcmSender = new GCMSender(accountsManager, sender, directoryQueue, executorService);
+ GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService);
- SettableFuture successFuture = SettableFuture.create();
- successFuture.set(successResult);
+ CompletableFuture successFuture = CompletableFuture.completedFuture(successResult);
- when(sender.send(any(Message.class), Matchers.anyObject())).thenReturn(successFuture);
- when(successResult.getContext()).thenReturn(message);
+ when(sender.send(any(Message.class))).thenReturn(successFuture);
gcmSender.sendMessage(message);
- verify(sender, times(1)).send(any(Message.class), eq(message));
+ verify(sender, times(1)).send(any(Message.class));
}
@Test
@@ -57,7 +52,6 @@ public class GCMSenderTest {
AccountsManager accountsManager = mock(AccountsManager.class);
Sender sender = mock(Sender.class );
Result invalidResult = mock(Result.class );
- DirectoryQueue directoryQueue = mock(DirectoryQueue.class );
SynchronousExecutorService executorService = new SynchronousExecutorService();
Account destinationAccount = mock(Account.class);
@@ -73,17 +67,15 @@ public class GCMSenderTest {
when(invalidResult.isSuccess()).thenReturn(true);
GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, false);
- GCMSender gcmSender = new GCMSender(accountsManager, sender, directoryQueue, executorService);
+ GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService);
- SettableFuture invalidFuture = SettableFuture.create();
- invalidFuture.set(invalidResult);
+ CompletableFuture invalidFuture = CompletableFuture.completedFuture(invalidResult);
- when(sender.send(any(Message.class), Matchers.anyObject())).thenReturn(invalidFuture);
- when(invalidResult.getContext()).thenReturn(message);
+ when(sender.send(any(Message.class))).thenReturn(invalidFuture);
gcmSender.sendMessage(message);
- verify(sender, times(1)).send(any(Message.class), eq(message));
+ verify(sender, times(1)).send(any(Message.class));
verify(accountsManager, times(1)).get(eq(destinationNumber));
verify(accountsManager, times(1)).update(eq(destinationAccount));
verify(destinationDevice, times(1)).setUninstalledFeedbackTimestamp(eq(Util.todayInMillis()));
@@ -102,7 +94,6 @@ public class GCMSenderTest {
Account destinationAccount = mock(Account.class );
Device destinationDevice = mock(Device.class );
- DirectoryQueue directoryQueue = mock(DirectoryQueue.class);
when(destinationAccount.getDevice(1)).thenReturn(Optional.of(destinationDevice));
when(accountsManager.get(destinationNumber)).thenReturn(Optional.of(destinationAccount));
@@ -115,17 +106,15 @@ public class GCMSenderTest {
when(canonicalResult.getCanonicalRegistrationId()).thenReturn(canonicalId);
GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, false);
- GCMSender gcmSender = new GCMSender(accountsManager, sender, directoryQueue, executorService);
+ GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService);
- SettableFuture invalidFuture = SettableFuture.create();
- invalidFuture.set(canonicalResult);
+ CompletableFuture invalidFuture = CompletableFuture.completedFuture(canonicalResult);
- when(sender.send(any(Message.class), Matchers.anyObject())).thenReturn(invalidFuture);
- when(canonicalResult.getContext()).thenReturn(message);
+ when(sender.send(any(Message.class))).thenReturn(invalidFuture);
gcmSender.sendMessage(message);
- verify(sender, times(1)).send(any(Message.class), eq(message));
+ verify(sender, times(1)).send(any(Message.class));
verify(accountsManager, times(1)).get(eq(destinationNumber));
verify(accountsManager, times(1)).update(eq(destinationAccount));
verify(destinationDevice, times(1)).setGcmId(eq(canonicalId));