gRPC API for payments service

This commit is contained in:
Sergey Skrobotov 2023-09-08 18:50:02 -07:00
parent 8e598c19dc
commit 9f3ffa3707
6 changed files with 215 additions and 25 deletions

View File

@ -124,6 +124,7 @@ import org.whispersystems.textsecuregcm.grpc.ErrorMappingInterceptor;
import org.whispersystems.textsecuregcm.grpc.GrpcServerManagedWrapper; import org.whispersystems.textsecuregcm.grpc.GrpcServerManagedWrapper;
import org.whispersystems.textsecuregcm.grpc.KeysAnonymousGrpcService; import org.whispersystems.textsecuregcm.grpc.KeysAnonymousGrpcService;
import org.whispersystems.textsecuregcm.grpc.KeysGrpcService; import org.whispersystems.textsecuregcm.grpc.KeysGrpcService;
import org.whispersystems.textsecuregcm.grpc.PaymentsGrpcService;
import org.whispersystems.textsecuregcm.grpc.ProfileAnonymousGrpcService; import org.whispersystems.textsecuregcm.grpc.ProfileAnonymousGrpcService;
import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService; import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService;
import org.whispersystems.textsecuregcm.grpc.UserAgentInterceptor; import org.whispersystems.textsecuregcm.grpc.UserAgentInterceptor;
@ -588,7 +589,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FixerClient fixerClient = new FixerClient(currencyClient, config.getPaymentsServiceConfiguration().fixerApiKey().value()); FixerClient fixerClient = new FixerClient(currencyClient, config.getPaymentsServiceConfiguration().fixerApiKey().value());
CoinMarketCapClient coinMarketCapClient = new CoinMarketCapClient(currencyClient, config.getPaymentsServiceConfiguration().coinMarketCapApiKey().value(), config.getPaymentsServiceConfiguration().coinMarketCapCurrencyIds()); CoinMarketCapClient coinMarketCapClient = new CoinMarketCapClient(currencyClient, config.getPaymentsServiceConfiguration().coinMarketCapApiKey().value(), config.getPaymentsServiceConfiguration().coinMarketCapCurrencyIds());
CurrencyConversionManager currencyManager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, CurrencyConversionManager currencyManager = new CurrencyConversionManager(fixerClient, coinMarketCapClient,
cacheCluster, config.getPaymentsServiceConfiguration().paymentCurrencies(), Clock.systemUTC()); cacheCluster, config.getPaymentsServiceConfiguration().paymentCurrencies(), recurringJobExecutor, Clock.systemUTC());
environment.lifecycle().manage(apnSender); environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(apnPushNotificationScheduler); environment.lifecycle().manage(apnPushNotificationScheduler);
@ -644,6 +645,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final ServerBuilder<?> grpcServer = ServerBuilder.forPort(config.getGrpcPort()) final ServerBuilder<?> grpcServer = ServerBuilder.forPort(config.getGrpcPort())
.addService(ServerInterceptors.intercept(new KeysGrpcService(accountsManager, keys, rateLimiters), basicCredentialAuthenticationInterceptor)) .addService(ServerInterceptors.intercept(new KeysGrpcService(accountsManager, keys, rateLimiters), basicCredentialAuthenticationInterceptor))
.addService(new KeysAnonymousGrpcService(accountsManager, keys)) .addService(new KeysAnonymousGrpcService(accountsManager, keys))
.addService(new PaymentsGrpcService(currencyManager))
.addService(ServerInterceptors.intercept(new ProfileGrpcService(clock, accountsManager, profilesManager, dynamicConfigurationManager, .addService(ServerInterceptors.intercept(new ProfileGrpcService(clock, accountsManager, profilesManager, dynamicConfigurationManager,
config.getBadges(), asyncCdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, profileBadgeConverter, rateLimiters, zkProfileOperations, config.getCdnConfiguration().bucket()), basicCredentialAuthenticationInterceptor)) config.getBadges(), asyncCdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, profileBadgeConverter, rateLimiters, zkProfileOperations, config.getCdnConfiguration().bucket()), basicCredentialAuthenticationInterceptor))
.addService(new ProfileAnonymousGrpcService(accountsManager, profilesManager, profileBadgeConverter, zkProfileOperations)); .addService(new ProfileAnonymousGrpcService(accountsManager, profilesManager, profileBadgeConverter, zkProfileOperations));

View File

@ -1,3 +1,8 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.currency; package org.whispersystems.textsecuregcm.currency;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -13,13 +18,14 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntity; import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntity;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList; import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.Util;
public class CurrencyConversionManager implements Managed { public class CurrencyConversionManager implements Managed {
@ -32,31 +38,42 @@ public class CurrencyConversionManager implements Managed {
@VisibleForTesting @VisibleForTesting
static final String COIN_MARKET_CAP_SHARED_CACHE_CURRENT_KEY = "CurrencyConversionManager::CoinMarketCapCacheCurrent"; static final String COIN_MARKET_CAP_SHARED_CACHE_CURRENT_KEY = "CurrencyConversionManager::CoinMarketCapCacheCurrent";
private static final String COIN_MARKET_CAP_SHARED_CACHE_DATA_KEY = "CurrencyConversionManager::CoinMarketCapCacheData"; private static final String COIN_MARKET_CAP_SHARED_CACHE_DATA_KEY = "CurrencyConversionManager::CoinMarketCapCacheData";
private final FixerClient fixerClient; private final FixerClient fixerClient;
private final CoinMarketCapClient coinMarketCapClient; private final CoinMarketCapClient coinMarketCapClient;
private final FaultTolerantRedisCluster cacheCluster; private final FaultTolerantRedisCluster cacheCluster;
private final Clock clock; private final Clock clock;
private final List<String> currencies; private final List<String> currencies;
private final ScheduledExecutorService executor;
private final AtomicReference<CurrencyConversionEntityList> cached = new AtomicReference<>(null); private final AtomicReference<CurrencyConversionEntityList> cached = new AtomicReference<>(null);
private Instant fixerUpdatedTimestamp = Instant.MIN; private Instant fixerUpdatedTimestamp = Instant.MIN;
private Map<String, BigDecimal> cachedFixerValues; private Map<String, BigDecimal> cachedFixerValues;
private Map<String, BigDecimal> cachedCoinMarketCapValues; private Map<String, BigDecimal> cachedCoinMarketCapValues;
public CurrencyConversionManager(final FixerClient fixerClient,
public CurrencyConversionManager(
final FixerClient fixerClient,
final CoinMarketCapClient coinMarketCapClient, final CoinMarketCapClient coinMarketCapClient,
final FaultTolerantRedisCluster cacheCluster, final FaultTolerantRedisCluster cacheCluster,
final List<String> currencies, final List<String> currencies,
final ScheduledExecutorService executor,
final Clock clock) { final Clock clock) {
this.fixerClient = fixerClient; this.fixerClient = fixerClient;
this.coinMarketCapClient = coinMarketCapClient; this.coinMarketCapClient = coinMarketCapClient;
this.cacheCluster = cacheCluster; this.cacheCluster = cacheCluster;
this.currencies = currencies; this.currencies = currencies;
this.executor = executor;
this.clock = clock; this.clock = clock;
} }
@ -66,22 +83,13 @@ public class CurrencyConversionManager implements Managed {
@Override @Override
public void start() throws Exception { public void start() throws Exception {
new Thread(() -> { executor.scheduleAtFixedRate(() -> {
for (;;) { try {
try { updateCacheIfNecessary();
updateCacheIfNecessary(); } catch (Throwable t) {
} catch (Throwable t) { logger.warn("Error updating currency conversions", t);
logger.warn("Error updating currency conversions", t);
}
Util.sleep(15000);
} }
}).start(); }, 0, 15, TimeUnit.SECONDS);
}
@Override
public void stop() throws Exception {
} }
@VisibleForTesting @VisibleForTesting

View File

@ -0,0 +1,63 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.grpc;
import static java.util.Objects.requireNonNull;
import io.grpc.Status;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.Pair;
import org.signal.chat.payments.GetCurrencyConversionsRequest;
import org.signal.chat.payments.GetCurrencyConversionsResponse;
import org.signal.chat.payments.ReactorPaymentsGrpc;
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticationUtil;
import org.whispersystems.textsecuregcm.currency.CurrencyConversionManager;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList;
import reactor.core.publisher.Mono;
public class PaymentsGrpcService extends ReactorPaymentsGrpc.PaymentsImplBase {
private final CurrencyConversionManager currencyManager;
public PaymentsGrpcService(final CurrencyConversionManager currencyManager) {
this.currencyManager = requireNonNull(currencyManager);
}
@Override
public Mono<GetCurrencyConversionsResponse> getCurrencyConversions(final GetCurrencyConversionsRequest request) {
AuthenticationUtil.requireAuthenticatedDevice();
final CurrencyConversionEntityList currencyConversionEntityList = currencyManager
.getCurrencyConversions()
.orElseThrow(Status.UNAVAILABLE::asRuntimeException);
final List<GetCurrencyConversionsResponse.CurrencyConversionEntity> currencyConversionEntities = currencyConversionEntityList
.getCurrencies()
.stream()
.map(cce -> GetCurrencyConversionsResponse.CurrencyConversionEntity.newBuilder()
.setBase(cce.getBase())
.putAllConversions(transformBigDecimalsToStrings(cce.getConversions()))
.build())
.toList();
return Mono.just(GetCurrencyConversionsResponse.newBuilder()
.addAllCurrencies(currencyConversionEntities).setTimestamp(currencyConversionEntityList.getTimestamp())
.build());
}
@Nonnull
private static Map<String, String> transformBigDecimalsToStrings(final Map<String, BigDecimal> conversions) {
AuthenticationUtil.requireAuthenticatedDevice();
return conversions.entrySet().stream()
.map(e -> Pair.of(e.getKey(), e.getValue().toString()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
syntax = "proto3";
option java_multiple_files = true;
package org.signal.chat.payments;
/**
* Provides methods for working with payments.
*/
service Payments {
/**
*/
rpc GetCurrencyConversions(GetCurrencyConversionsRequest) returns (GetCurrencyConversionsResponse) {}
}
message GetCurrencyConversionsRequest {
}
message GetCurrencyConversionsResponse {
message CurrencyConversionEntity {
string base = 1;
map<string, string> conversions = 2;
}
uint64 timestamp = 1;
repeated CurrencyConversionEntity currencies = 2;
}

View File

@ -1,3 +1,8 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.currency; package org.whispersystems.textsecuregcm.currency;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -12,6 +17,8 @@ import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList; import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList;
@ -22,6 +29,8 @@ class CurrencyConversionManagerTest {
@RegisterExtension @RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor();
@Test @Test
void testCurrencyCalculations() throws IOException { void testCurrencyCalculations() throws IOException {
FixerClient fixerClient = mock(FixerClient.class); FixerClient fixerClient = mock(FixerClient.class);
@ -35,7 +44,7 @@ class CurrencyConversionManagerTest {
)); ));
CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(), CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(),
List.of("FOO"), Clock.systemUTC()); List.of("FOO"), EXECUTOR, Clock.systemUTC());
manager.updateCacheIfNecessary(); manager.updateCacheIfNecessary();
@ -64,7 +73,7 @@ class CurrencyConversionManagerTest {
)); ));
CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(), CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(),
List.of("FOO"), Clock.systemUTC()); List.of("FOO"), EXECUTOR, Clock.systemUTC());
manager.updateCacheIfNecessary(); manager.updateCacheIfNecessary();
@ -93,7 +102,7 @@ class CurrencyConversionManagerTest {
)); ));
CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(), CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(),
List.of("FOO"), Clock.systemUTC()); List.of("FOO"), EXECUTOR, Clock.systemUTC());
manager.updateCacheIfNecessary(); manager.updateCacheIfNecessary();
@ -122,7 +131,7 @@ class CurrencyConversionManagerTest {
)); ));
CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(), CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(),
List.of("FOO"), Clock.systemUTC()); List.of("FOO"), EXECUTOR, Clock.systemUTC());
manager.updateCacheIfNecessary(); manager.updateCacheIfNecessary();
@ -154,7 +163,7 @@ class CurrencyConversionManagerTest {
)); ));
CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(), CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(),
List.of("FOO"), Clock.systemUTC()); List.of("FOO"), EXECUTOR, Clock.systemUTC());
manager.updateCacheIfNecessary(); manager.updateCacheIfNecessary();
@ -195,7 +204,7 @@ class CurrencyConversionManagerTest {
when(clock.millis()).thenReturn(currentTime.toEpochMilli()); when(clock.millis()).thenReturn(currentTime.toEpochMilli());
CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(), CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(),
List.of("FOO"), clock); List.of("FOO"), EXECUTOR, clock);
manager.updateCacheIfNecessary(); manager.updateCacheIfNecessary();

View File

@ -0,0 +1,72 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.grpc;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
import static org.whispersystems.textsecuregcm.grpc.GrpcTestUtils.assertStatusException;
import io.grpc.Status;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.signal.chat.payments.GetCurrencyConversionsRequest;
import org.signal.chat.payments.GetCurrencyConversionsResponse;
import org.signal.chat.payments.PaymentsGrpc;
import org.whispersystems.textsecuregcm.currency.CurrencyConversionManager;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntity;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList;
class PaymentsGrpcServiceTest extends SimpleBaseGrpcTest<PaymentsGrpcService, PaymentsGrpc.PaymentsBlockingStub> {
@Mock
private CurrencyConversionManager currencyManager;
@Override
protected PaymentsGrpcService createServiceBeforeEachTest() {
return new PaymentsGrpcService(currencyManager);
}
@Test
void testGetCurrencyConversions() {
final long timestamp = System.currentTimeMillis();
when(currencyManager.getCurrencyConversions()).thenReturn(Optional.of(
new CurrencyConversionEntityList(List.of(
new CurrencyConversionEntity("FOO", Map.of(
"USD", new BigDecimal("2.35"),
"EUR", new BigDecimal("1.89")
)),
new CurrencyConversionEntity("BAR", Map.of(
"USD", new BigDecimal("1.50"),
"EUR", new BigDecimal("0.98")
))
), timestamp)));
final GetCurrencyConversionsResponse currencyConversions = authenticatedServiceStub().getCurrencyConversions(
GetCurrencyConversionsRequest.newBuilder().build());
assertEquals(timestamp, currencyConversions.getTimestamp());
assertEquals(2, currencyConversions.getCurrenciesCount());
assertEquals("FOO", currencyConversions.getCurrencies(0).getBase());
assertEquals("2.35", currencyConversions.getCurrencies(0).getConversionsMap().get("USD"));
}
@Test
void testUnavailable() {
when(currencyManager.getCurrencyConversions()).thenReturn(Optional.empty());
assertStatusException(Status.UNAVAILABLE, () -> authenticatedServiceStub().getCurrencyConversions(
GetCurrencyConversionsRequest.newBuilder().build()));
}
@Test
public void testUnauthenticated() throws Exception {
assertStatusException(Status.UNAUTHENTICATED, () -> unauthenticatedServiceStub().getCurrencyConversions(
GetCurrencyConversionsRequest.newBuilder().build()));
}
}