diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/CallingGrpcService.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/CallingGrpcService.java index 4f60f9647..f981f094d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/CallingGrpcService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/CallingGrpcService.java @@ -34,7 +34,7 @@ public class CallingGrpcService extends ReactorCallingGrpc.CallingImplBase { final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice(); return rateLimiters.getTurnLimiter().validateReactive(authenticatedDevice.accountIdentifier()) - .then(Mono.defer(() -> Mono.fromSupplier(() -> turnTokenGenerator.generate(authenticatedDevice.accountIdentifier())))) + .then(Mono.fromSupplier(() -> turnTokenGenerator.generate(authenticatedDevice.accountIdentifier()))) .map(turnToken -> GetTurnCredentialsResponse.newBuilder() .setUsername(turnToken.username()) .setPassword(turnToken.password()) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/KeysAnonymousGrpcService.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/KeysAnonymousGrpcService.java index a4dd18314..864ab89dc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/KeysAnonymousGrpcService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/KeysAnonymousGrpcService.java @@ -30,7 +30,7 @@ public class KeysAnonymousGrpcService extends ReactorKeysAnonymousGrpc.KeysAnony final ServiceIdentifier serviceIdentifier = ServiceIdentifierUtil.fromGrpcServiceIdentifier(request.getRequest().getTargetIdentifier()); - return Mono.fromFuture(accountsManager.getByServiceIdentifierAsync(serviceIdentifier)) + return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier)) .flatMap(Mono::justOrEmpty) .switchIfEmpty(Mono.error(Status.UNAUTHENTICATED.asException())) .flatMap(targetAccount -> diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/KeysGrpcHelper.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/KeysGrpcHelper.java index aed7a143a..f0bc21a22 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/KeysGrpcHelper.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/KeysGrpcHelper.java @@ -52,8 +52,8 @@ class KeysGrpcHelper { .build()); return Flux.merge( - Mono.fromFuture(keysManager.takeEC(targetAccount.getIdentifier(identityType), device.getId())), - Mono.fromFuture(keysManager.takePQ(targetAccount.getIdentifier(identityType), device.getId()))) + Mono.fromFuture(() -> keysManager.takeEC(targetAccount.getIdentifier(identityType), device.getId())), + Mono.fromFuture(() -> keysManager.takePQ(targetAccount.getIdentifier(identityType), device.getId()))) .flatMap(Mono::justOrEmpty) .reduce(preKeyBundleBuilder, (builder, preKey) -> { if (preKey instanceof ECPreKey ecPreKey) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/KeysGrpcService.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/KeysGrpcService.java index 4f8277557..9ced01fbf 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/KeysGrpcService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/KeysGrpcService.java @@ -82,7 +82,7 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase { @Override public Mono getPreKeyCount(final GetPreKeyCountRequest request) { return Mono.fromSupplier(AuthenticationUtil::requireAuthenticatedDevice) - .flatMap(authenticatedDevice -> Mono.fromFuture(accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier())) + .flatMap(authenticatedDevice -> Mono.fromFuture(() -> accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier())) .map(maybeAccount -> maybeAccount .map(account -> Tuples.of(account, authenticatedDevice.deviceId())) .orElseThrow(Status.UNAUTHENTICATED::asRuntimeException))) @@ -91,10 +91,10 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase { Tuples.of(IdentityType.PNI, accountAndDeviceId.getT1().getPhoneNumberIdentifier(), accountAndDeviceId.getT2()) )) .flatMap(identityTypeUuidAndDeviceId -> Flux.merge( - Mono.fromFuture(keysManager.getEcCount(identityTypeUuidAndDeviceId.getT2(), identityTypeUuidAndDeviceId.getT3())) + Mono.fromFuture(() -> keysManager.getEcCount(identityTypeUuidAndDeviceId.getT2(), identityTypeUuidAndDeviceId.getT3())) .map(ecKeyCount -> Tuples.of(identityTypeUuidAndDeviceId.getT1(), PreKeyType.EC, ecKeyCount)), - Mono.fromFuture(keysManager.getPqCount(identityTypeUuidAndDeviceId.getT2(), identityTypeUuidAndDeviceId.getT3())) + Mono.fromFuture(() -> keysManager.getPqCount(identityTypeUuidAndDeviceId.getT2(), identityTypeUuidAndDeviceId.getT3())) .map(ecKeyCount -> Tuples.of(identityTypeUuidAndDeviceId.getT1(), PreKeyType.KEM, ecKeyCount)) )) .reduce(GetPreKeyCountResponse.newBuilder(), (builder, tuple) -> { @@ -168,7 +168,7 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase { final BiFunction extractPreKeyFunction, final BiFunction, CompletableFuture> storeKeysFunction) { - return Mono.fromFuture(accountsManager.getByAccountIdentifierAsync(authenticatedAccountUuid)) + return Mono.fromFuture(() -> accountsManager.getByAccountIdentifierAsync(authenticatedAccountUuid)) .map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException)) .map(account -> { final List preKeys = requestPreKeys.stream() @@ -181,7 +181,7 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase { return Tuples.of(account.getIdentifier(identityType), preKeys); }) - .flatMap(identifierAndPreKeys -> Mono.fromFuture(storeKeysFunction.apply(identifierAndPreKeys.getT1(), identifierAndPreKeys.getT2()))) + .flatMap(identifierAndPreKeys -> Mono.fromFuture(() -> storeKeysFunction.apply(identifierAndPreKeys.getT1(), identifierAndPreKeys.getT2()))) .thenReturn(SetPreKeyResponse.newBuilder().build()); } @@ -203,8 +203,8 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase { final UUID identifier = account.getIdentifier(identityType); return Flux.merge( - Mono.fromFuture(keysManager.storeEcSignedPreKeys(identifier, Map.of(authenticatedDevice.deviceId(), signedPreKey))), - Mono.fromFuture(accountsManager.updateDeviceAsync(account, authenticatedDevice.deviceId(), deviceUpdater))) + Mono.fromFuture(() -> keysManager.storeEcSignedPreKeys(identifier, Map.of(authenticatedDevice.deviceId(), signedPreKey))), + Mono.fromFuture(() -> accountsManager.updateDeviceAsync(account, authenticatedDevice.deviceId(), deviceUpdater))) .then(); })); } @@ -220,7 +220,7 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase { final UUID identifier = account.getIdentifier(IdentityTypeUtil.fromGrpcIdentityType(request.getIdentityType())); - return Mono.fromFuture(keysManager.storePqLastResort(identifier, Map.of(authenticatedDevice.deviceId(), lastResortKey))); + return Mono.fromFuture(() -> keysManager.storePqLastResort(identifier, Map.of(authenticatedDevice.deviceId(), lastResortKey))); })); } @@ -230,7 +230,7 @@ public class KeysGrpcService extends ReactorKeysGrpc.KeysImplBase { final BiFunction extractKeyFunction, final BiFunction> storeKeyFunction) { - return Mono.fromFuture(accountsManager.getByAccountIdentifierAsync(authenticatedAccountUuid)) + return Mono.fromFuture(() -> accountsManager.getByAccountIdentifierAsync(authenticatedAccountUuid)) .map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException)) .map(account -> { final IdentityKey identityKey = account.getIdentityKey(IdentityTypeUtil.fromGrpcIdentityType(identityType)); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/ProfileGrpcService.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/ProfileGrpcService.java index 1f8a0775e..ddc264354 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/ProfileGrpcService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/ProfileGrpcService.java @@ -75,9 +75,9 @@ public class ProfileGrpcService extends ReactorProfileGrpc.ProfileImplBase { validateRequest(request); return Mono.fromSupplier(AuthenticationUtil::requireAuthenticatedDevice) .flatMap(authenticatedDevice -> Mono.zip( - Mono.fromFuture(accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier())) + Mono.fromFuture(() -> accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier())) .map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException)), - Mono.fromFuture(profilesManager.getAsync(authenticatedDevice.accountIdentifier(), request.getVersion())) + Mono.fromFuture(() -> profilesManager.getAsync(authenticatedDevice.accountIdentifier(), request.getVersion())) )) .doOnNext(accountAndMaybeProfile -> { if (!request.getPaymentAddress().isEmpty()) { @@ -103,7 +103,7 @@ public class ProfileGrpcService extends ReactorProfileGrpc.ProfileImplBase { } }; - final Mono profileSetMono = Mono.fromFuture(profilesManager.setAsync(account.getUuid(), + final Mono profileSetMono = Mono.fromFuture(() -> profilesManager.setAsync(account.getUuid(), new VersionedProfile( request.getVersion(), request.getName().toByteArray(), @@ -118,12 +118,13 @@ public class ProfileGrpcService extends ReactorProfileGrpc.ProfileImplBase { .map(badges -> ProfileHelper.mergeBadgeIdsWithExistingAccountBadges(clock, badgeConfigurationMap, badges, account.getBadges())) .orElseGet(account::getBadges); - updates.add(Mono.fromFuture(accountsManager.updateAsync(account, a -> { + updates.add(Mono.fromFuture(() -> accountsManager.updateAsync(account, a -> { a.setBadges(clock, updatedBadges); a.setCurrentProfileVersion(request.getVersion()); }))); + if (request.getAvatarChange() != AvatarChange.AVATAR_CHANGE_UNCHANGED && avatarData.currentAvatar().isPresent()) { - updates.add(Mono.fromFuture(asyncS3client.deleteObject(DeleteObjectRequest.builder() + updates.add(Mono.fromFuture(() -> asyncS3client.deleteObject(DeleteObjectRequest.builder() .bucket(bucket) .key(avatarData.currentAvatar().get()) .build()))); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java index 82a7ad7c2..98fa8a362 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java @@ -55,7 +55,7 @@ public interface RateLimiter { } default Mono validateReactive(final String key) { - return Mono.fromFuture(validateAsync(key).toCompletableFuture()); + return Mono.fromFuture(() -> validateAsync(key).toCompletableFuture()); } default Mono validateReactive(final UUID accountUuid) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index 03782b41f..a4a85608f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -228,7 +228,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { .expressionAttributeValues(Map.of(":part", partitionKey)) .build()) .items()) - .flatMap(item -> Mono.fromFuture(dbAsyncClient.deleteItem(DeleteItemRequest.builder() + .flatMap(item -> Mono.fromFuture(() -> dbAsyncClient.deleteItem(DeleteItemRequest.builder() .tableName(tableName) .key(Map.of( KEY_PARTITION, partitionKey, @@ -257,7 +257,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { .consistentRead(true) .build()) .items()) - .flatMap(item -> Mono.fromFuture(dbAsyncClient.deleteItem(DeleteItemRequest.builder() + .flatMap(item -> Mono.fromFuture(() -> dbAsyncClient.deleteItem(DeleteItemRequest.builder() .tableName(tableName) .key(Map.of( KEY_PARTITION, partitionKey, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RepeatedUseSignedPreKeyStore.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RepeatedUseSignedPreKeyStore.java index 8352b60b8..471aaf9e5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RepeatedUseSignedPreKeyStore.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RepeatedUseSignedPreKeyStore.java @@ -150,7 +150,7 @@ public abstract class RepeatedUseSignedPreKeyStore> { .tableName(tableName) .key(getPrimaryKey(identifier, deviceId)) .build()) - .flatMap(deleteItemRequest -> Mono.fromFuture(dynamoDbAsyncClient.deleteItem(deleteItemRequest))) + .flatMap(deleteItemRequest -> Mono.fromFuture(() -> dynamoDbAsyncClient.deleteItem(deleteItemRequest))) // Idiom: wait for everything to finish, but discard the results .reduce(0, (a, b) -> 0) .toFuture() diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUsePreKeyStore.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUsePreKeyStore.java index 9499eb38c..95e086544 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUsePreKeyStore.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/SingleUsePreKeyStore.java @@ -94,12 +94,10 @@ public abstract class SingleUsePreKeyStore> { public CompletableFuture store(final UUID identifier, final long deviceId, final List preKeys) { final Timer.Sample sample = Timer.start(); - return Mono.fromFuture(delete(identifier, deviceId)) + return Mono.fromFuture(() -> delete(identifier, deviceId)) .thenMany( Flux.fromIterable(preKeys) - .flatMap( - preKey -> Mono.fromFuture(store(identifier, deviceId, preKey)), - DYNAMO_DB_MAX_BATCH_SIZE)) + .flatMap(preKey -> Mono.fromFuture(() -> store(identifier, deviceId, preKey)), DYNAMO_DB_MAX_BATCH_SIZE)) .then() .toFuture() .thenRun(() -> sample.stop(storeKeyBatchTimer)); @@ -149,7 +147,7 @@ public abstract class SingleUsePreKeyStore> { KEY_DEVICE_ID_KEY_ID, item.get(KEY_DEVICE_ID_KEY_ID))) .returnValues(ReturnValue.ALL_OLD) .build()) - .flatMap(deleteItemRequest -> Mono.fromFuture(dynamoDbAsyncClient.deleteItem(deleteItemRequest)), 1) + .flatMap(deleteItemRequest -> Mono.fromFuture(() -> dynamoDbAsyncClient.deleteItem(deleteItemRequest)), 1) .doOnNext(deleteItemResponse -> keysConsidered.incrementAndGet()) .filter(DeleteItemResponse::hasAttributes) .next() @@ -258,7 +256,7 @@ public abstract class SingleUsePreKeyStore> { KEY_DEVICE_ID_KEY_ID, item.get(KEY_DEVICE_ID_KEY_ID) )) .build()) - .flatMap(deleteItemRequest -> Mono.fromFuture(dynamoDbAsyncClient.deleteItem(deleteItemRequest)), DYNAMO_DB_MAX_BATCH_SIZE) + .flatMap(deleteItemRequest -> Mono.fromFuture(() -> dynamoDbAsyncClient.deleteItem(deleteItemRequest)), DYNAMO_DB_MAX_BATCH_SIZE) // Idiom: wait for everything to finish, but discard the results .reduce(0, (a, b) -> 0) .toFuture() diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index e1fe1d61d..7e01cd8f9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -357,7 +357,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac .tap(Micrometer.metrics(Metrics.globalRegistry)) .limitRate(MESSAGE_PUBLISHER_LIMIT_RATE) .flatMapSequential(envelope -> - Mono.fromFuture(sendMessage(envelope) + Mono.fromFuture(() -> sendMessage(envelope) .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS))) .subscribeOn(messageDeliveryScheduler) .subscribe( diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java index d525933fe..a5a4d18c2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateSignedECPreKeysCommand.java @@ -52,7 +52,7 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun return Flux.fromIterable(keys); })) - .flatMap(keyTuple -> Mono.fromFuture(keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3())) + .flatMap(keyTuple -> Mono.fromFuture(() -> keysManager.storeEcSignedPreKeyIfAbsent(keyTuple.getT1(), keyTuple.getT2(), keyTuple.getT3())) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).onRetryExhaustedThrow((spec, rs) -> rs.failure())), false, MAX_CONCURRENCY) .doOnNext(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment())