Lock accounts for the duration of deletion operations.
This commit is contained in:
parent
cdef745a7a
commit
f47fefb73e
|
@ -403,41 +403,42 @@ public class AccountsManager {
|
||||||
|
|
||||||
public void delete(final Account account, final DeletionReason deletionReason) throws InterruptedException {
|
public void delete(final Account account, final DeletionReason deletionReason) throws InterruptedException {
|
||||||
try (final Timer.Context ignored = deleteTimer.time()) {
|
try (final Timer.Context ignored = deleteTimer.time()) {
|
||||||
final CompletableFuture<Void> deleteStorageServiceDataFuture = secureStorageClient.deleteStoredData(account.getUuid());
|
deletedAccountsManager.lockAndPut(account.getNumber(), () -> {
|
||||||
final CompletableFuture<Void> deleteBackupServiceDataFuture = secureBackupClient.deleteBackups(account.getUuid());
|
final CompletableFuture<Void> deleteStorageServiceDataFuture = secureStorageClient.deleteStoredData(account.getUuid());
|
||||||
|
final CompletableFuture<Void> deleteBackupServiceDataFuture = secureBackupClient.deleteBackups(account.getUuid());
|
||||||
|
|
||||||
usernamesManager.delete(account.getUuid());
|
usernamesManager.delete(account.getUuid());
|
||||||
directoryQueue.deleteAccount(account);
|
directoryQueue.deleteAccount(account);
|
||||||
profilesManager.deleteAll(account.getUuid());
|
profilesManager.deleteAll(account.getUuid());
|
||||||
keysDynamoDb.delete(account.getUuid());
|
keysDynamoDb.delete(account.getUuid());
|
||||||
messagesManager.clear(account.getUuid());
|
messagesManager.clear(account.getUuid());
|
||||||
|
|
||||||
deleteStorageServiceDataFuture.join();
|
deleteStorageServiceDataFuture.join();
|
||||||
deleteBackupServiceDataFuture.join();
|
deleteBackupServiceDataFuture.join();
|
||||||
|
|
||||||
redisDelete(account);
|
redisDelete(account);
|
||||||
databaseDelete(account);
|
databaseDelete(account);
|
||||||
|
|
||||||
if (dynamoDeleteEnabled()) {
|
if (dynamoDeleteEnabled()) {
|
||||||
try {
|
try {
|
||||||
dynamoDelete(account);
|
dynamoDelete(account);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.error("Could not delete account {} from dynamo", account.getUuid().toString());
|
logger.error("Could not delete account {} from dynamo", account.getUuid().toString());
|
||||||
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER_NAME, "action", "delete").increment();
|
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER_NAME, "action", "delete").increment();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
deletedAccountsManager.addRecentlyDeletedAccount(account.getUuid(), account.getNumber());
|
|
||||||
|
|
||||||
|
return account.getUuid();
|
||||||
|
});
|
||||||
} catch (final RuntimeException | InterruptedException e) {
|
} catch (final RuntimeException | InterruptedException e) {
|
||||||
logger.warn("Failed to delete account", e);
|
logger.warn("Failed to delete account", e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
Metrics.counter(DELETE_COUNTER_NAME,
|
Metrics.counter(DELETE_COUNTER_NAME,
|
||||||
COUNTRY_CODE_TAG_NAME, Util.getCountryCode(account.getNumber()),
|
COUNTRY_CODE_TAG_NAME, Util.getCountryCode(account.getNumber()),
|
||||||
DELETION_REASON_TAG_NAME, deletionReason.tagValue)
|
DELETION_REASON_TAG_NAME, deletionReason.tagValue)
|
||||||
.increment();
|
.increment();
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getAccountMapKey(String number) {
|
private String getAccountMapKey(String number) {
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -60,10 +61,6 @@ public class DeletedAccountsManager {
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addRecentlyDeletedAccount(final UUID uuid, final String e164) throws InterruptedException {
|
|
||||||
withLock(e164, () -> deletedAccounts.put(uuid, e164, true));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void lockAndTake(final String e164, final Consumer<Optional<UUID>> consumer) throws InterruptedException {
|
public void lockAndTake(final String e164, final Consumer<Optional<UUID>> consumer) throws InterruptedException {
|
||||||
withLock(e164, () -> {
|
withLock(e164, () -> {
|
||||||
try {
|
try {
|
||||||
|
@ -75,6 +72,16 @@ public class DeletedAccountsManager {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void lockAndPut(final String e164, final Supplier<UUID> supplier) throws InterruptedException {
|
||||||
|
withLock(e164, () -> {
|
||||||
|
try {
|
||||||
|
deletedAccounts.put(supplier.get(), e164, true);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
log.warn("Supplier threw an exception while holding lock on a deleted account record", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private void withLock(final String e164, final Runnable task) throws InterruptedException {
|
private void withLock(final String e164, final Runnable task) throws InterruptedException {
|
||||||
final LockItem lockItem = lockClient.acquireLock(AcquireLockOptions.builder(e164)
|
final LockItem lockItem = lockClient.acquireLock(AcquireLockOptions.builder(e164)
|
||||||
.withAcquireReleasedLocksConsistently(true)
|
.withAcquireReleasedLocksConsistently(true)
|
||||||
|
|
|
@ -79,7 +79,7 @@ class DeletedAccountsManagerTest {
|
||||||
final UUID uuid = UUID.randomUUID();
|
final UUID uuid = UUID.randomUUID();
|
||||||
final String e164 = "+18005551234";
|
final String e164 = "+18005551234";
|
||||||
|
|
||||||
deletedAccountsManager.addRecentlyDeletedAccount(uuid, e164);
|
deletedAccounts.put(uuid, e164, true);
|
||||||
deletedAccountsManager.lockAndTake(e164, maybeUuid -> assertEquals(Optional.of(uuid), maybeUuid));
|
deletedAccountsManager.lockAndTake(e164, maybeUuid -> assertEquals(Optional.of(uuid), maybeUuid));
|
||||||
assertEquals(Optional.empty(), deletedAccounts.findUuid(e164));
|
assertEquals(Optional.empty(), deletedAccounts.findUuid(e164));
|
||||||
}
|
}
|
||||||
|
@ -89,7 +89,7 @@ class DeletedAccountsManagerTest {
|
||||||
final UUID uuid = UUID.randomUUID();
|
final UUID uuid = UUID.randomUUID();
|
||||||
final String e164 = "+18005551234";
|
final String e164 = "+18005551234";
|
||||||
|
|
||||||
deletedAccountsManager.addRecentlyDeletedAccount(uuid, e164);
|
deletedAccounts.put(uuid, e164, true);
|
||||||
|
|
||||||
deletedAccountsManager.lockAndTake(e164, maybeUuid -> {
|
deletedAccountsManager.lockAndTake(e164, maybeUuid -> {
|
||||||
assertEquals(Optional.of(uuid), maybeUuid);
|
assertEquals(Optional.of(uuid), maybeUuid);
|
||||||
|
@ -100,7 +100,7 @@ class DeletedAccountsManagerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testReconciliationLockContention() throws ChunkProcessingFailedException, InterruptedException {
|
void testReconciliationLockContention() throws ChunkProcessingFailedException {
|
||||||
|
|
||||||
final UUID[] uuids = new UUID[3];
|
final UUID[] uuids = new UUID[3];
|
||||||
final String[] e164s = new String[uuids.length];
|
final String[] e164s = new String[uuids.length];
|
||||||
|
@ -113,7 +113,7 @@ class DeletedAccountsManagerTest {
|
||||||
final Map<String, UUID> expectedReconciledAccounts = new HashMap<>();
|
final Map<String, UUID> expectedReconciledAccounts = new HashMap<>();
|
||||||
|
|
||||||
for (int i = 0; i < uuids.length; i++) {
|
for (int i = 0; i < uuids.length; i++) {
|
||||||
deletedAccountsManager.addRecentlyDeletedAccount(uuids[i], e164s[i]);
|
deletedAccounts.put(uuids[i], e164s[i], true);
|
||||||
expectedReconciledAccounts.put(e164s[i], uuids[i]);
|
expectedReconciledAccounts.put(e164s[i], uuids[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,7 +122,7 @@ class DeletedAccountsManagerTest {
|
||||||
|
|
||||||
final Thread putThread = new Thread(() -> {
|
final Thread putThread = new Thread(() -> {
|
||||||
try {
|
try {
|
||||||
deletedAccountsManager.addRecentlyDeletedAccount(replacedUUID, e164s[0]);
|
deletedAccountsManager.lockAndPut(e164s[0], () -> replacedUUID);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue