diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/FcmSender.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/FcmSender.java index 7c21411f8..fb7444bb9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/FcmSender.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/FcmSender.java @@ -7,12 +7,8 @@ package org.whispersystems.textsecuregcm.push; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; -import com.google.api.core.ApiFutures; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.firebase.FirebaseApp; import com.google.firebase.FirebaseOptions; @@ -27,13 +23,13 @@ import io.micrometer.core.instrument.Timer; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.util.ExceptionUtils; +import org.whispersystems.textsecuregcm.util.GoogleApiUtil; public class FcmSender implements PushNotificationSender { @@ -96,44 +92,29 @@ public class FcmSender implements PushNotificationSender { builder.putData(key, pushNotification.data() != null ? pushNotification.data() : ""); - final Instant start = Instant.now(); - final CompletableFuture completableSendFuture = new CompletableFuture<>(); + final Timer.Sample sample = Timer.start(); - final ApiFuture sendFuture = firebaseMessagingClient.sendAsync(builder.build()); + return GoogleApiUtil.toCompletableFuture(firebaseMessagingClient.sendAsync(builder.build()), executor) + .whenComplete((ignored, throwable) -> sample.stop(SEND_NOTIFICATION_TIMER)) + .thenApply(ignored -> new SendPushNotificationResult(true, null, false)) + .exceptionally(throwable -> { + if (ExceptionUtils.unwrap(throwable) instanceof final FirebaseMessagingException firebaseMessagingException) { + final String errorCode; - // We want to record the time taken to send the push notification as directly as possible; executing this very small - // bit of non-blocking measurement on the sender thread lets us do that without picking up any confounding factors - // like having a callback waiting in an executor's queue. - sendFuture.addListener(() -> SEND_NOTIFICATION_TIMER.record(Duration.between(start, Instant.now())), - MoreExecutors.directExecutor()); + if (firebaseMessagingException.getMessagingErrorCode() != null) { + errorCode = firebaseMessagingException.getMessagingErrorCode().name(); + } else { + logger.warn("Received an FCM exception with no error code", firebaseMessagingException); + errorCode = "unknown"; + } - ApiFutures.addCallback(sendFuture, new ApiFutureCallback<>() { - @Override - public void onSuccess(final String result) { - completableSendFuture.complete(new SendPushNotificationResult(true, null, false)); - } + final boolean unregistered = + firebaseMessagingException.getMessagingErrorCode() == MessagingErrorCode.UNREGISTERED; - @Override - public void onFailure(final Throwable cause) { - if (cause instanceof final FirebaseMessagingException firebaseMessagingException) { - final String errorCode; - - if (firebaseMessagingException.getMessagingErrorCode() != null) { - errorCode = firebaseMessagingException.getMessagingErrorCode().name(); + return new SendPushNotificationResult(false, errorCode, unregistered); } else { - logger.warn("Received an FCM exception with no error code", firebaseMessagingException); - errorCode = "unknown"; + throw ExceptionUtils.wrap(throwable); } - - completableSendFuture.complete(new SendPushNotificationResult(false, - errorCode, - firebaseMessagingException.getMessagingErrorCode() == MessagingErrorCode.UNREGISTERED)); - } else { - completableSendFuture.completeExceptionally(cause); - } - } - }, executor); - - return completableSendFuture; + }); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/BraintreeManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/BraintreeManager.java index 503d92e0b..727ed3e2e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/BraintreeManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/BraintreeManager.java @@ -19,8 +19,6 @@ import com.braintreegateway.TransactionSearchRequest; import com.braintreegateway.exceptions.BraintreeException; import com.braintreegateway.exceptions.NotFoundException; import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.api.core.ApiFutureCallback; -import com.google.api.core.ApiFutures; import com.google.cloud.pubsub.v1.Publisher; import com.google.common.annotations.VisibleForTesting; import java.math.BigDecimal; @@ -50,6 +48,7 @@ import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguratio import org.whispersystems.textsecuregcm.currency.CurrencyConversionManager; import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; +import org.whispersystems.textsecuregcm.util.GoogleApiUtil; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.ua.ClientPlatform; @@ -72,7 +71,6 @@ public class BraintreeManager implements SubscriptionProcessorManager { private final Map currenciesToMerchantAccounts; private final String PUBSUB_MESSAGE_COUNTER_NAME = MetricsUtil.name(BraintreeManager.class, "pubSubMessage"); - private final String PUBSUB_MESSAGE_SUCCESS_TAG = "success"; public BraintreeManager(final String braintreeMerchantId, final String braintreePublicKey, final String braintreePrivateKey, @@ -253,21 +251,17 @@ public class BraintreeManager implements SubscriptionProcessorManager { donationPubSubMessageBuilder.setClientPlatform(clientPlatform.name().toLowerCase(Locale.ROOT)); } - ApiFutures.addCallback(pubsubPublisher.publish(PubsubMessage.newBuilder() - .setData(donationPubSubMessageBuilder.build().toByteString()) - .build()), new ApiFutureCallback<>() { + GoogleApiUtil.toCompletableFuture(pubsubPublisher.publish(PubsubMessage.newBuilder() + .setData(donationPubSubMessageBuilder.build().toByteString()) + .build()), executor) + .whenComplete((messageId, throwable) -> { + if (throwable != null) { + logger.warn("Failed to publish donation pub/sub message", throwable); + } - @Override - public void onSuccess(final String messageId) { - Metrics.counter(PUBSUB_MESSAGE_COUNTER_NAME, PUBSUB_MESSAGE_SUCCESS_TAG, "true").increment(); - } - - @Override - public void onFailure(final Throwable throwable) { - logger.warn("Failed to publish donation pub/sub message", throwable); - Metrics.counter(PUBSUB_MESSAGE_COUNTER_NAME, PUBSUB_MESSAGE_SUCCESS_TAG, "false").increment(); - } - }, executor); + Metrics.counter(PUBSUB_MESSAGE_COUNTER_NAME, "success", String.valueOf(throwable == null)) + .increment(); + }); } catch (final Exception e) { logger.warn("Failed to construct donation pub/sub message", e); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/GoogleApiUtil.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/GoogleApiUtil.java new file mode 100644 index 000000000..a017dc706 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/GoogleApiUtil.java @@ -0,0 +1,28 @@ +package org.whispersystems.textsecuregcm.util; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +public class GoogleApiUtil { + + public static CompletableFuture toCompletableFuture(final ApiFuture apiFuture, final Executor executor) { + final CompletableFuture completableFuture = new CompletableFuture<>(); + + ApiFutures.addCallback(apiFuture, new ApiFutureCallback<>() { + @Override + public void onSuccess(final T value) { + completableFuture.complete(value); + } + + @Override + public void onFailure(final Throwable throwable) { + completableFuture.completeExceptionally(throwable); + } + }, executor); + + return completableFuture; + } +}