diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 5294b702c..f10a22ec9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -267,7 +267,6 @@ import org.whispersystems.textsecuregcm.workers.CertificateCommand; import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory; -import org.whispersystems.textsecuregcm.workers.IssuedReceiptMigrationCommand; import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand; import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand; import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand; @@ -333,7 +332,6 @@ public class WhisperServerService extends Application migrateToTagSet(final IssuedReceipt issuedReceipt) { - UpdateItemRequest updateItemRequest = UpdateItemRequest.builder() - .tableName(table) - .key(Map.of(KEY_PROCESSOR_ITEM_ID, s(issuedReceipt.itemId()))) - .conditionExpression("attribute_exists(#key) AND #tag = :tag") - .returnValues(ReturnValue.NONE) - .updateExpression("ADD #tags :singletonTag") - .expressionAttributeNames(Map.of( - "#key", KEY_PROCESSOR_ITEM_ID, - "#tag", KEY_ISSUED_RECEIPT_TAG, - "#tags", KEY_ISSUED_RECEIPT_TAG_SET)) - .expressionAttributeValues(Map.of( - ":tag", b(issuedReceipt.tag()), - ":singletonTag", AttributeValue.fromBs(Collections.singletonList(SdkBytes.fromByteArray(issuedReceipt.tag()))))) - .build(); - return dynamoDbAsyncClient.updateItem(updateItemRequest) - .thenRun(Util.NOOP) - .exceptionally(ExceptionUtils.exceptionallyHandler(ConditionalCheckFailedException.class, e -> { - log.info("Not migrating item {}, because when we tried to migrate it was already deleted", issuedReceipt.itemId()); - return null; - })); - } - - public record IssuedReceipt(String itemId, byte[] tag) {} - public Flux receiptsWithoutTagSet(final int segments, final Scheduler scheduler) { - if (segments < 1) { - throw new IllegalArgumentException("Total number of segments must be positive"); - } - - return Flux.range(0, segments) - .parallel() - .runOn(scheduler) - .flatMap(segment -> dynamoDbAsyncClient.scanPaginator(ScanRequest.builder() - .tableName(table) - .consistentRead(true) - .segment(segment) - .totalSegments(segments) - .filterExpression("attribute_not_exists(#tags)") - .expressionAttributeNames(Map.of("#tags", KEY_ISSUED_RECEIPT_TAG_SET)) - .build()) - .items() - .flatMapIterable(item -> { - if (!item.containsKey(KEY_ISSUED_RECEIPT_TAG)) { - log.error("Skipping item {} that was missing a receipt tag", item.get(KEY_PROCESSOR_ITEM_ID).s()); - return Collections.emptySet(); - } - return List.of(new IssuedReceipt(item.get(KEY_PROCESSOR_ITEM_ID).s(), item.get(KEY_ISSUED_RECEIPT_TAG).b().asByteArray())); - })) - .sequential(); - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/IssuedReceiptMigrationCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/IssuedReceiptMigrationCommand.java deleted file mode 100644 index 02118a7ed..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/IssuedReceiptMigrationCommand.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright 2024 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.workers; - -import io.dropwizard.core.Application; -import io.dropwizard.core.setup.Environment; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.Metrics; -import net.sourceforge.argparse4j.inf.Namespace; -import net.sourceforge.argparse4j.inf.Subparser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.WhisperServerConfiguration; -import org.whispersystems.textsecuregcm.metrics.MetricsUtil; -import org.whispersystems.textsecuregcm.storage.IssuedReceiptsManager; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -import java.time.Clock; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -public class IssuedReceiptMigrationCommand extends AbstractCommandWithDependencies { - - private final Logger logger = LoggerFactory.getLogger(getClass()); - - private static final String SEGMENT_COUNT_ARGUMENT = "segments"; - private static final String DRY_RUN_ARGUMENT = "dry-run"; - private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency"; - private static final String BUFFER_ARGUMENT = "buffer"; - - private static final String INSPECTED_ISSUED_RECEIPTS = MetricsUtil.name(IssuedReceiptMigrationCommand.class, - "inspectedIssuedReceipts"); - private static final String MIGRATED_ISSUED_RECEIPTS = MetricsUtil.name(IssuedReceiptMigrationCommand.class, - "migratedIssuedReceipts"); - - public IssuedReceiptMigrationCommand() { - super(new Application<>() { - @Override - public void run(final WhisperServerConfiguration configuration, final Environment environment) { - } - }, "migrate-issued-receipts", "Migrates columns in the issued receipts table"); - } - - @Override - public void configure(final Subparser subparser) { - super.configure(subparser); - - subparser.addArgument("--segments") - .type(Integer.class) - .dest(SEGMENT_COUNT_ARGUMENT) - .required(false) - .setDefault(1) - .help("The total number of segments for a DynamoDB scan"); - - subparser.addArgument("--max-concurrency") - .type(Integer.class) - .dest(MAX_CONCURRENCY_ARGUMENT) - .required(false) - .setDefault(16) - .help("Max concurrency for migration operations"); - - subparser.addArgument("--dry-run") - .type(Boolean.class) - .dest(DRY_RUN_ARGUMENT) - .required(false) - .setDefault(true) - .help("If true, don’t actually perform migration"); - - subparser.addArgument("--buffer") - .type(Integer.class) - .dest(BUFFER_ARGUMENT) - .setDefault(16_384) - .help("Records to buffer"); - } - - @Override - protected void run(final Environment environment, final Namespace namespace, - final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception { - final int bufferSize = namespace.getInt(BUFFER_ARGUMENT); - final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT_ARGUMENT)); - final int concurrency = Objects.requireNonNull(namespace.getInt(MAX_CONCURRENCY_ARGUMENT)); - final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT); - - logger.info("Crawling issuedReceipts with {} segments and {} processors", - segments, - Runtime.getRuntime().availableProcessors()); - - final Counter inspected = Metrics.counter(INSPECTED_ISSUED_RECEIPTS, - "dryRun", Boolean.toString(dryRun)); - final Counter migrated = Metrics.counter(MIGRATED_ISSUED_RECEIPTS, - "dryRun", Boolean.toString(dryRun)); - - final IssuedReceiptsManager issuedReceiptsManager = commandDependencies.issuedReceiptsManager(); - final Flux receipts = - issuedReceiptsManager.receiptsWithoutTagSet(segments, Schedulers.parallel()); - final long count = bufferShuffle(receipts, bufferSize) - .doOnNext(issuedReceipt -> inspected.increment()) - .flatMap(issuedReceipt -> Mono - .fromCompletionStage(() -> dryRun - ? CompletableFuture.completedFuture(null) - : issuedReceiptsManager.migrateToTagSet(issuedReceipt)) - .thenReturn(true) - .retry(3) - .onErrorResume(throwable -> { - logger.error("Failed to migrate {} after 3 attempts, giving up", issuedReceipt.itemId(), throwable); - return Mono.just(false); - }), - concurrency) - .doOnNext(success -> - Metrics.counter(MIGRATED_ISSUED_RECEIPTS, - "dryRun", Boolean.toString(dryRun), - "success", Boolean.toString(success))) - .count() - .block(); - logger.info("Attempted to migrate {} issued receipts", count); - } - - private static Flux bufferShuffle(Flux f, int bufferSize) { - return f.buffer(bufferSize) - .map(source -> { - final ArrayList shuffled = new ArrayList<>(source); - Collections.shuffle(shuffled); - return shuffled; - }) - .limitRate(2) - .flatMapIterable(Function.identity()); - } -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/IssuedReceiptsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/IssuedReceiptsManagerTest.java index 658fb62fe..650402b1b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/IssuedReceiptsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/IssuedReceiptsManagerTest.java @@ -12,13 +12,10 @@ import static org.mockito.Mockito.when; import jakarta.ws.rs.ClientErrorException; import java.time.Duration; import java.time.Instant; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; import org.assertj.core.api.Condition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -28,13 +25,11 @@ import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.subscriptions.PaymentProvider; import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.TestRandomUtil; -import reactor.core.scheduler.Schedulers; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; -import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; class IssuedReceiptsManagerTest { @@ -93,74 +88,6 @@ class IssuedReceiptsManagerTest { assertThat(future).succeedsWithin(Duration.ofSeconds(3)); } - @Test - void testMigrateToTagSet() { - Instant now = Instant.ofEpochSecond(NOW_EPOCH_SECONDS); - - issuedReceiptsManager - .recordIssuance("itemId", PaymentProvider.STRIPE, randomReceiptCredentialRequest(), now) - .join(); - removeTagSet("itemId"); - - assertThat(getItem("itemId").item()).doesNotContainKey(IssuedReceiptsManager.KEY_ISSUED_RECEIPT_TAG_SET); - - final IssuedReceiptsManager.IssuedReceipt issuedReceipt = issuedReceiptsManager - .receiptsWithoutTagSet(1, Schedulers.immediate()) - .blockFirst(); - - issuedReceiptsManager.migrateToTagSet(issuedReceipt).join(); - - final Map item = getItem("itemId").item(); - assertThat(item) - .containsKey(IssuedReceiptsManager.KEY_ISSUED_RECEIPT_TAG_SET) - .containsKey(IssuedReceiptsManager.KEY_ISSUED_RECEIPT_TAG); - - final List tags = item - .get(IssuedReceiptsManager.KEY_ISSUED_RECEIPT_TAG_SET).bs() - .stream() - .map(SdkBytes::asByteArray) - .toList(); - assertThat(tags).hasSize(1); - - final byte[] tag = item.get(IssuedReceiptsManager.KEY_ISSUED_RECEIPT_TAG).b().asByteArray(); - assertThat(tags).first().isEqualTo(tag); - } - - - @Test - void testReceiptsWithoutTagSet() { - Instant now = Instant.ofEpochSecond(NOW_EPOCH_SECONDS); - - final int numItems = 100; - final List expectedNoTagSet = IntStream.range(0, numItems) - .boxed() - .flatMap(i -> { - final String itemId = "item-%s".formatted(i); - issuedReceiptsManager.recordIssuance(itemId, PaymentProvider.STRIPE, randomReceiptCredentialRequest(), now).join(); - - if (i % 2 == 0) { - removeTagSet(itemId); - return Stream.of(itemId); - } else { - return Stream.empty(); - } - }).toList(); - final List items = issuedReceiptsManager - .receiptsWithoutTagSet(1, Schedulers.immediate()) - .map(IssuedReceiptsManager.IssuedReceipt::itemId) - .collectList().block(); - assertThat(items).hasSize(numItems / 2); - assertThat(items).containsExactlyInAnyOrderElementsOf(expectedNoTagSet); - } - - @Test - void testMigrateAfterRecordExpires() { - final IssuedReceiptsManager.IssuedReceipt issued = new IssuedReceiptsManager.IssuedReceipt("itemId", - TestRandomUtil.nextBytes(32)); - // We should succeed but do nothing if the item is deleted by the time we try to migrate it - issuedReceiptsManager.migrateToTagSet(issued).join(); - assertThat(getItem("itemId").hasItem()).isFalse(); - } private GetItemResponse getItem(final String itemId) { final DynamoDbClient client = DYNAMO_DB_EXTENSION.getDynamoDbClient(); @@ -176,15 +103,4 @@ class IssuedReceiptsManagerTest { when(request.serialize()).thenReturn(bytes); return request; } - - private void removeTagSet(final String itemId) { - final DynamoDbClient client = DYNAMO_DB_EXTENSION.getDynamoDbClient(); - // Simulate an entry that was written before we wrote the tag set field - client.updateItem(UpdateItemRequest.builder() - .tableName(Tables.ISSUED_RECEIPTS.tableName()) - .key(Map.of(IssuedReceiptsManager.KEY_PROCESSOR_ITEM_ID, AttributeValues.s(itemId))) - .updateExpression("REMOVE #tags") - .expressionAttributeNames(Map.of("#tags", IssuedReceiptsManager.KEY_ISSUED_RECEIPT_TAG_SET)) - .build()); - } }