diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 9925830e6..7215855ee 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -13,6 +13,8 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.InstanceProfileCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.s3.AmazonS3; @@ -52,7 +54,9 @@ import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.servlet.DispatcherType; import javax.servlet.FilterRegistration; @@ -286,13 +290,25 @@ public class WhisperServerService extends Application()); + + AmazonDynamoDBAsyncClientBuilder accountsDynamoDbAsyncClientBuilder = AmazonDynamoDBAsyncClientBuilder + .standard() + .withRegion(accountsDynamoDbClientBuilder.getRegion()) + .withClientConfiguration(accountsDynamoDbClientBuilder.getClientConfiguration()) + .withCredentials(accountsDynamoDbClientBuilder.getCredentials()) + .withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool); + DynamoDB messageDynamoDb = new DynamoDB(messageDynamoDbClientBuilder.build()); DynamoDB preKeyDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build()); AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build(); + AmazonDynamoDBAsync accountsDynamodbAsyncClient = accountsDynamoDbAsyncClientBuilder.build(); Accounts accounts = new Accounts(accountDatabase); - AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, new DynamoDB(accountsDynamoDbClient), config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName()); + AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamodbAsyncClient, accountsDynamoDbMigrationThreadPool, new DynamoDB(accountsDynamoDbClient), config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName()); PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase); PendingDevices pendingDevices = new PendingDevices (accountDatabase); Usernames usernames = new Usernames(accountDatabase); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicAccountsDynamoDbMigrationConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicAccountsDynamoDbMigrationConfiguration.java index 58c74cfcd..05e72227f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicAccountsDynamoDbMigrationConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicAccountsDynamoDbMigrationConfiguration.java @@ -8,6 +8,9 @@ public class DynamicAccountsDynamoDbMigrationConfiguration { @JsonProperty boolean backgroundMigrationEnabled; + @JsonProperty + int backgroundMigrationExecutorThreads = 1; + @JsonProperty boolean deleteEnabled; @@ -21,6 +24,10 @@ public class DynamicAccountsDynamoDbMigrationConfiguration { return backgroundMigrationEnabled; } + public int getBackgroundMigrationExecutorThreads() { + return backgroundMigrationExecutorThreads; + } + public void setDeleteEnabled(boolean deleteEnabled) { this.deleteEnabled = deleteEnabled; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java index 6a55de8cf..fbc0c7341 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDb.java @@ -2,7 +2,9 @@ package org.whispersystems.textsecuregcm.storage; import static com.codahale.metrics.MetricRegistry.name; +import com.amazonaws.handlers.AsyncHandler; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.PrimaryKey; @@ -16,17 +18,23 @@ import com.amazonaws.services.dynamodbv2.model.Put; import com.amazonaws.services.dynamodbv2.model.ReturnValuesOnConditionCheckFailure; import com.amazonaws.services.dynamodbv2.model.TransactWriteItem; import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsRequest; +import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsResult; import com.amazonaws.services.dynamodbv2.model.TransactionCanceledException; import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.UUIDUtil; @@ -43,6 +51,9 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt private final AmazonDynamoDB client; private final Table accountsTable; + private final AmazonDynamoDBAsync asyncClient; + + private final ThreadPoolExecutor migrationThreadPool; private final String phoneNumbersTableName; @@ -52,12 +63,15 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt private static final Timer GET_BY_UUID_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getByUuid")); private static final Timer DELETE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "delete")); - public AccountsDynamoDb(AmazonDynamoDB client, DynamoDB dynamoDb, String accountsTableName, String phoneNumbersTableName) { + public AccountsDynamoDb(AmazonDynamoDB client, AmazonDynamoDBAsync asyncClient, ThreadPoolExecutor migrationThreadPool, DynamoDB dynamoDb, String accountsTableName, String phoneNumbersTableName) { super(dynamoDb); this.client = client; this.accountsTable = dynamoDb.getTable(accountsTableName); this.phoneNumbersTableName = phoneNumbersTableName; + + this.asyncClient = asyncClient; + this.migrationThreadPool = migrationThreadPool; } @Override @@ -216,7 +230,29 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt }); } - public boolean migrate(Account account) { + public CompletableFuture migrate(List accounts, int threads) { + + migrationThreadPool.setCorePoolSize(threads); + migrationThreadPool.setMaximumPoolSize(threads); + + final List> futures = accounts.stream() + .map(this::migrate) + .map(f -> f.whenComplete((migrated, e) -> { + if (e == null) { + MIGRATED_COUNTER.increment(migrated ? 1 : 0); + } else { + ERROR_COUNTER.increment(); + } + })) + .collect(Collectors.toList()); + + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})); + } + + private static final Counter MIGRATED_COUNTER = Metrics.counter(name(AccountsDynamoDb.class, "migration", "count")); + private static final Counter ERROR_COUNTER = Metrics.counter(name(AccountsDynamoDb.class, "migration", "error")); + + public CompletableFuture migrate(Account account) { try { TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid()); @@ -233,17 +269,32 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt final TransactWriteItemsRequest request = new TransactWriteItemsRequest() .withTransactItems(phoneNumberConstraintPut, accountPut); - client.transactWriteItems(request); + final CompletableFuture resultFuture = new CompletableFuture<>(); - return true; + asyncClient.transactWriteItemsAsync(request, + new AsyncHandler<>() { + @Override + public void onError(Exception exception) { + if (exception instanceof TransactionCanceledException) { + // account is already migrated + resultFuture.complete(false); + } else { + ERROR_COUNTER.increment(); + resultFuture.completeExceptionally(exception); + } + } - } catch (JsonProcessingException e) { - throw new IllegalArgumentException(e); - } catch (TransactionCanceledException ignored) { - // account is already migrated + @Override + public void onSuccess(TransactWriteItemsRequest request, TransactWriteItemsResult transactWriteItemsResult) { + resultFuture.complete(true); + } + }); + + return resultFuture; + + } catch (Exception e) { + return CompletableFuture.failedFuture(e); } - - return false; } @VisibleForTesting diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrator.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrator.java index 57bd951a5..7ae8be997 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrator.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsDynamoDbMigrator.java @@ -1,18 +1,12 @@ package org.whispersystems.textsecuregcm.storage; -import static com.codahale.metrics.MetricRegistry.name; - -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.Metrics; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; public class AccountsDynamoDbMigrator extends AccountDatabaseCrawlerListener { - private static final Counter MIGRATED_COUNTER = Metrics.counter(name(AccountsDynamoDbMigrator.class, "migrated")); - private static final Counter ERROR_COUNTER = Metrics.counter(name(AccountsDynamoDbMigrator.class, "error")); - private final AccountsDynamoDb accountsDynamoDb; private final DynamicConfigurationManager dynamicConfigurationManager; @@ -34,20 +28,14 @@ public class AccountsDynamoDbMigrator extends AccountDatabaseCrawlerListener { @Override protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) { - if (!dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isBackgroundMigrationEnabled()) { + if (!dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration() + .isBackgroundMigrationEnabled()) { return; } - for (Account account : chunkAccounts) { - try { - final boolean migrated = accountsDynamoDb.migrate(account); - if (migrated) { - MIGRATED_COUNTER.increment(); - } - } catch (final Exception e) { + final CompletableFuture migrationBatch = accountsDynamoDb.migrate(chunkAccounts, + dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().getBackgroundMigrationExecutorThreads()); - ERROR_COUNTER.increment(); - } - } + migrationBatch.join(); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index 494a12dde..1a9483011 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -10,6 +10,8 @@ import static com.codahale.metrics.MetricRegistry.name; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.InstanceProfileCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -20,6 +22,9 @@ import io.dropwizard.setup.Environment; import io.lettuce.core.resource.ClientResources; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Subparser; import org.jdbi.v3.core.Jdbi; @@ -111,11 +116,21 @@ public class DeleteUserCommand extends EnvironmentCommand()); + + AmazonDynamoDBAsyncClientBuilder accountsDynamoDbAsyncClientBuilder = AmazonDynamoDBAsyncClientBuilder + .standard() + .withRegion(accountsDynamoDbClientBuilder.getRegion()) + .withClientConfiguration(accountsDynamoDbClientBuilder.getClientConfiguration()) + .withCredentials(accountsDynamoDbClientBuilder.getCredentials()) + .withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool); DynamoDB messageDynamoDb = new DynamoDB(clientBuilder.build()); DynamoDB preKeysDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build()); AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build(); + AmazonDynamoDBAsync accountsDynamoDbAsyncClient = accountsDynamoDbAsyncClientBuilder.build(); FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration(), redisClusterClientResources); @@ -132,7 +147,7 @@ public class DeleteUserCommand extends EnvironmentCommand()), dynamoDbExtension.getDynamoDB(), dynamoDbExtension.getTableName(), numbersTable.getTableName()); } @Test @@ -244,7 +249,7 @@ class AccountsDynamoDbTest { when(client.updateItem(any())) .thenThrow(RuntimeException.class); - AccountsDynamoDb accounts = new AccountsDynamoDb(client, dynamoDB, ACCOUNTS_TABLE_NAME, NUMBERS_TABLE_NAME); + AccountsDynamoDb accounts = new AccountsDynamoDb(client, mock(AmazonDynamoDBAsync.class), mock(ThreadPoolExecutor.class), dynamoDB, ACCOUNTS_TABLE_NAME, NUMBERS_TABLE_NAME); Account account = generateAccount("+14151112222", UUID.randomUUID()); try { @@ -279,19 +284,19 @@ class AccountsDynamoDbTest { } @Test - void testMigrate() { + void testMigrate() throws ExecutionException, InterruptedException { Device device = generateDevice (1 ); UUID firstUuid = UUID.randomUUID(); Account account = generateAccount("+14151112222", firstUuid, Collections.singleton(device)); - boolean migrated = accountsDynamoDb.migrate(account); + boolean migrated = accountsDynamoDb.migrate(account).get(); assertThat(migrated).isTrue(); verifyStoredState("+14151112222", account.getUuid(), account); - migrated = accountsDynamoDb.migrate(account); + migrated = accountsDynamoDb.migrate(account).get(); assertThat(migrated).isFalse(); @@ -302,14 +307,14 @@ class AccountsDynamoDbTest { device = generateDevice(1); Account accountRemigrationWithDifferentUuid = generateAccount("+14151112222", secondUuid, Collections.singleton(device)); - migrated = accountsDynamoDb.migrate(account); + migrated = accountsDynamoDb.migrate(account).get(); assertThat(migrated).isFalse(); verifyStoredState("+14151112222", firstUuid, account); account.setDynamoDbMigrationVersion(account.getDynamoDbMigrationVersion() + 1); - migrated = accountsDynamoDb.migrate(account); + migrated = accountsDynamoDb.migrate(account).get(); assertThat(migrated).isTrue(); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java index 97b2b8fa2..141d72ffa 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtension.java @@ -5,6 +5,8 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.local.main.ServerRunner; @@ -41,6 +43,7 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback private final long writeCapacityUnits; private AmazonDynamoDB client; + private AmazonDynamoDBAsync asyncClient; private DynamoDB dynamoDB; private DynamoDbExtension(String tableName, String hashKey, String rangeKey, List attributeDefinitions, List globalSecondaryIndexes, long readCapacityUnits, @@ -114,11 +117,16 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback } private void initializeClient() { - client = AmazonDynamoDBClientBuilder.standard() - .withEndpointConfiguration( - new AwsClientBuilder.EndpointConfiguration("http://localhost:" + port, "local-test-region")) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))) - .build(); + AmazonDynamoDBClientBuilder clientBuilder = AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration("http://localhost:" + port, "local-test-region")) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("accessKey", "secretKey"))); + client = clientBuilder.build(); + + asyncClient = AmazonDynamoDBAsyncClientBuilder.standard() + .withEndpointConfiguration(clientBuilder.getEndpoint()) + .withCredentials(clientBuilder.getCredentials()) + .build(); dynamoDB = new DynamoDB(client); } @@ -174,6 +182,10 @@ public class DynamoDbExtension implements BeforeEachCallback, AfterEachCallback return client; } + public AmazonDynamoDBAsync getAsyncClient() { + return asyncClient; + } + public DynamoDB getDynamoDB() { return dynamoDB; }