Add methods for iterating over all account identifiers
This commit is contained in:
parent
b203344ed4
commit
b8b17ae473
|
@ -1242,6 +1242,26 @@ public class Accounts {
|
|||
.sequential();
|
||||
}
|
||||
|
||||
Flux<UUID> getAllAccountIdentifiers(final int segments, final Scheduler scheduler) {
|
||||
if (segments < 1) {
|
||||
throw new IllegalArgumentException("Total number of segments must be positive");
|
||||
}
|
||||
|
||||
return Flux.range(0, segments)
|
||||
.parallel()
|
||||
.runOn(scheduler)
|
||||
.flatMap(segment -> dynamoDbAsyncClient.scanPaginator(ScanRequest.builder()
|
||||
.tableName(accountsTableName)
|
||||
.consistentRead(false)
|
||||
.segment(segment)
|
||||
.totalSegments(segments)
|
||||
.projectionExpression(KEY_ACCOUNT_UUID)
|
||||
.build())
|
||||
.items()
|
||||
.map(item -> AttributeValues.getUUID(item, KEY_ACCOUNT_UUID, null)))
|
||||
.sequential();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private Optional<Account> getByIndirectLookup(
|
||||
final Timer timer,
|
||||
|
|
|
@ -1217,6 +1217,10 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
|||
return accounts.getAll(segments, scheduler);
|
||||
}
|
||||
|
||||
public Flux<UUID> streamAccountIdentifiersFromDynamo(final int segments, final Scheduler scheduler) {
|
||||
return accounts.getAllAccountIdentifiers(segments, scheduler);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> delete(final Account account, final DeletionReason deletionReason) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
|
|
|
@ -30,10 +30,12 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
|
@ -56,6 +58,7 @@ import org.junit.jupiter.params.provider.ValueSource;
|
|||
import org.signal.libsignal.zkgroup.backups.BackupCredentialType;
|
||||
import org.whispersystems.textsecuregcm.auth.UnidentifiedAccessUtil;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
import org.whispersystems.textsecuregcm.identity.IdentityType;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables;
|
||||
import org.whispersystems.textsecuregcm.tests.util.AccountsHelper;
|
||||
import org.whispersystems.textsecuregcm.tests.util.DevicesHelper;
|
||||
|
@ -731,6 +734,22 @@ class AccountsTest {
|
|||
retrievedAccounts.stream().map(Account::getUuid).collect(Collectors.toSet()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetAllAccountIdentifiers() {
|
||||
final Set<UUID> expectedAccountIdentifiers = new HashSet<>();
|
||||
|
||||
for (int i = 1; i <= 100; i++) {
|
||||
final Account account = generateAccount("+1" + String.format("%03d", i), UUID.randomUUID(), UUID.randomUUID());
|
||||
expectedAccountIdentifiers.add(account.getIdentifier(IdentityType.ACI));
|
||||
createAccount(account);
|
||||
}
|
||||
|
||||
@SuppressWarnings("DataFlowIssue") final Set<UUID> retrievedAccountIdentifiers =
|
||||
new HashSet<>(accounts.getAllAccountIdentifiers(2, Schedulers.parallel()).collectList().block());
|
||||
|
||||
assertEquals(expectedAccountIdentifiers, retrievedAccountIdentifiers);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDelete() {
|
||||
final Device deletedDevice = generateDevice(DEVICE_ID_1);
|
||||
|
|
Loading…
Reference in New Issue