Parallelize assignment of phone number identifiers

This commit is contained in:
Jon Chambers 2021-11-11 11:13:17 -05:00 committed by Jon Chambers
parent cbdec0cb22
commit 6ada76da7f
1 changed files with 16 additions and 10 deletions

View File

@ -10,6 +10,10 @@ import io.micrometer.core.instrument.Metrics;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
@ -19,6 +23,8 @@ public class AssignPhoneNumberIdentifierCrawlerListener extends AccountDatabaseC
private final AccountsManager accountsManager;
private final PhoneNumberIdentifiers phoneNumberIdentifiers;
private final ExecutorService updateExecutor = Executors.newFixedThreadPool(16);
private static final Counter ASSIGNED_PNI_COUNTER =
Metrics.counter(name(AssignPhoneNumberIdentifierCrawlerListener.class, "assignPni"));
@ -47,20 +53,20 @@ public class AssignPhoneNumberIdentifierCrawlerListener extends AccountDatabaseC
// That means that we don't need to worry about accidentally overwriting a PNI assigned by another source; if an
// account doesn't have a PNI when it winds up in a crawled chunk, there's no danger that it will have one after a
// refresh, and so we can blindly assign a random PNI.
chunkAccounts.stream()
CompletableFuture.allOf(chunkAccounts.stream()
.filter(account -> account.getPhoneNumberIdentifier().isEmpty())
.map(Account::getUuid)
.forEach(accountIdentifier -> {
// We must not update the accounts in the chunk directly; instead, we need to get a fresh copy we're free to
// update as needed.
accountsManager.getByAccountIdentifier(accountIdentifier).ifPresent(accountWithoutPni -> {
final String number = accountWithoutPni.getNumber();
final UUID phoneNumberIdentifier = phoneNumberIdentifiers.getPhoneNumberIdentifier(number);
.map(accountsManager::getByAccountIdentifier)
.filter(Optional::isPresent)
.map(Optional::get)
.map(accountWithoutPni -> CompletableFuture.runAsync(() -> {
final String number = accountWithoutPni.getNumber();
final UUID phoneNumberIdentifier = phoneNumberIdentifiers.getPhoneNumberIdentifier(number);
accountsManager.update(accountWithoutPni, a -> a.setNumber(number, phoneNumberIdentifier));
});
accountsManager.update(accountWithoutPni, a -> a.setNumber(number, phoneNumberIdentifier));
ASSIGNED_PNI_COUNTER.increment();
});
}, updateExecutor))
.toArray(CompletableFuture[]::new)).join();
}
}