Defer asynchronous actions when deriving `Mono` instances from futures

This commit is contained in:
Jon Chambers 2023-08-18 13:48:23 -04:00 committed by Chris Eager
parent ff1ef90a6d
commit a96ee57c7e
11 changed files with 29 additions and 30 deletions

View File

@ -34,7 +34,7 @@ public class CallingGrpcService extends ReactorCallingGrpc.CallingImplBase {
final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice(); final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice();
return rateLimiters.getTurnLimiter().validateReactive(authenticatedDevice.accountIdentifier()) return rateLimiters.getTurnLimiter().validateReactive(authenticatedDevice.accountIdentifier())
.then(Mono.defer(() -> Mono.fromSupplier(() -> turnTokenGenerator.generate(authenticatedDevice.accountIdentifier())))) .then(Mono.fromSupplier(() -> turnTokenGenerator.generate(authenticatedDevice.accountIdentifier())))
.map(turnToken -> GetTurnCredentialsResponse.newBuilder() .map(turnToken -> GetTurnCredentialsResponse.newBuilder()
.setUsername(turnToken.username()) .setUsername(turnToken.username())
.setPassword(turnToken.password()) .setPassword(turnToken.password())

View File

@ -30,7 +30,7 @@ public class KeysAnonymousGrpcService extends ReactorKeysAnonymousGrpc.KeysAnony
final ServiceIdentifier serviceIdentifier = final ServiceIdentifier serviceIdentifier =
ServiceIdentifierUtil.fromGrpcServiceIdentifier(request.getRequest().getTargetIdentifier()); ServiceIdentifierUtil.fromGrpcServiceIdentifier(request.getRequest().getTargetIdentifier());
return Mono.fromFuture(accountsManager.getByServiceIdentifierAsync(serviceIdentifier)) return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier))
.flatMap(Mono::justOrEmpty) .flatMap(Mono::justOrEmpty)
.switchIfEmpty(Mono.error(Status.UNAUTHENTICATED.asException())) .switchIfEmpty(Mono.error(Status.UNAUTHENTICATED.asException()))
.flatMap(targetAccount -> .flatMap(targetAccount ->

View File

@ -52,8 +52,8 @@ class KeysGrpcHelper {
.build()); .build());
return Flux.merge( return Flux.merge(
Mono.fromFuture(keysManager.takeEC(targetAccount.getIdentifier(identityType), device.getId())), Mono.fromFuture(() -> keysManager.takeEC(targetAccount.getIdentifier(identityType), device.getId())),
Mono.fromFuture(keysManager.takePQ(targetAccount.getIdentifier(identityType), device.getId()))) Mono.fromFuture(() -> keysManager.takePQ(targetAccount.getIdentifier(identityType), device.getId())))
.flatMap(Mono::justOrEmpty) .flatMap(Mono::justOrEmpty)
.reduce(preKeyBundleBuilder, (builder, preKey) -> { .reduce(preKeyBundleBuilder, (builder, preKey) -> {
if (preKey instanceof ECPreKey ecPreKey) { if (preKey instanceof ECPreKey ecPreKey) {

View File

@ -82,7 +82,7 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase {
@Override @Override
public Mono<GetPreKeyCountResponse> getPreKeyCount(final GetPreKeyCountRequest request) { public Mono<GetPreKeyCountResponse> getPreKeyCount(final GetPreKeyCountRequest request) {
return Mono.fromSupplier(AuthenticationUtil::requireAuthenticatedDevice) return Mono.fromSupplier(AuthenticationUtil::requireAuthenticatedDevice)
.flatMap(authenticatedDevice -> Mono.fromFuture(accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier())) .flatMap(authenticatedDevice -> Mono.fromFuture(() -> accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier()))
.map(maybeAccount -> maybeAccount .map(maybeAccount -> maybeAccount
.map(account -> Tuples.of(account, authenticatedDevice.deviceId())) .map(account -> Tuples.of(account, authenticatedDevice.deviceId()))
.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException))) .orElseThrow(Status.UNAUTHENTICATED::asRuntimeException)))
@ -91,10 +91,10 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase {
Tuples.of(IdentityType.PNI, accountAndDeviceId.getT1().getPhoneNumberIdentifier(), accountAndDeviceId.getT2()) Tuples.of(IdentityType.PNI, accountAndDeviceId.getT1().getPhoneNumberIdentifier(), accountAndDeviceId.getT2())
)) ))
.flatMap(identityTypeUuidAndDeviceId -> Flux.merge( .flatMap(identityTypeUuidAndDeviceId -> Flux.merge(
Mono.fromFuture(keysManager.getEcCount(identityTypeUuidAndDeviceId.getT2(), identityTypeUuidAndDeviceId.getT3())) Mono.fromFuture(() -> keysManager.getEcCount(identityTypeUuidAndDeviceId.getT2(), identityTypeUuidAndDeviceId.getT3()))
.map(ecKeyCount -> Tuples.of(identityTypeUuidAndDeviceId.getT1(), PreKeyType.EC, ecKeyCount)), .map(ecKeyCount -> Tuples.of(identityTypeUuidAndDeviceId.getT1(), PreKeyType.EC, ecKeyCount)),
Mono.fromFuture(keysManager.getPqCount(identityTypeUuidAndDeviceId.getT2(), identityTypeUuidAndDeviceId.getT3())) Mono.fromFuture(() -> keysManager.getPqCount(identityTypeUuidAndDeviceId.getT2(), identityTypeUuidAndDeviceId.getT3()))
.map(ecKeyCount -> Tuples.of(identityTypeUuidAndDeviceId.getT1(), PreKeyType.KEM, ecKeyCount)) .map(ecKeyCount -> Tuples.of(identityTypeUuidAndDeviceId.getT1(), PreKeyType.KEM, ecKeyCount))
)) ))
.reduce(GetPreKeyCountResponse.newBuilder(), (builder, tuple) -> { .reduce(GetPreKeyCountResponse.newBuilder(), (builder, tuple) -> {
@ -168,7 +168,7 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase {
final BiFunction<R, IdentityKey, K> extractPreKeyFunction, final BiFunction<R, IdentityKey, K> extractPreKeyFunction,
final BiFunction<UUID, List<K>, CompletableFuture<Void>> storeKeysFunction) { final BiFunction<UUID, List<K>, CompletableFuture<Void>> storeKeysFunction) {
return Mono.fromFuture(accountsManager.getByAccountIdentifierAsync(authenticatedAccountUuid)) return Mono.fromFuture(() -> accountsManager.getByAccountIdentifierAsync(authenticatedAccountUuid))
.map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException)) .map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException))
.map(account -> { .map(account -> {
final List<K> preKeys = requestPreKeys.stream() final List<K> preKeys = requestPreKeys.stream()
@ -181,7 +181,7 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase {
return Tuples.of(account.getIdentifier(identityType), preKeys); return Tuples.of(account.getIdentifier(identityType), preKeys);
}) })
.flatMap(identifierAndPreKeys -> Mono.fromFuture(storeKeysFunction.apply(identifierAndPreKeys.getT1(), identifierAndPreKeys.getT2()))) .flatMap(identifierAndPreKeys -> Mono.fromFuture(() -> storeKeysFunction.apply(identifierAndPreKeys.getT1(), identifierAndPreKeys.getT2())))
.thenReturn(SetPreKeyResponse.newBuilder().build()); .thenReturn(SetPreKeyResponse.newBuilder().build());
} }
@ -203,8 +203,8 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase {
final UUID identifier = account.getIdentifier(identityType); final UUID identifier = account.getIdentifier(identityType);
return Flux.merge( return Flux.merge(
Mono.fromFuture(keysManager.storeEcSignedPreKeys(identifier, Map.of(authenticatedDevice.deviceId(), signedPreKey))), Mono.fromFuture(() -> keysManager.storeEcSignedPreKeys(identifier, Map.of(authenticatedDevice.deviceId(), signedPreKey))),
Mono.fromFuture(accountsManager.updateDeviceAsync(account, authenticatedDevice.deviceId(), deviceUpdater))) Mono.fromFuture(() -> accountsManager.updateDeviceAsync(account, authenticatedDevice.deviceId(), deviceUpdater)))
.then(); .then();
})); }));
} }
@ -220,7 +220,7 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase {
final UUID identifier = final UUID identifier =
account.getIdentifier(IdentityTypeUtil.fromGrpcIdentityType(request.getIdentityType())); account.getIdentifier(IdentityTypeUtil.fromGrpcIdentityType(request.getIdentityType()));
return Mono.fromFuture(keysManager.storePqLastResort(identifier, Map.of(authenticatedDevice.deviceId(), lastResortKey))); return Mono.fromFuture(() -> keysManager.storePqLastResort(identifier, Map.of(authenticatedDevice.deviceId(), lastResortKey)));
})); }));
} }
@ -230,7 +230,7 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase {
final BiFunction<R, IdentityKey, K> extractKeyFunction, final BiFunction<R, IdentityKey, K> extractKeyFunction,
final BiFunction<Account, K, Mono<?>> storeKeyFunction) { final BiFunction<Account, K, Mono<?>> storeKeyFunction) {
return Mono.fromFuture(accountsManager.getByAccountIdentifierAsync(authenticatedAccountUuid)) return Mono.fromFuture(() -> accountsManager.getByAccountIdentifierAsync(authenticatedAccountUuid))
.map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException)) .map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException))
.map(account -> { .map(account -> {
final IdentityKey identityKey = account.getIdentityKey(IdentityTypeUtil.fromGrpcIdentityType(identityType)); final IdentityKey identityKey = account.getIdentityKey(IdentityTypeUtil.fromGrpcIdentityType(identityType));

View File

@ -75,9 +75,9 @@ public class ProfileGrpcService extends ReactorProfileGrpc.ProfileImplBase {
validateRequest(request); validateRequest(request);
return Mono.fromSupplier(AuthenticationUtil::requireAuthenticatedDevice) return Mono.fromSupplier(AuthenticationUtil::requireAuthenticatedDevice)
.flatMap(authenticatedDevice -> Mono.zip( .flatMap(authenticatedDevice -> Mono.zip(
Mono.fromFuture(accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier())) Mono.fromFuture(() -> accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier()))
.map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException)), .map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException)),
Mono.fromFuture(profilesManager.getAsync(authenticatedDevice.accountIdentifier(), request.getVersion())) Mono.fromFuture(() -> profilesManager.getAsync(authenticatedDevice.accountIdentifier(), request.getVersion()))
)) ))
.doOnNext(accountAndMaybeProfile -> { .doOnNext(accountAndMaybeProfile -> {
if (!request.getPaymentAddress().isEmpty()) { if (!request.getPaymentAddress().isEmpty()) {
@ -103,7 +103,7 @@ public class ProfileGrpcService extends ReactorProfileGrpc.ProfileImplBase {
} }
}; };
final Mono<Void> profileSetMono = Mono.fromFuture(profilesManager.setAsync(account.getUuid(), final Mono<Void> profileSetMono = Mono.fromFuture(() -> profilesManager.setAsync(account.getUuid(),
new VersionedProfile( new VersionedProfile(
request.getVersion(), request.getVersion(),
request.getName().toByteArray(), request.getName().toByteArray(),
@ -118,12 +118,13 @@ public class ProfileGrpcService extends ReactorProfileGrpc.ProfileImplBase {
.map(badges -> ProfileHelper.mergeBadgeIdsWithExistingAccountBadges(clock, badgeConfigurationMap, badges, account.getBadges())) .map(badges -> ProfileHelper.mergeBadgeIdsWithExistingAccountBadges(clock, badgeConfigurationMap, badges, account.getBadges()))
.orElseGet(account::getBadges); .orElseGet(account::getBadges);
updates.add(Mono.fromFuture(accountsManager.updateAsync(account, a -> { updates.add(Mono.fromFuture(() -> accountsManager.updateAsync(account, a -> {
a.setBadges(clock, updatedBadges); a.setBadges(clock, updatedBadges);
a.setCurrentProfileVersion(request.getVersion()); a.setCurrentProfileVersion(request.getVersion());
}))); })));
if (request.getAvatarChange() != AvatarChange.AVATAR_CHANGE_UNCHANGED && avatarData.currentAvatar().isPresent()) { if (request.getAvatarChange() != AvatarChange.AVATAR_CHANGE_UNCHANGED && avatarData.currentAvatar().isPresent()) {
updates.add(Mono.fromFuture(asyncS3client.deleteObject(DeleteObjectRequest.builder() updates.add(Mono.fromFuture(() -> asyncS3client.deleteObject(DeleteObjectRequest.builder()
.bucket(bucket) .bucket(bucket)
.key(avatarData.currentAvatar().get()) .key(avatarData.currentAvatar().get())
.build()))); .build())));

View File

@ -55,7 +55,7 @@ public interface RateLimiter {
} }
default Mono<Void> validateReactive(final String key) { default Mono<Void> validateReactive(final String key) {
return Mono.fromFuture(validateAsync(key).toCompletableFuture()); return Mono.fromFuture(() -> validateAsync(key).toCompletableFuture());
} }
default Mono<Void> validateReactive(final UUID accountUuid) { default Mono<Void> validateReactive(final UUID accountUuid) {

View File

@ -228,7 +228,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
.expressionAttributeValues(Map.of(":part", partitionKey)) .expressionAttributeValues(Map.of(":part", partitionKey))
.build()) .build())
.items()) .items())
.flatMap(item -> Mono.fromFuture(dbAsyncClient.deleteItem(DeleteItemRequest.builder() .flatMap(item -> Mono.fromFuture(() -> dbAsyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(tableName) .tableName(tableName)
.key(Map.of( .key(Map.of(
KEY_PARTITION, partitionKey, KEY_PARTITION, partitionKey,
@ -257,7 +257,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
.consistentRead(true) .consistentRead(true)
.build()) .build())
.items()) .items())
.flatMap(item -> Mono.fromFuture(dbAsyncClient.deleteItem(DeleteItemRequest.builder() .flatMap(item -> Mono.fromFuture(() -> dbAsyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(tableName) .tableName(tableName)
.key(Map.of( .key(Map.of(
KEY_PARTITION, partitionKey, KEY_PARTITION, partitionKey,

View File

@ -150,7 +150,7 @@ public abstract class RepeatedUseSignedPreKeyStore<K extends SignedPreKey<?>> {
.tableName(tableName) .tableName(tableName)
.key(getPrimaryKey(identifier, deviceId)) .key(getPrimaryKey(identifier, deviceId))
.build()) .build())
.flatMap(deleteItemRequest -> Mono.fromFuture(dynamoDbAsyncClient.deleteItem(deleteItemRequest))) .flatMap(deleteItemRequest -> Mono.fromFuture(() -> dynamoDbAsyncClient.deleteItem(deleteItemRequest)))
// Idiom: wait for everything to finish, but discard the results // Idiom: wait for everything to finish, but discard the results
.reduce(0, (a, b) -> 0) .reduce(0, (a, b) -> 0)
.toFuture() .toFuture()

View File

@ -94,12 +94,10 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
public CompletableFuture<Void> store(final UUID identifier, final long deviceId, final List<K> preKeys) { public CompletableFuture<Void> store(final UUID identifier, final long deviceId, final List<K> preKeys) {
final Timer.Sample sample = Timer.start(); final Timer.Sample sample = Timer.start();
return Mono.fromFuture(delete(identifier, deviceId)) return Mono.fromFuture(() -> delete(identifier, deviceId))
.thenMany( .thenMany(
Flux.fromIterable(preKeys) Flux.fromIterable(preKeys)
.flatMap( .flatMap(preKey -> Mono.fromFuture(() -> store(identifier, deviceId, preKey)), DYNAMO_DB_MAX_BATCH_SIZE))
preKey -> Mono.fromFuture(store(identifier, deviceId, preKey)),
DYNAMO_DB_MAX_BATCH_SIZE))
.then() .then()
.toFuture() .toFuture()
.thenRun(() -> sample.stop(storeKeyBatchTimer)); .thenRun(() -> sample.stop(storeKeyBatchTimer));
@ -149,7 +147,7 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
KEY_DEVICE_ID_KEY_ID, item.get(KEY_DEVICE_ID_KEY_ID))) KEY_DEVICE_ID_KEY_ID, item.get(KEY_DEVICE_ID_KEY_ID)))
.returnValues(ReturnValue.ALL_OLD) .returnValues(ReturnValue.ALL_OLD)
.build()) .build())
.flatMap(deleteItemRequest -> Mono.fromFuture(dynamoDbAsyncClient.deleteItem(deleteItemRequest)), 1) .flatMap(deleteItemRequest -> Mono.fromFuture(() -> dynamoDbAsyncClient.deleteItem(deleteItemRequest)), 1)
.doOnNext(deleteItemResponse -> keysConsidered.incrementAndGet()) .doOnNext(deleteItemResponse -> keysConsidered.incrementAndGet())
.filter(DeleteItemResponse::hasAttributes) .filter(DeleteItemResponse::hasAttributes)
.next() .next()
@ -258,7 +256,7 @@ public abstract class SingleUsePreKeyStore<K extends PreKey<?>> {
KEY_DEVICE_ID_KEY_ID, item.get(KEY_DEVICE_ID_KEY_ID) KEY_DEVICE_ID_KEY_ID, item.get(KEY_DEVICE_ID_KEY_ID)
)) ))
.build()) .build())
.flatMap(deleteItemRequest -> Mono.fromFuture(dynamoDbAsyncClient.deleteItem(deleteItemRequest)), DYNAMO_DB_MAX_BATCH_SIZE) .flatMap(deleteItemRequest -> Mono.fromFuture(() -> dynamoDbAsyncClient.deleteItem(deleteItemRequest)), DYNAMO_DB_MAX_BATCH_SIZE)
// Idiom: wait for everything to finish, but discard the results // Idiom: wait for everything to finish, but discard the results
.reduce(0, (a, b) -> 0) .reduce(0, (a, b) -> 0)
.toFuture() .toFuture()

View File

@ -357,7 +357,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
.tap(Micrometer.metrics(Metrics.globalRegistry)) .tap(Micrometer.metrics(Metrics.globalRegistry))
.limitRate(MESSAGE_PUBLISHER_LIMIT_RATE) .limitRate(MESSAGE_PUBLISHER_LIMIT_RATE)
.flatMapSequential(envelope -> .flatMapSequential(envelope ->
Mono.fromFuture(sendMessage(envelope) Mono.fromFuture(() -> sendMessage(envelope)
.orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS))) .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)))
.subscribeOn(messageDeliveryScheduler) .subscribeOn(messageDeliveryScheduler)
.subscribe( .subscribe(

View File

@ -52,7 +52,7 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun
return Flux.fromIterable(keys); return Flux.fromIterable(keys);
})) }))
.flatMap(keyTuple -> Mono.fromFuture(keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3())) .flatMap(keyTuple -> Mono.fromFuture(() -> keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3()))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).onRetryExhaustedThrow((spec, rs) -> rs.failure())), .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).onRetryExhaustedThrow((spec, rs) -> rs.failure())),
false, MAX_CONCURRENCY) false, MAX_CONCURRENCY)
.doOnNext(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment()) .doOnNext(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment())