Limit concurrency when writing signed EC pre-keys
This commit is contained in:
parent
5ced86af1d
commit
25ea1df299
|
@ -24,6 +24,12 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun
|
||||||
private static final String STORE_KEY_ATTEMPT_COUNTER_NAME =
|
private static final String STORE_KEY_ATTEMPT_COUNTER_NAME =
|
||||||
MetricsUtil.name(MigrateSignedECPreKeysCommand.class, "storeKeyAttempt");
|
MetricsUtil.name(MigrateSignedECPreKeysCommand.class, "storeKeyAttempt");
|
||||||
|
|
||||||
|
// It's tricky to find, but this is the default connection count for the AWS SDK's async DynamoDB client. We'll have
|
||||||
|
// multiple workers using the max number of connections, but that's okay because the client allows for (by default)
|
||||||
|
// 10,000 pending requests. As long as we don't have more than 200(!) workers, we'll be fine with this concurrency
|
||||||
|
// level.
|
||||||
|
private static final int MAX_CONCURRENCY = 50;
|
||||||
|
|
||||||
public MigrateSignedECPreKeysCommand() {
|
public MigrateSignedECPreKeysCommand() {
|
||||||
super("migrate-signed-ec-pre-keys", "Migrate signed EC pre-keys from Account records to a dedicated table");
|
super("migrate-signed-ec-pre-keys", "Migrate signed EC pre-keys from Account records to a dedicated table");
|
||||||
}
|
}
|
||||||
|
@ -47,7 +53,7 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun
|
||||||
return Flux.fromIterable(keys);
|
return Flux.fromIterable(keys);
|
||||||
}))
|
}))
|
||||||
.flatMap(keyTuple -> Mono.fromFuture(
|
.flatMap(keyTuple -> Mono.fromFuture(
|
||||||
keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3())))
|
keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3())), false, MAX_CONCURRENCY)
|
||||||
.doOnNext(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment())
|
.doOnNext(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment())
|
||||||
.then()
|
.then()
|
||||||
.block();
|
.block();
|
||||||
|
|
Loading…
Reference in New Issue