Add threadpoool to increase Accounts → Dynamo migration throughput

This commit is contained in:
Chris Eager 2021-04-19 14:03:33 -05:00 committed by Chris Eager
parent 166d203e8e
commit a472774734
8 changed files with 138 additions and 42 deletions

View File

@ -13,6 +13,8 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.s3.AmazonS3;
@ -52,7 +54,9 @@ import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.servlet.DispatcherType;
import javax.servlet.FilterRegistration;
@ -286,13 +290,25 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.withRequestTimeout((int) config.getAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
// The thread pool core & max sizes are set via dynamic configuration within AccountsDynamoDb
ThreadPoolExecutor accountsDynamoDbMigrationThreadPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>());
AmazonDynamoDBAsyncClientBuilder accountsDynamoDbAsyncClientBuilder = AmazonDynamoDBAsyncClientBuilder
.standard()
.withRegion(accountsDynamoDbClientBuilder.getRegion())
.withClientConfiguration(accountsDynamoDbClientBuilder.getClientConfiguration())
.withCredentials(accountsDynamoDbClientBuilder.getCredentials())
.withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool);
DynamoDB messageDynamoDb = new DynamoDB(messageDynamoDbClientBuilder.build());
DynamoDB preKeyDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build();
AmazonDynamoDBAsync accountsDynamodbAsyncClient = accountsDynamoDbAsyncClientBuilder.build();
Accounts accounts = new Accounts(accountDatabase);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, new DynamoDB(accountsDynamoDbClient), config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName());
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamodbAsyncClient, accountsDynamoDbMigrationThreadPool, new DynamoDB(accountsDynamoDbClient), config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName());
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
PendingDevices pendingDevices = new PendingDevices (accountDatabase);
Usernames usernames = new Usernames(accountDatabase);

View File

@ -8,6 +8,9 @@ public class DynamicAccountsDynamoDbMigrationConfiguration {
@JsonProperty
boolean backgroundMigrationEnabled;
@JsonProperty
int backgroundMigrationExecutorThreads = 1;
@JsonProperty
boolean deleteEnabled;
@ -21,6 +24,10 @@ public class DynamicAccountsDynamoDbMigrationConfiguration {
return backgroundMigrationEnabled;
}
public int getBackgroundMigrationExecutorThreads() {
return backgroundMigrationExecutorThreads;
}
public void setDeleteEnabled(boolean deleteEnabled) {
this.deleteEnabled = deleteEnabled;
}

View File

@ -2,7 +2,9 @@ package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
@ -16,17 +18,23 @@ import com.amazonaws.services.dynamodbv2.model.Put;
import com.amazonaws.services.dynamodbv2.model.ReturnValuesOnConditionCheckFailure;
import com.amazonaws.services.dynamodbv2.model.TransactWriteItem;
import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsRequest;
import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsResult;
import com.amazonaws.services.dynamodbv2.model.TransactionCanceledException;
import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
@ -43,6 +51,9 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
private final AmazonDynamoDB client;
private final Table accountsTable;
private final AmazonDynamoDBAsync asyncClient;
private final ThreadPoolExecutor migrationThreadPool;
private final String phoneNumbersTableName;
@ -52,12 +63,15 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
private static final Timer GET_BY_UUID_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getByUuid"));
private static final Timer DELETE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "delete"));
public AccountsDynamoDb(AmazonDynamoDB client, DynamoDB dynamoDb, String accountsTableName, String phoneNumbersTableName) {
public AccountsDynamoDb(AmazonDynamoDB client, AmazonDynamoDBAsync asyncClient, ThreadPoolExecutor migrationThreadPool, DynamoDB dynamoDb, String accountsTableName, String phoneNumbersTableName) {
super(dynamoDb);
this.client = client;
this.accountsTable = dynamoDb.getTable(accountsTableName);
this.phoneNumbersTableName = phoneNumbersTableName;
this.asyncClient = asyncClient;
this.migrationThreadPool = migrationThreadPool;
}
@Override
@ -216,7 +230,29 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
});
}
public boolean migrate(Account account) {
public CompletableFuture<Void> migrate(List<Account> accounts, int threads) {
migrationThreadPool.setCorePoolSize(threads);
migrationThreadPool.setMaximumPoolSize(threads);
final List<CompletableFuture<?>> futures = accounts.stream()
.map(this::migrate)
.map(f -> f.whenComplete((migrated, e) -> {
if (e == null) {
MIGRATED_COUNTER.increment(migrated ? 1 : 0);
} else {
ERROR_COUNTER.increment();
}
}))
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
}
private static final Counter MIGRATED_COUNTER = Metrics.counter(name(AccountsDynamoDb.class, "migration", "count"));
private static final Counter ERROR_COUNTER = Metrics.counter(name(AccountsDynamoDb.class, "migration", "error"));
public CompletableFuture<Boolean> migrate(Account account) {
try {
TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid());
@ -233,17 +269,32 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
final TransactWriteItemsRequest request = new TransactWriteItemsRequest()
.withTransactItems(phoneNumberConstraintPut, accountPut);
client.transactWriteItems(request);
final CompletableFuture<Boolean> resultFuture = new CompletableFuture<>();
return true;
asyncClient.transactWriteItemsAsync(request,
new AsyncHandler<>() {
@Override
public void onError(Exception exception) {
if (exception instanceof TransactionCanceledException) {
// account is already migrated
resultFuture.complete(false);
} else {
ERROR_COUNTER.increment();
resultFuture.completeExceptionally(exception);
}
}
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
} catch (TransactionCanceledException ignored) {
// account is already migrated
@Override
public void onSuccess(TransactWriteItemsRequest request, TransactWriteItemsResult transactWriteItemsResult) {
resultFuture.complete(true);
}
});
return resultFuture;
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
return false;
}
@VisibleForTesting

View File

@ -1,18 +1,12 @@
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
public class AccountsDynamoDbMigrator extends AccountDatabaseCrawlerListener {
private static final Counter MIGRATED_COUNTER = Metrics.counter(name(AccountsDynamoDbMigrator.class, "migrated"));
private static final Counter ERROR_COUNTER = Metrics.counter(name(AccountsDynamoDbMigrator.class, "error"));
private final AccountsDynamoDb accountsDynamoDb;
private final DynamicConfigurationManager dynamicConfigurationManager;
@ -34,20 +28,14 @@ public class AccountsDynamoDbMigrator extends AccountDatabaseCrawlerListener {
@Override
protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
if (!dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isBackgroundMigrationEnabled()) {
if (!dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.isBackgroundMigrationEnabled()) {
return;
}
for (Account account : chunkAccounts) {
try {
final boolean migrated = accountsDynamoDb.migrate(account);
if (migrated) {
MIGRATED_COUNTER.increment();
}
} catch (final Exception e) {
final CompletableFuture<Void> migrationBatch = accountsDynamoDb.migrate(chunkAccounts,
dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().getBackgroundMigrationExecutorThreads());
ERROR_COUNTER.increment();
}
}
migrationBatch.join();
}
}

View File

@ -10,6 +10,8 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.fasterxml.jackson.databind.DeserializationFeature;
@ -20,6 +22,9 @@ import io.dropwizard.setup.Environment;
import io.lettuce.core.resource.ClientResources;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jdbi.v3.core.Jdbi;
@ -111,11 +116,21 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
.withRequestTimeout((int) configuration.getAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
ThreadPoolExecutor accountsDynamoDbMigrationThreadPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>());
AmazonDynamoDBAsyncClientBuilder accountsDynamoDbAsyncClientBuilder = AmazonDynamoDBAsyncClientBuilder
.standard()
.withRegion(accountsDynamoDbClientBuilder.getRegion())
.withClientConfiguration(accountsDynamoDbClientBuilder.getClientConfiguration())
.withCredentials(accountsDynamoDbClientBuilder.getCredentials())
.withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool);
DynamoDB messageDynamoDb = new DynamoDB(clientBuilder.build());
DynamoDB preKeysDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build();
AmazonDynamoDBAsync accountsDynamoDbAsyncClient = accountsDynamoDbAsyncClientBuilder.build();
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration(), redisClusterClientResources);
@ -132,7 +147,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(dynamicConfigurationManager);
Accounts accounts = new Accounts(accountDatabase);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, new DynamoDB(accountsDynamoDbClient), configuration.getAccountsDynamoDbConfiguration().getTableName(), configuration.getAccountsDynamoDbConfiguration().getPhoneNumberTableName());
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient, accountsDynamoDbMigrationThreadPool, new DynamoDB(accountsDynamoDbClient), configuration.getAccountsDynamoDbConfiguration().getTableName(), configuration.getAccountsDynamoDbConfiguration().getPhoneNumberTableName());
Usernames usernames = new Usernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);

View File

@ -318,6 +318,7 @@ class DynamicConfigurationTest {
final String accountsDynamoDbMigrationConfig =
"accountsDynamoDbMigration:\n"
+ " backgroundMigrationEnabled: true\n"
+ " backgroundMigrationExecutorThreads: 100\n"
+ " deleteEnabled: true\n"
+ " readEnabled: true\n"
+ " writeEnabled: true";
@ -327,6 +328,7 @@ class DynamicConfigurationTest {
.getAccountsDynamoDbMigrationConfiguration();
assertTrue(config.isBackgroundMigrationEnabled());
assertEquals(100, config.getBackgroundMigrationExecutorThreads());
assertTrue(config.isDeleteEnabled());
assertTrue(config.isWriteEnabled());
assertTrue(config.isReadEnabled());

View File

@ -12,6 +12,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.Table;
@ -31,6 +32,10 @@ import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.jdbi.v3.core.transaction.TransactionException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@ -65,7 +70,7 @@ class AccountsDynamoDbTest {
final Table numbersTable = dynamoDbExtension.getDynamoDB().createTable(createNumbersTableRequest);
this.accountsDynamoDb = new AccountsDynamoDb(dynamoDbExtension.getClient(), dynamoDbExtension.getDynamoDB(), dynamoDbExtension.getTableName(), numbersTable.getTableName());
this.accountsDynamoDb = new AccountsDynamoDb(dynamoDbExtension.getClient(), dynamoDbExtension.getAsyncClient(), new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()), dynamoDbExtension.getDynamoDB(), dynamoDbExtension.getTableName(), numbersTable.getTableName());
}
@Test
@ -244,7 +249,7 @@ class AccountsDynamoDbTest {
when(client.updateItem(any()))
.thenThrow(RuntimeException.class);
AccountsDynamoDb accounts = new AccountsDynamoDb(client, dynamoDB, ACCOUNTS_TABLE_NAME, NUMBERS_TABLE_NAME);
AccountsDynamoDb accounts = new AccountsDynamoDb(client, mock(AmazonDynamoDBAsync.class), mock(ThreadPoolExecutor.class), dynamoDB, ACCOUNTS_TABLE_NAME, NUMBERS_TABLE_NAME);
Account account = generateAccount("+14151112222", UUID.randomUUID());
try {
@ -279,19 +284,19 @@ class AccountsDynamoDbTest {
}
@Test
void testMigrate() {
void testMigrate() throws ExecutionException, InterruptedException {
Device device = generateDevice (1 );
UUID firstUuid = UUID.randomUUID();
Account account = generateAccount("+14151112222", firstUuid, Collections.singleton(device));
boolean migrated = accountsDynamoDb.migrate(account);
boolean migrated = accountsDynamoDb.migrate(account).get();
assertThat(migrated).isTrue();
verifyStoredState("+14151112222", account.getUuid(), account);
migrated = accountsDynamoDb.migrate(account);
migrated = accountsDynamoDb.migrate(account).get();
assertThat(migrated).isFalse();
@ -302,14 +307,14 @@ class AccountsDynamoDbTest {
device = generateDevice(1);
Account accountRemigrationWithDifferentUuid = generateAccount("+14151112222", secondUuid, Collections.singleton(device));
migrated = accountsDynamoDb.migrate(account);
migrated = accountsDynamoDb.migrate(account).get();
assertThat(migrated).isFalse();
verifyStoredState("+14151112222", firstUuid, account);
account.setDynamoDbMigrationVersion(account.getDynamoDbMigrationVersion() + 1);
migrated = accountsDynamoDb.migrate(account);
migrated = accountsDynamoDb.migrate(account).get();
assertThat(migrated).isTrue();
}

View File

@ -5,6 +5,8 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.local.main.ServerRunner;
@ -41,6 +43,7 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
private final long writeCapacityUnits;
private AmazonDynamoDB client;
private AmazonDynamoDBAsync asyncClient;
private DynamoDB dynamoDB;
private DynamoDbExtension(String tableName, String hashKey, String rangeKey, List<AttributeDefinition> attributeDefinitions, List<GlobalSecondaryIndex> globalSecondaryIndexes, long readCapacityUnits,
@ -114,11 +117,16 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
}
private void initializeClient() {
client = AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration("http://localhost:" + port, "local-test-region"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey")))
.build();
AmazonDynamoDBClientBuilder clientBuilder = AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration("http://localhost:" + port, "local-test-region"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey")));
client = clientBuilder.build();
asyncClient = AmazonDynamoDBAsyncClientBuilder.standard()
.withEndpointConfiguration(clientBuilder.getEndpoint())
.withCredentials(clientBuilder.getCredentials())
.build();
dynamoDB = new DynamoDB(client);
}
@ -174,6 +182,10 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback
return client;
}
public AmazonDynamoDBAsync getAsyncClient() {
return asyncClient;
}
public DynamoDB getDynamoDB() {
return dynamoDB;
}