Fail "wait for X" futures if a Redis operation fails

This commit is contained in:
Jon Chambers 2024-10-24 14:25:55 -04:00 committed by Jon Chambers
parent 9573d9e385
commit 9822d17ab9
1 changed files with 21 additions and 3 deletions

View File

@ -1582,7 +1582,13 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
Optional.ofNullable(waitForDeviceFuturesByTokenIdentifier.remove(tokenIdentifier))
.ifPresent(future -> pubSubRedisClient.withConnection(connection -> connection.async().get(getLinkedDeviceKey(tokenIdentifier)))
.thenAccept(deviceInfoJson -> handleDeviceAdded(future, deviceInfoJson)));
.whenComplete((deviceInfoJson, throwable) -> {
if (throwable != null) {
future.completeExceptionally(throwable);
} else {
handleDeviceAdded(future, deviceInfoJson);
}
}));
} else if (TRANSFER_ARCHIVE_KEYSPACE_PATTERN.equals(pattern) && "set".equalsIgnoreCase(message)) {
// The `- 1` here compensates for the '*' in the pattern
final String[] deviceIdentifierComponents =
@ -1607,7 +1613,13 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
Optional.ofNullable(waitForTransferArchiveFuturesByDeviceIdentifier.remove(deviceIdentifier))
.ifPresent(future -> pubSubRedisClient.withConnection(connection -> connection.async().get(transferArchiveKey))
.thenAccept(transferArchiveJson -> handleTransferArchiveAdded(future, transferArchiveJson)));
.whenComplete((transferArchiveJson, throwable) -> {
if (throwable != null) {
future.completeExceptionally(throwable);
} else {
handleTransferArchiveAdded(future, transferArchiveJson);
}
}));
} catch (final IllegalArgumentException e) {
logger.error("Could not parse timestamped device identifier", e);
}
@ -1618,7 +1630,13 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
Optional.ofNullable(waitForRestoreAccountRequestFuturesByToken.remove(token))
.ifPresent(future -> pubSubRedisClient.withConnection(connection -> connection.async().get(
getRestoreAccountRequestKey(token)))
.thenAccept(requestJson -> handleRestoreAccountRequest(future, requestJson)));
.whenComplete((requestJson, throwable) -> {
if (throwable != null) {
future.completeExceptionally(throwable);
} else {
handleRestoreAccountRequest(future, requestJson);
}
}));
}
}