Add additional handling for nullable field in recurring donation record
This commit is contained in:
parent
70a6c3e8e5
commit
8ea794baef
|
@ -12,7 +12,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Strings;
|
|
||||||
import com.stripe.model.Charge;
|
import com.stripe.model.Charge;
|
||||||
import com.stripe.model.Charge.Outcome;
|
import com.stripe.model.Charge.Outcome;
|
||||||
import com.stripe.model.Invoice;
|
import com.stripe.model.Invoice;
|
||||||
|
@ -46,6 +45,7 @@ import javax.validation.constraints.Min;
|
||||||
import javax.validation.constraints.NotEmpty;
|
import javax.validation.constraints.NotEmpty;
|
||||||
import javax.validation.constraints.NotNull;
|
import javax.validation.constraints.NotNull;
|
||||||
import javax.ws.rs.BadRequestException;
|
import javax.ws.rs.BadRequestException;
|
||||||
|
import javax.ws.rs.ClientErrorException;
|
||||||
import javax.ws.rs.Consumes;
|
import javax.ws.rs.Consumes;
|
||||||
import javax.ws.rs.DELETE;
|
import javax.ws.rs.DELETE;
|
||||||
import javax.ws.rs.DefaultValue;
|
import javax.ws.rs.DefaultValue;
|
||||||
|
@ -150,21 +150,22 @@ public class SubscriptionController {
|
||||||
if (getResult == GetResult.NOT_STORED || getResult == GetResult.PASSWORD_MISMATCH) {
|
if (getResult == GetResult.NOT_STORED || getResult == GetResult.PASSWORD_MISMATCH) {
|
||||||
throw new NotFoundException();
|
throw new NotFoundException();
|
||||||
}
|
}
|
||||||
String customerId = getResult.record.customerId;
|
return getResult.record.getProcessorCustomer()
|
||||||
if (Strings.isNullOrEmpty(customerId)) {
|
.map(processorCustomer -> stripeManager.getCustomer(processorCustomer.customerId())
|
||||||
throw new InternalServerErrorException("no customer id found");
|
.thenCompose(customer -> {
|
||||||
}
|
if (customer == null) {
|
||||||
return stripeManager.getCustomer(customerId).thenCompose(customer -> {
|
throw new InternalServerErrorException(
|
||||||
if (customer == null) {
|
"no customer record found for id " + processorCustomer.customerId());
|
||||||
throw new InternalServerErrorException("no customer record found for id " + customerId);
|
}
|
||||||
}
|
return stripeManager.listNonCanceledSubscriptions(customer);
|
||||||
return stripeManager.listNonCanceledSubscriptions(customer);
|
}).thenCompose(subscriptions -> {
|
||||||
}).thenCompose(subscriptions -> {
|
@SuppressWarnings("unchecked")
|
||||||
@SuppressWarnings("unchecked")
|
CompletableFuture<Subscription>[] futures = (CompletableFuture<Subscription>[]) subscriptions.stream()
|
||||||
CompletableFuture<Subscription>[] futures = (CompletableFuture<Subscription>[]) subscriptions.stream()
|
.map(stripeManager::cancelSubscriptionAtEndOfCurrentPeriod).toArray(CompletableFuture[]::new);
|
||||||
.map(stripeManager::cancelSubscriptionAtEndOfCurrentPeriod).toArray(CompletableFuture[]::new);
|
return CompletableFuture.allOf(futures);
|
||||||
return CompletableFuture.allOf(futures);
|
}))
|
||||||
});
|
// a missing customer ID is OK; it means the subscriber never started to add a payment method
|
||||||
|
.orElseGet(() -> CompletableFuture.completedFuture(null));
|
||||||
})
|
})
|
||||||
.thenCompose(unused -> subscriptionManager.canceledAt(requestData.subscriberUser, requestData.now))
|
.thenCompose(unused -> subscriptionManager.canceledAt(requestData.subscriberUser, requestData.now))
|
||||||
.thenApply(unused -> Response.ok().build());
|
.thenApply(unused -> Response.ok().build());
|
||||||
|
@ -222,19 +223,22 @@ public class SubscriptionController {
|
||||||
return subscriptionManager.get(requestData.subscriberUser, requestData.hmac)
|
return subscriptionManager.get(requestData.subscriberUser, requestData.hmac)
|
||||||
.thenApply(this::requireRecordFromGetResult)
|
.thenApply(this::requireRecordFromGetResult)
|
||||||
.thenCompose(record -> {
|
.thenCompose(record -> {
|
||||||
final CompletableFuture<SubscriptionManager.Record> updatedRecordFuture;
|
final CompletableFuture<SubscriptionManager.Record> updatedRecordFuture =
|
||||||
if (record.customerId == null) {
|
record.getProcessorCustomer()
|
||||||
updatedRecordFuture = subscriptionProcessorManager.createCustomer(requestData.subscriberUser)
|
.map(ignored -> CompletableFuture.completedFuture(record))
|
||||||
.thenApply(ProcessorCustomer::customerId)
|
.orElseGet(() -> subscriptionProcessorManager.createCustomer(requestData.subscriberUser)
|
||||||
.thenCompose(customerId -> subscriptionManager.updateProcessorAndCustomerId(record,
|
.thenApply(ProcessorCustomer::customerId)
|
||||||
new ProcessorCustomer(customerId,
|
.thenCompose(customerId -> subscriptionManager.updateProcessorAndCustomerId(record,
|
||||||
subscriptionProcessorManager.getProcessor()), Instant.now()));
|
new ProcessorCustomer(customerId, subscriptionProcessorManager.getProcessor()),
|
||||||
} else {
|
Instant.now())));
|
||||||
updatedRecordFuture = CompletableFuture.completedFuture(record);
|
|
||||||
}
|
|
||||||
|
|
||||||
return updatedRecordFuture.thenCompose(
|
return updatedRecordFuture.thenCompose(
|
||||||
updatedRecord -> subscriptionProcessorManager.createPaymentMethodSetupToken(updatedRecord.customerId));
|
updatedRecord -> {
|
||||||
|
final String customerId = updatedRecord.getProcessorCustomer()
|
||||||
|
.orElseThrow(() -> new InternalServerErrorException("record should not be missing customer"))
|
||||||
|
.customerId();
|
||||||
|
return subscriptionProcessorManager.createPaymentMethodSetupToken(customerId);
|
||||||
|
});
|
||||||
})
|
})
|
||||||
.thenApply(
|
.thenApply(
|
||||||
token -> Response.ok(new CreatePaymentMethodResponse(token, subscriptionProcessorManager.getProcessor()))
|
token -> Response.ok(new CreatePaymentMethodResponse(token, subscriptionProcessorManager.getProcessor()))
|
||||||
|
@ -259,10 +263,15 @@ public class SubscriptionController {
|
||||||
RequestData requestData = RequestData.process(authenticatedAccount, subscriberId, clock);
|
RequestData requestData = RequestData.process(authenticatedAccount, subscriberId, clock);
|
||||||
return subscriptionManager.get(requestData.subscriberUser, requestData.hmac)
|
return subscriptionManager.get(requestData.subscriberUser, requestData.hmac)
|
||||||
.thenApply(this::requireRecordFromGetResult)
|
.thenApply(this::requireRecordFromGetResult)
|
||||||
.thenCompose(record -> stripeManager.setDefaultPaymentMethodForCustomer(record.customerId, paymentMethodId))
|
.thenCompose(record -> record.getProcessorCustomer()
|
||||||
|
.map(processorCustomer -> stripeManager.setDefaultPaymentMethodForCustomer(processorCustomer.customerId(),
|
||||||
|
paymentMethodId))
|
||||||
|
.orElseThrow(() ->
|
||||||
|
// a missing customer ID indicates the client made requests out of order,
|
||||||
|
// and needs to call create_payment_method to create a customer for the given payment method
|
||||||
|
new ClientErrorException(Status.CONFLICT)))
|
||||||
.thenApply(customer -> Response.ok().build());
|
.thenApply(customer -> Response.ok().build());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class SetSubscriptionLevelSuccessResponse {
|
public static class SetSubscriptionLevelSuccessResponse {
|
||||||
|
|
||||||
private final long level;
|
private final long level;
|
||||||
|
@ -356,15 +365,22 @@ public class SubscriptionController {
|
||||||
if (record.subscriptionId == null) {
|
if (record.subscriptionId == null) {
|
||||||
long lastSubscriptionCreatedAt =
|
long lastSubscriptionCreatedAt =
|
||||||
record.subscriptionCreatedAt != null ? record.subscriptionCreatedAt.getEpochSecond() : 0;
|
record.subscriptionCreatedAt != null ? record.subscriptionCreatedAt.getEpochSecond() : 0;
|
||||||
// we don't have one yet so create it and then record the subscription id
|
|
||||||
//
|
return record.getProcessorCustomer()
|
||||||
// this relies on stripe's idempotency key to avoid creating more than one subscription if the client
|
.map(processorCustomer ->
|
||||||
// retries this request
|
// we don't have a subscription yet so create it and then record the subscription id
|
||||||
return stripeManager.createSubscription(record.customerId, priceConfiguration.getId(), level,
|
//
|
||||||
lastSubscriptionCreatedAt)
|
// this relies on stripe's idempotency key to avoid creating more than one subscription if the client
|
||||||
.thenCompose(subscription -> subscriptionManager.subscriptionCreated(
|
// retries this request
|
||||||
requestData.subscriberUser, subscription.getId(), requestData.now, level)
|
stripeManager.createSubscription(processorCustomer.customerId(), priceConfiguration.getId(), level,
|
||||||
.thenApply(unused -> subscription));
|
lastSubscriptionCreatedAt)
|
||||||
|
.thenCompose(subscription -> subscriptionManager.subscriptionCreated(
|
||||||
|
requestData.subscriberUser, subscription.getId(), requestData.now, level)
|
||||||
|
.thenApply(unused -> subscription)))
|
||||||
|
.orElseThrow(() ->
|
||||||
|
// a missing customer ID indicates the client made requests out of order,
|
||||||
|
// and needs to call create_payment_method to create a customer for the given payment method
|
||||||
|
new ClientErrorException(Status.CONFLICT));
|
||||||
} else {
|
} else {
|
||||||
// we already have a subscription in our records so let's check the level and change it if needed
|
// we already have a subscription in our records so let's check the level and change it if needed
|
||||||
return stripeManager.getSubscription(record.subscriptionId).thenCompose(
|
return stripeManager.getSubscription(record.subscriptionId).thenCompose(
|
||||||
|
|
|
@ -10,6 +10,7 @@ import static org.whispersystems.textsecuregcm.util.AttributeValues.m;
|
||||||
import static org.whispersystems.textsecuregcm.util.AttributeValues.n;
|
import static org.whispersystems.textsecuregcm.util.AttributeValues.n;
|
||||||
import static org.whispersystems.textsecuregcm.util.AttributeValues.s;
|
import static org.whispersystems.textsecuregcm.util.AttributeValues.s;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
|
@ -18,6 +19,7 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CompletionException;
|
import java.util.concurrent.CompletionException;
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
|
@ -64,8 +66,9 @@ public class SubscriptionManager {
|
||||||
public final byte[] user;
|
public final byte[] user;
|
||||||
public final byte[] password;
|
public final byte[] password;
|
||||||
public final Instant createdAt;
|
public final Instant createdAt;
|
||||||
public @Nullable String customerId;
|
@VisibleForTesting
|
||||||
public @Nullable SubscriptionProcessor processor;
|
@Nullable
|
||||||
|
ProcessorCustomer processorCustomer;
|
||||||
public Map<SubscriptionProcessor, String> processorsToCustomerIds;
|
public Map<SubscriptionProcessor, String> processorsToCustomerIds;
|
||||||
public String subscriptionId;
|
public String subscriptionId;
|
||||||
public Instant subscriptionCreatedAt;
|
public Instant subscriptionCreatedAt;
|
||||||
|
@ -82,28 +85,28 @@ public class SubscriptionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Record from(byte[] user, Map<String, AttributeValue> item) {
|
public static Record from(byte[] user, Map<String, AttributeValue> item) {
|
||||||
Record self = new Record(
|
Record record = new Record(
|
||||||
user,
|
user,
|
||||||
item.get(KEY_PASSWORD).b().asByteArray(),
|
item.get(KEY_PASSWORD).b().asByteArray(),
|
||||||
getInstant(item, KEY_CREATED_AT));
|
getInstant(item, KEY_CREATED_AT));
|
||||||
|
|
||||||
final Pair<SubscriptionProcessor, String> processorCustomerId = getProcessorAndCustomer(item);
|
final Pair<SubscriptionProcessor, String> processorCustomerId = getProcessorAndCustomer(item);
|
||||||
if (processorCustomerId != null) {
|
if (processorCustomerId != null) {
|
||||||
self.customerId = processorCustomerId.second();
|
record.processorCustomer = new ProcessorCustomer(processorCustomerId.second(), processorCustomerId.first());
|
||||||
self.processor = processorCustomerId.first();
|
|
||||||
} else {
|
|
||||||
// Until all existing data is migrated to KEY_PROCESSOR_ID_CUSTOMER_ID, fall back to KEY_CUSTOMER_ID
|
|
||||||
self.customerId = getString(item, KEY_CUSTOMER_ID);
|
|
||||||
}
|
}
|
||||||
self.processorsToCustomerIds = getProcessorsToCustomerIds(item);
|
record.processorsToCustomerIds = getProcessorsToCustomerIds(item);
|
||||||
self.subscriptionId = getString(item, KEY_SUBSCRIPTION_ID);
|
record.subscriptionId = getString(item, KEY_SUBSCRIPTION_ID);
|
||||||
self.subscriptionCreatedAt = getInstant(item, KEY_SUBSCRIPTION_CREATED_AT);
|
record.subscriptionCreatedAt = getInstant(item, KEY_SUBSCRIPTION_CREATED_AT);
|
||||||
self.subscriptionLevel = getLong(item, KEY_SUBSCRIPTION_LEVEL);
|
record.subscriptionLevel = getLong(item, KEY_SUBSCRIPTION_LEVEL);
|
||||||
self.subscriptionLevelChangedAt = getInstant(item, KEY_SUBSCRIPTION_LEVEL_CHANGED_AT);
|
record.subscriptionLevelChangedAt = getInstant(item, KEY_SUBSCRIPTION_LEVEL_CHANGED_AT);
|
||||||
self.accessedAt = getInstant(item, KEY_ACCESSED_AT);
|
record.accessedAt = getInstant(item, KEY_ACCESSED_AT);
|
||||||
self.canceledAt = getInstant(item, KEY_CANCELED_AT);
|
record.canceledAt = getInstant(item, KEY_CANCELED_AT);
|
||||||
self.currentPeriodEndsAt = getInstant(item, KEY_CURRENT_PERIOD_ENDS_AT);
|
record.currentPeriodEndsAt = getInstant(item, KEY_CURRENT_PERIOD_ENDS_AT);
|
||||||
return self;
|
return record;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<ProcessorCustomer> getProcessorCustomer() {
|
||||||
|
return Optional.ofNullable(processorCustomer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<SubscriptionProcessor, String> getProcessorsToCustomerIds(Map<String, AttributeValue> item) {
|
private static Map<SubscriptionProcessor, String> getProcessorsToCustomerIds(Map<String, AttributeValue> item) {
|
||||||
|
@ -314,45 +317,33 @@ public class SubscriptionManager {
|
||||||
public CompletableFuture<Record> updateProcessorAndCustomerId(Record userRecord,
|
public CompletableFuture<Record> updateProcessorAndCustomerId(Record userRecord,
|
||||||
ProcessorCustomer activeProcessorCustomer, Instant updatedAt) {
|
ProcessorCustomer activeProcessorCustomer, Instant updatedAt) {
|
||||||
|
|
||||||
// Don’t attempt to modify the existing map, since it may be immutable, and we also don’t want to have side effects
|
|
||||||
final Map<SubscriptionProcessor, String> allProcessorsAndCustomerIds = new HashMap<>(
|
|
||||||
userRecord.processorsToCustomerIds);
|
|
||||||
allProcessorsAndCustomerIds.put(activeProcessorCustomer.processor(), activeProcessorCustomer.customerId());
|
|
||||||
|
|
||||||
UpdateItemRequest request = UpdateItemRequest.builder()
|
UpdateItemRequest request = UpdateItemRequest.builder()
|
||||||
.tableName(table)
|
.tableName(table)
|
||||||
.key(Map.of(KEY_USER, b(userRecord.user)))
|
.key(Map.of(KEY_USER, b(userRecord.user)))
|
||||||
.returnValues(ReturnValue.ALL_NEW)
|
.returnValues(ReturnValue.ALL_NEW)
|
||||||
.conditionExpression(
|
.conditionExpression(
|
||||||
// there is no customer attribute yet
|
// there is no active processor+customer attribute
|
||||||
"attribute_not_exists(#customer_id) " +
|
"attribute_not_exists(#processor_customer_id) " +
|
||||||
// OR this record doesn't have the new processor+customer attributes yet
|
// or an attribute in the map with an inactive processor+customer
|
||||||
"OR (#customer_id = :customer_id " +
|
"AND attribute_not_exists(#processors_to_customer_ids.#processor_name)"
|
||||||
"AND attribute_not_exists(#processor_customer_id) " +
|
|
||||||
// TODO once all records are guaranteed to have the map, we can do a more targeted update
|
|
||||||
// "AND attribute_not_exists(#processors_to_customer_ids.#processor_name) " +
|
|
||||||
"AND attribute_not_exists(#processors_to_customer_ids))"
|
|
||||||
)
|
)
|
||||||
.updateExpression("SET "
|
.updateExpression("SET "
|
||||||
+ "#customer_id = :customer_id, "
|
+ "#customer_id = :customer_id, "
|
||||||
+ "#processor_customer_id = :processor_customer_id, "
|
+ "#processor_customer_id = :processor_customer_id, "
|
||||||
// TODO once all records are guaranteed to have the map, we can do a more targeted update
|
+ "#processors_to_customer_ids.#processor_name = :customer_id, "
|
||||||
// + "#processors_to_customer_ids.#processor_name = :customer_id, "
|
|
||||||
+ "#processors_to_customer_ids = :processors_and_customer_ids, "
|
|
||||||
+ "#accessed_at = :accessed_at"
|
+ "#accessed_at = :accessed_at"
|
||||||
)
|
)
|
||||||
.expressionAttributeNames(Map.of(
|
.expressionAttributeNames(Map.of(
|
||||||
"#accessed_at", KEY_ACCESSED_AT,
|
"#accessed_at", KEY_ACCESSED_AT,
|
||||||
"#customer_id", KEY_CUSTOMER_ID,
|
"#customer_id", KEY_CUSTOMER_ID,
|
||||||
"#processor_customer_id", KEY_PROCESSOR_ID_CUSTOMER_ID,
|
"#processor_customer_id", KEY_PROCESSOR_ID_CUSTOMER_ID,
|
||||||
// TODO "#processor_name", activeProcessor.name(),
|
"#processor_name", activeProcessorCustomer.processor().name(),
|
||||||
"#processors_to_customer_ids", KEY_PROCESSOR_CUSTOMER_IDS_MAP
|
"#processors_to_customer_ids", KEY_PROCESSOR_CUSTOMER_IDS_MAP
|
||||||
))
|
))
|
||||||
.expressionAttributeValues(Map.of(
|
.expressionAttributeValues(Map.of(
|
||||||
":accessed_at", n(updatedAt.getEpochSecond()),
|
":accessed_at", n(updatedAt.getEpochSecond()),
|
||||||
":customer_id", s(activeProcessorCustomer.customerId()),
|
":customer_id", s(activeProcessorCustomer.customerId()),
|
||||||
":processor_customer_id", b(activeProcessorCustomer.toDynamoBytes()),
|
":processor_customer_id", b(activeProcessorCustomer.toDynamoBytes())
|
||||||
":processors_and_customer_ids", m(createProcessorsToCustomerIdsAttributeMap(allProcessorsAndCustomerIds))
|
|
||||||
)).build();
|
)).build();
|
||||||
|
|
||||||
return client.updateItem(request)
|
return client.updateItem(request)
|
||||||
|
@ -367,15 +358,6 @@ public class SubscriptionManager {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, AttributeValue> createProcessorsToCustomerIdsAttributeMap(
|
|
||||||
Map<SubscriptionProcessor, String> allProcessorsAndCustomerIds) {
|
|
||||||
final Map<String, AttributeValue> result = new HashMap<>();
|
|
||||||
|
|
||||||
allProcessorsAndCustomerIds.forEach((processor, customerId) -> result.put(processor.name(), s(customerId)));
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CompletableFuture<Void> accessedAt(byte[] user, Instant accessedAt) {
|
public CompletableFuture<Void> accessedAt(byte[] user, Instant accessedAt) {
|
||||||
checkUserLength(user);
|
checkUserLength(user);
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.time.Clock;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -223,9 +224,11 @@ class SubscriptionControllerTest {
|
||||||
when(STRIPE_MANAGER.createCustomer(any()))
|
when(STRIPE_MANAGER.createCustomer(any()))
|
||||||
.thenReturn(CompletableFuture.completedFuture(customer));
|
.thenReturn(CompletableFuture.completedFuture(customer));
|
||||||
|
|
||||||
final SubscriptionManager.Record recordWithCustomerId = SubscriptionManager.Record.from(record.user, dynamoItem);
|
final Map<String, AttributeValue> dynamoItemWithProcessorCustomer = new HashMap<>(dynamoItem);
|
||||||
recordWithCustomerId.customerId = customerId;
|
dynamoItemWithProcessorCustomer.put(SubscriptionManager.KEY_PROCESSOR_ID_CUSTOMER_ID,
|
||||||
recordWithCustomerId.processorsToCustomerIds.put(SubscriptionProcessor.STRIPE, customerId);
|
b(new ProcessorCustomer(customerId, SubscriptionProcessor.STRIPE).toDynamoBytes()));
|
||||||
|
final SubscriptionManager.Record recordWithCustomerId = SubscriptionManager.Record.from(record.user,
|
||||||
|
dynamoItemWithProcessorCustomer);
|
||||||
|
|
||||||
when(SUBSCRIPTION_MANAGER.updateProcessorAndCustomerId(any(SubscriptionManager.Record.class), any(),
|
when(SUBSCRIPTION_MANAGER.updateProcessorAndCustomerId(any(SubscriptionManager.Record.class), any(),
|
||||||
any(Instant.class)))
|
any(Instant.class)))
|
||||||
|
|
|
@ -10,6 +10,7 @@ import static org.whispersystems.textsecuregcm.storage.SubscriptionManager.GetRe
|
||||||
import static org.whispersystems.textsecuregcm.storage.SubscriptionManager.GetResult.Type.NOT_STORED;
|
import static org.whispersystems.textsecuregcm.storage.SubscriptionManager.GetResult.Type.NOT_STORED;
|
||||||
import static org.whispersystems.textsecuregcm.storage.SubscriptionManager.GetResult.Type.PASSWORD_MISMATCH;
|
import static org.whispersystems.textsecuregcm.storage.SubscriptionManager.GetResult.Type.PASSWORD_MISMATCH;
|
||||||
import static org.whispersystems.textsecuregcm.util.AttributeValues.b;
|
import static org.whispersystems.textsecuregcm.util.AttributeValues.b;
|
||||||
|
import static org.whispersystems.textsecuregcm.util.AttributeValues.m;
|
||||||
import static org.whispersystems.textsecuregcm.util.AttributeValues.n;
|
import static org.whispersystems.textsecuregcm.util.AttributeValues.n;
|
||||||
import static org.whispersystems.textsecuregcm.util.AttributeValues.s;
|
import static org.whispersystems.textsecuregcm.util.AttributeValues.s;
|
||||||
|
|
||||||
|
@ -19,7 +20,9 @@ import java.time.Instant;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
@ -30,6 +33,7 @@ import org.whispersystems.textsecuregcm.storage.SubscriptionManager.Record;
|
||||||
import org.whispersystems.textsecuregcm.subscriptions.ProcessorCustomer;
|
import org.whispersystems.textsecuregcm.subscriptions.ProcessorCustomer;
|
||||||
import org.whispersystems.textsecuregcm.subscriptions.SubscriptionProcessor;
|
import org.whispersystems.textsecuregcm.subscriptions.SubscriptionProcessor;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
|
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
|
||||||
|
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
|
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
|
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
|
||||||
import software.amazon.awssdk.services.dynamodb.model.KeyType;
|
import software.amazon.awssdk.services.dynamodb.model.KeyType;
|
||||||
|
@ -56,6 +60,10 @@ class SubscriptionManagerTest {
|
||||||
attributeName(SubscriptionManager.KEY_CUSTOMER_ID).
|
attributeName(SubscriptionManager.KEY_CUSTOMER_ID).
|
||||||
attributeType(ScalarAttributeType.S).
|
attributeType(ScalarAttributeType.S).
|
||||||
build()).
|
build()).
|
||||||
|
attributeDefinition(AttributeDefinition.builder().
|
||||||
|
attributeName(SubscriptionManager.KEY_PROCESSOR_ID_CUSTOMER_ID).
|
||||||
|
attributeType(ScalarAttributeType.S).
|
||||||
|
build()).
|
||||||
globalSecondaryIndex(GlobalSecondaryIndex.builder().
|
globalSecondaryIndex(GlobalSecondaryIndex.builder().
|
||||||
indexName("c_to_u").
|
indexName("c_to_u").
|
||||||
keySchema(KeySchemaElement.builder().
|
keySchema(KeySchemaElement.builder().
|
||||||
|
@ -70,6 +78,20 @@ class SubscriptionManagerTest {
|
||||||
writeCapacityUnits(20L).
|
writeCapacityUnits(20L).
|
||||||
build()).
|
build()).
|
||||||
build()).
|
build()).
|
||||||
|
globalSecondaryIndex(GlobalSecondaryIndex.builder().
|
||||||
|
indexName("pc_to_u").
|
||||||
|
keySchema(KeySchemaElement.builder().
|
||||||
|
attributeName(SubscriptionManager.KEY_PROCESSOR_ID_CUSTOMER_ID).
|
||||||
|
keyType(KeyType.HASH).
|
||||||
|
build()).
|
||||||
|
projection(Projection.builder().
|
||||||
|
projectionType(ProjectionType.KEYS_ONLY).
|
||||||
|
build()).
|
||||||
|
provisionedThroughput(ProvisionedThroughput.builder().
|
||||||
|
readCapacityUnits(20L).
|
||||||
|
writeCapacityUnits(20L).
|
||||||
|
build()).
|
||||||
|
build()).
|
||||||
build();
|
build();
|
||||||
|
|
||||||
byte[] user;
|
byte[] user;
|
||||||
|
@ -157,14 +179,16 @@ class SubscriptionManagerTest {
|
||||||
assertThat(subscriptionManager.updateProcessorAndCustomerId(userRecord,
|
assertThat(subscriptionManager.updateProcessorAndCustomerId(userRecord,
|
||||||
new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE),
|
new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE),
|
||||||
subscriptionUpdated)).succeedsWithin(Duration.ofSeconds(3))
|
subscriptionUpdated)).succeedsWithin(Duration.ofSeconds(3))
|
||||||
.hasFieldOrPropertyWithValue("customerId", customer)
|
.hasFieldOrPropertyWithValue("processorCustomer",
|
||||||
|
Optional.of(new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE)))
|
||||||
.hasFieldOrPropertyWithValue("processorsToCustomerIds", Map.of(SubscriptionProcessor.STRIPE, customer));
|
.hasFieldOrPropertyWithValue("processorsToCustomerIds", Map.of(SubscriptionProcessor.STRIPE, customer));
|
||||||
|
|
||||||
assertThat(
|
assertThat(
|
||||||
subscriptionManager.updateProcessorAndCustomerId(userRecord,
|
subscriptionManager.updateProcessorAndCustomerId(userRecord,
|
||||||
new ProcessorCustomer(customer + "1", SubscriptionProcessor.STRIPE),
|
new ProcessorCustomer(customer + "1", SubscriptionProcessor.STRIPE),
|
||||||
subscriptionUpdated)).succeedsWithin(Duration.ofSeconds(3))
|
subscriptionUpdated)).succeedsWithin(Duration.ofSeconds(3))
|
||||||
.hasFieldOrPropertyWithValue("customerId", customer)
|
.hasFieldOrPropertyWithValue("processorCustomer",
|
||||||
|
Optional.of(new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE)))
|
||||||
.hasFieldOrPropertyWithValue("processorsToCustomerIds", Map.of(SubscriptionProcessor.STRIPE, customer));
|
.hasFieldOrPropertyWithValue("processorsToCustomerIds", Map.of(SubscriptionProcessor.STRIPE, customer));
|
||||||
|
|
||||||
// TODO test new customer ID with new processor does change the customer ID, once there is another processor
|
// TODO test new customer ID with new processor does change the customer ID, once there is another processor
|
||||||
|
@ -254,27 +278,21 @@ class SubscriptionManagerTest {
|
||||||
Arrays.fill(hmac, (byte) 2);
|
Arrays.fill(hmac, (byte) 2);
|
||||||
final String customerId = "abcdef";
|
final String customerId = "abcdef";
|
||||||
|
|
||||||
// manually create an existing record, with only KEY_CUSTOMER_ID
|
assertThat(subscriptionManager.create(user, hmac, Instant.now()))
|
||||||
dynamoDbExtension.getDynamoDbClient().putItem(p ->
|
.succeedsWithin(Duration.ofSeconds(1));
|
||||||
p.tableName(dynamoDbExtension.getTableName())
|
|
||||||
.item(Map.of(
|
|
||||||
SubscriptionManager.KEY_USER, b(user),
|
|
||||||
SubscriptionManager.KEY_PASSWORD, b(hmac),
|
|
||||||
SubscriptionManager.KEY_CREATED_AT, n(Instant.now().getEpochSecond()),
|
|
||||||
SubscriptionManager.KEY_CUSTOMER_ID, s(customerId),
|
|
||||||
SubscriptionManager.KEY_ACCESSED_AT, n(Instant.now().getEpochSecond())
|
|
||||||
))
|
|
||||||
);
|
|
||||||
|
|
||||||
final CompletableFuture<GetResult> firstGetResult = subscriptionManager.get(user, hmac);
|
final CompletableFuture<GetResult> firstGetResult = subscriptionManager.get(user, hmac);
|
||||||
assertThat(firstGetResult).succeedsWithin(Duration.ofSeconds(1));
|
assertThat(firstGetResult).succeedsWithin(Duration.ofSeconds(1));
|
||||||
|
|
||||||
final Record firstRecord = firstGetResult.get().record;
|
final Record firstRecord = firstGetResult.get().record;
|
||||||
|
|
||||||
assertThat(firstRecord.customerId).isEqualTo(customerId);
|
assertThat(firstRecord.processorCustomer).isNull();
|
||||||
assertThat(firstRecord.processor).isNull();
|
|
||||||
assertThat(firstRecord.processorsToCustomerIds).isEmpty();
|
assertThat(firstRecord.processorsToCustomerIds).isEmpty();
|
||||||
|
|
||||||
|
subscriptionManager.updateProcessorAndCustomerId(firstRecord,
|
||||||
|
new ProcessorCustomer(customerId, SubscriptionProcessor.STRIPE), Instant.now())
|
||||||
|
.get(1, TimeUnit.SECONDS);
|
||||||
|
|
||||||
// Try to update the user to have a different customer ID. This should quietly fail,
|
// Try to update the user to have a different customer ID. This should quietly fail,
|
||||||
// and just return the existing customer ID.
|
// and just return the existing customer ID.
|
||||||
final CompletableFuture<Record> firstUpdate = subscriptionManager.updateProcessorAndCustomerId(firstRecord,
|
final CompletableFuture<Record> firstUpdate = subscriptionManager.updateProcessorAndCustomerId(firstRecord,
|
||||||
|
@ -283,7 +301,7 @@ class SubscriptionManagerTest {
|
||||||
|
|
||||||
assertThat(firstUpdate).succeedsWithin(Duration.ofSeconds(1));
|
assertThat(firstUpdate).succeedsWithin(Duration.ofSeconds(1));
|
||||||
|
|
||||||
final String firstUpdateCustomerId = firstUpdate.get().customerId;
|
final String firstUpdateCustomerId = firstUpdate.get().getProcessorCustomer().orElseThrow().customerId();
|
||||||
assertThat(firstUpdateCustomerId).isEqualTo(customerId);
|
assertThat(firstUpdateCustomerId).isEqualTo(customerId);
|
||||||
|
|
||||||
// Now update with the existing customer ID. All fields should now be populated.
|
// Now update with the existing customer ID. All fields should now be populated.
|
||||||
|
@ -292,7 +310,7 @@ class SubscriptionManagerTest {
|
||||||
|
|
||||||
assertThat(secondUpdate).succeedsWithin(Duration.ofSeconds(1));
|
assertThat(secondUpdate).succeedsWithin(Duration.ofSeconds(1));
|
||||||
|
|
||||||
final String secondUpdateCustomerId = secondUpdate.get().customerId;
|
final String secondUpdateCustomerId = secondUpdate.get().getProcessorCustomer().orElseThrow().customerId();
|
||||||
assertThat(secondUpdateCustomerId).isEqualTo(customerId);
|
assertThat(secondUpdateCustomerId).isEqualTo(customerId);
|
||||||
|
|
||||||
final CompletableFuture<GetResult> secondGetResult = subscriptionManager.get(user, hmac);
|
final CompletableFuture<GetResult> secondGetResult = subscriptionManager.get(user, hmac);
|
||||||
|
@ -300,11 +318,58 @@ class SubscriptionManagerTest {
|
||||||
|
|
||||||
final Record secondRecord = secondGetResult.get().record;
|
final Record secondRecord = secondGetResult.get().record;
|
||||||
|
|
||||||
assertThat(secondRecord.customerId).isEqualTo(customerId);
|
assertThat(secondRecord.getProcessorCustomer())
|
||||||
assertThat(secondRecord.processor).isEqualTo(SubscriptionProcessor.STRIPE);
|
.isPresent()
|
||||||
|
.get()
|
||||||
|
.isEqualTo(new ProcessorCustomer(customerId, SubscriptionProcessor.STRIPE));
|
||||||
assertThat(secondRecord.processorsToCustomerIds).isEqualTo(Map.of(SubscriptionProcessor.STRIPE, customerId));
|
assertThat(secondRecord.processorsToCustomerIds).isEqualTo(Map.of(SubscriptionProcessor.STRIPE, customerId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testUpdateEmptyProcessorCustomerWithValueInMap() throws Exception {
|
||||||
|
// it isn’t possible to create this exact data setup in current code, but this tests the conditional update expression
|
||||||
|
final Map<String, AttributeValue> processorCustomers = Map.of(
|
||||||
|
SubscriptionProcessor.STRIPE.name(), s(customer)
|
||||||
|
);
|
||||||
|
|
||||||
|
final Map<String, AttributeValue> dynamoItem = Map.of(
|
||||||
|
SubscriptionManager.KEY_USER, b(user),
|
||||||
|
SubscriptionManager.KEY_PASSWORD, b(password),
|
||||||
|
SubscriptionManager.KEY_PROCESSOR_CUSTOMER_IDS_MAP, m(processorCustomers),
|
||||||
|
SubscriptionManager.KEY_CREATED_AT, n(created.getEpochSecond()),
|
||||||
|
SubscriptionManager.KEY_ACCESSED_AT, n(Instant.now().getEpochSecond())
|
||||||
|
);
|
||||||
|
|
||||||
|
dynamoDbExtension.getDynamoDbAsyncClient().putItem(builder ->
|
||||||
|
builder.tableName(dynamoDbExtension.getTableName())
|
||||||
|
.item(dynamoItem)
|
||||||
|
).get(1, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
final CompletableFuture<GetResult> firstGet = subscriptionManager.get(user, password);
|
||||||
|
|
||||||
|
assertThat(firstGet)
|
||||||
|
.succeedsWithin(Duration.ofSeconds(1))
|
||||||
|
.extracting(r -> r.record)
|
||||||
|
.satisfies(record -> {
|
||||||
|
assertThat(record.processorCustomer).isNull();
|
||||||
|
assertThat(record.processorsToCustomerIds).size().isEqualTo(1);
|
||||||
|
assertThat(record.processorsToCustomerIds).contains(Map.entry(SubscriptionProcessor.STRIPE, customer));
|
||||||
|
});
|
||||||
|
|
||||||
|
final CompletableFuture<Record> update = subscriptionManager.updateProcessorAndCustomerId(firstGet.get().record,
|
||||||
|
new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE), Instant.now());
|
||||||
|
|
||||||
|
assertThat(update)
|
||||||
|
.succeedsWithin(Duration.ofSeconds(1))
|
||||||
|
.satisfies(record -> {
|
||||||
|
// processorCustomer should not have been updated
|
||||||
|
assertThat(record.processorCustomer).isNull();
|
||||||
|
assertThat(record.processorsToCustomerIds).size().isEqualTo(1);
|
||||||
|
assertThat(record.processorsToCustomerIds).contains(Map.entry(SubscriptionProcessor.STRIPE, customer));
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testProcessorAndCustomerId() {
|
void testProcessorAndCustomerId() {
|
||||||
final ProcessorCustomer processorCustomer =
|
final ProcessorCustomer processorCustomer =
|
||||||
|
@ -326,7 +391,8 @@ class SubscriptionManagerTest {
|
||||||
assertThat(record).isNotNull();
|
assertThat(record).isNotNull();
|
||||||
assertThat(record.user).isEqualTo(user);
|
assertThat(record.user).isEqualTo(user);
|
||||||
assertThat(record.password).isEqualTo(password);
|
assertThat(record.password).isEqualTo(password);
|
||||||
assertThat(record.customerId).isNull();
|
assertThat(record.processorCustomer).isNull();
|
||||||
|
assertThat(record.processorsToCustomerIds).isEmpty();
|
||||||
assertThat(record.createdAt).isEqualTo(created);
|
assertThat(record.createdAt).isEqualTo(created);
|
||||||
assertThat(record.subscriptionId).isNull();
|
assertThat(record.subscriptionId).isNull();
|
||||||
assertThat(record.subscriptionCreatedAt).isNull();
|
assertThat(record.subscriptionCreatedAt).isNull();
|
||||||
|
|
Loading…
Reference in New Issue