Don’t delete accounts after reconciling
This commit is contained in:
parent
19617c14f8
commit
eac48a6617
|
@ -4,7 +4,8 @@
|
|||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
@ -12,16 +13,20 @@ import java.util.stream.Collectors;
|
|||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
import org.whispersystems.textsecuregcm.util.Pair;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
|
||||
|
||||
public class DeletedAccounts extends AbstractDynamoDbStore {
|
||||
|
||||
// uuid, primary key
|
||||
static final String KEY_ACCOUNT_UUID = "U";
|
||||
static final String ATTR_ACCOUNT_E164 = "P";
|
||||
// e164, primary key
|
||||
static final String KEY_ACCOUNT_E164 = "P";
|
||||
static final String ATTR_ACCOUNT_UUID = "U";
|
||||
static final String ATTR_EXPIRES = "E";
|
||||
static final String ATTR_RECONCILED_IN_CDS = "R";
|
||||
|
||||
static final Duration TIME_TO_LIVE = Duration.ofDays(30);
|
||||
|
||||
private final String tableName;
|
||||
|
||||
|
@ -35,37 +40,48 @@ public class DeletedAccounts extends AbstractDynamoDbStore {
|
|||
db().putItem(PutItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.item(Map.of(
|
||||
KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
|
||||
ATTR_ACCOUNT_E164, AttributeValues.fromString(e164)))
|
||||
KEY_ACCOUNT_E164, AttributeValues.fromString(e164),
|
||||
ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
|
||||
ATTR_EXPIRES, AttributeValues.fromLong(Instant.now().plus(TIME_TO_LIVE).getEpochSecond()),
|
||||
ATTR_RECONCILED_IN_CDS, AttributeValues.fromBoolean(false)))
|
||||
.build());
|
||||
}
|
||||
|
||||
public List<Pair<UUID, String>> list(final int max) {
|
||||
public List<Pair<UUID, String>> listAccountsToReconcile(final int max) {
|
||||
|
||||
final ScanRequest scanRequest = ScanRequest.builder()
|
||||
.tableName(tableName)
|
||||
.filterExpression("#reconciled = :reconciled")
|
||||
.expressionAttributeNames(Map.of(
|
||||
"#reconciled", ATTR_RECONCILED_IN_CDS
|
||||
))
|
||||
.expressionAttributeValues(Map.of(
|
||||
":reconciled", AttributeValues.fromBoolean(false)
|
||||
))
|
||||
.limit(max)
|
||||
.build();
|
||||
|
||||
return scan(scanRequest, max)
|
||||
.stream()
|
||||
.map(item -> new Pair<>(
|
||||
AttributeValues.getUUID(item, KEY_ACCOUNT_UUID, null),
|
||||
AttributeValues.getString(item, ATTR_ACCOUNT_E164, null)))
|
||||
AttributeValues.getUUID(item, ATTR_ACCOUNT_UUID, null),
|
||||
AttributeValues.getString(item, KEY_ACCOUNT_E164, null)))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public void delete(final List<UUID> uuidsToDelete) {
|
||||
public void markReconciled(final List<String> phoneNumbersReconciled) {
|
||||
writeInBatches(phoneNumbersReconciled, phoneNumbers -> {
|
||||
|
||||
writeInBatches(uuidsToDelete, (uuids -> {
|
||||
final List<WriteRequest> updates = phoneNumbers.stream()
|
||||
.map(phoneNumber -> WriteRequest.builder()
|
||||
.putRequest(PutRequest.builder().item(
|
||||
Map.of(KEY_ACCOUNT_E164, AttributeValues.fromString(phoneNumber),
|
||||
ATTR_RECONCILED_IN_CDS, AttributeValues.fromBoolean(true))
|
||||
).build()).build()
|
||||
).collect(Collectors.toList());
|
||||
|
||||
final List<WriteRequest> deletes = uuids.stream()
|
||||
.map(uuid -> WriteRequest.builder().deleteRequest(
|
||||
DeleteRequest.builder().key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid))).build()).build())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
executeTableWriteItemsUntilComplete(Map.of(tableName, deletes));
|
||||
}));
|
||||
executeTableWriteItemsUntilComplete(Map.of(tableName, updates));
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ public class DeletedAccountsTableCrawler extends ManagedPeriodicWork {
|
|||
@Override
|
||||
public void doPeriodicWork() throws Exception {
|
||||
|
||||
final List<Pair<UUID, String>> deletedAccounts = this.deletedAccounts.list(MAX_BATCH_SIZE);
|
||||
final List<Pair<UUID, String>> deletedAccounts = this.deletedAccounts.listAccountsToReconcile(MAX_BATCH_SIZE);
|
||||
|
||||
final List<User> deletedUsers = deletedAccounts.stream()
|
||||
.map(pair -> new User(pair.first(), pair.second()))
|
||||
|
@ -53,16 +53,16 @@ public class DeletedAccountsTableCrawler extends ManagedPeriodicWork {
|
|||
reconciler.onCrawlChunk(deletedUsers);
|
||||
}
|
||||
|
||||
final List<UUID> deletedUuids = deletedAccounts.stream()
|
||||
.map(Pair::first)
|
||||
final List<String> reconciledPhoneNumbers = deletedAccounts.stream()
|
||||
.map(Pair::second)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
this.deletedAccounts.delete(deletedUuids);
|
||||
this.deletedAccounts.markReconciled(reconciledPhoneNumbers);
|
||||
|
||||
DistributionSummary.builder(BATCH_SIZE_DISTRIBUTION_NAME)
|
||||
.publishPercentileHistogram()
|
||||
.register(Metrics.globalRegistry)
|
||||
.record(deletedUuids.size());
|
||||
.record(reconciledPhoneNumbers.size());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -43,6 +43,10 @@ public class AttributeValues {
|
|||
return AttributeValue.builder().b(value).build();
|
||||
}
|
||||
|
||||
public static AttributeValue fromBoolean(boolean value) {
|
||||
return AttributeValue.builder().bool(value).build();
|
||||
}
|
||||
|
||||
private static int toInt(AttributeValue av) {
|
||||
return Integer.parseInt(av.n());
|
||||
}
|
||||
|
|
|
@ -20,10 +20,10 @@ class DeletedAccountsTest {
|
|||
@RegisterExtension
|
||||
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
|
||||
.tableName("deleted_accounts_test")
|
||||
.hashKey(DeletedAccounts.KEY_ACCOUNT_UUID)
|
||||
.hashKey(DeletedAccounts.KEY_ACCOUNT_E164)
|
||||
.attributeDefinition(AttributeDefinition.builder()
|
||||
.attributeName(DeletedAccounts.KEY_ACCOUNT_UUID)
|
||||
.attributeType(ScalarAttributeType.B).build())
|
||||
.attributeName(DeletedAccounts.KEY_ACCOUNT_E164)
|
||||
.attributeType(ScalarAttributeType.S).build())
|
||||
.build();
|
||||
|
||||
@Test
|
||||
|
@ -38,20 +38,20 @@ class DeletedAccountsTest {
|
|||
String firstNumber = "+14152221234";
|
||||
String secondNumber = "+14152225678";
|
||||
|
||||
assertTrue(deletedAccounts.list(1).isEmpty());
|
||||
assertTrue(deletedAccounts.listAccountsToReconcile(1).isEmpty());
|
||||
|
||||
deletedAccounts.put(firstUuid, firstNumber);
|
||||
deletedAccounts.put(secondUuid, secondNumber);
|
||||
|
||||
assertEquals(1, deletedAccounts.list(1).size());
|
||||
assertEquals(1, deletedAccounts.listAccountsToReconcile(1).size());
|
||||
|
||||
assertTrue(deletedAccounts.list(10).containsAll(
|
||||
assertTrue(deletedAccounts.listAccountsToReconcile(10).containsAll(
|
||||
List.of(
|
||||
new Pair<>(firstUuid, firstNumber),
|
||||
new Pair<>(secondUuid, secondNumber))));
|
||||
|
||||
deletedAccounts.delete(List.of(firstUuid, secondUuid));
|
||||
deletedAccounts.markReconciled(List.of(firstNumber, secondNumber));
|
||||
|
||||
assertTrue(deletedAccounts.list(10).isEmpty());
|
||||
assertTrue(deletedAccounts.listAccountsToReconcile(10).isEmpty());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue