limit concurrency of async DynamoDB ops
This commit is contained in:
parent
ff9fe2c1be
commit
1f8e4713ef
|
@ -233,7 +233,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
|||
.key(Map.of(
|
||||
KEY_PARTITION, partitionKey,
|
||||
KEY_SORT, item.get(KEY_SORT)))
|
||||
.build())))
|
||||
.build())),
|
||||
DYNAMO_DB_MAX_BATCH_SIZE)
|
||||
.doOnComplete(() -> sample.stop(deleteByAccount))
|
||||
.then()
|
||||
.toFuture();
|
||||
|
@ -261,7 +262,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
|||
.key(Map.of(
|
||||
KEY_PARTITION, partitionKey,
|
||||
KEY_SORT, item.get(KEY_SORT)))
|
||||
.build())))
|
||||
.build())),
|
||||
DYNAMO_DB_MAX_BATCH_SIZE)
|
||||
.doOnComplete(() -> sample.stop(deleteByDevice))
|
||||
.then()
|
||||
.toFuture();
|
||||
|
|
|
@ -6,22 +6,19 @@
|
|||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||
import static org.whispersystems.textsecuregcm.storage.AbstractDynamoDbStore.DYNAMO_DB_MAX_BATCH_SIZE;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.DistributionSummary;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Duration;
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.whispersystems.textsecuregcm.entities.PreKey;
|
||||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
@ -97,11 +94,14 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
|||
public CompletableFuture<Void> store(final UUID identifier, final long deviceId, final List<K> preKeys) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return delete(identifier, deviceId)
|
||||
.thenCompose(ignored -> CompletableFuture.allOf(preKeys.stream()
|
||||
.map(preKey -> store(identifier, deviceId, preKey))
|
||||
.toList()
|
||||
.toArray(new CompletableFuture[0])))
|
||||
return Mono.fromFuture(delete(identifier, deviceId))
|
||||
.thenMany(
|
||||
Flux.fromIterable(preKeys)
|
||||
.flatMap(
|
||||
preKey -> Mono.fromFuture(store(identifier, deviceId, preKey)),
|
||||
DYNAMO_DB_MAX_BATCH_SIZE))
|
||||
.then()
|
||||
.toFuture()
|
||||
.thenRun(() -> sample.stop(storeKeyBatchTimer));
|
||||
}
|
||||
|
||||
|
@ -258,7 +258,7 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
|
|||
KEY_DEVICE_ID_KEY_ID, item.get(KEY_DEVICE_ID_KEY_ID)
|
||||
))
|
||||
.build())
|
||||
.flatMap(deleteItemRequest -> Mono.fromFuture(dynamoDbAsyncClient.deleteItem(deleteItemRequest)))
|
||||
.flatMap(deleteItemRequest -> Mono.fromFuture(dynamoDbAsyncClient.deleteItem(deleteItemRequest)), DYNAMO_DB_MAX_BATCH_SIZE)
|
||||
// Idiom: wait for everything to finish, but discard the results
|
||||
.reduce(0, (a, b) -> 0)
|
||||
.toFuture()
|
||||
|
|
Loading…
Reference in New Issue