Expand `try`-`finally` scope of deleted accounts reconciliation lock
This commit is contained in:
parent
d0ccae129a
commit
db198237f3
|
@ -156,39 +156,39 @@ public class DeletedAccountsManager {
|
||||||
|
|
||||||
public void lockAndReconcileAccounts(final int max, final DeletedAccountReconciliationConsumer consumer) throws ChunkProcessingFailedException {
|
public void lockAndReconcileAccounts(final int max, final DeletedAccountReconciliationConsumer consumer) throws ChunkProcessingFailedException {
|
||||||
final List<LockItem> lockItems = new ArrayList<>();
|
final List<LockItem> lockItems = new ArrayList<>();
|
||||||
final List<Pair<UUID, String>> reconciliationCandidates = deletedAccounts.listAccountsToReconcile(max).stream()
|
|
||||||
.filter(pair -> {
|
|
||||||
boolean lockAcquired = false;
|
|
||||||
|
|
||||||
try {
|
|
||||||
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(pair.second())
|
|
||||||
.withAcquireReleasedLocksConsistently(true)
|
|
||||||
.withShouldSkipBlockingWait(true)
|
|
||||||
.build()));
|
|
||||||
|
|
||||||
lockAcquired = true;
|
|
||||||
} catch (final InterruptedException e) {
|
|
||||||
log.warn("Interrupted while acquiring lock for reconciliation", e);
|
|
||||||
} catch (final LockCurrentlyUnavailableException ignored) {
|
|
||||||
}
|
|
||||||
|
|
||||||
return lockAcquired;
|
|
||||||
}).toList();
|
|
||||||
|
|
||||||
assert lockItems.size() == reconciliationCandidates.size();
|
|
||||||
|
|
||||||
// A deleted account's status may have changed in the time between getting a list of candidates and acquiring a lock
|
|
||||||
// on the candidate records. Now that we hold the lock, check which of the candidates still need to be reconciled.
|
|
||||||
final Set<String> numbersNeedingReconciliationAfterLock =
|
|
||||||
deletedAccounts.getAccountsNeedingReconciliation(reconciliationCandidates.stream()
|
|
||||||
.map(Pair::second)
|
|
||||||
.collect(Collectors.toList()));
|
|
||||||
|
|
||||||
final List<Pair<UUID, String>> accountsToReconcile = reconciliationCandidates.stream()
|
|
||||||
.filter(candidate -> numbersNeedingReconciliationAfterLock.contains(candidate.second()))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
final List<Pair<UUID, String>> reconciliationCandidates = deletedAccounts.listAccountsToReconcile(max).stream()
|
||||||
|
.filter(pair -> {
|
||||||
|
boolean lockAcquired = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(pair.second())
|
||||||
|
.withAcquireReleasedLocksConsistently(true)
|
||||||
|
.withShouldSkipBlockingWait(true)
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
lockAcquired = true;
|
||||||
|
} catch (final InterruptedException e) {
|
||||||
|
log.warn("Interrupted while acquiring lock for reconciliation", e);
|
||||||
|
} catch (final LockCurrentlyUnavailableException ignored) {
|
||||||
|
}
|
||||||
|
|
||||||
|
return lockAcquired;
|
||||||
|
}).toList();
|
||||||
|
|
||||||
|
assert lockItems.size() == reconciliationCandidates.size();
|
||||||
|
|
||||||
|
// A deleted account's status may have changed in the time between getting a list of candidates and acquiring a lock
|
||||||
|
// on the candidate records. Now that we hold the lock, check which of the candidates still need to be reconciled.
|
||||||
|
final Set<String> numbersNeedingReconciliationAfterLock =
|
||||||
|
deletedAccounts.getAccountsNeedingReconciliation(reconciliationCandidates.stream()
|
||||||
|
.map(Pair::second)
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
|
||||||
|
final List<Pair<UUID, String>> accountsToReconcile = reconciliationCandidates.stream()
|
||||||
|
.filter(candidate -> numbersNeedingReconciliationAfterLock.contains(candidate.second()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
deletedAccounts.markReconciled(consumer.reconcile(accountsToReconcile));
|
deletedAccounts.markReconciled(consumer.reconcile(accountsToReconcile));
|
||||||
} finally {
|
} finally {
|
||||||
lockItems.forEach(
|
lockItems.forEach(
|
||||||
|
|
Loading…
Reference in New Issue