From 6ada76da7f83c015fe1dab039872d12d09e40116 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 11 Nov 2021 11:13:17 -0500 Subject: [PATCH] Parallelize assignment of phone number identifiers --- ...nPhoneNumberIdentifierCrawlerListener.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AssignPhoneNumberIdentifierCrawlerListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AssignPhoneNumberIdentifierCrawlerListener.java index ebb48695e..4490dc827 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AssignPhoneNumberIdentifierCrawlerListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AssignPhoneNumberIdentifierCrawlerListener.java @@ -10,6 +10,10 @@ import io.micrometer.core.instrument.Metrics; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; @@ -19,6 +23,8 @@ public class AssignPhoneNumberIdentifierCrawlerListener extends AccountDatabaseC private final AccountsManager accountsManager; private final PhoneNumberIdentifiers phoneNumberIdentifiers; + private final ExecutorService updateExecutor = Executors.newFixedThreadPool(16); + private static final Counter ASSIGNED_PNI_COUNTER = Metrics.counter(name(AssignPhoneNumberIdentifierCrawlerListener.class, "assignPni")); @@ -47,20 +53,20 @@ public class AssignPhoneNumberIdentifierCrawlerListener extends AccountDatabaseC // That means that we don't need to worry about accidentally overwriting a PNI assigned by another source; if an // account doesn't have a PNI when it winds up in a crawled chunk, there's no danger that it will have one after a // refresh, and so we can blindly assign a random PNI. - chunkAccounts.stream() + CompletableFuture.allOf(chunkAccounts.stream() .filter(account -> account.getPhoneNumberIdentifier().isEmpty()) .map(Account::getUuid) - .forEach(accountIdentifier -> { - // We must not update the accounts in the chunk directly; instead, we need to get a fresh copy we're free to - // update as needed. - accountsManager.getByAccountIdentifier(accountIdentifier).ifPresent(accountWithoutPni -> { - final String number = accountWithoutPni.getNumber(); - final UUID phoneNumberIdentifier = phoneNumberIdentifiers.getPhoneNumberIdentifier(number); + .map(accountsManager::getByAccountIdentifier) + .filter(Optional::isPresent) + .map(Optional::get) + .map(accountWithoutPni -> CompletableFuture.runAsync(() -> { + final String number = accountWithoutPni.getNumber(); + final UUID phoneNumberIdentifier = phoneNumberIdentifiers.getPhoneNumberIdentifier(number); - accountsManager.update(accountWithoutPni, a -> a.setNumber(number, phoneNumberIdentifier)); - }); + accountsManager.update(accountWithoutPni, a -> a.setNumber(number, phoneNumberIdentifier)); ASSIGNED_PNI_COUNTER.increment(); - }); + }, updateExecutor)) + .toArray(CompletableFuture[]::new)).join(); } }