diff --git a/service/config/sample.yml b/service/config/sample.yml index 0df5f4571..a44dd3ceb 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -67,6 +67,12 @@ braintree: supportedCurrenciesByPaymentMethod: PAYPAL: - usd + pubSubProject: example-project + pubSubTopic: example-topic + pubSubCredentialConfiguration: | + { + "credential": "configuration" + } dynamoDbClientConfiguration: region: us-west-2 # AWS Region diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 35a4ad8ef..d6b862a7a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -7,7 +7,14 @@ package org.whispersystems.textsecuregcm; import static com.codahale.metrics.MetricRegistry.name; import static java.util.Objects.requireNonNull; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.auth.oauth2.ExternalAccountCredentials; +import com.google.cloud.pubsub.v1.Publisher; import com.google.common.collect.Lists; +import com.google.pubsub.v1.TopicName; import io.dropwizard.auth.AuthDynamicFeature; import io.dropwizard.auth.AuthFilter; import io.dropwizard.auth.AuthValueFactoryProvider; @@ -29,7 +36,9 @@ import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.binder.grpc.MetricCollectingServerInterceptor; import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; import io.netty.channel.local.LocalAddress; +import java.io.ByteArrayInputStream; import java.net.http.HttpClient; +import java.nio.charset.StandardCharsets; import java.time.Clock; import java.time.Duration; import java.util.ArrayList; @@ -652,13 +661,39 @@ public class WhisperServerService extends Application dynamicConfigurationManager.getConfiguration().getVirtualThreads().allowedPinEvents(), config.getVirtualThreadConfiguration().pinEventThreshold()); + final Publisher pubSubPublisher; + { + final FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException) + .setMaxOutstandingElementCount(100L) + .setMaxOutstandingRequestBytes(16 * 1024 * 1024L) // 16MB + .build(); + + final BatchingSettings batchingSettings = BatchingSettings.newBuilder() + .setFlowControlSettings(flowControlSettings) + .setDelayThreshold(org.threeten.bp.Duration.ofMillis(10)) + // These thresholds are actually the default, setting them explicitly since creating a custom batchingSettings resets them + .setElementCountThreshold(100L) + .setRequestByteThreshold(5000L) + .build(); + + try (final ByteArrayInputStream credentialConfigInputStream = + new ByteArrayInputStream(config.getBraintree().pubSubCredentialConfiguration().getBytes(StandardCharsets.UTF_8))) { + + pubSubPublisher = Publisher.newBuilder(TopicName.of(config.getBraintree().pubSubProject(), config.getBraintree().pubSubTopic())) + .setCredentialsProvider(FixedCredentialsProvider.create(ExternalAccountCredentials.fromStream(credentialConfigInputStream))) + .setBatchingSettings(batchingSettings) + .build(); + } + } + StripeManager stripeManager = new StripeManager(config.getStripe().apiKey().value(), subscriptionProcessorExecutor, config.getStripe().idempotencyKeyGenerator().value(), config.getStripe().boostDescription(), config.getStripe().supportedCurrenciesByPaymentMethod()); BraintreeManager braintreeManager = new BraintreeManager(config.getBraintree().merchantId(), config.getBraintree().publicKey(), config.getBraintree().privateKey().value(), config.getBraintree().environment(), config.getBraintree().supportedCurrenciesByPaymentMethod(), config.getBraintree().merchantAccounts(), - config.getBraintree().graphqlUrl(), currencyManager, config.getBraintree().circuitBreaker(), subscriptionProcessorExecutor, + config.getBraintree().graphqlUrl(), currencyManager, pubSubPublisher, config.getBraintree().circuitBreaker(), subscriptionProcessorExecutor, subscriptionProcessorRetryExecutor); environment.lifecycle().manage(apnSender); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/BraintreeConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/BraintreeConfiguration.java index 32291b3e3..f838485ff 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/BraintreeConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/BraintreeConfiguration.java @@ -19,7 +19,7 @@ import org.whispersystems.textsecuregcm.subscriptions.PaymentMethod; * @param publicKey the Braintree API public key * @param privateKey the Braintree API private key * @param environment the Braintree environment ("production" or "sandbox") - * @param supportedCurrencies the set of supported currencies + * @param supportedCurrenciesByPaymentMethod the set of supported currencies * @param graphqlUrl the Braintree GraphQL URl to use (this must match the environment) * @param merchantAccounts merchant account within the merchant for processing individual currencies * @param circuitBreaker configuration for the circuit breaker used by the GraphQL HTTP client @@ -31,9 +31,10 @@ public record BraintreeConfiguration(@NotBlank String merchantId, @Valid @NotEmpty Map> supportedCurrenciesByPaymentMethod, @NotBlank String graphqlUrl, @NotEmpty Map merchantAccounts, - @NotNull - @Valid - CircuitBreakerConfiguration circuitBreaker) { + @NotNull @Valid CircuitBreakerConfiguration circuitBreaker, + @NotBlank String pubSubProject, + @NotBlank String pubSubTopic, + @NotBlank String pubSubCredentialConfiguration) { public BraintreeConfiguration { if (circuitBreaker == null) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/SubscriptionController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/SubscriptionController.java index 1c4425375..d93006960 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/SubscriptionController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/SubscriptionController.java @@ -784,7 +784,7 @@ public class SubscriptionController { } }) .thenCompose(unused -> braintreeManager.captureOneTimePayment(request.payerId, request.paymentId, - request.paymentToken, request.currency, request.amount, request.level)) + request.paymentToken, request.currency, request.amount, request.level, getClientPlatform(userAgent))) .thenCompose(chargeSuccessDetails -> oneTimeDonationsManager.putPaidAt(chargeSuccessDetails.paymentId(), Instant.now())) .thenApply(paymentId -> Response.ok( new ConfirmPayPalBoostResponse(paymentId)).build()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManager.java index f33170a3d..6a9b65e0a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManager.java @@ -10,12 +10,14 @@ import io.dropwizard.lifecycle.Managed; import io.lettuce.core.SetArgs; import java.io.IOException; import java.math.BigDecimal; +import java.math.RoundingMode; import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; @@ -166,4 +168,18 @@ public class CurrencyConversionManager implements Managed { return n; } } + + @VisibleForTesting + void setCachedFixerValues(final Map cachedFixerValues) { + this.cachedFixerValues = cachedFixerValues; + } + + public Optional convertToUsd(final BigDecimal amount, final String currency) { + if ("USD".equalsIgnoreCase(currency)) { + return Optional.of(amount); + } + + return Optional.ofNullable(cachedFixerValues.get(currency.toUpperCase(Locale.ROOT))) + .map(conversionRate -> amount.divide(conversionRate, 2, RoundingMode.HALF_EVEN)); + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/BraintreeManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/BraintreeManager.java index b30c537ec..1bf48b1ff 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/BraintreeManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/BraintreeManager.java @@ -19,6 +19,9 @@ import com.braintreegateway.TransactionSearchRequest; import com.braintreegateway.exceptions.BraintreeException; import com.braintreegateway.exceptions.NotFoundException; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.Publisher; import com.google.common.annotations.VisibleForTesting; import java.math.BigDecimal; import java.time.Duration; @@ -39,11 +42,14 @@ import javax.annotation.Nullable; import javax.ws.rs.ClientErrorException; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; +import com.google.pubsub.v1.PubsubMessage; +import io.micrometer.core.instrument.Metrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.currency.CurrencyConversionManager; import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.ua.ClientPlatform; @@ -54,13 +60,20 @@ public class BraintreeManager implements SubscriptionProcessorManager { private static final String GENERIC_DECLINED_PROCESSOR_CODE = "2046"; private static final String PAYPAL_FUNDING_INSTRUMENT_DECLINED_PROCESSOR_CODE = "2074"; private static final String PAYPAL_PAYMENT_ALREADY_COMPLETED_PROCESSOR_CODE = "2094"; + + private static final BigDecimal ONE_MILLION = BigDecimal.valueOf(1_000_000); + private final BraintreeGateway braintreeGateway; private final BraintreeGraphqlClient braintreeGraphqlClient; private final CurrencyConversionManager currencyConversionManager; + private final Publisher pubsubPublisher; private final Executor executor; private final Map> supportedCurrenciesByPaymentMethod; private final Map currenciesToMerchantAccounts; + private final String PUBSUB_MESSAGE_COUNTER_NAME = MetricsUtil.name(BraintreeManager.class, "pubSubMessage"); + private final String PUBSUB_MESSAGE_SUCCESS_TAG = "success"; + public BraintreeManager(final String braintreeMerchantId, final String braintreePublicKey, final String braintreePrivateKey, final String braintreeEnvironment, @@ -68,6 +81,7 @@ public class BraintreeManager implements SubscriptionProcessorManager { final Map currenciesToMerchantAccounts, final String graphqlUri, final CurrencyConversionManager currencyConversionManager, + final Publisher pubsubPublisher, final CircuitBreakerConfiguration circuitBreakerConfiguration, final Executor executor, final ScheduledExecutorService retryExecutor) { @@ -87,6 +101,7 @@ public class BraintreeManager implements SubscriptionProcessorManager { .withRequestTimeout(Duration.ofSeconds(70)) .build(), graphqlUri, braintreePublicKey, braintreePrivateKey), currencyConversionManager, + pubsubPublisher, executor); } @@ -94,12 +109,14 @@ public class BraintreeManager implements SubscriptionProcessorManager { BraintreeManager(final BraintreeGateway braintreeGateway, final Map> supportedCurrenciesByPaymentMethod, final Map currenciesToMerchantAccounts, final BraintreeGraphqlClient braintreeGraphqlClient, - final CurrencyConversionManager currencyConversionManager, final Executor executor) { + final CurrencyConversionManager currencyConversionManager, final Publisher pubsubPublisher, + final Executor executor) { this.braintreeGateway = braintreeGateway; this.supportedCurrenciesByPaymentMethod = supportedCurrenciesByPaymentMethod; this.currenciesToMerchantAccounts = currenciesToMerchantAccounts; this.braintreeGraphqlClient = braintreeGraphqlClient; this.currencyConversionManager = currencyConversionManager; + this.pubsubPublisher = pubsubPublisher; this.executor = executor; } @@ -148,7 +165,7 @@ public class BraintreeManager implements SubscriptionProcessorManager { } public CompletableFuture captureOneTimePayment(String payerId, String paymentId, - String paymentToken, String currency, long amount, long level) { + String paymentToken, String currency, long amount, long level, @Nullable ClientPlatform clientPlatform) { return braintreeGraphqlClient.tokenizePayPalOneTimePayment(payerId, paymentId, paymentToken) .thenCompose(response -> braintreeGraphqlClient.chargeOneTimePayment( response.paymentMethod.id, @@ -166,8 +183,7 @@ public class BraintreeManager implements SubscriptionProcessorManager { final Transaction unsuccessfulTx = braintreeGateway.transaction().find(chargeResponse.transaction.id); if (PAYPAL_PAYMENT_ALREADY_COMPLETED_PROCESSOR_CODE.equals(unsuccessfulTx.getProcessorResponseCode()) - || Transaction.GatewayRejectionReason.DUPLICATE.equals( - unsuccessfulTx.getGatewayRejectionReason())) { + || Transaction.GatewayRejectionReason.DUPLICATE.equals(unsuccessfulTx.getGatewayRejectionReason())) { // the payment has already been charged - maybe a previous call timed out or was interrupted - // in any case, check for a successful transaction with the paymentId final ResourceCollection search = braintreeGateway.transaction() @@ -188,6 +204,48 @@ public class BraintreeManager implements SubscriptionProcessorManager { final Transaction successfulTx = search.getFirst(); + try { + final BigDecimal originalAmountUsd = + currencyConversionManager.convertToUsd(successfulTx.getAmount(), successfulTx.getCurrencyIsoCode()) + .orElseThrow(() -> new IllegalArgumentException("Could not convert to USD from " + successfulTx.getCurrencyIsoCode())); + + final DonationsPubsub.DonationPubSubMessage.Builder donationPubSubMessageBuilder = + DonationsPubsub.DonationPubSubMessage.newBuilder() + .setTimestamp(successfulTx.getCreatedAt().toInstant().toEpochMilli() * 1000) + .setSource("app") + .setProvider("braintree") + .setRecurring(false) + .setPaymentMethodType("paypal") + .setOriginalAmountMicros(toMicros(successfulTx.getAmount())) + .setOriginalCurrency(successfulTx.getCurrencyIsoCode()) + .setOriginalAmountUsdMicros(toMicros(originalAmountUsd)); + + if (clientPlatform != null) { + donationPubSubMessageBuilder.setClientPlatform(clientPlatform.name().toLowerCase(Locale.ROOT)); + } + + ApiFutures.addCallback(pubsubPublisher.publish(PubsubMessage.newBuilder() + .setData(donationPubSubMessageBuilder.build().toByteString()) + .build()), + new ApiFutureCallback<>() { + + @Override + public void onSuccess(final String messageId) { + Metrics.counter(PUBSUB_MESSAGE_COUNTER_NAME, PUBSUB_MESSAGE_SUCCESS_TAG, "true").increment(); + } + + @Override + public void onFailure(final Throwable throwable) { + logger.warn("Failed to publish donation pub/sub message", throwable); + Metrics.counter(PUBSUB_MESSAGE_COUNTER_NAME, PUBSUB_MESSAGE_SUCCESS_TAG, "false").increment(); + + } + + }, executor); + } catch (final Exception e) { + logger.warn("Failed to construct donation pub/sub message", e); + } + return CompletableFuture.completedFuture( new PayPalChargeSuccessDetails(successfulTx.getGraphQLId())); } @@ -207,6 +265,11 @@ public class BraintreeManager implements SubscriptionProcessorManager { }, executor)); } + @VisibleForTesting + long toMicros(final BigDecimal amount) { + return amount.multiply(ONE_MILLION).longValueExact(); + } + private static PaymentStatus getPaymentStatus(Transaction.Status status) { return switch (status) { case SETTLEMENT_CONFIRMED, SETTLING, SUBMITTED_FOR_SETTLEMENT, SETTLED -> PaymentStatus.SUCCEEDED; diff --git a/service/src/main/proto/DonationsPubsub.proto b/service/src/main/proto/DonationsPubsub.proto new file mode 100644 index 000000000..4764f28fd --- /dev/null +++ b/service/src/main/proto/DonationsPubsub.proto @@ -0,0 +1,62 @@ +syntax = "proto2"; + +option java_package = "org.whispersystems.textsecuregcm.subscriptions"; + +/** + * A message that contains details about a new donation, whether a one-time "boost" or a recurring subscription. + */ +message DonationPubSubMessage { + + /** + * The instant at which this donation took place in microseconds since the epoch. + */ + required int64 timestamp = 1; + + /** + * A string identifying the source (either "web" or "app") from which this donation originated. + */ + required string source = 2; + + /** + * An identifier for the payment provider that handled this donation (e.g. "stripe" or "braintree" or "donorbox"). + */ + required string provider = 3; + + /** + * If `true`, indicates that this donation is part of a subscription. If `false`, this is a one-time donation. + */ + required bool recurring = 4; + + /** + * The type of payment method used for this donation (e.g. "credit_card" or "apple_pay" or "paypal"). + */ + required string payment_method_type = 5; + + /** + * The original amount of the donation before fees or conversion, in millionths of a full unit of the currency. For + * example, an amount of 9.75 USD would be represented as 9750000. + */ + required int64 original_amount_micros = 6; + + /** + * The ISO 4217 identifier for the original currency of this donation (e.g. "USD" or "EUR"). + */ + required string original_currency = 7; + + /** + * The amount of the donation after conversion to USD in millionths of a dollar. If the original amount was in USD, + * this value must be the same as `original_amount_micros`. + */ + required int64 original_amount_usd_micros = 8; + + /** + * The ISO 3166 country code of the country from which this donation originated. May be omitted if not known. + */ + optional string country = 9; + + /** + * The platform of the client that made this donation (e.g. "ios" or "android" or "desktop") if known. May be omitted + * if not known. + */ + optional string client_platform = 10; +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/SubscriptionControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/SubscriptionControllerTest.java index 4d1fd7b84..0c294601b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/SubscriptionControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/SubscriptionControllerTest.java @@ -321,7 +321,7 @@ class SubscriptionControllerTest { void confirmPaypalBoostProcessorError() { when(BRAINTREE_MANAGER.captureOneTimePayment(anyString(), anyString(), anyString(), anyString(), anyLong(), - anyLong())) + anyLong(), any())) .thenReturn(CompletableFuture.failedFuture(new SubscriptionProcessorException(SubscriptionProcessor.BRAINTREE, new ChargeFailure("2046", "Declined", null, null, null)))); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManagerTest.java index 4a3797c33..c394c4557 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManagerTest.java @@ -6,6 +6,7 @@ package org.whispersystems.textsecuregcm.currency; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -15,13 +16,16 @@ import java.math.BigDecimal; import java.time.Clock; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; class CurrencyConversionManagerTest { @@ -232,4 +236,30 @@ class CurrencyConversionManagerTest { assertThat(conversions.getCurrencies().get(0).getConversions().get("FKP")).isEqualTo(new BigDecimal("1.7470981")); } + @Test + void convertToUsd() { + final CurrencyConversionManager currencyConversionManager = new CurrencyConversionManager(mock(FixerClient.class), + mock(CoinMarketCapClient.class), + mock(FaultTolerantRedisCluster.class), + Collections.emptyList(), + EXECUTOR, + Clock.systemUTC()); + + currencyConversionManager.setCachedFixerValues(Map.of("JPY", BigDecimal.valueOf(154.757008), "GBP", BigDecimal.valueOf(0.81196))); + + assertEquals(Optional.of(new BigDecimal("17.50")), + currencyConversionManager.convertToUsd(new BigDecimal("17.50"), "USD")); + + assertEquals(Optional.of(new BigDecimal("17.50")), + currencyConversionManager.convertToUsd(new BigDecimal("17.50"), "usd")); + + assertEquals(Optional.empty(), + currencyConversionManager.convertToUsd(new BigDecimal("10.00"), "XYZ")); + + assertEquals(Optional.of(new BigDecimal("12.92")), + currencyConversionManager.convertToUsd(new BigDecimal("2000"), "JPY")); + + assertEquals(Optional.of(new BigDecimal("12.32")), + currencyConversionManager.convertToUsd(new BigDecimal("10"), "GBP")); + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/subscriptions/BraintreeManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/subscriptions/BraintreeManagerTest.java index 52f93f68d..f8841f3ad 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/subscriptions/BraintreeManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/subscriptions/BraintreeManagerTest.java @@ -17,6 +17,7 @@ import java.time.Duration; import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; +import com.google.cloud.pubsub.v1.Publisher; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.whispersystems.textsecuregcm.currency.CurrencyConversionManager; @@ -34,6 +35,7 @@ class BraintreeManagerTest { Map.of("usd", "usdMerchant"), mock(BraintreeGraphqlClient.class), mock(CurrencyConversionManager.class), + mock(Publisher.class), Executors.newSingleThreadExecutor()); }