Ensure accounts are deleted after batch migration; store migration failures for later processing
This commit is contained in:
parent
a472774734
commit
5974328d9c
|
@ -134,6 +134,16 @@ public class WhisperServerConfiguration extends Configuration {
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private AccountsDynamoDbConfiguration accountsDynamoDb;
|
private AccountsDynamoDbConfiguration accountsDynamoDb;
|
||||||
|
|
||||||
|
@Valid
|
||||||
|
@NotNull
|
||||||
|
@JsonProperty
|
||||||
|
private DynamoDbConfiguration migrationDeletedAccountsDynamoDb;
|
||||||
|
|
||||||
|
@Valid
|
||||||
|
@NotNull
|
||||||
|
@JsonProperty
|
||||||
|
private DynamoDbConfiguration migrationRetryAccountsDynamoDb;
|
||||||
|
|
||||||
@Valid
|
@Valid
|
||||||
@NotNull
|
@NotNull
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -311,6 +321,14 @@ public class WhisperServerConfiguration extends Configuration {
|
||||||
return accountsDynamoDb;
|
return accountsDynamoDb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public DynamoDbConfiguration getMigrationDeletedAccountsDynamoDbConfiguration() {
|
||||||
|
return migrationDeletedAccountsDynamoDb;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DynamoDbConfiguration getMigrationRetryAccountsDynamoDbConfiguration() {
|
||||||
|
return migrationRetryAccountsDynamoDb;
|
||||||
|
}
|
||||||
|
|
||||||
public DatabaseConfiguration getAbuseDatabaseConfiguration() {
|
public DatabaseConfiguration getAbuseDatabaseConfiguration() {
|
||||||
return abuseDatabase;
|
return abuseDatabase;
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,6 +159,8 @@ import org.whispersystems.textsecuregcm.storage.MessagePersister;
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagesCache;
|
import org.whispersystems.textsecuregcm.storage.MessagesCache;
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
|
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts;
|
||||||
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
|
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
|
||||||
import org.whispersystems.textsecuregcm.storage.PendingAccountsManager;
|
import org.whispersystems.textsecuregcm.storage.PendingAccountsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.PendingDevices;
|
import org.whispersystems.textsecuregcm.storage.PendingDevices;
|
||||||
|
@ -301,14 +303,34 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
.withCredentials(accountsDynamoDbClientBuilder.getCredentials())
|
.withCredentials(accountsDynamoDbClientBuilder.getCredentials())
|
||||||
.withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool);
|
.withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool);
|
||||||
|
|
||||||
|
AmazonDynamoDBClientBuilder migrationDeletedAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
|
||||||
|
.standard()
|
||||||
|
.withRegion(config.getMigrationDeletedAccountsDynamoDbConfiguration().getRegion())
|
||||||
|
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getMigrationDeletedAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
|
||||||
|
.withRequestTimeout((int) config.getMigrationDeletedAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
|
||||||
|
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
|
||||||
|
|
||||||
|
AmazonDynamoDBClientBuilder migrationRetryAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
|
||||||
|
.standard()
|
||||||
|
.withRegion(config.getMigrationRetryAccountsDynamoDbConfiguration().getRegion())
|
||||||
|
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getMigrationRetryAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
|
||||||
|
.withRequestTimeout((int) config.getMigrationRetryAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
|
||||||
|
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
|
||||||
|
|
||||||
DynamoDB messageDynamoDb = new DynamoDB(messageDynamoDbClientBuilder.build());
|
DynamoDB messageDynamoDb = new DynamoDB(messageDynamoDbClientBuilder.build());
|
||||||
DynamoDB preKeyDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
|
DynamoDB preKeyDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
|
||||||
|
|
||||||
AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build();
|
AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build();
|
||||||
AmazonDynamoDBAsync accountsDynamodbAsyncClient = accountsDynamoDbAsyncClientBuilder.build();
|
AmazonDynamoDBAsync accountsDynamodbAsyncClient = accountsDynamoDbAsyncClientBuilder.build();
|
||||||
|
|
||||||
|
DynamoDB recentlyDeletedAccountsDynamoDb = new DynamoDB(migrationDeletedAccountsDynamoDbClientBuilder.build());
|
||||||
|
DynamoDB migrationRetryAccountsDynamoDb = new DynamoDB(migrationRetryAccountsDynamoDbClientBuilder.build());
|
||||||
|
|
||||||
|
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(recentlyDeletedAccountsDynamoDb, config.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
|
||||||
|
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, config.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());
|
||||||
|
|
||||||
Accounts accounts = new Accounts(accountDatabase);
|
Accounts accounts = new Accounts(accountDatabase);
|
||||||
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamodbAsyncClient, accountsDynamoDbMigrationThreadPool, new DynamoDB(accountsDynamoDbClient), config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName());
|
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamodbAsyncClient, accountsDynamoDbMigrationThreadPool, new DynamoDB(accountsDynamoDbClient), config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts);
|
||||||
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
|
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
|
||||||
PendingDevices pendingDevices = new PendingDevices (accountDatabase);
|
PendingDevices pendingDevices = new PendingDevices (accountDatabase);
|
||||||
Usernames usernames = new Usernames(accountDatabase);
|
Usernames usernames = new Usernames(accountDatabase);
|
||||||
|
|
|
@ -35,6 +35,8 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||||
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
||||||
|
|
||||||
|
@ -55,6 +57,9 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
|
||||||
|
|
||||||
private final ThreadPoolExecutor migrationThreadPool;
|
private final ThreadPoolExecutor migrationThreadPool;
|
||||||
|
|
||||||
|
private final MigrationDeletedAccounts migrationDeletedAccounts;
|
||||||
|
private final MigrationRetryAccounts migrationRetryAccounts;
|
||||||
|
|
||||||
private final String phoneNumbersTableName;
|
private final String phoneNumbersTableName;
|
||||||
|
|
||||||
private static final Timer CREATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "create"));
|
private static final Timer CREATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "create"));
|
||||||
|
@ -63,7 +68,13 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
|
||||||
private static final Timer GET_BY_UUID_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getByUuid"));
|
private static final Timer GET_BY_UUID_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getByUuid"));
|
||||||
private static final Timer DELETE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "delete"));
|
private static final Timer DELETE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "delete"));
|
||||||
|
|
||||||
public AccountsDynamoDb(AmazonDynamoDB client, AmazonDynamoDBAsync asyncClient, ThreadPoolExecutor migrationThreadPool, DynamoDB dynamoDb, String accountsTableName, String phoneNumbersTableName) {
|
private final Logger logger = LoggerFactory.getLogger(AccountsDynamoDb.class);
|
||||||
|
|
||||||
|
public AccountsDynamoDb(AmazonDynamoDB client, AmazonDynamoDBAsync asyncClient,
|
||||||
|
ThreadPoolExecutor migrationThreadPool, DynamoDB dynamoDb, String accountsTableName, String phoneNumbersTableName,
|
||||||
|
MigrationDeletedAccounts migrationDeletedAccounts,
|
||||||
|
MigrationRetryAccounts accountsMigrationErrors) {
|
||||||
|
|
||||||
super(dynamoDb);
|
super(dynamoDb);
|
||||||
|
|
||||||
this.client = client;
|
this.client = client;
|
||||||
|
@ -72,6 +83,9 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
|
||||||
|
|
||||||
this.asyncClient = asyncClient;
|
this.asyncClient = asyncClient;
|
||||||
this.migrationThreadPool = migrationThreadPool;
|
this.migrationThreadPool = migrationThreadPool;
|
||||||
|
|
||||||
|
this.migrationDeletedAccounts = migrationDeletedAccounts;
|
||||||
|
this.migrationRetryAccounts = accountsMigrationErrors;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -208,50 +222,72 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
|
||||||
public void delete(UUID uuid) {
|
public void delete(UUID uuid) {
|
||||||
DELETE_TIMER.record(() -> {
|
DELETE_TIMER.record(() -> {
|
||||||
|
|
||||||
Optional<Account> maybeAccount = get(uuid);
|
delete(uuid, true);
|
||||||
|
|
||||||
maybeAccount.ifPresent(account -> {
|
|
||||||
|
|
||||||
TransactWriteItem phoneNumberDelete = new TransactWriteItem()
|
|
||||||
.withDelete(new Delete()
|
|
||||||
.withTableName(phoneNumbersTableName)
|
|
||||||
.withKey(Map.of(ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()))));
|
|
||||||
|
|
||||||
TransactWriteItem accountDelete = new TransactWriteItem().withDelete(
|
|
||||||
new Delete()
|
|
||||||
.withTableName(accountsTable.getTableName())
|
|
||||||
.withKey(Map.of(KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)))));
|
|
||||||
|
|
||||||
TransactWriteItemsRequest request = new TransactWriteItemsRequest()
|
|
||||||
.withTransactItems(phoneNumberDelete, accountDelete);
|
|
||||||
|
|
||||||
client.transactWriteItems(request);
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompletableFuture<Void> migrate(List<Account> accounts, int threads) {
|
private void delete(UUID uuid, boolean saveInDeletedAccountsTable) {
|
||||||
|
|
||||||
migrationThreadPool.setCorePoolSize(threads);
|
if (saveInDeletedAccountsTable) {
|
||||||
migrationThreadPool.setMaximumPoolSize(threads);
|
migrationDeletedAccounts.put(uuid);
|
||||||
|
}
|
||||||
|
|
||||||
final List<CompletableFuture<?>> futures = accounts.stream()
|
Optional<Account> maybeAccount = get(uuid);
|
||||||
.map(this::migrate)
|
|
||||||
.map(f -> f.whenComplete((migrated, e) -> {
|
|
||||||
if (e == null) {
|
|
||||||
MIGRATED_COUNTER.increment(migrated ? 1 : 0);
|
|
||||||
} else {
|
|
||||||
ERROR_COUNTER.increment();
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
|
maybeAccount.ifPresent(account -> {
|
||||||
|
|
||||||
|
TransactWriteItem phoneNumberDelete = new TransactWriteItem()
|
||||||
|
.withDelete(new Delete()
|
||||||
|
.withTableName(phoneNumbersTableName)
|
||||||
|
.withKey(Map.of(ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()))));
|
||||||
|
|
||||||
|
TransactWriteItem accountDelete = new TransactWriteItem().withDelete(
|
||||||
|
new Delete()
|
||||||
|
.withTableName(accountsTable.getTableName())
|
||||||
|
.withKey(Map.of(KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)))));
|
||||||
|
|
||||||
|
TransactWriteItemsRequest request = new TransactWriteItemsRequest()
|
||||||
|
.withTransactItems(phoneNumberDelete, accountDelete);
|
||||||
|
|
||||||
|
client.transactWriteItems(request);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Counter MIGRATED_COUNTER = Metrics.counter(name(AccountsDynamoDb.class, "migration", "count"));
|
private static final Counter MIGRATED_COUNTER = Metrics.counter(name(AccountsDynamoDb.class, "migration", "count"));
|
||||||
private static final Counter ERROR_COUNTER = Metrics.counter(name(AccountsDynamoDb.class, "migration", "error"));
|
private static final Counter ERROR_COUNTER = Metrics.counter(name(AccountsDynamoDb.class, "migration", "error"));
|
||||||
|
|
||||||
|
public CompletableFuture<Void> migrate(List<Account> accounts, int threads) {
|
||||||
|
|
||||||
|
migrationThreadPool.setCorePoolSize(threads);
|
||||||
|
migrationThreadPool.setMaximumPoolSize(threads);
|
||||||
|
|
||||||
|
final List<CompletableFuture<?>> futures = accounts.stream()
|
||||||
|
.map(this::migrate)
|
||||||
|
.map(f -> f.whenComplete((migrated, e) -> {
|
||||||
|
if (e == null) {
|
||||||
|
MIGRATED_COUNTER.increment(migrated ? 1 : 0);
|
||||||
|
} else {
|
||||||
|
ERROR_COUNTER.increment();
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
CompletableFuture<Void> migrationBatch = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
|
||||||
|
|
||||||
|
return migrationBatch.whenComplete((result, exception) -> deleteRecentlyDeletedUuids());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void deleteRecentlyDeletedUuids() {
|
||||||
|
|
||||||
|
final List<UUID> recentlyDeletedUuids = migrationDeletedAccounts.getRecentlyDeletedUuids();
|
||||||
|
|
||||||
|
for (UUID recentlyDeletedUuid : recentlyDeletedUuids) {
|
||||||
|
delete(recentlyDeletedUuid, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
migrationDeletedAccounts.delete(recentlyDeletedUuids);
|
||||||
|
}
|
||||||
|
|
||||||
public CompletableFuture<Boolean> migrate(Account account) {
|
public CompletableFuture<Boolean> migrate(Account account) {
|
||||||
try {
|
try {
|
||||||
TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid());
|
TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid());
|
||||||
|
@ -279,7 +315,11 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
|
||||||
// account is already migrated
|
// account is already migrated
|
||||||
resultFuture.complete(false);
|
resultFuture.complete(false);
|
||||||
} else {
|
} else {
|
||||||
ERROR_COUNTER.increment();
|
try {
|
||||||
|
migrationRetryAccounts.put(account.getUuid());
|
||||||
|
} catch (final Exception e) {
|
||||||
|
logger.error("Could not store account {}", account.getUuid());
|
||||||
|
}
|
||||||
resultFuture.completeExceptionally(exception);
|
resultFuture.completeExceptionally(exception);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
package org.whispersystems.textsecuregcm.storage;
|
||||||
|
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.Item;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.Table;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
||||||
|
|
||||||
|
public class MigrationDeletedAccounts extends AbstractDynamoDbStore {
|
||||||
|
|
||||||
|
private final Table table;
|
||||||
|
|
||||||
|
static final String KEY_UUID = "U";
|
||||||
|
|
||||||
|
public MigrationDeletedAccounts(DynamoDB dynamoDb, String tableName) {
|
||||||
|
super(dynamoDb);
|
||||||
|
|
||||||
|
table = dynamoDb.getTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(UUID uuid) {
|
||||||
|
table.putItem(new Item()
|
||||||
|
.withPrimaryKey(primaryKey(uuid)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<UUID> getRecentlyDeletedUuids() {
|
||||||
|
|
||||||
|
final List<UUID> uuids = new ArrayList<>();
|
||||||
|
|
||||||
|
for (Item item : table.scan(new ScanSpec()).firstPage()) {
|
||||||
|
// only process one page each time. If we have a significant backlog at the end of the migration
|
||||||
|
// we can handle it separately
|
||||||
|
uuids.add(UUIDUtil.fromByteBuffer(item.getByteBuffer(KEY_UUID)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return uuids;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void delete(List<UUID> uuids) {
|
||||||
|
|
||||||
|
writeInBatches(uuids, (batch) -> {
|
||||||
|
|
||||||
|
final TableWriteItems deleteItems = new TableWriteItems(table.getTableName());
|
||||||
|
|
||||||
|
for (UUID uuid : batch) {
|
||||||
|
deleteItems.addPrimaryKeyToDelete(primaryKey(uuid));
|
||||||
|
}
|
||||||
|
|
||||||
|
executeTableWriteItemsUntilComplete(deleteItems);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static PrimaryKey primaryKey(UUID uuid) {
|
||||||
|
return new PrimaryKey(KEY_UUID, UUIDUtil.toBytes(uuid));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
package org.whispersystems.textsecuregcm.storage;
|
||||||
|
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.Item;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.Page;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.ScanOutcome;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.Table;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
||||||
|
|
||||||
|
public class MigrationRetryAccounts extends AbstractDynamoDbStore {
|
||||||
|
|
||||||
|
private final Table table;
|
||||||
|
|
||||||
|
static final String KEY_UUID = "U";
|
||||||
|
|
||||||
|
public MigrationRetryAccounts(DynamoDB dynamoDb, String tableName) {
|
||||||
|
super(dynamoDb);
|
||||||
|
|
||||||
|
table = dynamoDb.getTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(UUID uuid) {
|
||||||
|
table.putItem(new Item()
|
||||||
|
.withPrimaryKey(primaryKey(uuid)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<UUID> getUuids(int max) {
|
||||||
|
|
||||||
|
final List<UUID> uuids = new ArrayList<>();
|
||||||
|
|
||||||
|
for (Page<Item, ScanOutcome> page : table.scan(new ScanSpec()).pages()) {
|
||||||
|
|
||||||
|
for (Item item : page) {
|
||||||
|
uuids.add(UUIDUtil.fromByteBuffer(item.getByteBuffer(KEY_UUID)));
|
||||||
|
|
||||||
|
if (uuids.size() >= max) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (uuids.size() >= max) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return uuids;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static PrimaryKey primaryKey(UUID uuid) {
|
||||||
|
return new PrimaryKey(KEY_UUID, UUIDUtil.toBytes(uuid));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -39,10 +39,12 @@ import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient;
|
||||||
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
|
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
|
||||||
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
|
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts;
|
||||||
import org.whispersystems.textsecuregcm.storage.Accounts;
|
import org.whispersystems.textsecuregcm.storage.Accounts;
|
||||||
import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb;
|
import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb;
|
||||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason;
|
import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts;
|
||||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||||
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
|
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
|
||||||
|
@ -126,6 +128,20 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
|
||||||
.withCredentials(accountsDynamoDbClientBuilder.getCredentials())
|
.withCredentials(accountsDynamoDbClientBuilder.getCredentials())
|
||||||
.withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool);
|
.withExecutorFactory(() -> accountsDynamoDbMigrationThreadPool);
|
||||||
|
|
||||||
|
AmazonDynamoDBClientBuilder migrationDeletedAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
|
||||||
|
.standard()
|
||||||
|
.withRegion(configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getRegion())
|
||||||
|
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
|
||||||
|
.withRequestTimeout((int) configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
|
||||||
|
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
|
||||||
|
|
||||||
|
AmazonDynamoDBClientBuilder migrationRetryAccountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
|
||||||
|
.standard()
|
||||||
|
.withRegion(configuration.getMigrationRetryAccountsDynamoDbConfiguration().getRegion())
|
||||||
|
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getMigrationRetryAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
|
||||||
|
.withRequestTimeout((int) configuration.getMigrationRetryAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
|
||||||
|
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
|
||||||
|
|
||||||
DynamoDB messageDynamoDb = new DynamoDB(clientBuilder.build());
|
DynamoDB messageDynamoDb = new DynamoDB(clientBuilder.build());
|
||||||
DynamoDB preKeysDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
|
DynamoDB preKeysDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
|
||||||
|
|
||||||
|
@ -146,8 +162,14 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
|
||||||
|
|
||||||
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(dynamicConfigurationManager);
|
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(dynamicConfigurationManager);
|
||||||
|
|
||||||
|
DynamoDB migrationDeletedAccountsDynamoDb = new DynamoDB(migrationDeletedAccountsDynamoDbClientBuilder.build());
|
||||||
|
DynamoDB migrationRetryAccountsDynamoDb = new DynamoDB(migrationRetryAccountsDynamoDbClientBuilder.build());
|
||||||
|
|
||||||
|
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(migrationDeletedAccountsDynamoDb, configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
|
||||||
|
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, configuration.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());
|
||||||
|
|
||||||
Accounts accounts = new Accounts(accountDatabase);
|
Accounts accounts = new Accounts(accountDatabase);
|
||||||
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient, accountsDynamoDbMigrationThreadPool, new DynamoDB(accountsDynamoDbClient), configuration.getAccountsDynamoDbConfiguration().getTableName(), configuration.getAccountsDynamoDbConfiguration().getPhoneNumberTableName());
|
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient, accountsDynamoDbMigrationThreadPool, new DynamoDB(accountsDynamoDbClient), configuration.getAccountsDynamoDbConfiguration().getTableName(), configuration.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts);
|
||||||
Usernames usernames = new Usernames(accountDatabase);
|
Usernames usernames = new Usernames(accountDatabase);
|
||||||
Profiles profiles = new Profiles(accountDatabase);
|
Profiles profiles = new Profiles(accountDatabase);
|
||||||
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
|
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
|
||||||
|
|
|
@ -15,8 +15,11 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
|
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
|
||||||
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
||||||
import com.amazonaws.services.dynamodbv2.document.Item;
|
import com.amazonaws.services.dynamodbv2.document.Item;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.Page;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.ScanOutcome;
|
||||||
import com.amazonaws.services.dynamodbv2.document.Table;
|
import com.amazonaws.services.dynamodbv2.document.Table;
|
||||||
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
|
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec;
|
||||||
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
|
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
|
||||||
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
|
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
|
||||||
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
|
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
|
||||||
|
@ -49,6 +52,8 @@ class AccountsDynamoDbTest {
|
||||||
|
|
||||||
private static final String ACCOUNTS_TABLE_NAME = "accounts_test";
|
private static final String ACCOUNTS_TABLE_NAME = "accounts_test";
|
||||||
private static final String NUMBERS_TABLE_NAME = "numbers_test";
|
private static final String NUMBERS_TABLE_NAME = "numbers_test";
|
||||||
|
private static final String MIGRATION_DELETED_ACCOUNTS_TABLE_NAME = "migration_deleted_accounts_test";
|
||||||
|
private static final String MIGRATION_RETRY_ACCOUNTS_TABLE_NAME = "miration_retry_accounts_test";
|
||||||
|
|
||||||
@RegisterExtension
|
@RegisterExtension
|
||||||
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
|
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
|
||||||
|
@ -59,6 +64,9 @@ class AccountsDynamoDbTest {
|
||||||
|
|
||||||
private AccountsDynamoDb accountsDynamoDb;
|
private AccountsDynamoDb accountsDynamoDb;
|
||||||
|
|
||||||
|
private Table migrationDeletedAccountsTable;
|
||||||
|
private Table migrationRetryAccountsTable;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setupAccountsDao() {
|
void setupAccountsDao() {
|
||||||
|
|
||||||
|
@ -70,7 +78,30 @@ class AccountsDynamoDbTest {
|
||||||
|
|
||||||
final Table numbersTable = dynamoDbExtension.getDynamoDB().createTable(createNumbersTableRequest);
|
final Table numbersTable = dynamoDbExtension.getDynamoDB().createTable(createNumbersTableRequest);
|
||||||
|
|
||||||
this.accountsDynamoDb = new AccountsDynamoDb(dynamoDbExtension.getClient(), dynamoDbExtension.getAsyncClient(), new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()), dynamoDbExtension.getDynamoDB(), dynamoDbExtension.getTableName(), numbersTable.getTableName());
|
final CreateTableRequest createMigrationDeletedAccountsTableRequest = new CreateTableRequest()
|
||||||
|
.withTableName(MIGRATION_DELETED_ACCOUNTS_TABLE_NAME)
|
||||||
|
.withKeySchema(new KeySchemaElement(MigrationDeletedAccounts.KEY_UUID, KeyType.HASH))
|
||||||
|
.withAttributeDefinitions(new AttributeDefinition(MigrationDeletedAccounts.KEY_UUID, ScalarAttributeType.B))
|
||||||
|
.withProvisionedThroughput(DynamoDbExtension.DEFAULT_PROVISIONED_THROUGHPUT);
|
||||||
|
|
||||||
|
migrationDeletedAccountsTable = dynamoDbExtension.getDynamoDB().createTable(createMigrationDeletedAccountsTableRequest);
|
||||||
|
|
||||||
|
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(dynamoDbExtension.getDynamoDB(),
|
||||||
|
migrationDeletedAccountsTable.getTableName());
|
||||||
|
|
||||||
|
final CreateTableRequest createMigrationRetryAccountsTableRequest = new CreateTableRequest()
|
||||||
|
.withTableName(MIGRATION_RETRY_ACCOUNTS_TABLE_NAME)
|
||||||
|
.withKeySchema(new KeySchemaElement(MigrationRetryAccounts.KEY_UUID, KeyType.HASH))
|
||||||
|
.withAttributeDefinitions(new AttributeDefinition(MigrationRetryAccounts.KEY_UUID, ScalarAttributeType.B))
|
||||||
|
.withProvisionedThroughput(DynamoDbExtension.DEFAULT_PROVISIONED_THROUGHPUT);
|
||||||
|
|
||||||
|
migrationRetryAccountsTable = dynamoDbExtension.getDynamoDB().createTable(createMigrationRetryAccountsTableRequest);
|
||||||
|
|
||||||
|
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts((dynamoDbExtension.getDynamoDB()),
|
||||||
|
migrationRetryAccountsTable.getTableName());
|
||||||
|
|
||||||
|
this.accountsDynamoDb = new AccountsDynamoDb(dynamoDbExtension.getClient(), dynamoDbExtension.getAsyncClient(), new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()), dynamoDbExtension.getDynamoDB(), dynamoDbExtension.getTableName(), numbersTable.getTableName(),
|
||||||
|
migrationDeletedAccounts, migrationRetryAccounts);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -214,6 +245,27 @@ class AccountsDynamoDbTest {
|
||||||
verifyStoredState(recreatedAccount.getNumber(), recreatedAccount.getUuid(),
|
verifyStoredState(recreatedAccount.getNumber(), recreatedAccount.getUuid(),
|
||||||
accountsDynamoDb.get(recreatedAccount.getUuid()).get(), recreatedAccount);
|
accountsDynamoDb.get(recreatedAccount.getUuid()).get(), recreatedAccount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
verifyRecentlyDeletedAccountsTableItemCount(1);
|
||||||
|
|
||||||
|
assertThat(migrationDeletedAccountsTable
|
||||||
|
.getItem(MigrationDeletedAccounts.primaryKey(deletedAccount.getUuid()))).isNotNull();
|
||||||
|
|
||||||
|
accountsDynamoDb.deleteRecentlyDeletedUuids();
|
||||||
|
|
||||||
|
verifyRecentlyDeletedAccountsTableItemCount(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyRecentlyDeletedAccountsTableItemCount(int expectedItemCount) {
|
||||||
|
int totalItems = 0;
|
||||||
|
|
||||||
|
for (Page<Item, ScanOutcome> page : migrationDeletedAccountsTable.scan(new ScanSpec()).pages()) {
|
||||||
|
for (Item ignored : page) {
|
||||||
|
totalItems++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(totalItems).isEqualTo(expectedItemCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -249,7 +301,8 @@ class AccountsDynamoDbTest {
|
||||||
when(client.updateItem(any()))
|
when(client.updateItem(any()))
|
||||||
.thenThrow(RuntimeException.class);
|
.thenThrow(RuntimeException.class);
|
||||||
|
|
||||||
AccountsDynamoDb accounts = new AccountsDynamoDb(client, mock(AmazonDynamoDBAsync.class), mock(ThreadPoolExecutor.class), dynamoDB, ACCOUNTS_TABLE_NAME, NUMBERS_TABLE_NAME);
|
AccountsDynamoDb accounts = new AccountsDynamoDb(client, mock(AmazonDynamoDBAsync.class), mock(ThreadPoolExecutor.class), dynamoDB, ACCOUNTS_TABLE_NAME, NUMBERS_TABLE_NAME, mock(
|
||||||
|
MigrationDeletedAccounts.class), mock(MigrationRetryAccounts.class));
|
||||||
Account account = generateAccount("+14151112222", UUID.randomUUID());
|
Account account = generateAccount("+14151112222", UUID.randomUUID());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
package org.whispersystems.textsecuregcm.storage;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
|
||||||
|
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
|
||||||
|
class MigrationDeletedAccountsTest {
|
||||||
|
|
||||||
|
@RegisterExtension
|
||||||
|
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
|
||||||
|
.tableName("deleted_accounts_test")
|
||||||
|
.hashKey(MigrationDeletedAccounts.KEY_UUID)
|
||||||
|
.attributeDefinition(new AttributeDefinition(MigrationDeletedAccounts.KEY_UUID, ScalarAttributeType.B))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void test() {
|
||||||
|
|
||||||
|
final MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(dynamoDbExtension.getDynamoDB(),
|
||||||
|
dynamoDbExtension.getTableName());
|
||||||
|
|
||||||
|
UUID firstUuid = UUID.randomUUID();
|
||||||
|
UUID secondUuid = UUID.randomUUID();
|
||||||
|
|
||||||
|
assertTrue(migrationDeletedAccounts.getRecentlyDeletedUuids().isEmpty());
|
||||||
|
|
||||||
|
migrationDeletedAccounts.put(firstUuid);
|
||||||
|
migrationDeletedAccounts.put(secondUuid);
|
||||||
|
|
||||||
|
assertTrue(migrationDeletedAccounts.getRecentlyDeletedUuids().containsAll(List.of(firstUuid, secondUuid)));
|
||||||
|
|
||||||
|
migrationDeletedAccounts.delete(List.of(firstUuid, secondUuid));
|
||||||
|
|
||||||
|
assertTrue(migrationDeletedAccounts.getRecentlyDeletedUuids().isEmpty());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
package org.whispersystems.textsecuregcm.storage;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
|
||||||
|
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
|
||||||
|
class MigrationRetryAccountsTest {
|
||||||
|
|
||||||
|
@RegisterExtension
|
||||||
|
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
|
||||||
|
.tableName("account_migration_errors_test")
|
||||||
|
.hashKey(MigrationRetryAccounts.KEY_UUID)
|
||||||
|
.attributeDefinition(new AttributeDefinition(MigrationRetryAccounts.KEY_UUID, ScalarAttributeType.B))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void test() {
|
||||||
|
|
||||||
|
final MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(dynamoDbExtension.getDynamoDB(),
|
||||||
|
dynamoDbExtension.getTableName());
|
||||||
|
|
||||||
|
UUID firstUuid = UUID.randomUUID();
|
||||||
|
UUID secondUuid = UUID.randomUUID();
|
||||||
|
|
||||||
|
assertTrue(migrationRetryAccounts.getUuids(10).isEmpty());
|
||||||
|
|
||||||
|
migrationRetryAccounts.put(firstUuid);
|
||||||
|
migrationRetryAccounts.put(secondUuid);
|
||||||
|
|
||||||
|
assertTrue(migrationRetryAccounts.getUuids(10).containsAll(List.of(firstUuid, secondUuid)));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue