From 25ea1df29950ac501ef05ffe85bdbf8022a0572a Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 6 Jul 2023 13:47:48 -0400 Subject: [PATCH] Limit concurrency when writing signed EC pre-keys --- .../workers/MigrateSignedECPreKeysCommand.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java index 536f2693b..877e8d857 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java @@ -24,6 +24,12 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun private static final String STORE_KEY_ATTEMPT_COUNTER_NAME = MetricsUtil.name(MigrateSignedECPreKeysCommand.class, "storeKeyAttempt"); + // It's tricky to find, but this is the default connection count for the AWS SDK's async DynamoDB client. We'll have + // multiple workers using the max number of connections, but that's okay because the client allows for (by default) + // 10,000 pending requests. As long as we don't have more than 200(!) workers, we'll be fine with this concurrency + // level. + private static final int MAX_CONCURRENCY = 50; + public MigrateSignedECPreKeysCommand() { super("migrate-signed-ec-pre-keys", "Migrate signed EC pre-keys from Account records to a dedicated table"); } @@ -47,7 +53,7 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun return Flux.fromIterable(keys); })) .flatMap(keyTuple -> Mono.fromFuture( - keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3()))) + keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3())), false, MAX_CONCURRENCY) .doOnNext(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment()) .then() .block();