Update reconciliation crawler to use secondary index

This commit is contained in:
Chris Eager 2021-06-24 18:52:26 -05:00 committed by Chris Eager
parent 2f88f0eedb
commit 819d59cd79
6 changed files with 58 additions and 28 deletions

View File

@ -22,6 +22,7 @@ import org.whispersystems.textsecuregcm.configuration.AwsAttachmentsConfiguratio
import org.whispersystems.textsecuregcm.configuration.CdnConfiguration; import org.whispersystems.textsecuregcm.configuration.CdnConfiguration;
import org.whispersystems.textsecuregcm.configuration.DatabaseConfiguration; import org.whispersystems.textsecuregcm.configuration.DatabaseConfiguration;
import org.whispersystems.textsecuregcm.configuration.DatadogConfiguration; import org.whispersystems.textsecuregcm.configuration.DatadogConfiguration;
import org.whispersystems.textsecuregcm.configuration.DeletedAccountsDynamoDbConfiguration;
import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration; import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration;
import org.whispersystems.textsecuregcm.configuration.DonationConfiguration; import org.whispersystems.textsecuregcm.configuration.DonationConfiguration;
import org.whispersystems.textsecuregcm.configuration.DynamoDbConfiguration; import org.whispersystems.textsecuregcm.configuration.DynamoDbConfiguration;
@ -160,7 +161,7 @@ public class WhisperServerConfiguration extends Configuration {
@Valid @Valid
@NotNull @NotNull
@JsonProperty @JsonProperty
private DynamoDbConfiguration deletedAccountsDynamoDb; private DeletedAccountsDynamoDbConfiguration deletedAccountsDynamoDb;
@Valid @Valid
@NotNull @NotNull
@ -386,7 +387,7 @@ public class WhisperServerConfiguration extends Configuration {
return migrationRetryAccountsDynamoDb; return migrationRetryAccountsDynamoDb;
} }
public DynamoDbConfiguration getDeletedAccountsDynamoDbConfiguration() { public DeletedAccountsDynamoDbConfiguration getDeletedAccountsDynamoDbConfiguration() {
return deletedAccountsDynamoDb; return deletedAccountsDynamoDb;
} }

View File

@ -355,7 +355,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DynamoDbClient pendingDevicesDynamoDbClient = DynamoDbFromConfig.client(config.getPendingDevicesDynamoDbConfiguration(), DynamoDbClient pendingDevicesDynamoDbClient = DynamoDbFromConfig.client(config.getPendingDevicesDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DeletedAccounts deletedAccounts = new DeletedAccounts(deletedAccountsDynamoDbClient, config.getDeletedAccountsDynamoDbConfiguration().getTableName()); DeletedAccounts deletedAccounts = new DeletedAccounts(deletedAccountsDynamoDbClient, config.getDeletedAccountsDynamoDbConfiguration().getTableName(), config.getDeletedAccountsDynamoDbConfiguration().getNeedsReconciliationIndexName());
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(recentlyDeletedAccountsDynamoDb, config.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName()); MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(recentlyDeletedAccountsDynamoDb, config.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, config.getMigrationRetryAccountsDynamoDbConfiguration().getTableName()); MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, config.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());

View File

@ -0,0 +1,13 @@
package org.whispersystems.textsecuregcm.configuration;
import javax.validation.constraints.NotNull;
public class DeletedAccountsDynamoDbConfiguration extends DynamoDbConfiguration {
@NotNull
private String needsReconciliationIndexName;
public String getNeedsReconciliationIndexName() {
return needsReconciliationIndexName;
}
}

View File

@ -14,9 +14,8 @@ import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.Pair;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
public class DeletedAccounts extends AbstractDynamoDbStore { public class DeletedAccounts extends AbstractDynamoDbStore {
@ -24,16 +23,18 @@ public class DeletedAccounts extends AbstractDynamoDbStore {
static final String KEY_ACCOUNT_E164 = "P"; static final String KEY_ACCOUNT_E164 = "P";
static final String ATTR_ACCOUNT_UUID = "U"; static final String ATTR_ACCOUNT_UUID = "U";
static final String ATTR_EXPIRES = "E"; static final String ATTR_EXPIRES = "E";
static final String ATTR_RECONCILED_IN_CDS = "R"; static final String ATTR_NEEDS_CDS_RECONCILIATION = "R";
static final Duration TIME_TO_LIVE = Duration.ofDays(30); static final Duration TIME_TO_LIVE = Duration.ofDays(30);
private final String tableName; private final String tableName;
private final String needsReconciliationIndexName;
public DeletedAccounts(final DynamoDbClient dynamoDb, final String tableName) { public DeletedAccounts(final DynamoDbClient dynamoDb, final String tableName, final String needsReconciliationIndexName) {
super(dynamoDb); super(dynamoDb);
this.tableName = tableName; this.tableName = tableName;
this.needsReconciliationIndexName = needsReconciliationIndexName;
} }
public void put(UUID uuid, String e164) { public void put(UUID uuid, String e164) {
@ -43,7 +44,7 @@ public class DeletedAccounts extends AbstractDynamoDbStore {
KEY_ACCOUNT_E164, AttributeValues.fromString(e164), KEY_ACCOUNT_E164, AttributeValues.fromString(e164),
ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(uuid), ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
ATTR_EXPIRES, AttributeValues.fromLong(Instant.now().plus(TIME_TO_LIVE).getEpochSecond()), ATTR_EXPIRES, AttributeValues.fromLong(Instant.now().plus(TIME_TO_LIVE).getEpochSecond()),
ATTR_RECONCILED_IN_CDS, AttributeValues.fromBoolean(false))) ATTR_NEEDS_CDS_RECONCILIATION, AttributeValues.fromInt(1)))
.build()); .build());
} }
@ -51,13 +52,7 @@ public class DeletedAccounts extends AbstractDynamoDbStore {
final ScanRequest scanRequest = ScanRequest.builder() final ScanRequest scanRequest = ScanRequest.builder()
.tableName(tableName) .tableName(tableName)
.filterExpression("#reconciled = :reconciled") .indexName(needsReconciliationIndexName)
.expressionAttributeNames(Map.of(
"#reconciled", ATTR_RECONCILED_IN_CDS
))
.expressionAttributeValues(Map.of(
":reconciled", AttributeValues.fromBoolean(false)
))
.limit(max) .limit(max)
.build(); .build();
@ -70,18 +65,19 @@ public class DeletedAccounts extends AbstractDynamoDbStore {
} }
public void markReconciled(final List<String> phoneNumbersReconciled) { public void markReconciled(final List<String> phoneNumbersReconciled) {
writeInBatches(phoneNumbersReconciled, phoneNumbers -> {
final List<WriteRequest> updates = phoneNumbers.stream() phoneNumbersReconciled.forEach(number -> db().updateItem(
.map(phoneNumber -> WriteRequest.builder() UpdateItemRequest.builder()
.putRequest(PutRequest.builder().item( .tableName(tableName)
Map.of(KEY_ACCOUNT_E164, AttributeValues.fromString(phoneNumber), .key(Map.of(
ATTR_RECONCILED_IN_CDS, AttributeValues.fromBoolean(true)) KEY_ACCOUNT_E164, AttributeValues.fromString(number)
).build()).build() ))
).collect(Collectors.toList()); .updateExpression("REMOVE #needs_reconciliation")
.expressionAttributeNames(Map.of(
executeTableWriteItemsUntilComplete(Map.of(tableName, updates)); "#needs_reconciliation", ATTR_NEEDS_CDS_RECONCILIATION
}); ))
.build()
));
} }
} }

View File

@ -133,7 +133,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(configuration.getMigrationRetryAccountsDynamoDbConfiguration(), DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(configuration.getMigrationRetryAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DeletedAccounts deletedAccounts = new DeletedAccounts(deletedAccountsDynamoDbClient, configuration.getDeletedAccountsDynamoDbConfiguration().getTableName()); DeletedAccounts deletedAccounts = new DeletedAccounts(deletedAccountsDynamoDbClient, configuration.getDeletedAccountsDynamoDbConfiguration().getTableName(), configuration.getDeletedAccountsDynamoDbConfiguration().getNeedsReconciliationIndexName());
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(migrationDeletedAccountsDynamoDb, configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName()); MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(migrationDeletedAccountsDynamoDb, configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, configuration.getMigrationRetryAccountsDynamoDbConfiguration().getTableName()); MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, configuration.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());

View File

@ -13,10 +13,18 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.Pair;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.Projection;
import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
class DeletedAccountsTest { class DeletedAccountsTest {
private static final String NEEDS_RECONCILIATION_INDEX_NAME = "needs_reconciliation_test";
@RegisterExtension @RegisterExtension
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder() static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
.tableName("deleted_accounts_test") .tableName("deleted_accounts_test")
@ -24,13 +32,25 @@ class DeletedAccountsTest {
.attributeDefinition(AttributeDefinition.builder() .attributeDefinition(AttributeDefinition.builder()
.attributeName(DeletedAccounts.KEY_ACCOUNT_E164) .attributeName(DeletedAccounts.KEY_ACCOUNT_E164)
.attributeType(ScalarAttributeType.S).build()) .attributeType(ScalarAttributeType.S).build())
.attributeDefinition(AttributeDefinition.builder()
.attributeName(DeletedAccounts.ATTR_NEEDS_CDS_RECONCILIATION)
.attributeType(ScalarAttributeType.N)
.build())
.globalSecondaryIndex(GlobalSecondaryIndex.builder()
.indexName(NEEDS_RECONCILIATION_INDEX_NAME)
.keySchema(KeySchemaElement.builder().attributeName(DeletedAccounts.KEY_ACCOUNT_E164).keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName(DeletedAccounts.ATTR_NEEDS_CDS_RECONCILIATION).keyType(KeyType.RANGE).build())
.projection(Projection.builder().projectionType(ProjectionType.INCLUDE).nonKeyAttributes(DeletedAccounts.ATTR_ACCOUNT_UUID).build())
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(10L).writeCapacityUnits(10L).build())
.build())
.build(); .build();
@Test @Test
void test() { void test() {
final DeletedAccounts deletedAccounts = new DeletedAccounts(dynamoDbExtension.getDynamoDbClient(), final DeletedAccounts deletedAccounts = new DeletedAccounts(dynamoDbExtension.getDynamoDbClient(),
dynamoDbExtension.getTableName()); dynamoDbExtension.getTableName(),
NEEDS_RECONCILIATION_INDEX_NAME);
UUID firstUuid = UUID.randomUUID(); UUID firstUuid = UUID.randomUUID();
UUID secondUuid = UUID.randomUUID(); UUID secondUuid = UUID.randomUUID();