From db198237f3588ae2137869b2e88d53c37a559db1 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Mon, 27 Feb 2023 21:07:05 -0600 Subject: [PATCH] Expand `try`-`finally` scope of deleted accounts reconciliation lock --- .../storage/DeletedAccountsManager.java | 64 +++++++++---------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsManager.java index fdb8ba7a8..1dbdc104e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsManager.java @@ -156,39 +156,39 @@ public class DeletedAccountsManager { public void lockAndReconcileAccounts(final int max, final DeletedAccountReconciliationConsumer consumer) throws ChunkProcessingFailedException { final List lockItems = new ArrayList<>(); - final List> reconciliationCandidates = deletedAccounts.listAccountsToReconcile(max).stream() - .filter(pair -> { - boolean lockAcquired = false; - - try { - lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(pair.second()) - .withAcquireReleasedLocksConsistently(true) - .withShouldSkipBlockingWait(true) - .build())); - - lockAcquired = true; - } catch (final InterruptedException e) { - log.warn("Interrupted while acquiring lock for reconciliation", e); - } catch (final LockCurrentlyUnavailableException ignored) { - } - - return lockAcquired; - }).toList(); - - assert lockItems.size() == reconciliationCandidates.size(); - - // A deleted account's status may have changed in the time between getting a list of candidates and acquiring a lock - // on the candidate records. Now that we hold the lock, check which of the candidates still need to be reconciled. - final Set numbersNeedingReconciliationAfterLock = - deletedAccounts.getAccountsNeedingReconciliation(reconciliationCandidates.stream() - .map(Pair::second) - .collect(Collectors.toList())); - - final List> accountsToReconcile = reconciliationCandidates.stream() - .filter(candidate -> numbersNeedingReconciliationAfterLock.contains(candidate.second())) - .collect(Collectors.toList()); - try { + final List> reconciliationCandidates = deletedAccounts.listAccountsToReconcile(max).stream() + .filter(pair -> { + boolean lockAcquired = false; + + try { + lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(pair.second()) + .withAcquireReleasedLocksConsistently(true) + .withShouldSkipBlockingWait(true) + .build())); + + lockAcquired = true; + } catch (final InterruptedException e) { + log.warn("Interrupted while acquiring lock for reconciliation", e); + } catch (final LockCurrentlyUnavailableException ignored) { + } + + return lockAcquired; + }).toList(); + + assert lockItems.size() == reconciliationCandidates.size(); + + // A deleted account's status may have changed in the time between getting a list of candidates and acquiring a lock + // on the candidate records. Now that we hold the lock, check which of the candidates still need to be reconciled. + final Set numbersNeedingReconciliationAfterLock = + deletedAccounts.getAccountsNeedingReconciliation(reconciliationCandidates.stream() + .map(Pair::second) + .collect(Collectors.toList())); + + final List> accountsToReconcile = reconciliationCandidates.stream() + .filter(candidate -> numbersNeedingReconciliationAfterLock.contains(candidate.second())) + .collect(Collectors.toList()); + deletedAccounts.markReconciled(consumer.reconcile(accountsToReconcile)); } finally { lockItems.forEach(