From b8b17ae47372af80b3264f67d964d63f282034a3 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Tue, 18 Feb 2025 09:19:14 -0500 Subject: [PATCH] Add methods for iterating over all account identifiers --- .../textsecuregcm/storage/Accounts.java | 20 +++++++++++++++++++ .../storage/AccountsManager.java | 4 ++++ .../textsecuregcm/storage/AccountsTest.java | 19 ++++++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java index c55eb8594..0fc4bdcf8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java @@ -1242,6 +1242,26 @@ public class Accounts { .sequential(); } + Flux getAllAccountIdentifiers(final int segments, final Scheduler scheduler) { + if (segments < 1) { + throw new IllegalArgumentException("Total number of segments must be positive"); + } + + return Flux.range(0, segments) + .parallel() + .runOn(scheduler) + .flatMap(segment -> dynamoDbAsyncClient.scanPaginator(ScanRequest.builder() + .tableName(accountsTableName) + .consistentRead(false) + .segment(segment) + .totalSegments(segments) + .projectionExpression(KEY_ACCOUNT_UUID) + .build()) + .items() + .map(item -> AttributeValues.getUUID(item, KEY_ACCOUNT_UUID, null))) + .sequential(); + } + @Nonnull private Optional getByIndirectLookup( final Timer timer, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 7fd6274d1..d7481448e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -1217,6 +1217,10 @@ public class AccountsManager extends RedisPubSubAdapter implemen return accounts.getAll(segments, scheduler); } + public Flux streamAccountIdentifiersFromDynamo(final int segments, final Scheduler scheduler) { + return accounts.getAllAccountIdentifiers(segments, scheduler); + } + public CompletableFuture delete(final Account account, final DeletionReason deletionReason) { final Timer.Sample sample = Timer.start(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java index 91f7d4b56..ad1a1e38d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsTest.java @@ -30,10 +30,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -56,6 +58,7 @@ import org.junit.jupiter.params.provider.ValueSource; import org.signal.libsignal.zkgroup.backups.BackupCredentialType; import org.whispersystems.textsecuregcm.auth.UnidentifiedAccessUtil; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.tests.util.AccountsHelper; import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; @@ -731,6 +734,22 @@ class AccountsTest { retrievedAccounts.stream().map(Account::getUuid).collect(Collectors.toSet())); } + @Test + void testGetAllAccountIdentifiers() { + final Set expectedAccountIdentifiers = new HashSet<>(); + + for (int i = 1; i <= 100; i++) { + final Account account = generateAccount("+1" + String.format("%03d", i), UUID.randomUUID(), UUID.randomUUID()); + expectedAccountIdentifiers.add(account.getIdentifier(IdentityType.ACI)); + createAccount(account); + } + + @SuppressWarnings("DataFlowIssue") final Set retrievedAccountIdentifiers = + new HashSet<>(accounts.getAllAccountIdentifiers(2, Schedulers.parallel()).collectList().block()); + + assertEquals(expectedAccountIdentifiers, retrievedAccountIdentifiers); + } + @Test void testDelete() { final Device deletedDevice = generateDevice(DEVICE_ID_1);