Add dynamic configuration for using DynamoDB in AccountsDatabaseCrawler

This commit is contained in:
Chris Eager 2021-07-06 13:01:24 -05:00 committed by GitHub
parent 78819d5382
commit a824b5575d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 434 additions and 130 deletions

View File

@ -482,7 +482,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
CurrencyConversionManager currencyManager = new CurrencyConversionManager(fixerClient, ftxClient, config.getPaymentsServiceConfiguration().getPaymentCurrencies());
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster);
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs());
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs(), dynamicConfigurationManager);
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccounts, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
MigrationRetryAccountsTableCrawler migrationRetryAccountsTableCrawler = new MigrationRetryAccountsTableCrawler(migrationRetryAccounts, accountsManager, accountsDynamoDb, cacheCluster, recurringJobExecutor);

View File

@ -23,6 +23,12 @@ public class DynamicAccountsDynamoDbMigrationConfiguration {
@JsonProperty
boolean logMismatches;
@JsonProperty
boolean dynamoCrawlerEnabled;
@JsonProperty
int dynamoCrawlerScanPageSize = 10;
public boolean isBackgroundMigrationEnabled() {
return backgroundMigrationEnabled;
}
@ -59,4 +65,12 @@ public class DynamicAccountsDynamoDbMigrationConfiguration {
public boolean isLogMismatches() {
return logMismatches;
}
public boolean isDynamoCrawlerEnabled() {
return dynamoCrawlerEnabled;
}
public int getDynamoCrawlerScanPageSize() {
return dynamoCrawlerScanPageSize;
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
public class AccountCrawlChunk {
private final List<Account> accounts;
@Nullable
private final UUID lastUuid;
public AccountCrawlChunk(final List<Account> accounts, @Nullable final UUID lastUuid) {
this.accounts = accounts;
this.lastUuid = lastUuid;
}
public List<Account> getAccounts() {
return accounts;
}
public Optional<UUID> getLastUuid() {
return Optional.ofNullable(lastUuid);
}
}

View File

@ -4,22 +4,21 @@
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import io.dropwizard.lifecycle.Managed;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class AccountDatabaseCrawler implements Managed, Runnable {
@ -38,6 +37,8 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
private final AccountDatabaseCrawlerCache cache;
private final List<AccountDatabaseCrawlerListener> listeners;
private final DynamicConfigurationManager dynamicConfigurationManager;
private AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
@ -45,7 +46,8 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
AccountDatabaseCrawlerCache cache,
List<AccountDatabaseCrawlerListener> listeners,
int chunkSize,
long chunkIntervalMs)
long chunkIntervalMs,
DynamicConfigurationManager dynamicConfigurationManager)
{
this.accounts = accounts;
this.chunkSize = chunkSize;
@ -53,6 +55,8 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
this.workerId = UUID.randomUUID().toString();
this.cache = cache;
this.listeners = listeners;
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
@Override
@ -93,14 +97,15 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
@VisibleForTesting
public boolean doPeriodicWork() {
if (cache.claimActiveWork(workerId, WORKER_TTL_MS)) {
try {
long startTimeMs = System.currentTimeMillis();
final long startTimeMs = System.currentTimeMillis();
processChunk();
if (cache.isAccelerated()) {
return true;
}
long endTimeMs = System.currentTimeMillis();
long sleepIntervalMs = chunkIntervalMs - (endTimeMs - startTimeMs);
final long endTimeMs = System.currentTimeMillis();
final long sleepIntervalMs = chunkIntervalMs - (endTimeMs - startTimeMs);
if (sleepIntervalMs > 0) sleepWhileRunning(sleepIntervalMs);
} finally {
cache.releaseActiveWork(workerId);
@ -110,42 +115,67 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
}
private void processChunk() {
Optional<UUID> fromUuid = cache.getLastUuid();
final boolean useDynamo = dynamicConfigurationManager.getConfiguration()
.getAccountsDynamoDbMigrationConfiguration()
.isDynamoCrawlerEnabled();
if (!fromUuid.isPresent()) {
listeners.stream().filter(listener -> listener instanceof DirectoryReconciler)
.forEach(reconciler -> ((DirectoryReconciler) reconciler).setUseV3Endpoints(useDynamo));
final Optional<UUID> fromUuid = getLastUuid(useDynamo);
if (fromUuid.isEmpty()) {
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart);
}
List<Account> chunkAccounts = readChunk(fromUuid, chunkSize);
final AccountCrawlChunk chunkAccounts = readChunk(fromUuid, chunkSize, useDynamo);
if (chunkAccounts.isEmpty()) {
if (chunkAccounts.getAccounts().isEmpty()) {
logger.info("Finished crawl");
listeners.forEach(listener -> listener.onCrawlEnd(fromUuid));
cache.setLastUuid(Optional.empty());
cacheLastUuid(Optional.empty(), useDynamo);
cache.setAccelerated(false);
} else {
try {
for (AccountDatabaseCrawlerListener listener : listeners) {
listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts);
listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts.getAccounts());
}
cache.setLastUuid(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getUuid()));
cacheLastUuid(chunkAccounts.getLastUuid(), useDynamo);
} catch (AccountDatabaseCrawlerRestartException e) {
cache.setLastUuid(Optional.empty());
cacheLastUuid(Optional.empty(), useDynamo);
cache.setAccelerated(false);
}
}
}
private List<Account> readChunk(Optional<UUID> fromUuid, int chunkSize) {
private AccountCrawlChunk readChunk(Optional<UUID> fromUuid, int chunkSize, boolean useDynamo) {
try (Timer.Context timer = readChunkTimer.time()) {
if (fromUuid.isPresent()) {
return accounts.getAllFrom(fromUuid.get(), chunkSize);
return useDynamo
? accounts.getAllFromDynamo(fromUuid.get(), chunkSize)
: accounts.getAllFrom(fromUuid.get(), chunkSize);
}
return accounts.getAllFrom(chunkSize);
return useDynamo
? accounts.getAllFromDynamo(chunkSize)
: accounts.getAllFrom(chunkSize);
}
}
private Optional<UUID> getLastUuid(final boolean useDynamo) {
if (useDynamo) {
return cache.getLastUuidDynamo();
} else {
return cache.getLastUuid();
}
}
private void cacheLastUuid(final Optional<UUID> lastUuid, final boolean useDynamo) {
if (useDynamo) {
cache.setLastUuidDynamo(lastUuid);
} else {
cache.setLastUuid(lastUuid);
}
}

View File

@ -21,6 +21,8 @@ public class AccountDatabaseCrawlerCache {
private static final String LAST_UUID_KEY = "account_database_crawler_cache_last_uuid";
private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate";
private static final String LAST_UUID_DYNAMO_KEY = "account_database_crawler_cache_last_uuid_dynamo";
private static final long LAST_NUMBER_TTL_MS = 86400_000L;
private final FaultTolerantRedisCluster cacheCluster;
@ -66,4 +68,19 @@ public class AccountDatabaseCrawlerCache {
}
}
public Optional<UUID> getLastUuidDynamo() {
final String lastUuidString = cacheCluster.withCluster(connection -> connection.sync().get(LAST_UUID_DYNAMO_KEY));
if (lastUuidString == null) return Optional.empty();
else return Optional.of(UUID.fromString(lastUuidString));
}
public void setLastUuidDynamo(Optional<UUID> lastUuid) {
if (lastUuid.isPresent()) {
cacheCluster.useCluster(connection -> connection.sync().psetex(LAST_UUID_DYNAMO_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
} else {
cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_DYNAMO_KEY));
}
}
}

View File

@ -9,6 +9,7 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
@ -103,20 +104,21 @@ public class Accounts implements AccountStore {
}));
}
public List<Account> getAllFrom(UUID from, int length) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getAllFromOffsetTimer.time()) {
public AccountCrawlChunk getAllFrom(UUID from, int length) {
final List<Account> accounts = database.with(jdbi -> jdbi.withHandle(handle -> {
try (Context ignored = getAllFromOffsetTimer.time()) {
return handle.createQuery("SELECT * FROM accounts WHERE " + UID + " > :from ORDER BY " + UID + " LIMIT :limit")
.bind("from", from)
.bind("limit", length)
.mapTo(Account.class)
.list();
.bind("from", from)
.bind("limit", length)
.mapTo(Account.class)
.list();
}
}));
return buildChunkForAccounts(accounts);
}
public List<Account> getAllFrom(int length) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
public AccountCrawlChunk getAllFrom(int length) {
final List<Account> accounts = database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getAllFromTimer.time()) {
return handle.createQuery("SELECT * FROM accounts ORDER BY " + UID + " LIMIT :limit")
.bind("limit", length)
@ -124,6 +126,12 @@ public class Accounts implements AccountStore {
.list();
}
}));
return buildChunkForAccounts(accounts);
}
private AccountCrawlChunk buildChunkForAccounts(final List<Account> accounts) {
return new AccountCrawlChunk(accounts, accounts.isEmpty() ? null : accounts.get(accounts.size() - 1).getUuid());
}
@Override

View File

@ -1,3 +1,7 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
@ -31,6 +35,7 @@ import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.Put;
import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;
@ -62,6 +67,8 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
private static final Timer UPDATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "update"));
private static final Timer GET_BY_NUMBER_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getByNumber"));
private static final Timer GET_BY_UUID_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getByUuid"));
private static final Timer GET_ALL_FROM_START_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getAllFrom"));
private static final Timer GET_ALL_FROM_OFFSET_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getAllFromOffset"));
private static final Timer DELETE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "delete"));
private final Logger logger = LoggerFactory.getLogger(AccountsDynamoDb.class);
@ -230,6 +237,33 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
});
}
public AccountCrawlChunk getAllFrom(final UUID from, final int maxCount, final int pageSize) {
final ScanRequest.Builder scanRequestBuilder = ScanRequest.builder()
.limit(pageSize)
.exclusiveStartKey(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(from)));
return scanForChunk(scanRequestBuilder, maxCount, GET_ALL_FROM_OFFSET_TIMER);
}
public AccountCrawlChunk getAllFromStart(final int maxCount, final int pageSize) {
final ScanRequest.Builder scanRequestBuilder = ScanRequest.builder()
.limit(pageSize);
return scanForChunk(scanRequestBuilder, maxCount, GET_ALL_FROM_START_TIMER);
}
private AccountCrawlChunk scanForChunk(final ScanRequest.Builder scanRequestBuilder, final int maxCount, final Timer timer) {
scanRequestBuilder.tableName(accountsTableName);
final List<Account> accounts = timer.record(() -> scan(scanRequestBuilder.build(), maxCount)
.stream()
.map(AccountsDynamoDb::fromItem)
.collect(Collectors.toList()));
return new AccountCrawlChunk(accounts, accounts.size() > 0 ? accounts.get(accounts.size() - 1).getUuid() : null);
}
private void delete(UUID uuid, boolean saveInDeletedAccountsTable) {
if (saveInDeletedAccountsTable) {
@ -332,7 +366,7 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
return;
}
try {
migrationRetryAccounts.put(account.getUuid());
migrationRetryAccounts.put(account.getUuid());
} catch (final Exception e) {
logger.error("Could not store account {}", account.getUuid());
}

View File

@ -19,7 +19,6 @@ import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -234,14 +233,26 @@ public class AccountsManager {
}
public List<Account> getAllFrom(int length) {
public AccountCrawlChunk getAllFrom(int length) {
return accounts.getAllFrom(length);
}
public List<Account> getAllFrom(UUID uuid, int length) {
public AccountCrawlChunk getAllFrom(UUID uuid, int length) {
return accounts.getAllFrom(uuid, length);
}
public AccountCrawlChunk getAllFromDynamo(int length) {
final int maxPageSize = dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.getDynamoCrawlerScanPageSize();
return accountsDynamoDb.getAllFromStart(length, maxPageSize);
}
public AccountCrawlChunk getAllFromDynamo(UUID uuid, int length) {
final int maxPageSize = dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.getDynamoCrawlerScanPageSize();
return accountsDynamoDb.getAllFrom(uuid, length, maxPageSize);
}
public void delete(final Account account, final DeletionReason deletionReason) {
try (final Timer.Context ignored = deleteTimer.time()) {
final CompletableFuture<Void> deleteStorageServiceDataFuture = secureStorageClient.deleteStoredData(account.getUuid());

View File

@ -4,24 +4,23 @@
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse;
import org.whispersystems.textsecuregcm.util.Constants;
import javax.ws.rs.ProcessingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
import javax.ws.rs.ProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse;
import org.whispersystems.textsecuregcm.util.Constants;
public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
@ -32,6 +31,8 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
private final Timer sendChunkTimer;
private final Meter sendChunkErrorMeter;
private boolean useV3Endpoints;
public DirectoryReconciler(String name, DirectoryReconciliationClient reconciliationClient) {
this.reconciliationClient = reconciliationClient;
sendChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, name, "sendChunk"));
@ -45,6 +46,10 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
public void onCrawlEnd(Optional<UUID> fromUuid) {
DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList());
sendChunk(request);
if (useV3Endpoints) {
reconciliationClient.complete();
}
}
@Override
@ -76,7 +81,12 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
private DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) {
try (Timer.Context timer = sendChunkTimer.time()) {
DirectoryReconciliationResponse response = reconciliationClient.sendChunk(request);
DirectoryReconciliationResponse response;
if (useV3Endpoints) {
response = reconciliationClient.sendChunkV3(request);
} else {
response = reconciliationClient.sendChunk(request);
}
if (response.getStatus() != DirectoryReconciliationResponse.Status.OK) {
sendChunkErrorMeter.mark();
logger.warn("reconciliation error: " + response.getStatus());
@ -89,4 +99,7 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
}
}
public void setUseV3Endpoints(final boolean useV3Endpoints) {
this.useV3Endpoints = useV3Endpoints;
}
}

View File

@ -46,6 +46,13 @@ public class DirectoryReconciliationClient {
.put(Entity.json(request), DirectoryReconciliationResponse.class);
}
public DirectoryReconciliationResponse sendChunkV3(DirectoryReconciliationRequest request) {
return client.target(replicationUrl)
.path("/v3/directory/exists")
.request(MediaType.APPLICATION_JSON_TYPE)
.put(Entity.json(request), DirectoryReconciliationResponse.class);
}
public DirectoryReconciliationResponse delete(DirectoryReconciliationRequest request) {
return client.target(replicationUrl)
.path("/v3/directory/deletes")
@ -53,6 +60,13 @@ public class DirectoryReconciliationClient {
.put(Entity.json(request), DirectoryReconciliationResponse.class);
}
public DirectoryReconciliationResponse complete() {
return client.target(replicationUrl)
.path("/v3/directory/complete")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(null, DirectoryReconciliationResponse.class);
}
private static Client initializeClient(DirectoryServerConfiguration directoryServerConfiguration)
throws CertificateException
{

View File

@ -5,22 +5,25 @@
package org.whispersystems.textsecuregcm.storage;
import org.junit.Before;
import org.junit.Test;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static org.junit.Assert.*;
import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
public class AccountDatabaseCrawlerIntegrationTest extends AbstractRedisClusterTest {
private static final UUID FIRST_UUID = UUID.fromString("82339e80-81cd-48e2-9ed2-ccd5dd262ad9");
@ -32,6 +35,8 @@ public class AccountDatabaseCrawlerIntegrationTest extends AbstractRedisClusterT
private AccountsManager accountsManager;
private AccountDatabaseCrawlerListener listener;
private DynamicConfigurationManager dynamicConfigurationManager;
private AccountDatabaseCrawler accountDatabaseCrawler;
private static final int CHUNK_SIZE = 1;
@ -47,16 +52,22 @@ public class AccountDatabaseCrawlerIntegrationTest extends AbstractRedisClusterT
accountsManager = mock(AccountsManager.class);
listener = mock(AccountDatabaseCrawlerListener.class);
dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
when(firstAccount.getUuid()).thenReturn(FIRST_UUID);
when(secondAccount.getUuid()).thenReturn(SECOND_UUID);
when(accountsManager.getAllFrom(CHUNK_SIZE)).thenReturn(List.of(firstAccount));
when(accountsManager.getAllFrom(FIRST_UUID, CHUNK_SIZE))
.thenReturn(List.of(secondAccount))
.thenReturn(Collections.emptyList());
when(accountsManager.getAllFrom(CHUNK_SIZE)).thenReturn(new AccountCrawlChunk(List.of(firstAccount), FIRST_UUID));
when(accountsManager.getAllFrom(any(UUID.class), eq(CHUNK_SIZE)))
.thenReturn(new AccountCrawlChunk(List.of(secondAccount), SECOND_UUID))
.thenReturn(new AccountCrawlChunk(Collections.emptyList(), null));
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
when(dynamicConfiguration.getAccountsDynamoDbMigrationConfiguration()).thenReturn(mock(DynamicAccountsDynamoDbMigrationConfiguration.class));
final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(getRedisCluster());
accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS);
accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS, dynamicConfigurationManager);
}
@Test

View File

@ -1,21 +1,24 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.fasterxml.uuid.UUIDComparator;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
@ -256,6 +259,55 @@ class AccountsDynamoDbTest {
verifyStoredState("+14151112222", account.getUuid(), account);
}
@Test
void testRetrieveFrom() {
List<Account> users = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
Account account = generateAccount("+1" + String.format("%03d", i), UUID.randomUUID());
users.add(account);
accountsDynamoDb.create(account);
}
users.sort((account, t1) -> UUIDComparator.staticCompare(account.getUuid(), t1.getUuid()));
AccountCrawlChunk retrieved = accountsDynamoDb.getAllFromStart(10, 1);
assertThat(retrieved.getAccounts().size()).isEqualTo(10);
for (int i = 0; i < retrieved.getAccounts().size(); i++) {
final Account retrievedAccount = retrieved.getAccounts().get(i);
final Account expectedAccount = users.stream()
.filter(account -> account.getUuid().equals(retrievedAccount.getUuid()))
.findAny()
.orElseThrow();
verifyStoredState(expectedAccount.getNumber(), expectedAccount.getUuid(), retrievedAccount, expectedAccount);
users.remove(expectedAccount);
}
for (int j = 0; j < 9; j++) {
retrieved = accountsDynamoDb.getAllFrom(retrieved.getLastUuid().orElseThrow(), 10, 1);
assertThat(retrieved.getAccounts().size()).isEqualTo(10);
for (int i = 0; i < retrieved.getAccounts().size(); i++) {
final Account retrievedAccount = retrieved.getAccounts().get(i);
final Account expectedAccount = users.stream()
.filter(account -> account.getUuid().equals(retrievedAccount.getUuid()))
.findAny()
.orElseThrow();
verifyStoredState(expectedAccount.getNumber(), expectedAccount.getUuid(), retrievedAccount, expectedAccount);
users.remove(expectedAccount);
}
}
assertThat(users).isEmpty();
}
@Test
void testDelete() {
final Device deletedDevice = generateDevice (1);

View File

@ -5,28 +5,38 @@
package org.whispersystems.textsecuregcm.tests.storage;
import org.junit.Before;
import org.junit.Test;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class AccountDatabaseCrawlerTest {
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicAccountsDynamoDbMigrationConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountCrawlChunk;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
class AccountDatabaseCrawlerTest {
private static final UUID ACCOUNT1 = UUID.randomUUID();
private static final UUID ACCOUNT2 = UUID.randomUUID();
@ -41,37 +51,58 @@ public class AccountDatabaseCrawlerTest {
private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class);
private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class);
private final AccountDatabaseCrawler crawler = new AccountDatabaseCrawler(accounts, cache, Arrays.asList(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS);
private final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
@Before
public void setup() {
private final AccountDatabaseCrawler crawler = new AccountDatabaseCrawler(accounts, cache, Arrays.asList(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS, dynamicConfigurationManager);
private DynamicAccountsDynamoDbMigrationConfiguration dynamicAccountsDynamoDbMigrationConfiguration;
@BeforeEach
void setup() {
when(account1.getUuid()).thenReturn(ACCOUNT1);
when(account2.getUuid()).thenReturn(ACCOUNT2);
when(accounts.getAllFrom(anyInt())).thenReturn(Arrays.asList(account1, account2));
when(accounts.getAllFrom(eq(ACCOUNT1), anyInt())).thenReturn(Arrays.asList(account2));
when(accounts.getAllFrom(eq(ACCOUNT2), anyInt())).thenReturn(Collections.emptyList());
when(accounts.getAllFrom(anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account1, account2), ACCOUNT2));
when(accounts.getAllFrom(eq(ACCOUNT1), anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2));
when(accounts.getAllFrom(eq(ACCOUNT2), anyInt())).thenReturn(new AccountCrawlChunk(Collections.emptyList(), null));
when(accounts.getAllFromDynamo(anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account1, account2), ACCOUNT2));
when(accounts.getAllFromDynamo(eq(ACCOUNT1), anyInt())).thenReturn(new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2));
when(accounts.getAllFromDynamo(eq(ACCOUNT2), anyInt())).thenReturn(new AccountCrawlChunk(Collections.emptyList(), null));
when(cache.claimActiveWork(any(), anyLong())).thenReturn(true);
when(cache.isAccelerated()).thenReturn(false);
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
dynamicAccountsDynamoDbMigrationConfiguration = mock(DynamicAccountsDynamoDbMigrationConfiguration.class);
when(dynamicConfiguration.getAccountsDynamoDbMigrationConfiguration()).thenReturn(dynamicAccountsDynamoDbMigrationConfiguration);
}
@Test
public void testCrawlStart() throws AccountDatabaseCrawlerRestartException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCrawlStart(final boolean useDynamo) throws AccountDatabaseCrawlerRestartException {
when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(useDynamo);
when(cache.getLastUuid()).thenReturn(Optional.empty());
when(cache.getLastUuidDynamo()).thenReturn(Optional.empty());
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastUuid();
verify(cache, times(useDynamo ? 0 : 1)).getLastUuid();
verify(cache, times(useDynamo ? 1 : 0)).getLastUuidDynamo();
verify(listener, times(1)).onCrawlStart();
verify(accounts, times(1)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(0)).getAllFrom(any(UUID.class), eq(CHUNK_SIZE));
if (useDynamo) {
verify(accounts, times(1)).getAllFromDynamo(eq(CHUNK_SIZE));
verify(accounts, times(0)).getAllFromDynamo(any(UUID.class), eq(CHUNK_SIZE));
} else {
verify(accounts, times(1)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(0)).getAllFrom(any(UUID.class), eq(CHUNK_SIZE));
}
verify(account1, times(0)).getUuid();
verify(account2, times(1)).getUuid();
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.empty()), eq(Arrays.asList(account1, account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(useDynamo ? 0 : 1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(useDynamo ? 1 : 0)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
@ -82,20 +113,29 @@ public class AccountDatabaseCrawlerTest {
verifyNoMoreInteractions(cache);
}
@Test
public void testCrawlChunk() throws AccountDatabaseCrawlerRestartException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCrawlChunk(final boolean useDynamo) throws AccountDatabaseCrawlerRestartException {
when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(useDynamo);
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1));
when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT1));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastUuid();
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(account2, times(1)).getUuid();
verify(cache, times(useDynamo ? 0: 1)).getLastUuid();
verify(cache, times(useDynamo ? 1: 0)).getLastUuidDynamo();
if (useDynamo) {
verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE));
} else {
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
}
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(useDynamo ? 0 : 1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(useDynamo ? 1 : 0)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
@ -107,21 +147,30 @@ public class AccountDatabaseCrawlerTest {
verifyNoMoreInteractions(cache);
}
@Test
public void testCrawlChunkAccelerated() throws AccountDatabaseCrawlerRestartException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCrawlChunkAccelerated(final boolean useDynamo) throws AccountDatabaseCrawlerRestartException {
when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(useDynamo);
when(cache.isAccelerated()).thenReturn(true);
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1));
when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT1));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isTrue();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastUuid();
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(account2, times(1)).getUuid();
verify(cache, times(useDynamo ? 0 : 1)).getLastUuid();
verify(cache, times(useDynamo ? 1 : 0)).getLastUuidDynamo();
if (useDynamo) {
verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE));
} else {
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
}
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(useDynamo ? 0 : 1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(useDynamo ? 1 : 0)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
@ -133,21 +182,31 @@ public class AccountDatabaseCrawlerTest {
verifyNoMoreInteractions(cache);
}
@Test
public void testCrawlChunkRestart() throws AccountDatabaseCrawlerRestartException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCrawlChunkRestart(final boolean useDynamo) throws AccountDatabaseCrawlerRestartException {
when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(useDynamo);
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1));
when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT1));
doThrow(AccountDatabaseCrawlerRestartException.class).when(listener).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastUuid();
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(cache, times(useDynamo ? 0 : 1)).getLastUuid();
verify(cache, times(useDynamo ? 1 : 0)).getLastUuidDynamo();
if (useDynamo) {
verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE));
} else {
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
}
verify(account2, times(0)).getNumber();
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.empty()));
verify(cache, times(useDynamo ? 0 : 1)).setLastUuid(eq(Optional.empty()));
verify(cache, times(useDynamo ? 1 : 0)).setLastUuidDynamo(eq(Optional.empty()));
verify(cache, times(1)).setAccelerated(false);
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
@ -160,21 +219,31 @@ public class AccountDatabaseCrawlerTest {
verifyNoMoreInteractions(cache);
}
@Test
public void testCrawlEnd() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCrawlEnd(final boolean useDynamo) {
when(dynamicAccountsDynamoDbMigrationConfiguration.isDynamoCrawlerEnabled()).thenReturn(useDynamo);
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT2));
when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT2));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastUuid();
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT2), eq(CHUNK_SIZE));
verify(cache, times(useDynamo ? 0 : 1)).getLastUuid();
verify(cache, times(useDynamo ? 1 : 0)).getLastUuidDynamo();
if (useDynamo) {
verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT2), eq(CHUNK_SIZE));
} else {
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT2), eq(CHUNK_SIZE));
}
verify(account1, times(0)).getNumber();
verify(account2, times(0)).getNumber();
verify(listener, times(1)).onCrawlEnd(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).setLastUuid(eq(Optional.empty()));
verify(cache, times(useDynamo ? 0 : 1)).setLastUuid(eq(Optional.empty()));
verify(cache, times(useDynamo ? 1 : 0)).setLastUuidDynamo(eq(Optional.empty()));
verify(cache, times(1)).setAccelerated(false);
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));

View File

@ -38,6 +38,7 @@ import org.junit.Test;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.entities.SignedPreKey;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountCrawlChunk;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
@ -189,19 +190,19 @@ public class AccountsTest {
users.sort((account, t1) -> UUIDComparator.staticCompare(account.getUuid(), t1.getUuid()));
List<Account> retrieved = accounts.getAllFrom(10);
assertThat(retrieved.size()).isEqualTo(10);
AccountCrawlChunk retrieved = accounts.getAllFrom(10);
assertThat(retrieved.getAccounts().size()).isEqualTo(10);
for (int i=0;i<retrieved.size();i++) {
verifyStoredState(users.get(i).getNumber(), users.get(i).getUuid(), retrieved.get(i), users.get(i));
for (int i=0;i<retrieved.getAccounts().size();i++) {
verifyStoredState(users.get(i).getNumber(), users.get(i).getUuid(), retrieved.getAccounts().get(i), users.get(i));
}
for (int j=0;j<9;j++) {
retrieved = accounts.getAllFrom(retrieved.get(9).getUuid(), 10);
assertThat(retrieved.size()).isEqualTo(10);
retrieved = accounts.getAllFrom(retrieved.getLastUuid().orElseThrow(), 10);
assertThat(retrieved.getAccounts().size()).isEqualTo(10);
for (int i=0;i<retrieved.size();i++) {
verifyStoredState(users.get(10 + (j * 10) + i).getNumber(), users.get(10 + (j * 10) + i).getUuid(), retrieved.get(i), users.get(10 + (j * 10) + i));
for (int i=0;i<retrieved.getAccounts().size();i++) {
verifyStoredState(users.get(10 + (j * 10) + i).getNumber(), users.get(10 + (j * 10) + i).getUuid(), retrieved.getAccounts().get(i), users.get(10 + (j * 10) + i));
}
}
}