diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 3391d3a3b..93292ce03 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -482,7 +482,7 @@ public class WhisperServerService extends Application accounts; + @Nullable + private final UUID lastUuid; + + public AccountCrawlChunk(final List accounts, @Nullable final UUID lastUuid) { + this.accounts = accounts; + this.lastUuid = lastUuid; + } + + public List getAccounts() { + return accounts; + } + + public Optional getLastUuid() { + return Optional.ofNullable(lastUuid); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java index f643c3648..96eead71e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java @@ -4,22 +4,21 @@ */ package org.whispersystems.textsecuregcm.storage; +import static com.codahale.metrics.MetricRegistry.name; + import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.Util; - +import io.dropwizard.lifecycle.Managed; import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; - -import static com.codahale.metrics.MetricRegistry.name; -import io.dropwizard.lifecycle.Managed; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.util.Constants; +import org.whispersystems.textsecuregcm.util.Util; @SuppressWarnings("OptionalUsedAsFieldOrParameterType") public class AccountDatabaseCrawler implements Managed, Runnable { @@ -38,6 +37,8 @@ public class AccountDatabaseCrawler implements Managed, Runnable { private final AccountDatabaseCrawlerCache cache; private final List listeners; + private final DynamicConfigurationManager dynamicConfigurationManager; + private AtomicBoolean running = new AtomicBoolean(false); private boolean finished; @@ -45,7 +46,8 @@ public class AccountDatabaseCrawler implements Managed, Runnable { AccountDatabaseCrawlerCache cache, List listeners, int chunkSize, - long chunkIntervalMs) + long chunkIntervalMs, + DynamicConfigurationManager dynamicConfigurationManager) { this.accounts = accounts; this.chunkSize = chunkSize; @@ -53,6 +55,8 @@ public class AccountDatabaseCrawler implements Managed, Runnable { this.workerId = UUID.randomUUID().toString(); this.cache = cache; this.listeners = listeners; + + this.dynamicConfigurationManager = dynamicConfigurationManager; } @Override @@ -93,14 +97,15 @@ public class AccountDatabaseCrawler implements Managed, Runnable { @VisibleForTesting public boolean doPeriodicWork() { if (cache.claimActiveWork(workerId, WORKER_TTL_MS)) { + try { - long startTimeMs = System.currentTimeMillis(); + final long startTimeMs = System.currentTimeMillis(); processChunk(); if (cache.isAccelerated()) { return true; } - long endTimeMs = System.currentTimeMillis(); - long sleepIntervalMs = chunkIntervalMs - (endTimeMs - startTimeMs); + final long endTimeMs = System.currentTimeMillis(); + final long sleepIntervalMs = chunkIntervalMs - (endTimeMs - startTimeMs); if (sleepIntervalMs > 0) sleepWhileRunning(sleepIntervalMs); } finally { cache.releaseActiveWork(workerId); @@ -110,42 +115,67 @@ public class AccountDatabaseCrawler implements Managed, Runnable { } private void processChunk() { - Optional fromUuid = cache.getLastUuid(); + final boolean useDynamo = dynamicConfigurationManager.getConfiguration() + .getAccountsDynamoDbMigrationConfiguration() + .isDynamoCrawlerEnabled(); - if (!fromUuid.isPresent()) { + listeners.stream().filter(listener -> listener instanceof DirectoryReconciler) + .forEach(reconciler -> ((DirectoryReconciler) reconciler).setUseV3Endpoints(useDynamo)); + + final Optional fromUuid = getLastUuid(useDynamo); + + if (fromUuid.isEmpty()) { listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart); } - List chunkAccounts = readChunk(fromUuid, chunkSize); + final AccountCrawlChunk chunkAccounts = readChunk(fromUuid, chunkSize, useDynamo); - if (chunkAccounts.isEmpty()) { + if (chunkAccounts.getAccounts().isEmpty()) { logger.info("Finished crawl"); listeners.forEach(listener -> listener.onCrawlEnd(fromUuid)); - cache.setLastUuid(Optional.empty()); + cacheLastUuid(Optional.empty(), useDynamo); cache.setAccelerated(false); } else { try { for (AccountDatabaseCrawlerListener listener : listeners) { - listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts); + listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts.getAccounts()); } - cache.setLastUuid(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getUuid())); + cacheLastUuid(chunkAccounts.getLastUuid(), useDynamo); } catch (AccountDatabaseCrawlerRestartException e) { - cache.setLastUuid(Optional.empty()); + cacheLastUuid(Optional.empty(), useDynamo); cache.setAccelerated(false); } - } - } - private List readChunk(Optional fromUuid, int chunkSize) { + private AccountCrawlChunk readChunk(Optional fromUuid, int chunkSize, boolean useDynamo) { try (Timer.Context timer = readChunkTimer.time()) { if (fromUuid.isPresent()) { - return accounts.getAllFrom(fromUuid.get(), chunkSize); + return useDynamo + ? accounts.getAllFromDynamo(fromUuid.get(), chunkSize) + : accounts.getAllFrom(fromUuid.get(), chunkSize); } - return accounts.getAllFrom(chunkSize); + return useDynamo + ? accounts.getAllFromDynamo(chunkSize) + : accounts.getAllFrom(chunkSize); + } + } + + private Optional getLastUuid(final boolean useDynamo) { + if (useDynamo) { + return cache.getLastUuidDynamo(); + } else { + return cache.getLastUuid(); + } + } + + private void cacheLastUuid(final Optional lastUuid, final boolean useDynamo) { + if (useDynamo) { + cache.setLastUuidDynamo(lastUuid); + } else { + cache.setLastUuid(lastUuid); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java index 18740a1e9..c81fc449e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java @@ -21,6 +21,8 @@ public class AccountDatabaseCrawlerCache { private static final String LAST_UUID_KEY = "account_database_crawler_cache_last_uuid"; private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate"; + private static final String LAST_UUID_DYNAMO_KEY = "account_database_crawler_cache_last_uuid_dynamo"; + private static final long LAST_NUMBER_TTL_MS = 86400_000L; private final FaultTolerantRedisCluster cacheCluster; @@ -66,4 +68,19 @@ public class AccountDatabaseCrawlerCache { } } + public Optional getLastUuidDynamo() { + final String lastUuidString = cacheCluster.withCluster(connection -> connection.sync().get(LAST_UUID_DYNAMO_KEY)); + + if (lastUuidString == null) return Optional.empty(); + else return Optional.of(UUID.fromString(lastUuidString)); + } + + public void setLastUuidDynamo(Optional lastUuid) { + if (lastUuid.isPresent()) { + cacheCluster.useCluster(connection -> connection.sync().psetex(LAST_UUID_DYNAMO_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); + } else { + cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_DYNAMO_KEY)); + } + } + } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java index 5ff498a2f..a8a8a1fa8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java @@ -9,6 +9,7 @@ import static com.codahale.metrics.MetricRegistry.name; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; +import com.codahale.metrics.Timer.Context; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; @@ -103,20 +104,21 @@ public class Accounts implements AccountStore { })); } - public List getAllFrom(UUID from, int length) { - return database.with(jdbi -> jdbi.withHandle(handle -> { - try (Timer.Context ignored = getAllFromOffsetTimer.time()) { + public AccountCrawlChunk getAllFrom(UUID from, int length) { + final List accounts = database.with(jdbi -> jdbi.withHandle(handle -> { + try (Context ignored = getAllFromOffsetTimer.time()) { return handle.createQuery("SELECT * FROM accounts WHERE " + UID + " > :from ORDER BY " + UID + " LIMIT :limit") - .bind("from", from) - .bind("limit", length) - .mapTo(Account.class) - .list(); + .bind("from", from) + .bind("limit", length) + .mapTo(Account.class) + .list(); } })); + return buildChunkForAccounts(accounts); } - public List getAllFrom(int length) { - return database.with(jdbi -> jdbi.withHandle(handle -> { + public AccountCrawlChunk getAllFrom(int length) { + final List accounts = database.with(jdbi -> jdbi.withHandle(handle -> { try (Timer.Context ignored = getAllFromTimer.time()) { return handle.createQuery("SELECT * FROM accounts ORDER BY " + UID + " LIMIT :limit") .bind("limit", length) @@ -124,6 +126,12 @@ public class Accounts implements AccountStore { .list(); } })); + + return buildChunkForAccounts(accounts); + } + + private AccountCrawlChunk buildChunkForAccounts(final List accounts) { + return new AccountCrawlChunk(accounts, accounts.isEmpty() ? null : accounts.get(accounts.size() - 1).getUuid()); } @Override diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java index 1d5faebd9..b43fddfd5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java @@ -1,3 +1,7 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ package org.whispersystems.textsecuregcm.storage; import static com.codahale.metrics.MetricRegistry.name; @@ -31,6 +35,7 @@ import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.Put; import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem; import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest; import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException; @@ -62,6 +67,8 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt private static final Timer UPDATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "update")); private static final Timer GET_BY_NUMBER_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getByNumber")); private static final Timer GET_BY_UUID_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getByUuid")); + private static final Timer GET_ALL_FROM_START_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getAllFrom")); + private static final Timer GET_ALL_FROM_OFFSET_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getAllFromOffset")); private static final Timer DELETE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "delete")); private final Logger logger = LoggerFactory.getLogger(AccountsDynamoDb.class); @@ -230,6 +237,33 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt }); } + public AccountCrawlChunk getAllFrom(final UUID from, final int maxCount, final int pageSize) { + final ScanRequest.Builder scanRequestBuilder = ScanRequest.builder() + .limit(pageSize) + .exclusiveStartKey(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(from))); + + return scanForChunk(scanRequestBuilder, maxCount, GET_ALL_FROM_OFFSET_TIMER); + } + + public AccountCrawlChunk getAllFromStart(final int maxCount, final int pageSize) { + final ScanRequest.Builder scanRequestBuilder = ScanRequest.builder() + .limit(pageSize); + + return scanForChunk(scanRequestBuilder, maxCount, GET_ALL_FROM_START_TIMER); + } + + private AccountCrawlChunk scanForChunk(final ScanRequest.Builder scanRequestBuilder, final int maxCount, final Timer timer) { + + scanRequestBuilder.tableName(accountsTableName); + + final List accounts = timer.record(() -> scan(scanRequestBuilder.build(), maxCount) + .stream() + .map(AccountsDynamoDb::fromItem) + .collect(Collectors.toList())); + + return new AccountCrawlChunk(accounts, accounts.size() > 0 ? accounts.get(accounts.size() - 1).getUuid() : null); + } + private void delete(UUID uuid, boolean saveInDeletedAccountsTable) { if (saveInDeletedAccountsTable) { @@ -332,7 +366,7 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt return; } try { - migrationRetryAccounts.put(account.getUuid()); + migrationRetryAccounts.put(account.getUuid()); } catch (final Exception e) { logger.error("Could not store account {}", account.getUuid()); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 92e0abc79..0d1959037 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -19,7 +19,6 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import java.io.IOException; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -234,14 +233,26 @@ public class AccountsManager { } - public List getAllFrom(int length) { + public AccountCrawlChunk getAllFrom(int length) { return accounts.getAllFrom(length); } - public List getAllFrom(UUID uuid, int length) { + public AccountCrawlChunk getAllFrom(UUID uuid, int length) { return accounts.getAllFrom(uuid, length); } + public AccountCrawlChunk getAllFromDynamo(int length) { + final int maxPageSize = dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration() + .getDynamoCrawlerScanPageSize(); + return accountsDynamoDb.getAllFromStart(length, maxPageSize); + } + + public AccountCrawlChunk getAllFromDynamo(UUID uuid, int length) { + final int maxPageSize = dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration() + .getDynamoCrawlerScanPageSize(); + return accountsDynamoDb.getAllFrom(uuid, length, maxPageSize); + } + public void delete(final Account account, final DeletionReason deletionReason) { try (final Timer.Context ignored = deleteTimer.time()) { final CompletableFuture deleteStorageServiceDataFuture = secureStorageClient.deleteStoredData(account.getUuid()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java index cc99f6f28..e07da0714 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java @@ -4,24 +4,23 @@ */ package org.whispersystems.textsecuregcm.storage; +import static com.codahale.metrics.MetricRegistry.name; + import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; -import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; -import org.whispersystems.textsecuregcm.util.Constants; - -import javax.ws.rs.ProcessingException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; - -import static com.codahale.metrics.MetricRegistry.name; +import javax.ws.rs.ProcessingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; +import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; +import org.whispersystems.textsecuregcm.util.Constants; public class DirectoryReconciler extends AccountDatabaseCrawlerListener { @@ -32,6 +31,8 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener { private final Timer sendChunkTimer; private final Meter sendChunkErrorMeter; + private boolean useV3Endpoints; + public DirectoryReconciler(String name, DirectoryReconciliationClient reconciliationClient) { this.reconciliationClient = reconciliationClient; sendChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, name, "sendChunk")); @@ -45,6 +46,10 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener { public void onCrawlEnd(Optional fromUuid) { DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList()); sendChunk(request); + + if (useV3Endpoints) { + reconciliationClient.complete(); + } } @Override @@ -76,7 +81,12 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener { private DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) { try (Timer.Context timer = sendChunkTimer.time()) { - DirectoryReconciliationResponse response = reconciliationClient.sendChunk(request); + DirectoryReconciliationResponse response; + if (useV3Endpoints) { + response = reconciliationClient.sendChunkV3(request); + } else { + response = reconciliationClient.sendChunk(request); + } if (response.getStatus() != DirectoryReconciliationResponse.Status.OK) { sendChunkErrorMeter.mark(); logger.warn("reconciliation error: " + response.getStatus()); @@ -89,4 +99,7 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener { } } + public void setUseV3Endpoints(final boolean useV3Endpoints) { + this.useV3Endpoints = useV3Endpoints; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java index 35bad4921..e74e5177f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java @@ -46,6 +46,13 @@ public class DirectoryReconciliationClient { .put(Entity.json(request), DirectoryReconciliationResponse.class); } + public DirectoryReconciliationResponse sendChunkV3(DirectoryReconciliationRequest request) { + return client.target(replicationUrl) + .path("/v3/directory/exists") + .request(MediaType.APPLICATION_JSON_TYPE) + .put(Entity.json(request), DirectoryReconciliationResponse.class); + } + public DirectoryReconciliationResponse delete(DirectoryReconciliationRequest request) { return client.target(replicationUrl) .path("/v3/directory/deletes") @@ -53,6 +60,13 @@ public class DirectoryReconciliationClient { .put(Entity.json(request), DirectoryReconciliationResponse.class); } + public DirectoryReconciliationResponse complete() { + return client.target(replicationUrl) + .path("/v3/directory/complete") + .request(MediaType.APPLICATION_JSON_TYPE) + .post(null, DirectoryReconciliationResponse.class); + } + private static Client initializeClient(DirectoryServerConfiguration directoryServerConfiguration) throws CertificateException { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java index dca221eff..15dbf5a90 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerIntegrationTest.java @@ -5,22 +5,25 @@ package org.whispersystems.textsecuregcm.storage; -import org.junit.Before; -import org.junit.Test; -import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; - -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.UUID; - -import static org.junit.Assert.*; +import static org.junit.Assert.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import org.junit.Before; +import org.junit.Test; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; + public class AccountDatabaseCrawlerIntegrationTest extends AbstractRedisClusterTest { private static final UUID FIRST_UUID = UUID.fromString("82339e80-81cd-48e2-9ed2-ccd5dd262ad9"); @@ -32,6 +35,8 @@ public class AccountDatabaseCrawlerIntegrationTest extends AbstractRedisClusterT private AccountsManager accountsManager; private AccountDatabaseCrawlerListener listener; + private DynamicConfigurationManager dynamicConfigurationManager; + private AccountDatabaseCrawler accountDatabaseCrawler; private static final int CHUNK_SIZE = 1; @@ -47,16 +52,22 @@ public class AccountDatabaseCrawlerIntegrationTest extends AbstractRedisClusterT accountsManager = mock(AccountsManager.class); listener = mock(AccountDatabaseCrawlerListener.class); + dynamicConfigurationManager = mock(DynamicConfigurationManager.class); + when(firstAccount.getUuid()).thenReturn(FIRST_UUID); when(secondAccount.getUuid()).thenReturn(SECOND_UUID); - when(accountsManager.getAllFrom(CHUNK_SIZE)).thenReturn(List.of(firstAccount)); - when(accountsManager.getAllFrom(FIRST_UUID, CHUNK_SIZE)) - .thenReturn(List.of(secondAccount)) - .thenReturn(Collections.emptyList()); + when(accountsManager.getAllFrom(CHUNK_SIZE)).thenReturn(new AccountCrawlChunk(List.of(firstAccount), FIRST_UUID)); + when(accountsManager.getAllFrom(any(UUID.class), eq(CHUNK_SIZE))) + .thenReturn(new AccountCrawlChunk(List.of(secondAccount), SECOND_UUID)) + .thenReturn(new AccountCrawlChunk(Collections.emptyList(), null)); + + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + when(dynamicConfiguration.getAccountsDynamoDbMigrationConfiguration()).thenReturn(mock(DynamicAccountsDynamoDbMigrationConfiguration.class)); final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(getRedisCluster()); - accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS); + accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS, dynamicConfigurationManager); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbTest.java index c34f54904..02d3b740c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbTest.java @@ -1,21 +1,24 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2013-2021 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ package org.whispersystems.textsecuregcm.storage; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.fasterxml.uuid.UUIDComparator; import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; @@ -256,6 +259,55 @@ class AccountsDynamoDbTest { verifyStoredState("+14151112222", account.getUuid(), account); } + @Test + void testRetrieveFrom() { + List users = new ArrayList<>(); + + for (int i = 1; i <= 100; i++) { + Account account = generateAccount("+1" + String.format("%03d", i), UUID.randomUUID()); + users.add(account); + accountsDynamoDb.create(account); + } + + users.sort((account, t1) -> UUIDComparator.staticCompare(account.getUuid(), t1.getUuid())); + + AccountCrawlChunk retrieved = accountsDynamoDb.getAllFromStart(10, 1); + assertThat(retrieved.getAccounts().size()).isEqualTo(10); + + for (int i = 0; i < retrieved.getAccounts().size(); i++) { + final Account retrievedAccount = retrieved.getAccounts().get(i); + + final Account expectedAccount = users.stream() + .filter(account -> account.getUuid().equals(retrievedAccount.getUuid())) + .findAny() + .orElseThrow(); + + verifyStoredState(expectedAccount.getNumber(), expectedAccount.getUuid(), retrievedAccount, expectedAccount); + + users.remove(expectedAccount); + } + + for (int j = 0; j < 9; j++) { + retrieved = accountsDynamoDb.getAllFrom(retrieved.getLastUuid().orElseThrow(), 10, 1); + assertThat(retrieved.getAccounts().size()).isEqualTo(10); + + for (int i = 0; i < retrieved.getAccounts().size(); i++) { + final Account retrievedAccount = retrieved.getAccounts().get(i); + + final Account expectedAccount = users.stream() + .filter(account -> account.getUuid().equals(retrievedAccount.getUuid())) + .findAny() + .orElseThrow(); + + verifyStoredState(expectedAccount.getNumber(), expectedAccount.getUuid(), retrievedAccount, expectedAccount); + + users.remove(expectedAccount); + } + } + + assertThat(users).isEmpty(); + } + @Test void testDelete() { final Device deletedDevice = generateDevice (1); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java index f9dfec1c0..e2dc9a526 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java @@ -5,28 +5,38 @@ package org.whispersystems.textsecuregcm.tests.storage; -import org.junit.Before; -import org.junit.Test; -import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler; -import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache; -import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener; -import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException; -import org.whispersystems.textsecuregcm.storage.AccountsManager; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Optional; -import java.util.UUID; - import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; -public class AccountDatabaseCrawlerTest { +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountCrawlChunk; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; + +class AccountDatabaseCrawlerTest { private static final UUID ACCOUNT1 = UUID.randomUUID(); private static final UUID ACCOUNT2 = UUID.randomUUID(); @@ -41,37 +51,58 @@ public class AccountDatabaseCrawlerTest { private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class); private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class); - private final AccountDatabaseCrawler crawler = new AccountDatabaseCrawler(accounts, cache, Arrays.asList(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS); + private final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class); - @Before - public void setup() { + private final AccountDatabaseCrawler crawler = new AccountDatabaseCrawler(accounts, cache, Arrays.asList(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS, dynamicConfigurationManager); + private DynamicAccountsDynamoDbMigrationConfiguration dynamicAccountsDynamoDbMigrationConfiguration; + + @BeforeEach + void setup() { when(account1.getUuid()).thenReturn(ACCOUNT1); when(account2.getUuid()).thenReturn(ACCOUNT2); - when(accounts.getAllFrom(anyInt())).thenReturn(Arrays.asList(account1, account2)); - when(accounts.getAllFrom(eq(ACCOUNT1), anyInt())).thenReturn(Arrays.asList(account2)); - when(accounts.getAllFrom(eq(ACCOUNT2), anyInt())).thenReturn(Collections.emptyList()); + when(accounts.getAllFrom(anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account1, account2), ACCOUNT2)); + when(accounts.getAllFrom(eq(ACCOUNT1), anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2)); + when(accounts.getAllFrom(eq(ACCOUNT2), anyInt())).thenReturn(new AccountCrawlChunk(Collections.emptyList(), null)); + + when(accounts.getAllFromDynamo(anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account1, account2), ACCOUNT2)); + when(accounts.getAllFromDynamo(eq(ACCOUNT1), anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2)); + when(accounts.getAllFromDynamo(eq(ACCOUNT2), anyInt())).thenReturn(new AccountCrawlChunk(Collections.emptyList(), null)); when(cache.claimActiveWork(any(), anyLong())).thenReturn(true); when(cache.isAccelerated()).thenReturn(false); + + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + dynamicAccountsDynamoDbMigrationConfiguration = mock(DynamicAccountsDynamoDbMigrationConfiguration.class); + when(dynamicConfiguration.getAccountsDynamoDbMigrationConfiguration()).thenReturn(dynamicAccountsDynamoDbMigrationConfiguration); } - @Test - public void testCrawlStart() throws AccountDatabaseCrawlerRestartException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCrawlStart(final boolean useDynamo) throws AccountDatabaseCrawlerRestartException { + when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(useDynamo); when(cache.getLastUuid()).thenReturn(Optional.empty()); + when(cache.getLastUuidDynamo()).thenReturn(Optional.empty()); boolean accelerated = crawler.doPeriodicWork(); assertThat(accelerated).isFalse(); verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); - verify(cache, times(1)).getLastUuid(); + verify(cache, times(useDynamo ? 0 : 1)).getLastUuid(); + verify(cache, times(useDynamo ? 1 : 0)).getLastUuidDynamo(); verify(listener, times(1)).onCrawlStart(); - verify(accounts, times(1)).getAllFrom(eq(CHUNK_SIZE)); - verify(accounts, times(0)).getAllFrom(any(UUID.class), eq(CHUNK_SIZE)); + if (useDynamo) { + verify(accounts, times(1)).getAllFromDynamo(eq(CHUNK_SIZE)); + verify(accounts, times(0)).getAllFromDynamo(any(UUID.class), eq(CHUNK_SIZE)); + } else { + verify(accounts, times(1)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(0)).getAllFrom(any(UUID.class), eq(CHUNK_SIZE)); + } verify(account1, times(0)).getUuid(); - verify(account2, times(1)).getUuid(); verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.empty()), eq(Arrays.asList(account1, account2))); - verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2))); + verify(cache, times(useDynamo ? 0 : 1)).setLastUuid(eq(Optional.of(ACCOUNT2))); + verify(cache, times(useDynamo ? 1 : 0)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2))); verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); @@ -82,20 +113,29 @@ public class AccountDatabaseCrawlerTest { verifyNoMoreInteractions(cache); } - @Test - public void testCrawlChunk() throws AccountDatabaseCrawlerRestartException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCrawlChunk(final boolean useDynamo) throws AccountDatabaseCrawlerRestartException { + when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(useDynamo); when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1)); + when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT1)); boolean accelerated = crawler.doPeriodicWork(); assertThat(accelerated).isFalse(); verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); - verify(cache, times(1)).getLastUuid(); - verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); - verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); - verify(account2, times(1)).getUuid(); + verify(cache, times(useDynamo ? 0: 1)).getLastUuid(); + verify(cache, times(useDynamo ? 1: 0)).getLastUuidDynamo(); + if (useDynamo) { + verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE)); + } else { + verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); + } verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); - verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2))); + verify(cache, times(useDynamo ? 0 : 1)).setLastUuid(eq(Optional.of(ACCOUNT2))); + verify(cache, times(useDynamo ? 1 : 0)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2))); verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); @@ -107,21 +147,30 @@ public class AccountDatabaseCrawlerTest { verifyNoMoreInteractions(cache); } - @Test - public void testCrawlChunkAccelerated() throws AccountDatabaseCrawlerRestartException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCrawlChunkAccelerated(final boolean useDynamo) throws AccountDatabaseCrawlerRestartException { + when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(useDynamo); when(cache.isAccelerated()).thenReturn(true); when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1)); + when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT1)); boolean accelerated = crawler.doPeriodicWork(); assertThat(accelerated).isTrue(); verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); - verify(cache, times(1)).getLastUuid(); - verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); - verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); - verify(account2, times(1)).getUuid(); + verify(cache, times(useDynamo ? 0 : 1)).getLastUuid(); + verify(cache, times(useDynamo ? 1 : 0)).getLastUuidDynamo(); + if (useDynamo) { + verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE)); + } else { + verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); + } verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); - verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2))); + verify(cache, times(useDynamo ? 0 : 1)).setLastUuid(eq(Optional.of(ACCOUNT2))); + verify(cache, times(useDynamo ? 1 : 0)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2))); verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); @@ -133,21 +182,31 @@ public class AccountDatabaseCrawlerTest { verifyNoMoreInteractions(cache); } - @Test - public void testCrawlChunkRestart() throws AccountDatabaseCrawlerRestartException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCrawlChunkRestart(final boolean useDynamo) throws AccountDatabaseCrawlerRestartException { + when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(useDynamo); when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1)); + when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT1)); doThrow(AccountDatabaseCrawlerRestartException.class).when(listener).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); boolean accelerated = crawler.doPeriodicWork(); assertThat(accelerated).isFalse(); verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); - verify(cache, times(1)).getLastUuid(); - verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); - verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); + verify(cache, times(useDynamo ? 0 : 1)).getLastUuid(); + verify(cache, times(useDynamo ? 1 : 0)).getLastUuidDynamo(); + if (useDynamo) { + verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE)); + } else { + verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); + } verify(account2, times(0)).getNumber(); verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); - verify(cache, times(1)).setLastUuid(eq(Optional.empty())); + verify(cache, times(useDynamo ? 0 : 1)).setLastUuid(eq(Optional.empty())); + verify(cache, times(useDynamo ? 1 : 0)).setLastUuidDynamo(eq(Optional.empty())); verify(cache, times(1)).setAccelerated(false); verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); @@ -160,21 +219,31 @@ public class AccountDatabaseCrawlerTest { verifyNoMoreInteractions(cache); } - @Test - public void testCrawlEnd() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCrawlEnd(final boolean useDynamo) { + when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(useDynamo); when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT2)); + when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT2)); boolean accelerated = crawler.doPeriodicWork(); assertThat(accelerated).isFalse(); verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); - verify(cache, times(1)).getLastUuid(); - verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); - verify(accounts, times(1)).getAllFrom(eq(ACCOUNT2), eq(CHUNK_SIZE)); + verify(cache, times(useDynamo ? 0 : 1)).getLastUuid(); + verify(cache, times(useDynamo ? 1 : 0)).getLastUuidDynamo(); + if (useDynamo) { + verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT2), eq(CHUNK_SIZE)); + } else { + verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFrom(eq(ACCOUNT2), eq(CHUNK_SIZE)); + } verify(account1, times(0)).getNumber(); verify(account2, times(0)).getNumber(); verify(listener, times(1)).onCrawlEnd(eq(Optional.of(ACCOUNT2))); - verify(cache, times(1)).setLastUuid(eq(Optional.empty())); + verify(cache, times(useDynamo ? 0 : 1)).setLastUuid(eq(Optional.empty())); + verify(cache, times(useDynamo ? 1 : 0)).setLastUuidDynamo(eq(Optional.empty())); verify(cache, times(1)).setAccelerated(false); verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsTest.java index 8d286f62a..5957fb8eb 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsTest.java @@ -38,6 +38,7 @@ import org.junit.Test; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.entities.SignedPreKey; import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountCrawlChunk; import org.whispersystems.textsecuregcm.storage.Accounts; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; @@ -189,19 +190,19 @@ public class AccountsTest { users.sort((account, t1) -> UUIDComparator.staticCompare(account.getUuid(), t1.getUuid())); - List retrieved = accounts.getAllFrom(10); - assertThat(retrieved.size()).isEqualTo(10); + AccountCrawlChunk retrieved = accounts.getAllFrom(10); + assertThat(retrieved.getAccounts().size()).isEqualTo(10); - for (int i=0;i