From 9f3ffa3707caf8945057e19c8723662e35a21e0b Mon Sep 17 00:00:00 2001 From: Sergey Skrobotov Date: Fri, 8 Sep 2023 18:50:02 -0700 Subject: [PATCH] gRPC API for payments service --- .../textsecuregcm/WhisperServerService.java | 4 +- .../currency/CurrencyConversionManager.java | 44 +++++++----- .../grpc/PaymentsGrpcService.java | 63 ++++++++++++++++ .../main/proto/org/signal/chat/payments.proto | 36 ++++++++++ .../CurrencyConversionManagerTest.java | 21 ++++-- .../grpc/PaymentsGrpcServiceTest.java | 72 +++++++++++++++++++ 6 files changed, 215 insertions(+), 25 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/grpc/PaymentsGrpcService.java create mode 100644 service/src/main/proto/org/signal/chat/payments.proto create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/grpc/PaymentsGrpcServiceTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 3bda191e8..7502db26f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -124,6 +124,7 @@ import org.whispersystems.textsecuregcm.grpc.ErrorMappingInterceptor; import org.whispersystems.textsecuregcm.grpc.GrpcServerManagedWrapper; import org.whispersystems.textsecuregcm.grpc.KeysAnonymousGrpcService; import org.whispersystems.textsecuregcm.grpc.KeysGrpcService; +import org.whispersystems.textsecuregcm.grpc.PaymentsGrpcService; import org.whispersystems.textsecuregcm.grpc.ProfileAnonymousGrpcService; import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService; import org.whispersystems.textsecuregcm.grpc.UserAgentInterceptor; @@ -588,7 +589,7 @@ public class WhisperServerService extends Application grpcServer = ServerBuilder.forPort(config.getGrpcPort()) .addService(ServerInterceptors.intercept(new KeysGrpcService(accountsManager, keys, rateLimiters), basicCredentialAuthenticationInterceptor)) .addService(new KeysAnonymousGrpcService(accountsManager, keys)) + .addService(new PaymentsGrpcService(currencyManager)) .addService(ServerInterceptors.intercept(new ProfileGrpcService(clock, accountsManager, profilesManager, dynamicConfigurationManager, config.getBadges(), asyncCdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, profileBadgeConverter, rateLimiters, zkProfileOperations, config.getCdnConfiguration().bucket()), basicCredentialAuthenticationInterceptor)) .addService(new ProfileAnonymousGrpcService(accountsManager, profilesManager, profileBadgeConverter, zkProfileOperations)); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManager.java index a151ffd17..f33170a3d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManager.java @@ -1,3 +1,8 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + package org.whispersystems.textsecuregcm.currency; import com.google.common.annotations.VisibleForTesting; @@ -13,13 +18,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntity; import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.util.Util; public class CurrencyConversionManager implements Managed { @@ -32,31 +38,42 @@ public class CurrencyConversionManager implements Managed { @VisibleForTesting static final String COIN_MARKET_CAP_SHARED_CACHE_CURRENT_KEY = "CurrencyConversionManager::CoinMarketCapCacheCurrent"; + private static final String COIN_MARKET_CAP_SHARED_CACHE_DATA_KEY = "CurrencyConversionManager::CoinMarketCapCacheData"; - private final FixerClient fixerClient; + private final FixerClient fixerClient; + private final CoinMarketCapClient coinMarketCapClient; + private final FaultTolerantRedisCluster cacheCluster; + private final Clock clock; private final List currencies; + private final ScheduledExecutorService executor; + private final AtomicReference cached = new AtomicReference<>(null); private Instant fixerUpdatedTimestamp = Instant.MIN; private Map cachedFixerValues; + private Map cachedCoinMarketCapValues; - public CurrencyConversionManager(final FixerClient fixerClient, + + public CurrencyConversionManager( + final FixerClient fixerClient, final CoinMarketCapClient coinMarketCapClient, final FaultTolerantRedisCluster cacheCluster, final List currencies, + final ScheduledExecutorService executor, final Clock clock) { this.fixerClient = fixerClient; this.coinMarketCapClient = coinMarketCapClient; this.cacheCluster = cacheCluster; this.currencies = currencies; + this.executor = executor; this.clock = clock; } @@ -66,22 +83,13 @@ public class CurrencyConversionManager implements Managed { @Override public void start() throws Exception { - new Thread(() -> { - for (;;) { - try { - updateCacheIfNecessary(); - } catch (Throwable t) { - logger.warn("Error updating currency conversions", t); - } - - Util.sleep(15000); + executor.scheduleAtFixedRate(() -> { + try { + updateCacheIfNecessary(); + } catch (Throwable t) { + logger.warn("Error updating currency conversions", t); } - }).start(); - } - - @Override - public void stop() throws Exception { - + }, 0, 15, TimeUnit.SECONDS); } @VisibleForTesting diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/PaymentsGrpcService.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/PaymentsGrpcService.java new file mode 100644 index 000000000..845ca4fb4 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/PaymentsGrpcService.java @@ -0,0 +1,63 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.grpc; + +import static java.util.Objects.requireNonNull; + +import io.grpc.Status; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import org.apache.commons.lang3.tuple.Pair; +import org.signal.chat.payments.GetCurrencyConversionsRequest; +import org.signal.chat.payments.GetCurrencyConversionsResponse; +import org.signal.chat.payments.ReactorPaymentsGrpc; +import org.whispersystems.textsecuregcm.auth.grpc.AuthenticationUtil; +import org.whispersystems.textsecuregcm.currency.CurrencyConversionManager; +import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList; +import reactor.core.publisher.Mono; + +public class PaymentsGrpcService extends ReactorPaymentsGrpc.PaymentsImplBase { + + private final CurrencyConversionManager currencyManager; + + + public PaymentsGrpcService(final CurrencyConversionManager currencyManager) { + this.currencyManager = requireNonNull(currencyManager); + } + + @Override + public Mono getCurrencyConversions(final GetCurrencyConversionsRequest request) { + AuthenticationUtil.requireAuthenticatedDevice(); + + final CurrencyConversionEntityList currencyConversionEntityList = currencyManager + .getCurrencyConversions() + .orElseThrow(Status.UNAVAILABLE::asRuntimeException); + + final List currencyConversionEntities = currencyConversionEntityList + .getCurrencies() + .stream() + .map(cce -> GetCurrencyConversionsResponse.CurrencyConversionEntity.newBuilder() + .setBase(cce.getBase()) + .putAllConversions(transformBigDecimalsToStrings(cce.getConversions())) + .build()) + .toList(); + + return Mono.just(GetCurrencyConversionsResponse.newBuilder() + .addAllCurrencies(currencyConversionEntities).setTimestamp(currencyConversionEntityList.getTimestamp()) + .build()); + } + + @Nonnull + private static Map transformBigDecimalsToStrings(final Map conversions) { + AuthenticationUtil.requireAuthenticatedDevice(); + return conversions.entrySet().stream() + .map(e -> Pair.of(e.getKey(), e.getValue().toString())) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + } +} diff --git a/service/src/main/proto/org/signal/chat/payments.proto b/service/src/main/proto/org/signal/chat/payments.proto new file mode 100644 index 000000000..ec4a5d6e5 --- /dev/null +++ b/service/src/main/proto/org/signal/chat/payments.proto @@ -0,0 +1,36 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +syntax = "proto3"; + +option java_multiple_files = true; + +package org.signal.chat.payments; + +/** + * Provides methods for working with payments. + */ +service Payments { + /** + */ + rpc GetCurrencyConversions(GetCurrencyConversionsRequest) returns (GetCurrencyConversionsResponse) {} +} + +message GetCurrencyConversionsRequest { +} + +message GetCurrencyConversionsResponse { + + message CurrencyConversionEntity { + + string base = 1; + + map conversions = 2; + } + + uint64 timestamp = 1; + + repeated CurrencyConversionEntity currencies = 2; +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManagerTest.java index cd11a1858..4a3797c33 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/currency/CurrencyConversionManagerTest.java @@ -1,3 +1,8 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + package org.whispersystems.textsecuregcm.currency; import static org.assertj.core.api.Assertions.assertThat; @@ -12,6 +17,8 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList; @@ -22,6 +29,8 @@ class CurrencyConversionManagerTest { @RegisterExtension static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); + static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(); + @Test void testCurrencyCalculations() throws IOException { FixerClient fixerClient = mock(FixerClient.class); @@ -35,7 +44,7 @@ class CurrencyConversionManagerTest { )); CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(), - List.of("FOO"), Clock.systemUTC()); + List.of("FOO"), EXECUTOR, Clock.systemUTC()); manager.updateCacheIfNecessary(); @@ -64,7 +73,7 @@ class CurrencyConversionManagerTest { )); CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(), - List.of("FOO"), Clock.systemUTC()); + List.of("FOO"), EXECUTOR, Clock.systemUTC()); manager.updateCacheIfNecessary(); @@ -93,7 +102,7 @@ class CurrencyConversionManagerTest { )); CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(), - List.of("FOO"), Clock.systemUTC()); + List.of("FOO"), EXECUTOR, Clock.systemUTC()); manager.updateCacheIfNecessary(); @@ -122,7 +131,7 @@ class CurrencyConversionManagerTest { )); CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(), - List.of("FOO"), Clock.systemUTC()); + List.of("FOO"), EXECUTOR, Clock.systemUTC()); manager.updateCacheIfNecessary(); @@ -154,7 +163,7 @@ class CurrencyConversionManagerTest { )); CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(), - List.of("FOO"), Clock.systemUTC()); + List.of("FOO"), EXECUTOR, Clock.systemUTC()); manager.updateCacheIfNecessary(); @@ -195,7 +204,7 @@ class CurrencyConversionManagerTest { when(clock.millis()).thenReturn(currentTime.toEpochMilli()); CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(), - List.of("FOO"), clock); + List.of("FOO"), EXECUTOR, clock); manager.updateCacheIfNecessary(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/PaymentsGrpcServiceTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/PaymentsGrpcServiceTest.java new file mode 100644 index 000000000..1c70d1ebb --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/PaymentsGrpcServiceTest.java @@ -0,0 +1,72 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.grpc; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; +import static org.whispersystems.textsecuregcm.grpc.GrpcTestUtils.assertStatusException; + +import io.grpc.Status; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.signal.chat.payments.GetCurrencyConversionsRequest; +import org.signal.chat.payments.GetCurrencyConversionsResponse; +import org.signal.chat.payments.PaymentsGrpc; +import org.whispersystems.textsecuregcm.currency.CurrencyConversionManager; +import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntity; +import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList; + +class PaymentsGrpcServiceTest extends SimpleBaseGrpcTest { + + @Mock + private CurrencyConversionManager currencyManager; + + @Override + protected PaymentsGrpcService createServiceBeforeEachTest() { + return new PaymentsGrpcService(currencyManager); + } + + @Test + void testGetCurrencyConversions() { + final long timestamp = System.currentTimeMillis(); + when(currencyManager.getCurrencyConversions()).thenReturn(Optional.of( + new CurrencyConversionEntityList(List.of( + new CurrencyConversionEntity("FOO", Map.of( + "USD", new BigDecimal("2.35"), + "EUR", new BigDecimal("1.89") + )), + new CurrencyConversionEntity("BAR", Map.of( + "USD", new BigDecimal("1.50"), + "EUR", new BigDecimal("0.98") + )) + ), timestamp))); + + final GetCurrencyConversionsResponse currencyConversions = authenticatedServiceStub().getCurrencyConversions( + GetCurrencyConversionsRequest.newBuilder().build()); + + assertEquals(timestamp, currencyConversions.getTimestamp()); + assertEquals(2, currencyConversions.getCurrenciesCount()); + assertEquals("FOO", currencyConversions.getCurrencies(0).getBase()); + assertEquals("2.35", currencyConversions.getCurrencies(0).getConversionsMap().get("USD")); + } + + @Test + void testUnavailable() { + when(currencyManager.getCurrencyConversions()).thenReturn(Optional.empty()); + assertStatusException(Status.UNAVAILABLE, () -> authenticatedServiceStub().getCurrencyConversions( + GetCurrencyConversionsRequest.newBuilder().build())); + } + + @Test + public void testUnauthenticated() throws Exception { + assertStatusException(Status.UNAUTHENTICATED, () -> unauthenticatedServiceStub().getCurrencyConversions( + GetCurrencyConversionsRequest.newBuilder().build())); + } +}