From 1f8e4713ef93fba7b90c5be07eb7afbfab38bbcf Mon Sep 17 00:00:00 2001 From: Jonathan Klabunde Tomer <125505367+jkt-signal@users.noreply.github.com> Date: Thu, 17 Aug 2023 13:56:09 -0700 Subject: [PATCH] limit concurrency of async DynamoDB ops --- .../storage/MessagesDynamoDb.java | 6 ++++-- .../storage/SingleUsePreKeyStore.java | 20 +++++++++---------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index ebee30a0b..03782b41f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -233,7 +233,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { .key(Map.of( KEY_PARTITION, partitionKey, KEY_SORT, item.get(KEY_SORT))) - .build()))) + .build())), + DYNAMO_DB_MAX_BATCH_SIZE) .doOnComplete(() -> sample.stop(deleteByAccount)) .then() .toFuture(); @@ -261,7 +262,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { .key(Map.of( KEY_PARTITION, partitionKey, KEY_SORT, item.get(KEY_SORT))) - .build()))) + .build())), + DYNAMO_DB_MAX_BATCH_SIZE) .doOnComplete(() -> sample.stop(deleteByDevice)) .then() .toFuture(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUsePreKeyStore.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUsePreKeyStore.java index f9d815434..9499eb38c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUsePreKeyStore.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUsePreKeyStore.java @@ -6,22 +6,19 @@ package org.whispersystems.textsecuregcm.storage; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; +import static org.whispersystems.textsecuregcm.storage.AbstractDynamoDbStore.DYNAMO_DB_MAX_BATCH_SIZE; -import com.google.common.annotations.VisibleForTesting; -import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; import java.nio.ByteBuffer; import java.time.Duration; -import java.util.Base64; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.StringUtils; import org.whispersystems.textsecuregcm.entities.PreKey; import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.Util; @@ -97,11 +94,14 @@ public abstract class SingleUsePreKeyStore> { public CompletableFuture store(final UUID identifier, final long deviceId, final List preKeys) { final Timer.Sample sample = Timer.start(); - return delete(identifier, deviceId) - .thenCompose(ignored -> CompletableFuture.allOf(preKeys.stream() - .map(preKey -> store(identifier, deviceId, preKey)) - .toList() - .toArray(new CompletableFuture[0]))) + return Mono.fromFuture(delete(identifier, deviceId)) + .thenMany( + Flux.fromIterable(preKeys) + .flatMap( + preKey -> Mono.fromFuture(store(identifier, deviceId, preKey)), + DYNAMO_DB_MAX_BATCH_SIZE)) + .then() + .toFuture() .thenRun(() -> sample.stop(storeKeyBatchTimer)); } @@ -258,7 +258,7 @@ public abstract class SingleUsePreKeyStore> { KEY_DEVICE_ID_KEY_ID, item.get(KEY_DEVICE_ID_KEY_ID) )) .build()) - .flatMap(deleteItemRequest -> Mono.fromFuture(dynamoDbAsyncClient.deleteItem(deleteItemRequest))) + .flatMap(deleteItemRequest -> Mono.fromFuture(dynamoDbAsyncClient.deleteItem(deleteItemRequest)), DYNAMO_DB_MAX_BATCH_SIZE) // Idiom: wait for everything to finish, but discard the results .reduce(0, (a, b) -> 0) .toFuture()