Use a common utility for turning Google API futures into `CompletableFutures`

This commit is contained in:
Jon Chambers 2024-04-26 15:27:59 -04:00 committed by GitHub
parent 88e2687e23
commit 9d3e3c7312
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 59 additions and 56 deletions

View File

@ -7,12 +7,8 @@ package org.whispersystems.textsecuregcm.push;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.firebase.FirebaseApp;
import com.google.firebase.FirebaseOptions;
@ -27,13 +23,13 @@ import io.micrometer.core.instrument.Timer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.GoogleApiUtil;
public class FcmSender implements PushNotificationSender {
@ -96,44 +92,29 @@ public class FcmSender implements PushNotificationSender {
builder.putData(key, pushNotification.data() != null ? pushNotification.data() : "");
final Instant start = Instant.now();
final CompletableFuture<SendPushNotificationResult> completableSendFuture = new CompletableFuture<>();
final Timer.Sample sample = Timer.start();
final ApiFuture<String> sendFuture = firebaseMessagingClient.sendAsync(builder.build());
return GoogleApiUtil.toCompletableFuture(firebaseMessagingClient.sendAsync(builder.build()), executor)
.whenComplete((ignored, throwable) -> sample.stop(SEND_NOTIFICATION_TIMER))
.thenApply(ignored -> new SendPushNotificationResult(true, null, false))
.exceptionally(throwable -> {
if (ExceptionUtils.unwrap(throwable) instanceof final FirebaseMessagingException firebaseMessagingException) {
final String errorCode;
// We want to record the time taken to send the push notification as directly as possible; executing this very small
// bit of non-blocking measurement on the sender thread lets us do that without picking up any confounding factors
// like having a callback waiting in an executor's queue.
sendFuture.addListener(() -> SEND_NOTIFICATION_TIMER.record(Duration.between(start, Instant.now())),
MoreExecutors.directExecutor());
if (firebaseMessagingException.getMessagingErrorCode() != null) {
errorCode = firebaseMessagingException.getMessagingErrorCode().name();
} else {
logger.warn("Received an FCM exception with no error code", firebaseMessagingException);
errorCode = "unknown";
}
ApiFutures.addCallback(sendFuture, new ApiFutureCallback<>() {
@Override
public void onSuccess(final String result) {
completableSendFuture.complete(new SendPushNotificationResult(true, null, false));
}
final boolean unregistered =
firebaseMessagingException.getMessagingErrorCode() == MessagingErrorCode.UNREGISTERED;
@Override
public void onFailure(final Throwable cause) {
if (cause instanceof final FirebaseMessagingException firebaseMessagingException) {
final String errorCode;
if (firebaseMessagingException.getMessagingErrorCode() != null) {
errorCode = firebaseMessagingException.getMessagingErrorCode().name();
return new SendPushNotificationResult(false, errorCode, unregistered);
} else {
logger.warn("Received an FCM exception with no error code", firebaseMessagingException);
errorCode = "unknown";
throw ExceptionUtils.wrap(throwable);
}
completableSendFuture.complete(new SendPushNotificationResult(false,
errorCode,
firebaseMessagingException.getMessagingErrorCode() == MessagingErrorCode.UNREGISTERED));
} else {
completableSendFuture.completeExceptionally(cause);
}
}
}, executor);
return completableSendFuture;
});
}
}

View File

@ -19,8 +19,6 @@ import com.braintreegateway.TransactionSearchRequest;
import com.braintreegateway.exceptions.BraintreeException;
import com.braintreegateway.exceptions.NotFoundException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.annotations.VisibleForTesting;
import java.math.BigDecimal;
@ -50,6 +48,7 @@ import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguratio
import org.whispersystems.textsecuregcm.currency.CurrencyConversionManager;
import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.GoogleApiUtil;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
@ -72,7 +71,6 @@ public class BraintreeManager implements SubscriptionProcessorManager {
private final Map<String, String> currenciesToMerchantAccounts;
private final String PUBSUB_MESSAGE_COUNTER_NAME = MetricsUtil.name(BraintreeManager.class, "pubSubMessage");
private final String PUBSUB_MESSAGE_SUCCESS_TAG = "success";
public BraintreeManager(final String braintreeMerchantId, final String braintreePublicKey,
final String braintreePrivateKey,
@ -253,21 +251,17 @@ public class BraintreeManager implements SubscriptionProcessorManager {
donationPubSubMessageBuilder.setClientPlatform(clientPlatform.name().toLowerCase(Locale.ROOT));
}
ApiFutures.addCallback(pubsubPublisher.publish(PubsubMessage.newBuilder()
.setData(donationPubSubMessageBuilder.build().toByteString())
.build()), new ApiFutureCallback<>() {
GoogleApiUtil.toCompletableFuture(pubsubPublisher.publish(PubsubMessage.newBuilder()
.setData(donationPubSubMessageBuilder.build().toByteString())
.build()), executor)
.whenComplete((messageId, throwable) -> {
if (throwable != null) {
logger.warn("Failed to publish donation pub/sub message", throwable);
}
@Override
public void onSuccess(final String messageId) {
Metrics.counter(PUBSUB_MESSAGE_COUNTER_NAME, PUBSUB_MESSAGE_SUCCESS_TAG, "true").increment();
}
@Override
public void onFailure(final Throwable throwable) {
logger.warn("Failed to publish donation pub/sub message", throwable);
Metrics.counter(PUBSUB_MESSAGE_COUNTER_NAME, PUBSUB_MESSAGE_SUCCESS_TAG, "false").increment();
}
}, executor);
Metrics.counter(PUBSUB_MESSAGE_COUNTER_NAME, "success", String.valueOf(throwable == null))
.increment();
});
} catch (final Exception e) {
logger.warn("Failed to construct donation pub/sub message", e);
}

View File

@ -0,0 +1,28 @@
package org.whispersystems.textsecuregcm.util;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
public class GoogleApiUtil {
public static <T> CompletableFuture<T> toCompletableFuture(final ApiFuture<T> apiFuture, final Executor executor) {
final CompletableFuture<T> completableFuture = new CompletableFuture<>();
ApiFutures.addCallback(apiFuture, new ApiFutureCallback<>() {
@Override
public void onSuccess(final T value) {
completableFuture.complete(value);
}
@Override
public void onFailure(final Throwable throwable) {
completableFuture.completeExceptionally(throwable);
}
}, executor);
return completableFuture;
}
}