From 8ea794baefc93269094ad89834a305a97945e68b Mon Sep 17 00:00:00 2001 From: Chris Eager <79161849+eager-signal@users.noreply.github.com> Date: Fri, 21 Oct 2022 12:56:39 -0500 Subject: [PATCH] Add additional handling for nullable field in recurring donation record --- .../controllers/SubscriptionController.java | 92 ++++++++------- .../storage/SubscriptionManager.java | 72 +++++------- .../SubscriptionControllerTest.java | 9 +- .../storage/SubscriptionManagerTest.java | 106 ++++++++++++++---- 4 files changed, 173 insertions(+), 106 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/SubscriptionController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/SubscriptionController.java index c3cc6e2ec..82e58fdaf 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/SubscriptionController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/SubscriptionController.java @@ -12,7 +12,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Strings; import com.stripe.model.Charge; import com.stripe.model.Charge.Outcome; import com.stripe.model.Invoice; @@ -46,6 +45,7 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; import javax.ws.rs.BadRequestException; +import javax.ws.rs.ClientErrorException; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -150,21 +150,22 @@ public class SubscriptionController { if (getResult == GetResult.NOT_STORED || getResult == GetResult.PASSWORD_MISMATCH) { throw new NotFoundException(); } - String customerId = getResult.record.customerId; - if (Strings.isNullOrEmpty(customerId)) { - throw new InternalServerErrorException("no customer id found"); - } - return stripeManager.getCustomer(customerId).thenCompose(customer -> { - if (customer == null) { - throw new InternalServerErrorException("no customer record found for id " + customerId); - } - return stripeManager.listNonCanceledSubscriptions(customer); - }).thenCompose(subscriptions -> { - @SuppressWarnings("unchecked") - CompletableFuture[] futures = (CompletableFuture[]) subscriptions.stream() - .map(stripeManager::cancelSubscriptionAtEndOfCurrentPeriod).toArray(CompletableFuture[]::new); - return CompletableFuture.allOf(futures); - }); + return getResult.record.getProcessorCustomer() + .map(processorCustomer -> stripeManager.getCustomer(processorCustomer.customerId()) + .thenCompose(customer -> { + if (customer == null) { + throw new InternalServerErrorException( + "no customer record found for id " + processorCustomer.customerId()); + } + return stripeManager.listNonCanceledSubscriptions(customer); + }).thenCompose(subscriptions -> { + @SuppressWarnings("unchecked") + CompletableFuture[] futures = (CompletableFuture[]) subscriptions.stream() + .map(stripeManager::cancelSubscriptionAtEndOfCurrentPeriod).toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(futures); + })) + // a missing customer ID is OK; it means the subscriber never started to add a payment method + .orElseGet(() -> CompletableFuture.completedFuture(null)); }) .thenCompose(unused -> subscriptionManager.canceledAt(requestData.subscriberUser, requestData.now)) .thenApply(unused -> Response.ok().build()); @@ -222,19 +223,22 @@ public class SubscriptionController { return subscriptionManager.get(requestData.subscriberUser, requestData.hmac) .thenApply(this::requireRecordFromGetResult) .thenCompose(record -> { - final CompletableFuture updatedRecordFuture; - if (record.customerId == null) { - updatedRecordFuture = subscriptionProcessorManager.createCustomer(requestData.subscriberUser) - .thenApply(ProcessorCustomer::customerId) - .thenCompose(customerId -> subscriptionManager.updateProcessorAndCustomerId(record, - new ProcessorCustomer(customerId, - subscriptionProcessorManager.getProcessor()), Instant.now())); - } else { - updatedRecordFuture = CompletableFuture.completedFuture(record); - } + final CompletableFuture updatedRecordFuture = + record.getProcessorCustomer() + .map(ignored -> CompletableFuture.completedFuture(record)) + .orElseGet(() -> subscriptionProcessorManager.createCustomer(requestData.subscriberUser) + .thenApply(ProcessorCustomer::customerId) + .thenCompose(customerId -> subscriptionManager.updateProcessorAndCustomerId(record, + new ProcessorCustomer(customerId, subscriptionProcessorManager.getProcessor()), + Instant.now()))); return updatedRecordFuture.thenCompose( - updatedRecord -> subscriptionProcessorManager.createPaymentMethodSetupToken(updatedRecord.customerId)); + updatedRecord -> { + final String customerId = updatedRecord.getProcessorCustomer() + .orElseThrow(() -> new InternalServerErrorException("record should not be missing customer")) + .customerId(); + return subscriptionProcessorManager.createPaymentMethodSetupToken(customerId); + }); }) .thenApply( token -> Response.ok(new CreatePaymentMethodResponse(token, subscriptionProcessorManager.getProcessor())) @@ -259,10 +263,15 @@ public class SubscriptionController { RequestData requestData = RequestData.process(authenticatedAccount, subscriberId, clock); return subscriptionManager.get(requestData.subscriberUser, requestData.hmac) .thenApply(this::requireRecordFromGetResult) - .thenCompose(record -> stripeManager.setDefaultPaymentMethodForCustomer(record.customerId, paymentMethodId)) + .thenCompose(record -> record.getProcessorCustomer() + .map(processorCustomer -> stripeManager.setDefaultPaymentMethodForCustomer(processorCustomer.customerId(), + paymentMethodId)) + .orElseThrow(() -> + // a missing customer ID indicates the client made requests out of order, + // and needs to call create_payment_method to create a customer for the given payment method + new ClientErrorException(Status.CONFLICT))) .thenApply(customer -> Response.ok().build()); } - public static class SetSubscriptionLevelSuccessResponse { private final long level; @@ -356,15 +365,22 @@ public class SubscriptionController { if (record.subscriptionId == null) { long lastSubscriptionCreatedAt = record.subscriptionCreatedAt != null ? record.subscriptionCreatedAt.getEpochSecond() : 0; - // we don't have one yet so create it and then record the subscription id - // - // this relies on stripe's idempotency key to avoid creating more than one subscription if the client - // retries this request - return stripeManager.createSubscription(record.customerId, priceConfiguration.getId(), level, - lastSubscriptionCreatedAt) - .thenCompose(subscription -> subscriptionManager.subscriptionCreated( - requestData.subscriberUser, subscription.getId(), requestData.now, level) - .thenApply(unused -> subscription)); + + return record.getProcessorCustomer() + .map(processorCustomer -> + // we don't have a subscription yet so create it and then record the subscription id + // + // this relies on stripe's idempotency key to avoid creating more than one subscription if the client + // retries this request + stripeManager.createSubscription(processorCustomer.customerId(), priceConfiguration.getId(), level, + lastSubscriptionCreatedAt) + .thenCompose(subscription -> subscriptionManager.subscriptionCreated( + requestData.subscriberUser, subscription.getId(), requestData.now, level) + .thenApply(unused -> subscription))) + .orElseThrow(() -> + // a missing customer ID indicates the client made requests out of order, + // and needs to call create_payment_method to create a customer for the given payment method + new ClientErrorException(Status.CONFLICT)); } else { // we already have a subscription in our records so let's check the level and change it if needed return stripeManager.getSubscription(record.subscriptionId).thenCompose( diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/SubscriptionManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/SubscriptionManager.java index 333a2bcc7..8fcd7d88b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/SubscriptionManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/SubscriptionManager.java @@ -10,6 +10,7 @@ import static org.whispersystems.textsecuregcm.util.AttributeValues.m; import static org.whispersystems.textsecuregcm.util.AttributeValues.n; import static org.whispersystems.textsecuregcm.util.AttributeValues.s; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; @@ -18,6 +19,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import javax.annotation.Nonnull; @@ -64,8 +66,9 @@ public class SubscriptionManager { public final byte[] user; public final byte[] password; public final Instant createdAt; - public @Nullable String customerId; - public @Nullable SubscriptionProcessor processor; + @VisibleForTesting + @Nullable + ProcessorCustomer processorCustomer; public Map processorsToCustomerIds; public String subscriptionId; public Instant subscriptionCreatedAt; @@ -82,28 +85,28 @@ public class SubscriptionManager { } public static Record from(byte[] user, Map item) { - Record self = new Record( + Record record = new Record( user, item.get(KEY_PASSWORD).b().asByteArray(), getInstant(item, KEY_CREATED_AT)); final Pair processorCustomerId = getProcessorAndCustomer(item); if (processorCustomerId != null) { - self.customerId = processorCustomerId.second(); - self.processor = processorCustomerId.first(); - } else { - // Until all existing data is migrated to KEY_PROCESSOR_ID_CUSTOMER_ID, fall back to KEY_CUSTOMER_ID - self.customerId = getString(item, KEY_CUSTOMER_ID); + record.processorCustomer = new ProcessorCustomer(processorCustomerId.second(), processorCustomerId.first()); } - self.processorsToCustomerIds = getProcessorsToCustomerIds(item); - self.subscriptionId = getString(item, KEY_SUBSCRIPTION_ID); - self.subscriptionCreatedAt = getInstant(item, KEY_SUBSCRIPTION_CREATED_AT); - self.subscriptionLevel = getLong(item, KEY_SUBSCRIPTION_LEVEL); - self.subscriptionLevelChangedAt = getInstant(item, KEY_SUBSCRIPTION_LEVEL_CHANGED_AT); - self.accessedAt = getInstant(item, KEY_ACCESSED_AT); - self.canceledAt = getInstant(item, KEY_CANCELED_AT); - self.currentPeriodEndsAt = getInstant(item, KEY_CURRENT_PERIOD_ENDS_AT); - return self; + record.processorsToCustomerIds = getProcessorsToCustomerIds(item); + record.subscriptionId = getString(item, KEY_SUBSCRIPTION_ID); + record.subscriptionCreatedAt = getInstant(item, KEY_SUBSCRIPTION_CREATED_AT); + record.subscriptionLevel = getLong(item, KEY_SUBSCRIPTION_LEVEL); + record.subscriptionLevelChangedAt = getInstant(item, KEY_SUBSCRIPTION_LEVEL_CHANGED_AT); + record.accessedAt = getInstant(item, KEY_ACCESSED_AT); + record.canceledAt = getInstant(item, KEY_CANCELED_AT); + record.currentPeriodEndsAt = getInstant(item, KEY_CURRENT_PERIOD_ENDS_AT); + return record; + } + + public Optional getProcessorCustomer() { + return Optional.ofNullable(processorCustomer); } private static Map getProcessorsToCustomerIds(Map item) { @@ -314,45 +317,33 @@ public class SubscriptionManager { public CompletableFuture updateProcessorAndCustomerId(Record userRecord, ProcessorCustomer activeProcessorCustomer, Instant updatedAt) { - // Don’t attempt to modify the existing map, since it may be immutable, and we also don’t want to have side effects - final Map allProcessorsAndCustomerIds = new HashMap<>( - userRecord.processorsToCustomerIds); - allProcessorsAndCustomerIds.put(activeProcessorCustomer.processor(), activeProcessorCustomer.customerId()); - UpdateItemRequest request = UpdateItemRequest.builder() .tableName(table) .key(Map.of(KEY_USER, b(userRecord.user))) .returnValues(ReturnValue.ALL_NEW) .conditionExpression( - // there is no customer attribute yet - "attribute_not_exists(#customer_id) " + - // OR this record doesn't have the new processor+customer attributes yet - "OR (#customer_id = :customer_id " + - "AND attribute_not_exists(#processor_customer_id) " + - // TODO once all records are guaranteed to have the map, we can do a more targeted update - // "AND attribute_not_exists(#processors_to_customer_ids.#processor_name) " + - "AND attribute_not_exists(#processors_to_customer_ids))" + // there is no active processor+customer attribute + "attribute_not_exists(#processor_customer_id) " + + // or an attribute in the map with an inactive processor+customer + "AND attribute_not_exists(#processors_to_customer_ids.#processor_name)" ) .updateExpression("SET " + "#customer_id = :customer_id, " + "#processor_customer_id = :processor_customer_id, " - // TODO once all records are guaranteed to have the map, we can do a more targeted update - // + "#processors_to_customer_ids.#processor_name = :customer_id, " - + "#processors_to_customer_ids = :processors_and_customer_ids, " + + "#processors_to_customer_ids.#processor_name = :customer_id, " + "#accessed_at = :accessed_at" ) .expressionAttributeNames(Map.of( "#accessed_at", KEY_ACCESSED_AT, "#customer_id", KEY_CUSTOMER_ID, "#processor_customer_id", KEY_PROCESSOR_ID_CUSTOMER_ID, - // TODO "#processor_name", activeProcessor.name(), + "#processor_name", activeProcessorCustomer.processor().name(), "#processors_to_customer_ids", KEY_PROCESSOR_CUSTOMER_IDS_MAP )) .expressionAttributeValues(Map.of( ":accessed_at", n(updatedAt.getEpochSecond()), ":customer_id", s(activeProcessorCustomer.customerId()), - ":processor_customer_id", b(activeProcessorCustomer.toDynamoBytes()), - ":processors_and_customer_ids", m(createProcessorsToCustomerIdsAttributeMap(allProcessorsAndCustomerIds)) + ":processor_customer_id", b(activeProcessorCustomer.toDynamoBytes()) )).build(); return client.updateItem(request) @@ -367,15 +358,6 @@ public class SubscriptionManager { }); } - private Map createProcessorsToCustomerIdsAttributeMap( - Map allProcessorsAndCustomerIds) { - final Map result = new HashMap<>(); - - allProcessorsAndCustomerIds.forEach((processor, customerId) -> result.put(processor.name(), s(customerId))); - - return result; - } - public CompletableFuture accessedAt(byte[] user, Instant accessedAt) { checkUserLength(user); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/SubscriptionControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/SubscriptionControllerTest.java index 7c7075f41..fa6c383af 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/SubscriptionControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/SubscriptionControllerTest.java @@ -22,6 +22,7 @@ import java.time.Clock; import java.time.Instant; import java.util.Arrays; import java.util.Base64; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -223,9 +224,11 @@ class SubscriptionControllerTest { when(STRIPE_MANAGER.createCustomer(any())) .thenReturn(CompletableFuture.completedFuture(customer)); - final SubscriptionManager.Record recordWithCustomerId = SubscriptionManager.Record.from(record.user, dynamoItem); - recordWithCustomerId.customerId = customerId; - recordWithCustomerId.processorsToCustomerIds.put(SubscriptionProcessor.STRIPE, customerId); + final Map dynamoItemWithProcessorCustomer = new HashMap<>(dynamoItem); + dynamoItemWithProcessorCustomer.put(SubscriptionManager.KEY_PROCESSOR_ID_CUSTOMER_ID, + b(new ProcessorCustomer(customerId, SubscriptionProcessor.STRIPE).toDynamoBytes())); + final SubscriptionManager.Record recordWithCustomerId = SubscriptionManager.Record.from(record.user, + dynamoItemWithProcessorCustomer); when(SUBSCRIPTION_MANAGER.updateProcessorAndCustomerId(any(SubscriptionManager.Record.class), any(), any(Instant.class))) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/SubscriptionManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/SubscriptionManagerTest.java index 760fcf3f7..8f3c5d952 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/SubscriptionManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/SubscriptionManagerTest.java @@ -10,6 +10,7 @@ import static org.whispersystems.textsecuregcm.storage.SubscriptionManager.GetRe import static org.whispersystems.textsecuregcm.storage.SubscriptionManager.GetResult.Type.NOT_STORED; import static org.whispersystems.textsecuregcm.storage.SubscriptionManager.GetResult.Type.PASSWORD_MISMATCH; import static org.whispersystems.textsecuregcm.util.AttributeValues.b; +import static org.whispersystems.textsecuregcm.util.AttributeValues.m; import static org.whispersystems.textsecuregcm.util.AttributeValues.n; import static org.whispersystems.textsecuregcm.util.AttributeValues.s; @@ -19,7 +20,9 @@ import java.time.Instant; import java.util.Arrays; import java.util.Base64; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import javax.annotation.Nonnull; import org.junit.jupiter.api.BeforeEach; @@ -30,6 +33,7 @@ import org.whispersystems.textsecuregcm.storage.SubscriptionManager.Record; import org.whispersystems.textsecuregcm.subscriptions.ProcessorCustomer; import org.whispersystems.textsecuregcm.subscriptions.SubscriptionProcessor; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; @@ -56,6 +60,10 @@ class SubscriptionManagerTest { attributeName(SubscriptionManager.KEY_CUSTOMER_ID). attributeType(ScalarAttributeType.S). build()). + attributeDefinition(AttributeDefinition.builder(). + attributeName(SubscriptionManager.KEY_PROCESSOR_ID_CUSTOMER_ID). + attributeType(ScalarAttributeType.S). + build()). globalSecondaryIndex(GlobalSecondaryIndex.builder(). indexName("c_to_u"). keySchema(KeySchemaElement.builder(). @@ -70,6 +78,20 @@ class SubscriptionManagerTest { writeCapacityUnits(20L). build()). build()). + globalSecondaryIndex(GlobalSecondaryIndex.builder(). + indexName("pc_to_u"). + keySchema(KeySchemaElement.builder(). + attributeName(SubscriptionManager.KEY_PROCESSOR_ID_CUSTOMER_ID). + keyType(KeyType.HASH). + build()). + projection(Projection.builder(). + projectionType(ProjectionType.KEYS_ONLY). + build()). + provisionedThroughput(ProvisionedThroughput.builder(). + readCapacityUnits(20L). + writeCapacityUnits(20L). + build()). + build()). build(); byte[] user; @@ -157,14 +179,16 @@ class SubscriptionManagerTest { assertThat(subscriptionManager.updateProcessorAndCustomerId(userRecord, new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE), subscriptionUpdated)).succeedsWithin(Duration.ofSeconds(3)) - .hasFieldOrPropertyWithValue("customerId", customer) + .hasFieldOrPropertyWithValue("processorCustomer", + Optional.of(new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE))) .hasFieldOrPropertyWithValue("processorsToCustomerIds", Map.of(SubscriptionProcessor.STRIPE, customer)); assertThat( subscriptionManager.updateProcessorAndCustomerId(userRecord, new ProcessorCustomer(customer + "1", SubscriptionProcessor.STRIPE), subscriptionUpdated)).succeedsWithin(Duration.ofSeconds(3)) - .hasFieldOrPropertyWithValue("customerId", customer) + .hasFieldOrPropertyWithValue("processorCustomer", + Optional.of(new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE))) .hasFieldOrPropertyWithValue("processorsToCustomerIds", Map.of(SubscriptionProcessor.STRIPE, customer)); // TODO test new customer ID with new processor does change the customer ID, once there is another processor @@ -254,27 +278,21 @@ class SubscriptionManagerTest { Arrays.fill(hmac, (byte) 2); final String customerId = "abcdef"; - // manually create an existing record, with only KEY_CUSTOMER_ID - dynamoDbExtension.getDynamoDbClient().putItem(p -> - p.tableName(dynamoDbExtension.getTableName()) - .item(Map.of( - SubscriptionManager.KEY_USER, b(user), - SubscriptionManager.KEY_PASSWORD, b(hmac), - SubscriptionManager.KEY_CREATED_AT, n(Instant.now().getEpochSecond()), - SubscriptionManager.KEY_CUSTOMER_ID, s(customerId), - SubscriptionManager.KEY_ACCESSED_AT, n(Instant.now().getEpochSecond()) - )) - ); + assertThat(subscriptionManager.create(user, hmac, Instant.now())) + .succeedsWithin(Duration.ofSeconds(1)); final CompletableFuture firstGetResult = subscriptionManager.get(user, hmac); assertThat(firstGetResult).succeedsWithin(Duration.ofSeconds(1)); final Record firstRecord = firstGetResult.get().record; - assertThat(firstRecord.customerId).isEqualTo(customerId); - assertThat(firstRecord.processor).isNull(); + assertThat(firstRecord.processorCustomer).isNull(); assertThat(firstRecord.processorsToCustomerIds).isEmpty(); + subscriptionManager.updateProcessorAndCustomerId(firstRecord, + new ProcessorCustomer(customerId, SubscriptionProcessor.STRIPE), Instant.now()) + .get(1, TimeUnit.SECONDS); + // Try to update the user to have a different customer ID. This should quietly fail, // and just return the existing customer ID. final CompletableFuture firstUpdate = subscriptionManager.updateProcessorAndCustomerId(firstRecord, @@ -283,7 +301,7 @@ class SubscriptionManagerTest { assertThat(firstUpdate).succeedsWithin(Duration.ofSeconds(1)); - final String firstUpdateCustomerId = firstUpdate.get().customerId; + final String firstUpdateCustomerId = firstUpdate.get().getProcessorCustomer().orElseThrow().customerId(); assertThat(firstUpdateCustomerId).isEqualTo(customerId); // Now update with the existing customer ID. All fields should now be populated. @@ -292,7 +310,7 @@ class SubscriptionManagerTest { assertThat(secondUpdate).succeedsWithin(Duration.ofSeconds(1)); - final String secondUpdateCustomerId = secondUpdate.get().customerId; + final String secondUpdateCustomerId = secondUpdate.get().getProcessorCustomer().orElseThrow().customerId(); assertThat(secondUpdateCustomerId).isEqualTo(customerId); final CompletableFuture secondGetResult = subscriptionManager.get(user, hmac); @@ -300,11 +318,58 @@ class SubscriptionManagerTest { final Record secondRecord = secondGetResult.get().record; - assertThat(secondRecord.customerId).isEqualTo(customerId); - assertThat(secondRecord.processor).isEqualTo(SubscriptionProcessor.STRIPE); + assertThat(secondRecord.getProcessorCustomer()) + .isPresent() + .get() + .isEqualTo(new ProcessorCustomer(customerId, SubscriptionProcessor.STRIPE)); assertThat(secondRecord.processorsToCustomerIds).isEqualTo(Map.of(SubscriptionProcessor.STRIPE, customerId)); } + @Test + void testUpdateEmptyProcessorCustomerWithValueInMap() throws Exception { + // it isn’t possible to create this exact data setup in current code, but this tests the conditional update expression + final Map processorCustomers = Map.of( + SubscriptionProcessor.STRIPE.name(), s(customer) + ); + + final Map dynamoItem = Map.of( + SubscriptionManager.KEY_USER, b(user), + SubscriptionManager.KEY_PASSWORD, b(password), + SubscriptionManager.KEY_PROCESSOR_CUSTOMER_IDS_MAP, m(processorCustomers), + SubscriptionManager.KEY_CREATED_AT, n(created.getEpochSecond()), + SubscriptionManager.KEY_ACCESSED_AT, n(Instant.now().getEpochSecond()) + ); + + dynamoDbExtension.getDynamoDbAsyncClient().putItem(builder -> + builder.tableName(dynamoDbExtension.getTableName()) + .item(dynamoItem) + ).get(1, TimeUnit.SECONDS); + + final CompletableFuture firstGet = subscriptionManager.get(user, password); + + assertThat(firstGet) + .succeedsWithin(Duration.ofSeconds(1)) + .extracting(r -> r.record) + .satisfies(record -> { + assertThat(record.processorCustomer).isNull(); + assertThat(record.processorsToCustomerIds).size().isEqualTo(1); + assertThat(record.processorsToCustomerIds).contains(Map.entry(SubscriptionProcessor.STRIPE, customer)); + }); + + final CompletableFuture update = subscriptionManager.updateProcessorAndCustomerId(firstGet.get().record, + new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE), Instant.now()); + + assertThat(update) + .succeedsWithin(Duration.ofSeconds(1)) + .satisfies(record -> { + // processorCustomer should not have been updated + assertThat(record.processorCustomer).isNull(); + assertThat(record.processorsToCustomerIds).size().isEqualTo(1); + assertThat(record.processorsToCustomerIds).contains(Map.entry(SubscriptionProcessor.STRIPE, customer)); + }); + + } + @Test void testProcessorAndCustomerId() { final ProcessorCustomer processorCustomer = @@ -326,7 +391,8 @@ class SubscriptionManagerTest { assertThat(record).isNotNull(); assertThat(record.user).isEqualTo(user); assertThat(record.password).isEqualTo(password); - assertThat(record.customerId).isNull(); + assertThat(record.processorCustomer).isNull(); + assertThat(record.processorsToCustomerIds).isEmpty(); assertThat(record.createdAt).isEqualTo(created); assertThat(record.subscriptionId).isNull(); assertThat(record.subscriptionCreatedAt).isNull();