diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredLinkedDevicesCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredLinkedDevicesCommand.java index 38b0e6a90..851266814 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredLinkedDevicesCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/RemoveExpiredLinkedDevicesCommand.java @@ -8,9 +8,13 @@ package org.whispersystems.textsecuregcm.workers; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; import io.micrometer.core.instrument.Metrics; +import io.micrometer.shaded.reactor.util.function.Tuple2; import io.micrometer.shaded.reactor.util.function.Tuples; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import net.sourceforge.argparse4j.inf.Subparser; @@ -23,9 +27,12 @@ import reactor.core.publisher.Mono; public class RemoveExpiredLinkedDevicesCommand extends AbstractSinglePassCrawlAccountsCommand { - private static final int MAX_CONCURRENCY = 16; + private static final int DEFAULT_MAX_CONCURRENCY = 16; private static final String DRY_RUN_ARGUMENT = "dry-run"; + private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency"; + private static final String BUFFER_ARGUMENT = "buffer"; + private static final String REMOVED_DEVICES_COUNTER_NAME = name(RemoveExpiredLinkedDevicesCommand.class, "removedDevices"); private static final String UPDATED_ACCOUNTS_COUNTER_NAME = name(RemoveExpiredLinkedDevicesCommand.class, @@ -49,15 +56,36 @@ public class RemoveExpiredLinkedDevicesCommand extends AbstractSinglePassCrawlAc .required(false) .setDefault(true) .help("If true, don’t actually modify accounts with expired linked devices"); + + subparser.addArgument("--max-concurrency") + .type(Integer.class) + .dest(MAX_CONCURRENCY_ARGUMENT) + .setDefault(DEFAULT_MAX_CONCURRENCY) + .help("Max concurrency for DynamoDB operations"); + + subparser.addArgument("--buffer") + .type(Integer.class) + .dest(BUFFER_ARGUMENT) + .setDefault(16_384) + .help("Accounts to buffer"); } @Override protected void crawlAccounts(final Flux accounts) { final boolean dryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT); + final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT); + final int bufferSize = getNamespace().getInt(BUFFER_ARGUMENT); accounts.map(a -> Tuples.of(a, getExpiredLinkedDeviceIds(a.getDevices()))) .filter(accountAndExpiredDevices -> !accountAndExpiredDevices.getT2().isEmpty()) + .buffer(bufferSize) + .map(source -> { + final List>> shuffled = new ArrayList<>(source); + Collections.shuffle(shuffled); + return shuffled; + }) + .flatMapIterable(Function.identity()) .flatMap(accountAndExpiredDevices -> { final Account account = accountAndExpiredDevices.getT1(); final Set expiredDevices = accountAndExpiredDevices.getT2(); @@ -73,7 +101,7 @@ public class RemoveExpiredLinkedDevicesCommand extends AbstractSinglePassCrawlAc return Mono.empty(); }); - }, MAX_CONCURRENCY) + }, maxConcurrency) .doOnNext(removedDevices -> { Metrics.counter(REMOVED_DEVICES_COUNTER_NAME, "dryRun", String.valueOf(dryRun)).increment(removedDevices); Metrics.counter(UPDATED_ACCOUNTS_COUNTER_NAME, "dryRun", String.valueOf(dryRun)).increment();