diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/SubscriptionController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/SubscriptionController.java index 58a53df5e..fbb0479c0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/SubscriptionController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/SubscriptionController.java @@ -310,7 +310,7 @@ public class SubscriptionController { .map(ignored -> CompletableFuture.completedFuture(record)) .orElseGet(() -> subscriptionProcessorManager.createCustomer(requestData.subscriberUser) .thenApply(ProcessorCustomer::customerId) - .thenCompose(customerId -> subscriptionManager.updateProcessorAndCustomerId(record, + .thenCompose(customerId -> subscriptionManager.setProcessorAndCustomerId(record, new ProcessorCustomer(customerId, subscriptionProcessorManager.getProcessor()), Instant.now()))); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/SubscriptionManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/SubscriptionManager.java index 2da19847b..1a841bbdd 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/SubscriptionManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/SubscriptionManager.java @@ -15,8 +15,6 @@ import com.google.common.base.Throwables; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.time.Instant; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -24,6 +22,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.ws.rs.ClientErrorException; +import javax.ws.rs.core.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.subscriptions.ProcessorCustomer; @@ -48,6 +48,7 @@ public class SubscriptionManager { public static final String KEY_PASSWORD = "P"; // B public static final String KEY_PROCESSOR_ID_CUSTOMER_ID = "PC"; // B (GSI Hash Key of `pc_to_u` index) public static final String KEY_CREATED_AT = "R"; // N + @Deprecated public static final String KEY_PROCESSOR_CUSTOMER_IDS_MAP = "PCI"; // M public static final String KEY_SUBSCRIPTION_ID = "S"; // S public static final String KEY_SUBSCRIPTION_CREATED_AT = "T"; // N @@ -57,7 +58,7 @@ public class SubscriptionManager { public static final String KEY_CANCELED_AT = "B"; // N public static final String KEY_CURRENT_PERIOD_ENDS_AT = "D"; // N - public static final String INDEX_NAME = "pc_to_u"; // Hash Key "C" + public static final String INDEX_NAME = "pc_to_u"; // Hash Key "PC" public static class Record { @@ -67,7 +68,6 @@ public class SubscriptionManager { @VisibleForTesting @Nullable ProcessorCustomer processorCustomer; - public Map processorsToCustomerIds; public String subscriptionId; public Instant subscriptionCreatedAt; public Long subscriptionLevel; @@ -92,7 +92,6 @@ public class SubscriptionManager { if (processorCustomerId != null) { record.processorCustomer = new ProcessorCustomer(processorCustomerId.second(), processorCustomerId.first()); } - record.processorsToCustomerIds = getProcessorsToCustomerIds(item); record.subscriptionId = getString(item, KEY_SUBSCRIPTION_ID); record.subscriptionCreatedAt = getInstant(item, KEY_SUBSCRIPTION_CREATED_AT); record.subscriptionLevel = getLong(item, KEY_SUBSCRIPTION_LEVEL); @@ -107,18 +106,6 @@ public class SubscriptionManager { return Optional.ofNullable(processorCustomer); } - private static Map getProcessorsToCustomerIds(Map item) { - final AttributeValue attributeValue = item.get(KEY_PROCESSOR_CUSTOMER_IDS_MAP); - final Map attribute = - attributeValue == null ? Collections.emptyMap() : attributeValue.m(); - - final Map processorsToCustomerIds = new HashMap<>(); - attribute.forEach((processorName, customerId) -> - processorsToCustomerIds.put(SubscriptionProcessor.valueOf(processorName), customerId.s())); - - return processorsToCustomerIds; - } - /** * Extracts the active processor and customer from a single attribute value in the given item. *

@@ -308,37 +295,28 @@ public class SubscriptionManager { } /** - * Updates the active processor and customer ID for the given user record. + * Sets the processor and customer ID for the given user record. * - * @return the updated user record. + * @return the user record. */ - public CompletableFuture updateProcessorAndCustomerId(Record userRecord, + public CompletableFuture setProcessorAndCustomerId(Record userRecord, ProcessorCustomer activeProcessorCustomer, Instant updatedAt) { UpdateItemRequest request = UpdateItemRequest.builder() .tableName(table) .key(Map.of(KEY_USER, b(userRecord.user))) .returnValues(ReturnValue.ALL_NEW) - .conditionExpression( - // there is no active processor+customer attribute - "attribute_not_exists(#processor_customer_id) " + - // or an attribute in the map with an inactive processor+customer - "AND attribute_not_exists(#processors_to_customer_ids.#processor_name)" - ) + .conditionExpression("attribute_not_exists(#processor_customer_id)") .updateExpression("SET " + "#processor_customer_id = :processor_customer_id, " - + "#processors_to_customer_ids.#processor_name = :customer_id, " + "#accessed_at = :accessed_at" ) .expressionAttributeNames(Map.of( "#accessed_at", KEY_ACCESSED_AT, - "#processor_customer_id", KEY_PROCESSOR_ID_CUSTOMER_ID, - "#processor_name", activeProcessorCustomer.processor().name(), - "#processors_to_customer_ids", KEY_PROCESSOR_CUSTOMER_IDS_MAP + "#processor_customer_id", KEY_PROCESSOR_ID_CUSTOMER_ID )) .expressionAttributeValues(Map.of( ":accessed_at", n(updatedAt.getEpochSecond()), - ":customer_id", s(activeProcessorCustomer.customerId()), ":processor_customer_id", b(activeProcessorCustomer.toDynamoBytes()) )).build(); @@ -346,8 +324,7 @@ public class SubscriptionManager { .thenApply(updateItemResponse -> Record.from(userRecord.user, updateItemResponse.attributes())) .exceptionallyCompose(throwable -> { if (Throwables.getRootCause(throwable) instanceof ConditionalCheckFailedException) { - return getUser(userRecord.user).thenApply(getItemResponse -> - Record.from(userRecord.user, getItemResponse.item())); + throw new ClientErrorException(Response.Status.CONFLICT); } Throwables.throwIfUnchecked(throwable); throw new CompletionException(throwable); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/SubscriptionControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/SubscriptionControllerTest.java index c4d9efe7f..3592d4725 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/SubscriptionControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/SubscriptionControllerTest.java @@ -416,7 +416,7 @@ class SubscriptionControllerTest { final SubscriptionManager.Record recordWithCustomerId = SubscriptionManager.Record.from(record.user, dynamoItemWithProcessorCustomer); - when(SUBSCRIPTION_MANAGER.updateProcessorAndCustomerId(any(SubscriptionManager.Record.class), any(), + when(SUBSCRIPTION_MANAGER.setProcessorAndCustomerId(any(SubscriptionManager.Record.class), any(), any(Instant.class))) .thenReturn(CompletableFuture.completedFuture(recordWithCustomerId)); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/SubscriptionManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/SubscriptionManagerTest.java index d90f38b1e..0898599e8 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/SubscriptionManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/SubscriptionManagerTest.java @@ -9,22 +9,18 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.whispersystems.textsecuregcm.storage.SubscriptionManager.GetResult.Type.FOUND; import static org.whispersystems.textsecuregcm.storage.SubscriptionManager.GetResult.Type.NOT_STORED; import static org.whispersystems.textsecuregcm.storage.SubscriptionManager.GetResult.Type.PASSWORD_MISMATCH; -import static org.whispersystems.textsecuregcm.util.AttributeValues.b; -import static org.whispersystems.textsecuregcm.util.AttributeValues.m; -import static org.whispersystems.textsecuregcm.util.AttributeValues.n; -import static org.whispersystems.textsecuregcm.util.AttributeValues.s; import java.security.SecureRandom; import java.time.Duration; import java.time.Instant; -import java.util.Arrays; import java.util.Base64; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import javax.annotation.Nonnull; +import javax.ws.rs.ClientErrorException; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -33,7 +29,6 @@ import org.whispersystems.textsecuregcm.storage.SubscriptionManager.Record; import org.whispersystems.textsecuregcm.subscriptions.ProcessorCustomer; import org.whispersystems.textsecuregcm.subscriptions.SubscriptionProcessor; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; -import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; @@ -150,7 +145,7 @@ class SubscriptionManagerTest { } @Test - void testUpdateCustomerIdAndProcessor() throws Exception { + void testSetCustomerIdAndProcessor() throws Exception { Instant subscriptionUpdated = Instant.ofEpochSecond(NOW_EPOCH_SECONDS + 1); assertThat(subscriptionManager.create(user, password, created)).succeedsWithin(Duration.ofSeconds(3)); @@ -158,22 +153,34 @@ class SubscriptionManagerTest { assertThat(getUser).succeedsWithin(Duration.ofSeconds(3)); final Record userRecord = getUser.get().record; - assertThat(subscriptionManager.updateProcessorAndCustomerId(userRecord, + assertThat(subscriptionManager.setProcessorAndCustomerId(userRecord, new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE), subscriptionUpdated)).succeedsWithin(Duration.ofSeconds(3)) .hasFieldOrPropertyWithValue("processorCustomer", - Optional.of(new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE))) - .hasFieldOrPropertyWithValue("processorsToCustomerIds", Map.of(SubscriptionProcessor.STRIPE, customer)); + Optional.of(new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE))); + final Condition clientError409Condition = new Condition<>(e -> + e instanceof ClientErrorException cee && cee.getResponse().getStatus() == 409, "Client error: 409"); + + // changing the customer ID is not permitted assertThat( - subscriptionManager.updateProcessorAndCustomerId(userRecord, + subscriptionManager.setProcessorAndCustomerId(userRecord, new ProcessorCustomer(customer + "1", SubscriptionProcessor.STRIPE), - subscriptionUpdated)).succeedsWithin(Duration.ofSeconds(3)) - .hasFieldOrPropertyWithValue("processorCustomer", - Optional.of(new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE))) - .hasFieldOrPropertyWithValue("processorsToCustomerIds", Map.of(SubscriptionProcessor.STRIPE, customer)); + subscriptionUpdated)).failsWithin(Duration.ofSeconds(3)) + .withThrowableOfType(ExecutionException.class) + .withCauseInstanceOf(ClientErrorException.class) + .extracting(Throwable::getCause) + .satisfies(clientError409Condition); - // TODO test new customer ID with new processor does change the customer ID, once there is another processor + // calling setProcessorAndCustomerId() with the same customer ID is also an error + assertThat( + subscriptionManager.setProcessorAndCustomerId(userRecord, + new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE), + subscriptionUpdated)).failsWithin(Duration.ofSeconds(3)) + .withThrowableOfType(ExecutionException.class) + .withCauseInstanceOf(ClientErrorException.class) + .extracting(Throwable::getCause) + .satisfies(clientError409Condition); assertThat(subscriptionManager.getSubscriberUserByProcessorCustomer( new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE))) @@ -190,7 +197,7 @@ class SubscriptionManagerTest { assertThat(getUser).succeedsWithin(Duration.ofSeconds(3)); final Record userRecord = getUser.get().record; - assertThat(subscriptionManager.updateProcessorAndCustomerId(userRecord, + assertThat(subscriptionManager.setProcessorAndCustomerId(userRecord, new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE), subscriptionUpdated)).succeedsWithin(Duration.ofSeconds(3)); assertThat(subscriptionManager.getSubscriberUserByProcessorCustomer( @@ -253,107 +260,6 @@ class SubscriptionManagerTest { }); } - @Test - void testSubscriptionAddProcessorAttribute() throws Exception { - - final byte[] user = new byte[16]; - Arrays.fill(user, (byte) 1); - final byte[] hmac = new byte[16]; - Arrays.fill(hmac, (byte) 2); - final String customerId = "abcdef"; - - assertThat(subscriptionManager.create(user, hmac, Instant.now())) - .succeedsWithin(Duration.ofSeconds(1)); - - final CompletableFuture firstGetResult = subscriptionManager.get(user, hmac); - assertThat(firstGetResult).succeedsWithin(Duration.ofSeconds(1)); - - final Record firstRecord = firstGetResult.get().record; - - assertThat(firstRecord.processorCustomer).isNull(); - assertThat(firstRecord.processorsToCustomerIds).isEmpty(); - - subscriptionManager.updateProcessorAndCustomerId(firstRecord, - new ProcessorCustomer(customerId, SubscriptionProcessor.STRIPE), Instant.now()) - .get(1, TimeUnit.SECONDS); - - // Try to update the user to have a different customer ID. This should quietly fail, - // and just return the existing customer ID. - final CompletableFuture firstUpdate = subscriptionManager.updateProcessorAndCustomerId(firstRecord, - new ProcessorCustomer(customerId + "something else", SubscriptionProcessor.STRIPE), - Instant.now()); - - assertThat(firstUpdate).succeedsWithin(Duration.ofSeconds(1)); - - final String firstUpdateCustomerId = firstUpdate.get().getProcessorCustomer().orElseThrow().customerId(); - assertThat(firstUpdateCustomerId).isEqualTo(customerId); - - // Now update with the existing customer ID. All fields should now be populated. - final CompletableFuture secondUpdate = subscriptionManager.updateProcessorAndCustomerId(firstRecord, - new ProcessorCustomer(customerId, SubscriptionProcessor.STRIPE), Instant.now()); - - assertThat(secondUpdate).succeedsWithin(Duration.ofSeconds(1)); - - final String secondUpdateCustomerId = secondUpdate.get().getProcessorCustomer().orElseThrow().customerId(); - assertThat(secondUpdateCustomerId).isEqualTo(customerId); - - final CompletableFuture secondGetResult = subscriptionManager.get(user, hmac); - assertThat(secondGetResult).succeedsWithin(Duration.ofSeconds(1)); - - final Record secondRecord = secondGetResult.get().record; - - assertThat(secondRecord.getProcessorCustomer()) - .isPresent() - .get() - .isEqualTo(new ProcessorCustomer(customerId, SubscriptionProcessor.STRIPE)); - assertThat(secondRecord.processorsToCustomerIds).isEqualTo(Map.of(SubscriptionProcessor.STRIPE, customerId)); - } - - @Test - void testUpdateEmptyProcessorCustomerWithValueInMap() throws Exception { - // it isn’t possible to create this exact data setup in current code, but this tests the conditional update expression - final Map processorCustomers = Map.of( - SubscriptionProcessor.STRIPE.name(), s(customer) - ); - - final Map dynamoItem = Map.of( - SubscriptionManager.KEY_USER, b(user), - SubscriptionManager.KEY_PASSWORD, b(password), - SubscriptionManager.KEY_PROCESSOR_CUSTOMER_IDS_MAP, m(processorCustomers), - SubscriptionManager.KEY_CREATED_AT, n(created.getEpochSecond()), - SubscriptionManager.KEY_ACCESSED_AT, n(Instant.now().getEpochSecond()) - ); - - dynamoDbExtension.getDynamoDbAsyncClient().putItem(builder -> - builder.tableName(dynamoDbExtension.getTableName()) - .item(dynamoItem) - ).get(1, TimeUnit.SECONDS); - - final CompletableFuture firstGet = subscriptionManager.get(user, password); - - assertThat(firstGet) - .succeedsWithin(Duration.ofSeconds(1)) - .extracting(r -> r.record) - .satisfies(record -> { - assertThat(record.processorCustomer).isNull(); - assertThat(record.processorsToCustomerIds).size().isEqualTo(1); - assertThat(record.processorsToCustomerIds).contains(Map.entry(SubscriptionProcessor.STRIPE, customer)); - }); - - final CompletableFuture update = subscriptionManager.updateProcessorAndCustomerId(firstGet.get().record, - new ProcessorCustomer(customer, SubscriptionProcessor.STRIPE), Instant.now()); - - assertThat(update) - .succeedsWithin(Duration.ofSeconds(1)) - .satisfies(record -> { - // processorCustomer should not have been updated - assertThat(record.processorCustomer).isNull(); - assertThat(record.processorsToCustomerIds).size().isEqualTo(1); - assertThat(record.processorsToCustomerIds).contains(Map.entry(SubscriptionProcessor.STRIPE, customer)); - }); - - } - @Test void testProcessorAndCustomerId() { final ProcessorCustomer processorCustomer = @@ -376,7 +282,6 @@ class SubscriptionManagerTest { assertThat(record.user).isEqualTo(user); assertThat(record.password).isEqualTo(password); assertThat(record.processorCustomer).isNull(); - assertThat(record.processorsToCustomerIds).isEmpty(); assertThat(record.createdAt).isEqualTo(created); assertThat(record.subscriptionId).isNull(); assertThat(record.subscriptionCreatedAt).isNull();