diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java index 8d1f26827..bb85cf0c1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java @@ -43,7 +43,6 @@ import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.Response.Status; import java.time.Clock; import java.time.Duration; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -96,6 +95,7 @@ import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider; import org.whispersystems.textsecuregcm.push.MessageSender; import org.whispersystems.textsecuregcm.push.MessageTooLargeException; +import org.whispersystems.textsecuregcm.push.MessageUtil; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ReceiptSender; @@ -105,7 +105,6 @@ import org.whispersystems.textsecuregcm.spam.SpamChecker; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; -import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers; import org.whispersystems.textsecuregcm.storage.ReportMessageManager; @@ -114,11 +113,7 @@ import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.websocket.WebSocketConnection; import org.whispersystems.websocket.WebsocketHeaders; import org.whispersystems.websocket.auth.ReadOnly; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; @SuppressWarnings("OptionalUsedAsFieldOrParameterType") @Path("/v1/messages") @@ -145,8 +140,6 @@ public class MessageController { private final MessageDeliveryLoopMonitor messageDeliveryLoopMonitor; private final Clock clock; - private static final int MAX_FETCH_ACCOUNT_CONCURRENCY = 8; - private static final CompletableFuture[] EMPTY_FUTURE_ARRAY = new CompletableFuture[0]; private static final String OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME = name(MessageController.class, "outgoingMessageListSizeBytes"); @@ -563,7 +556,9 @@ public class MessageController { final ContainerRequestContext context) { // Perform fast, inexpensive checks before attempting to resolve recipients - validateNoDuplicateDevices(multiRecipientMessage); + if (MessageUtil.hasDuplicateDevices(multiRecipientMessage)) { + throw new BadRequestException("Multi-recipient message contains duplicate recipient"); + } if (groupSendTokenHeader == null && combinedUnidentifiedSenderAccessKeys == null) { throw new NotAuthorizedException("A group send endorsement token or unidentified access key is required for non-story messages"); @@ -582,7 +577,14 @@ public class MessageController { // At this point, the caller has at least superficially provided the information needed to send a multi-recipient // message. Attempt to resolve the destination service identifiers to Signal accounts. final Map resolvedRecipients = - resolveRecipients(multiRecipientMessage, groupSendTokenHeader == null); + MessageUtil.resolveRecipients(accountsManager, multiRecipientMessage); + + final List unresolvedRecipientServiceIdentifiers = + MessageUtil.getUnresolvedRecipients(multiRecipientMessage, resolvedRecipients); + + if (groupSendTokenHeader == null && !unresolvedRecipientServiceIdentifiers.isEmpty()) { + throw new NotFoundException(); + } // Access keys are checked against the UAK in the resolved accounts, so we have to check after resolving accounts above. // Group send endorsements are checked earlier; for stories, we don't check permissions at all because only clients check them @@ -598,17 +600,6 @@ public class MessageController { urgent, context); - final List unresolvedRecipientServiceIdentifiers; - - if (groupSendTokenHeader != null) { - unresolvedRecipientServiceIdentifiers = multiRecipientMessage.getRecipients().entrySet().stream() - .filter(entry -> !resolvedRecipients.containsKey(entry.getValue())) - .map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey())) - .toList(); - } else { - unresolvedRecipientServiceIdentifiers = List.of(); - } - return new SendMultiRecipientMessageResponse(unresolvedRecipientServiceIdentifiers); } @@ -620,12 +611,14 @@ public class MessageController { final ContainerRequestContext context) { // Perform fast, inexpensive checks before attempting to resolve recipients - validateNoDuplicateDevices(multiRecipientMessage); + if (MessageUtil.hasDuplicateDevices(multiRecipientMessage)) { + throw new BadRequestException("Multi-recipient message contains duplicate recipient"); + } // At this point, the caller has at least superficially provided the information needed to send a multi-recipient // message. Attempt to resolve the destination service identifiers to Signal accounts. final Map resolvedRecipients = - resolveRecipients(multiRecipientMessage, false); + MessageUtil.resolveRecipients(accountsManager, multiRecipientMessage); // We might filter out all the recipients of a story (if none exist). // In this case there is no error so we should just return 200 now. @@ -909,43 +902,4 @@ public class MessageController { return Response.status(Status.ACCEPTED) .build(); } - - private static void validateNoDuplicateDevices(final SealedSenderMultiRecipientMessage multiRecipientMessage) { - final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID + 1]; - - for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) { - if (recipient.getDevices().length == 1) { - // A recipient can't have repeated devices if they only have one device - continue; - } - - Arrays.fill(usedDeviceIds, false); - - for (final byte deviceId : recipient.getDevices()) { - if (usedDeviceIds[deviceId]) { - throw new BadRequestException(); - } - - usedDeviceIds[deviceId] = true; - } - } - } - - private Map resolveRecipients(final SealedSenderMultiRecipientMessage multiRecipientMessage, - final boolean throwOnNotFound) { - - return Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet()) - .flatMap(serviceIdAndRecipient -> { - final ServiceIdentifier serviceIdentifier = - ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey()); - - return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier)) - .flatMap(Mono::justOrEmpty) - .switchIfEmpty(throwOnNotFound ? Mono.error(NotFoundException::new) : Mono.empty()) - .map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account)); - }, MAX_FETCH_ACCOUNT_CONCURRENCY) - .collectMap(Tuple2::getT1, Tuple2::getT2) - .blockOptional() - .orElse(Collections.emptyMap()); - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/MessagesAnonymousGrpcService.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/MessagesAnonymousGrpcService.java index c095d7969..a0a3a45de 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/MessagesAnonymousGrpcService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/MessagesAnonymousGrpcService.java @@ -8,8 +8,6 @@ package org.whispersystems.textsecuregcm.grpc; import io.grpc.Status; import io.grpc.StatusException; import java.time.Clock; -import java.util.Arrays; -import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -37,17 +35,13 @@ import org.whispersystems.textsecuregcm.limits.CardinalityEstimator; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.push.MessageSender; import org.whispersystems.textsecuregcm.push.MessageTooLargeException; +import org.whispersystems.textsecuregcm.push.MessageUtil; import org.whispersystems.textsecuregcm.spam.GrpcResponse; import org.whispersystems.textsecuregcm.spam.MessageType; import org.whispersystems.textsecuregcm.spam.SpamCheckResult; import org.whispersystems.textsecuregcm.spam.SpamChecker; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; -import org.whispersystems.textsecuregcm.storage.Device; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.MessagesAnonymousImplBase { @@ -61,8 +55,6 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me private static final SendMessageResponse SEND_MESSAGE_SUCCESS_RESPONSE = SendMessageResponse.newBuilder().build(); - private static final int MAX_FETCH_ACCOUNT_CONCURRENCY = 8; - public MessagesAnonymousGrpcService(final AccountsManager accountsManager, final RateLimiters rateLimiters, final MessageSender messageSender, @@ -256,7 +248,7 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me // At this point, the caller has at least superficially provided the information needed to send a multi-recipient // message. Attempt to resolve the destination service identifiers to Signal accounts. final Map resolvedRecipients = - resolveRecipients(multiRecipientMessage); + MessageUtil.resolveRecipients(accountsManager, multiRecipientMessage); try { messageSender.sendMultiRecipientMessage(multiRecipientMessage, @@ -269,10 +261,7 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me final SendMultiRecipientMessageResponse.Builder responseBuilder = SendMultiRecipientMessageResponse.newBuilder(); - multiRecipientMessage.getRecipients().entrySet() - .stream() - .filter(entry -> !resolvedRecipients.containsKey(entry.getValue())) - .map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey())) + MessageUtil.getUnresolvedRecipients(multiRecipientMessage, resolvedRecipients).stream() .map(ServiceIdentifierUtil::toGrpcServiceIdentifier) .forEach(responseBuilder::addUnresolvedRecipients); @@ -308,49 +297,13 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me // Check that the request is well-formed and doesn't contain repeated entries for the same device for the same // recipient - validateNoDuplicateDevices(multiRecipientMessage); + if (MessageUtil.hasDuplicateDevices(multiRecipientMessage)) { + throw Status.INVALID_ARGUMENT.withDescription("Multi-recipient message contains duplicate recipient").asException(); + } return multiRecipientMessage; } - private void validateNoDuplicateDevices(final SealedSenderMultiRecipientMessage multiRecipientMessage) - throws StatusException { - - final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID + 1]; - - for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) { - if (recipient.getDevices().length == 1) { - // A recipient can't have repeated devices if they only have one device - continue; - } - - Arrays.fill(usedDeviceIds, false); - - for (final byte deviceId : recipient.getDevices()) { - if (usedDeviceIds[deviceId]) { - throw Status.INVALID_ARGUMENT.withDescription("Request contains repeated device entries").asException(); - } - - usedDeviceIds[deviceId] = true; - } - } - } - - private Map resolveRecipients(final SealedSenderMultiRecipientMessage multiRecipientMessage) { - return Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet()) - .flatMap(serviceIdAndRecipient -> { - final ServiceIdentifier serviceIdentifier = - ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey()); - - return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier)) - .flatMap(Mono::justOrEmpty) - .map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account)); - }, MAX_FETCH_ACCOUNT_CONCURRENCY) - .collectMap(Tuple2::getT1, Tuple2::getT2) - .blockOptional() - .orElse(Collections.emptyMap()); - } - private MismatchedDevices buildMismatchedDevices(final ServiceIdentifier serviceIdentifier, org.whispersystems.textsecuregcm.controllers.MismatchedDevices mismatchedDevices) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageUtil.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageUtil.java new file mode 100644 index 000000000..9fa1f3c9c --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageUtil.java @@ -0,0 +1,128 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.push; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage; +import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.Device; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +public class MessageUtil { + + public static final int DEFAULT_MAX_FETCH_ACCOUNT_CONCURRENCY = 8; + + private MessageUtil() { + } + + /** + * Finds account records for all recipients named in the given multi-recipient manager. Note that the returned map + * of recipients to account records will omit entries for recipients that could not be resolved to active accounts; + * callers that require full resolution should check for a missing entries and take appropriate action. + * + * @param accountsManager the {@code AccountsManager} instance to use to find account records + * @param multiRecipientMessage the message for which to resolve recipients + * + * @return a map of recipients to account records + * + * @see #getUnresolvedRecipients(SealedSenderMultiRecipientMessage, Map) + */ + public static Map resolveRecipients( + final AccountsManager accountsManager, + final SealedSenderMultiRecipientMessage multiRecipientMessage) { + + return resolveRecipients(accountsManager, multiRecipientMessage, DEFAULT_MAX_FETCH_ACCOUNT_CONCURRENCY); + } + + /** + * Finds account records for all recipients named in the given multi-recipient manager. Note that the returned map + * of recipients to account records will omit entries for recipients that could not be resolved to active accounts; + * callers that require full resolution should check for a missing entries and take appropriate action. + * + * @param accountsManager the {@code AccountsManager} instance to use to find account records + * @param multiRecipientMessage the message for which to resolve recipients + * @param maxFetchAccountConcurrency the maximum number of concurrent account-retrieval operations + * + * @return a map of recipients to account records + * + * @see #getUnresolvedRecipients(SealedSenderMultiRecipientMessage, Map) + */ + public static Map resolveRecipients( + final AccountsManager accountsManager, + final SealedSenderMultiRecipientMessage multiRecipientMessage, + final int maxFetchAccountConcurrency) { + + return Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet()) + .flatMap(serviceIdAndRecipient -> { + final ServiceIdentifier serviceIdentifier = + ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey()); + + return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier)) + .flatMap(Mono::justOrEmpty) + .map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account)); + }, maxFetchAccountConcurrency) + .collectMap(Tuple2::getT1, Tuple2::getT2) + .blockOptional() + .orElse(Collections.emptyMap()); + } + + /** + * Returns a list of recipients missing from the map of resolved recipients for a multi-recipient message. + * + * @param multiRecipientMessage the multi-recipient message + * @param resolvedRecipients the map of resolved recipients to check for missing entries + * + * @return a list of {@code ServiceIdentifiers} belonging to multi-recipient message recipients that are not present + * in the given map of {@code resolvedRecipients} + */ + public static List getUnresolvedRecipients( + final SealedSenderMultiRecipientMessage multiRecipientMessage, + final Map resolvedRecipients) { + + return multiRecipientMessage.getRecipients().entrySet().stream() + .filter(entry -> !resolvedRecipients.containsKey(entry.getValue())) + .map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey())) + .toList(); + } + + /** + * Checks if a multi-recipient message contains duplicate recipients. + * + * @param multiRecipientMessage the message to check for duplicate recipients + * + * @return {@code true} if the message contains duplicate recipients or {@code false} otherwise + */ + public static boolean hasDuplicateDevices(final SealedSenderMultiRecipientMessage multiRecipientMessage) { + final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID + 1]; + + for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) { + if (recipient.getDevices().length == 1) { + // A recipient can't have repeated devices if they only have one device + continue; + } + + Arrays.fill(usedDeviceIds, false); + + for (final byte deviceId : recipient.getDevices()) { + if (usedDeviceIds[deviceId]) { + return true; + } + + usedDeviceIds[deviceId] = true; + } + } + + return false; + } +}