diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java index fa0251c55..8d1f26827 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java @@ -109,7 +109,6 @@ import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers; import org.whispersystems.textsecuregcm.storage.ReportMessageManager; -import org.whispersystems.textsecuregcm.util.ExceptionUtils; import org.whispersystems.textsecuregcm.util.HeaderUtils; import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.websocket.WebSocketConnection; @@ -177,6 +176,9 @@ public class MessageController { private static final Duration NOTIFY_FOR_REMAINING_MESSAGES_DELAY = Duration.ofMinutes(1); + private static final SendMultiRecipientMessageResponse SEND_STORY_RESPONSE = + new SendMultiRecipientMessageResponse(Collections.emptyList()); + public MessageController( RateLimiters rateLimiters, CardinalityEstimator messageByteLimitEstimator, @@ -262,144 +264,207 @@ public class MessageController { @Context final ContainerRequestContext context) throws RateLimitExceededException { - if (source.isEmpty() && accessKey.isEmpty() && groupSendToken == null && !isStory) { - throw new WebApplicationException(Status.UNAUTHORIZED); - } - if (groupSendToken != null) { if (source.isPresent() || accessKey.isPresent()) { - throw new BadRequestException( - "Group send endorsement tokens should not be combined with other authentication"); + throw new BadRequestException("Group send endorsement tokens should not be combined with other authentication"); } else if (isStory) { throw new BadRequestException("Group send endorsement tokens should not be sent for story messages"); } } final Sample sample = Timer.start(); + final boolean needsSync; try { - final boolean isSyncMessage = - source.map(s -> s.getAccount().isIdentifiedBy(destinationIdentifier)).orElse(false); - - if (isSyncMessage && destinationIdentifier.identityType() == IdentityType.PNI) { - throw new WebApplicationException(Status.FORBIDDEN); - } - - final Optional maybeDestination; - if (!isSyncMessage) { - maybeDestination = accountsManager.getByServiceIdentifier(destinationIdentifier); - } else { - maybeDestination = source.map(AuthenticatedDevice::getAccount); - } - - final MessageType messageType; - if (isStory) { - messageType = MessageType.INDIVIDUAL_STORY; - } else if (isSyncMessage) { - messageType = MessageType.SYNC; + needsSync = false; + sendStoryMessage(destinationIdentifier, messages, context); } else if (source.isPresent()) { - messageType = MessageType.INDIVIDUAL_IDENTIFIED_SENDER; + final AuthenticatedDevice authenticatedDevice = source.get(); + + if (authenticatedDevice.getAccount().isIdentifiedBy(destinationIdentifier)) { + needsSync = false; + sendSyncMessage(source.get(), destinationIdentifier, messages, context); + } else { + needsSync = authenticatedDevice.getAccount().getDevices().size() > 1; + sendIdentifiedSenderIndividualMessage(authenticatedDevice, destinationIdentifier, messages, context); + } } else { - messageType = MessageType.INDIVIDUAL_SEALED_SENDER; - } - - final SpamCheckResult spamCheckResult = - spamChecker.checkForIndividualRecipientSpamHttp(messageType, context, source, maybeDestination, destinationIdentifier); - - if (spamCheckResult.response().isPresent()) { - return spamCheckResult.response().get(); - } - - final Optional reportSpamToken = spamCheckResult.token(); - - try { - final int totalContentLength = - messages.messages().stream().mapToInt(message -> message.content().length).sum(); - - rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength); - } catch (final RateLimitExceededException e) { - messageByteLimitEstimator.add(destinationIdentifier.uuid().toString()); - throw e; - } - - try { - if (isStory) { - // Stories will be checked by the client; we bypass access checks here for stories. - } else if (groupSendToken != null) { - checkGroupSendToken(List.of(destinationIdentifier.toLibsignal()), groupSendToken); - if (maybeDestination.isEmpty()) { - throw new NotFoundException(); - } - } else { - OptionalAccess.verify(source.map(AuthenticatedDevice::getAccount), accessKey, maybeDestination, - destinationIdentifier); - } - - final boolean needsSync = !isSyncMessage && source.isPresent() && source.get().getAccount().getDevices().size() > 1; - - // We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify - // we leak information about whether a destination UUID exists if we return any other code (e.g. 404) from - // these requests. - if (isStory && maybeDestination.isEmpty()) { - return Response.ok(new SendMessageResponse(needsSync)).build(); - } - - // if destination is empty we would either throw an exception in OptionalAccess.verify when isStory is false - // or else return a 200 response when isStory is true. - final Account destination = maybeDestination.orElseThrow(); - - if (source.isPresent() && !isSyncMessage) { - rateLimiters.getMessagesLimiter().validate(source.get().getAccount().getUuid(), destination.getUuid()); - } - - if (isStory) { - rateLimiters.getStoriesLimiter().validate(destination.getUuid()); - } - - final Map messagesByDeviceId = messages.messages().stream() - .collect(Collectors.toMap(IncomingMessage::destinationDeviceId, message -> { - try { - return message.toEnvelope( - destinationIdentifier, - source.map(AuthenticatedDevice::getAccount).orElse(null), - source.map(account -> account.getAuthenticatedDevice().getId()).orElse(null), - messages.timestamp() == 0 ? System.currentTimeMillis() : messages.timestamp(), - isStory, - messages.online(), - messages.urgent(), - reportSpamToken.orElse(null)); - } catch (final IllegalArgumentException e) { - logger.warn("Received bad envelope type {} from {}", message.type(), userAgent); - throw new BadRequestException(e); - } - })); - - final Map registrationIdsByDeviceId = messages.messages().stream() - .collect(Collectors.toMap(IncomingMessage::destinationDeviceId, IncomingMessage::destinationRegistrationId)); - - messageSender.sendMessages(destination, destinationIdentifier, messagesByDeviceId, registrationIdsByDeviceId, userAgent); - - return Response.ok(new SendMessageResponse(needsSync)).build(); - } catch (final MismatchedDevicesException e) { - if (!e.getMismatchedDevices().staleDeviceIds().isEmpty()) { - throw new WebApplicationException(Response.status(410) - .type(MediaType.APPLICATION_JSON) - .entity(new StaleDevicesResponse(e.getMismatchedDevices().staleDeviceIds())) - .build()); - } else { - throw new WebApplicationException(Response.status(409) - .type(MediaType.APPLICATION_JSON_TYPE) - .entity(new MismatchedDevicesResponse(e.getMismatchedDevices().missingDeviceIds(), - e.getMismatchedDevices().extraDeviceIds())) - .build()); - } - } catch (final MessageTooLargeException e) { - throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE); + needsSync = false; + sendSealedSenderMessage(destinationIdentifier, messages, accessKey, groupSendToken != null ? groupSendToken.token() : null, context); } } finally { sample.stop(INDIVIDUAL_MESSAGE_LATENCY_TIMER); } + + return Response.ok(new SendMessageResponse(needsSync)).build(); + } + + private void sendIdentifiedSenderIndividualMessage(final AuthenticatedDevice source, + final ServiceIdentifier destinationIdentifier, + final IncomingMessageList messages, + final ContainerRequestContext context) + throws RateLimitExceededException { + + final Account destination = + accountsManager.getByServiceIdentifier(destinationIdentifier).orElseThrow(NotFoundException::new); + + rateLimiters.getMessagesLimiter().validate(source.getAccount().getUuid(), destination.getUuid()); + + sendIndividualMessage(destination, + destinationIdentifier, + source, + messages, + false, + MessageType.INDIVIDUAL_IDENTIFIED_SENDER, + context); + } + + private void sendSyncMessage(final AuthenticatedDevice source, + final ServiceIdentifier destinationIdentifier, + final IncomingMessageList messages, + final ContainerRequestContext context) + throws RateLimitExceededException { + + if (destinationIdentifier.identityType() == IdentityType.PNI) { + throw new WebApplicationException(Status.FORBIDDEN); + } + + sendIndividualMessage(source.getAccount(), + destinationIdentifier, + source, + messages, + false, + MessageType.SYNC, + context); + } + + private void sendSealedSenderMessage(final ServiceIdentifier destinationIdentifier, + final IncomingMessageList messages, + final Optional accessKey, + @Nullable final GroupSendFullToken groupSendToken, + final ContainerRequestContext context) + throws RateLimitExceededException { + + if (accessKey.isEmpty() && groupSendToken == null) { + throw new WebApplicationException(Status.UNAUTHORIZED); + } + + final Optional maybeDestination = accountsManager.getByServiceIdentifier(destinationIdentifier); + + if (groupSendToken != null) { + checkGroupSendToken(List.of(destinationIdentifier.toLibsignal()), groupSendToken); + } else { + OptionalAccess.verify(Optional.empty(), accessKey, maybeDestination, destinationIdentifier); + } + + final Account destination = maybeDestination.orElseThrow(NotFoundException::new); + + sendIndividualMessage(destination, + destinationIdentifier, + null, + messages, + false, + MessageType.INDIVIDUAL_SEALED_SENDER, + context); + } + + private void sendStoryMessage(final ServiceIdentifier destinationIdentifier, + final IncomingMessageList messages, + final ContainerRequestContext context) + throws RateLimitExceededException { + + // We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify + // authentication is handled by the receiving client, we leak information about whether a destination UUID exists if + // we return any other code (e.g. 404) from these requests. + final Account destination = accountsManager.getByServiceIdentifier(destinationIdentifier).orElseThrow(() -> + new WebApplicationException(Response.ok(new SendMessageResponse(false)).build())); + + rateLimiters.getStoriesLimiter().validate(destination.getUuid()); + + sendIndividualMessage(destination, + destinationIdentifier, + null, + messages, + true, + MessageType.INDIVIDUAL_STORY, + context); + } + + private void sendIndividualMessage(final Account destination, + final ServiceIdentifier destinationIdentifier, + @Nullable final AuthenticatedDevice sender, + final IncomingMessageList messages, + final boolean isStory, + final MessageType messageType, + final ContainerRequestContext context) throws RateLimitExceededException { + + final SpamCheckResult spamCheckResult = + spamChecker.checkForIndividualRecipientSpamHttp(messageType, + context, + Optional.ofNullable(sender), + Optional.of(destination), + destinationIdentifier); + + spamCheckResult.response().ifPresent(response -> { + throw new WebApplicationException(response); + }); + + final String userAgent = context.getHeaderString(HttpHeaders.USER_AGENT); + + try { + final int totalContentLength = + messages.messages().stream().mapToInt(message -> message.content().length).sum(); + + rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength); + } catch (final RateLimitExceededException e) { + messageByteLimitEstimator.add(destinationIdentifier.uuid().toString()); + throw e; + } + + final Map messagesByDeviceId = messages.messages().stream() + .collect(Collectors.toMap(IncomingMessage::destinationDeviceId, message -> { + try { + return message.toEnvelope( + destinationIdentifier, + sender != null ? sender.getAccount() : null, + sender != null ? sender.getAuthenticatedDevice().getId() : null, + messages.timestamp() == 0 ? System.currentTimeMillis() : messages.timestamp(), + isStory, + messages.online(), + messages.urgent(), + spamCheckResult.token().orElse(null)); + } catch (final IllegalArgumentException e) { + logger.warn("Received bad envelope type {} from {}", message.type(), userAgent); + throw new BadRequestException(e); + } + })); + + final Map registrationIdsByDeviceId = messages.messages().stream() + .collect(Collectors.toMap(IncomingMessage::destinationDeviceId, IncomingMessage::destinationRegistrationId)); + + try { + messageSender.sendMessages(destination, + destinationIdentifier, + messagesByDeviceId, + registrationIdsByDeviceId, + userAgent); + } catch (final MismatchedDevicesException e) { + if (!e.getMismatchedDevices().staleDeviceIds().isEmpty()) { + throw new WebApplicationException(Response.status(410) + .type(MediaType.APPLICATION_JSON) + .entity(new StaleDevicesResponse(e.getMismatchedDevices().staleDeviceIds())) + .build()); + } else { + throw new WebApplicationException(Response.status(409) + .type(MediaType.APPLICATION_JSON_TYPE) + .entity(new MismatchedDevicesResponse(e.getMismatchedDevices().missingDeviceIds(), + e.getMismatchedDevices().extraDeviceIds())) + .build()); + } + } catch (final MessageTooLargeException e) { + throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE); + } } @Timed @@ -439,8 +504,6 @@ public class MessageController { @HeaderParam(HeaderUtils.GROUP_SEND_TOKEN) @Nullable GroupSendTokenHeader groupSendToken, - @HeaderParam(HttpHeaders.USER_AGENT) String userAgent, - @Parameter(description="If true, deliver the message only to recipients that are online when it is sent") @QueryParam("online") boolean online, @@ -455,7 +518,7 @@ public class MessageController { @Parameter(description="The sealed-sender multi-recipient message payload as serialized by libsignal") @NotNull SealedSenderMultiRecipientMessage multiRecipientMessage, - @Context ContainerRequestContext context) throws RateLimitExceededException { + @Context ContainerRequestContext context) { if (timestamp < 0 || timestamp > MAX_TIMESTAMP) { throw new BadRequestException("Illegal timestamp"); @@ -465,172 +528,207 @@ public class MessageController { throw new BadRequestException("Recipient list is empty"); } - // Check that the request is well-formed and doesn't contain repeated entries for the same device for the same - // recipient - { - final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID]; - - for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) { - Arrays.fill(usedDeviceIds, false); - - for (final byte deviceId : recipient.getDevices()) { - if (usedDeviceIds[deviceId]) { - throw new BadRequestException(); - } - - usedDeviceIds[deviceId] = true; - } - } - } - final Timer.Sample sample = Timer.start(); try { - final SpamCheckResult spamCheckResult = spamChecker.checkForMultiRecipientSpamHttp( - isStory ? MessageType.MULTI_RECIPIENT_STORY : MessageType.MULTI_RECIPIENT_SEALED_SENDER, - context); + final SendMultiRecipientMessageResponse sendMultiRecipientMessageResponse; - if (spamCheckResult.response().isPresent()) { - return spamCheckResult.response().get(); - } - - if (groupSendToken == null && accessKeys == null && !isStory) { - throw new NotAuthorizedException("A group send endorsement token or unidentified access key is required for non-story messages"); - } - if (groupSendToken != null) { - if (accessKeys != null) { - throw new BadRequestException("Only one of group send endorsement token and unidentified access key may be provided"); - } else if (isStory) { - throw new BadRequestException("Stories should not provide a group send endorsement token"); - } - } - - if (groupSendToken != null) { - // Group send endorsements are checked before we even attempt to resolve any accounts, since - // the lists of service IDs in the envelope are all that we need to check against - checkGroupSendToken(multiRecipientMessage.getRecipients().keySet(), groupSendToken); - } - - // At this point, the caller has at least superficially provided the information needed to send a multi-recipient - // message. Attempt to resolve the destination service identifiers to Signal accounts. - final Map resolvedRecipients = - Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet()) - .flatMap(serviceIdAndRecipient -> { - final ServiceIdentifier serviceIdentifier = - ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey()); - - return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier)) - .flatMap(Mono::justOrEmpty) - .switchIfEmpty(isStory || groupSendToken != null ? Mono.empty() : Mono.error(NotFoundException::new)) - .map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account)); - }, MAX_FETCH_ACCOUNT_CONCURRENCY) - .collectMap(Tuple2::getT1, Tuple2::getT2) - .blockOptional() - .orElse(Collections.emptyMap()); - - // Access keys are checked against the UAK in the resolved accounts, so we have to check after resolving accounts above. - // Group send endorsements are checked earlier; for stories, we don't check permissions at all because only clients check them - if (groupSendToken == null && !isStory) { - checkAccessKeys(accessKeys, multiRecipientMessage, resolvedRecipients); - } - - // We might filter out all the recipients of a story (if none exist). - // In this case there is no error so we should just return 200 now. if (isStory) { - if (resolvedRecipients.isEmpty()) { - return Response.ok(new SendMultiRecipientMessageResponse(List.of())).build(); - } - - try { - CompletableFuture.allOf(resolvedRecipients.values() - .stream() - .map(account -> account.getIdentifier(IdentityType.ACI)) - .map(accountIdentifier -> - rateLimiters.getStoriesLimiter().validateAsync(accountIdentifier).toCompletableFuture()) - .toList() - .toArray(EMPTY_FUTURE_ARRAY)) - .join(); - } catch (final Exception e) { - if (ExceptionUtils.unwrap(e) instanceof RateLimitExceededException rateLimitExceededException) { - throw rateLimitExceededException; - } else { - throw ExceptionUtils.wrap(e); - } - } - } - - try { - if (!resolvedRecipients.isEmpty()) { - messageSender.sendMultiRecipientMessage(multiRecipientMessage, resolvedRecipients, timestamp, isStory, online, isUrgent, userAgent).get(); - } - - final List unresolvedRecipientServiceIds; if (groupSendToken != null) { - unresolvedRecipientServiceIds = multiRecipientMessage.getRecipients().entrySet().stream() - .filter(entry -> !resolvedRecipients.containsKey(entry.getValue())) - .map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey())) - .toList(); - } else { - unresolvedRecipientServiceIds = List.of(); + // Stories require no authentication. We fail requests that provide a groupSendToken, but for historical + // reasons we allow requests to set a combined access key, even though we ignore it + throw new BadRequestException("Group send token not allowed when sending stories"); } - return Response.ok(new SendMultiRecipientMessageResponse(unresolvedRecipientServiceIds)).build(); - } catch (InterruptedException e) { - logger.error("interrupted while delivering multi-recipient messages", e); - throw new InternalServerErrorException("interrupted during delivery"); - } catch (CancellationException e) { - logger.error("cancelled while delivering multi-recipient messages", e); - throw new InternalServerErrorException("delivery cancelled"); - } catch (ExecutionException e) { - logger.error("partial failure while delivering multi-recipient messages", e.getCause()); - throw new InternalServerErrorException("failure during delivery"); - } catch (MultiRecipientMismatchedDevicesException e) { - final List accountMismatchedDevices = - e.getMismatchedDevicesByServiceIdentifier().entrySet().stream() - .filter(entry -> !entry.getValue().missingDeviceIds().isEmpty() || !entry.getValue().extraDeviceIds().isEmpty()) - .map(entry -> new AccountMismatchedDevices(entry.getKey(), - new MismatchedDevicesResponse(entry.getValue().missingDeviceIds(), entry.getValue().extraDeviceIds()))) - .toList(); - - if (!accountMismatchedDevices.isEmpty()) { - return Response - .status(409) - .type(MediaType.APPLICATION_JSON_TYPE) - .entity(accountMismatchedDevices) - .build(); - } - - final List accountStaleDevices = - e.getMismatchedDevicesByServiceIdentifier().entrySet().stream() - .filter(entry -> !entry.getValue().staleDeviceIds().isEmpty()) - .map(entry -> new AccountStaleDevices(entry.getKey(), - new StaleDevicesResponse(entry.getValue().staleDeviceIds()))) - .toList(); - - if (!accountStaleDevices.isEmpty()) { - return Response - .status(410) - .type(MediaType.APPLICATION_JSON) - .entity(accountStaleDevices) - .build(); - } - - throw new RuntimeException(e); - } catch (final MessageTooLargeException e) { - throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE); + sendMultiRecipientMessageResponse = + sendMultiRecipientStoryMessage(multiRecipientMessage, timestamp, online, isUrgent, context); + } else { + sendMultiRecipientMessageResponse = + sendMultiRecipientMessage(multiRecipientMessage, timestamp, online, isUrgent, groupSendToken, accessKeys, + context); } + + return Response.ok(sendMultiRecipientMessageResponse).build(); } finally { sample.stop(MULTI_RECIPIENT_MESSAGE_LATENCY_TIMER); } } - private void checkGroupSendToken( - final Collection recipients, - final @NotNull GroupSendTokenHeader groupSendToken) { + private SendMultiRecipientMessageResponse sendMultiRecipientMessage(final SealedSenderMultiRecipientMessage multiRecipientMessage, + final long timestamp, + final boolean ephemeral, + final boolean urgent, + @Nullable final GroupSendTokenHeader groupSendTokenHeader, + @Nullable final CombinedUnidentifiedSenderAccessKeys combinedUnidentifiedSenderAccessKeys, + final ContainerRequestContext context) { + + // Perform fast, inexpensive checks before attempting to resolve recipients + validateNoDuplicateDevices(multiRecipientMessage); + + if (groupSendTokenHeader == null && combinedUnidentifiedSenderAccessKeys == null) { + throw new NotAuthorizedException("A group send endorsement token or unidentified access key is required for non-story messages"); + } + + if (groupSendTokenHeader != null && combinedUnidentifiedSenderAccessKeys != null) { + throw new BadRequestException("Only one of group send endorsement token and unidentified access key may be provided"); + } + + if (groupSendTokenHeader != null) { + // Group send endorsements are checked before we even attempt to resolve any accounts, since + // the lists of service IDs in the envelope are all that we need to check against + checkGroupSendToken(multiRecipientMessage.getRecipients().keySet(), groupSendTokenHeader); + } + + // At this point, the caller has at least superficially provided the information needed to send a multi-recipient + // message. Attempt to resolve the destination service identifiers to Signal accounts. + final Map resolvedRecipients = + resolveRecipients(multiRecipientMessage, groupSendTokenHeader == null); + + // Access keys are checked against the UAK in the resolved accounts, so we have to check after resolving accounts above. + // Group send endorsements are checked earlier; for stories, we don't check permissions at all because only clients check them + if (groupSendTokenHeader == null) { + checkAccessKeys(combinedUnidentifiedSenderAccessKeys, multiRecipientMessage, resolvedRecipients); + } + + sendMultiRecipientMessage(multiRecipientMessage, + resolvedRecipients, + timestamp, + false, + ephemeral, + urgent, + context); + + final List unresolvedRecipientServiceIdentifiers; + + if (groupSendTokenHeader != null) { + unresolvedRecipientServiceIdentifiers = multiRecipientMessage.getRecipients().entrySet().stream() + .filter(entry -> !resolvedRecipients.containsKey(entry.getValue())) + .map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey())) + .toList(); + } else { + unresolvedRecipientServiceIdentifiers = List.of(); + } + + return new SendMultiRecipientMessageResponse(unresolvedRecipientServiceIdentifiers); + } + + @SuppressWarnings("SameReturnValue") + private SendMultiRecipientMessageResponse sendMultiRecipientStoryMessage(final SealedSenderMultiRecipientMessage multiRecipientMessage, + final long timestamp, + final boolean ephemeral, + final boolean urgent, + final ContainerRequestContext context) { + + // Perform fast, inexpensive checks before attempting to resolve recipients + validateNoDuplicateDevices(multiRecipientMessage); + + // At this point, the caller has at least superficially provided the information needed to send a multi-recipient + // message. Attempt to resolve the destination service identifiers to Signal accounts. + final Map resolvedRecipients = + resolveRecipients(multiRecipientMessage, false); + + // We might filter out all the recipients of a story (if none exist). + // In this case there is no error so we should just return 200 now. + if (resolvedRecipients.isEmpty()) { + return SEND_STORY_RESPONSE; + } + + CompletableFuture.allOf(resolvedRecipients.values() + .stream() + .map(account -> account.getIdentifier(IdentityType.ACI)) + .map(accountIdentifier -> + rateLimiters.getStoriesLimiter().validateAsync(accountIdentifier).toCompletableFuture()) + .toList() + .toArray(EMPTY_FUTURE_ARRAY)) + .join(); + + sendMultiRecipientMessage(multiRecipientMessage, + resolvedRecipients, + timestamp, + true, + ephemeral, + urgent, + context); + + return SEND_STORY_RESPONSE; + } + + private void sendMultiRecipientMessage(final SealedSenderMultiRecipientMessage multiRecipientMessage, + final Map resolvedRecipients, + final long timestamp, + final boolean isStory, + final boolean ephemeral, + final boolean urgent, + final ContainerRequestContext context) { + + final MessageType messageType = + isStory ? MessageType.MULTI_RECIPIENT_STORY : MessageType.MULTI_RECIPIENT_SEALED_SENDER; + + spamChecker.checkForMultiRecipientSpamHttp(messageType, context).response().ifPresent(response -> { + throw new WebApplicationException(response); + }); + try { - final GroupSendFullToken token = groupSendToken.token(); - token.verify(recipients, clock.instant(), GroupSendDerivedKeyPair.forExpiration(token.getExpiration(), serverSecretParams)); - } catch (VerificationFailedException e) { + if (!resolvedRecipients.isEmpty()) { + messageSender.sendMultiRecipientMessage(multiRecipientMessage, + resolvedRecipients, + timestamp, isStory, + ephemeral, + urgent, + context.getHeaderString(HttpHeaders.USER_AGENT)).get(); + } + } catch (final InterruptedException e) { + logger.error("interrupted while delivering multi-recipient messages", e); + throw new InternalServerErrorException("interrupted during delivery"); + } catch (final CancellationException e) { + logger.error("cancelled while delivering multi-recipient messages", e); + throw new InternalServerErrorException("delivery cancelled"); + } catch (final ExecutionException e) { + logger.error("partial failure while delivering multi-recipient messages", e.getCause()); + throw new InternalServerErrorException("failure during delivery"); + } catch (final MessageTooLargeException e) { + throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE); + } catch (final MultiRecipientMismatchedDevicesException e) { + final List accountMismatchedDevices = + e.getMismatchedDevicesByServiceIdentifier().entrySet().stream() + .filter(entry -> !entry.getValue().missingDeviceIds().isEmpty() || !entry.getValue().extraDeviceIds().isEmpty()) + .map(entry -> new AccountMismatchedDevices(entry.getKey(), + new MismatchedDevicesResponse(entry.getValue().missingDeviceIds(), entry.getValue().extraDeviceIds()))) + .toList(); + + if (!accountMismatchedDevices.isEmpty()) { + throw new WebApplicationException(Response + .status(409) + .type(MediaType.APPLICATION_JSON_TYPE) + .entity(accountMismatchedDevices) + .build()); + } + + final List accountStaleDevices = + e.getMismatchedDevicesByServiceIdentifier().entrySet().stream() + .filter(entry -> !entry.getValue().staleDeviceIds().isEmpty()) + .map(entry -> new AccountStaleDevices(entry.getKey(), + new StaleDevicesResponse(entry.getValue().staleDeviceIds()))) + .toList(); + + throw new WebApplicationException(Response + .status(410) + .type(MediaType.APPLICATION_JSON) + .entity(accountStaleDevices) + .build()); + } + } + + private void checkGroupSendToken(final Collection recipients, final GroupSendTokenHeader groupSendToken) { + checkGroupSendToken(recipients, groupSendToken.token()); + } + + private void checkGroupSendToken(final Collection recipients, final GroupSendFullToken groupSendFullToken) { + try { + groupSendFullToken.verify(recipients, + clock.instant(), + GroupSendDerivedKeyPair.forExpiration(groupSendFullToken.getExpiration(), serverSecretParams)); + } catch (final VerificationFailedException e) { throw new NotAuthorizedException(e); } } @@ -811,4 +909,43 @@ public class MessageController { return Response.status(Status.ACCEPTED) .build(); } + + private static void validateNoDuplicateDevices(final SealedSenderMultiRecipientMessage multiRecipientMessage) { + final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID + 1]; + + for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) { + if (recipient.getDevices().length == 1) { + // A recipient can't have repeated devices if they only have one device + continue; + } + + Arrays.fill(usedDeviceIds, false); + + for (final byte deviceId : recipient.getDevices()) { + if (usedDeviceIds[deviceId]) { + throw new BadRequestException(); + } + + usedDeviceIds[deviceId] = true; + } + } + } + + private Map resolveRecipients(final SealedSenderMultiRecipientMessage multiRecipientMessage, + final boolean throwOnNotFound) { + + return Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet()) + .flatMap(serviceIdAndRecipient -> { + final ServiceIdentifier serviceIdentifier = + ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey()); + + return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier)) + .flatMap(Mono::justOrEmpty) + .switchIfEmpty(throwOnNotFound ? Mono.error(NotFoundException::new) : Mono.empty()) + .map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account)); + }, MAX_FETCH_ACCOUNT_CONCURRENCY) + .collectMap(Tuple2::getT1, Tuple2::getT2) + .blockOptional() + .orElse(Collections.emptyMap()); + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MismatchedDevices.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MismatchedDevices.java index 77a6a85a2..813de1a19 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MismatchedDevices.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MismatchedDevices.java @@ -8,4 +8,10 @@ package org.whispersystems.textsecuregcm.controllers; import java.util.Set; public record MismatchedDevices(Set missingDeviceIds, Set extraDeviceIds, Set staleDeviceIds) { + + public MismatchedDevices { + if (missingDeviceIds.isEmpty() && extraDeviceIds.isEmpty() && staleDeviceIds.isEmpty()) { + throw new IllegalArgumentException("At least one of missingDevices, extraDevices, or staleDevices must be non-empty"); + } + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MultiRecipientMismatchedDevicesException.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MultiRecipientMismatchedDevicesException.java index 65a054725..1b66c764e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MultiRecipientMismatchedDevicesException.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MultiRecipientMismatchedDevicesException.java @@ -15,6 +15,10 @@ public class MultiRecipientMismatchedDevicesException extends Exception { public MultiRecipientMismatchedDevicesException( final Map mismatchedDevicesByServiceIdentifier) { + if (mismatchedDevicesByServiceIdentifier.isEmpty()) { + throw new IllegalArgumentException("Must provide non-empty map of service identifiers to mismatched devices"); + } + this.mismatchedDevicesByServiceIdentifier = mismatchedDevicesByServiceIdentifier; } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java index 610c31eb8..6a5514cfa 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/MessageControllerTest.java @@ -92,6 +92,7 @@ import org.whispersystems.textsecuregcm.limits.CardinalityEstimator; import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor; import org.whispersystems.textsecuregcm.limits.RateLimiter; import org.whispersystems.textsecuregcm.limits.RateLimiters; +import org.whispersystems.textsecuregcm.mappers.CompletionExceptionMapper; import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider; @@ -187,6 +188,7 @@ class MessageControllerTest { .addProvider(AuthHelper.getAuthFilter()) .addProvider(new AuthValueFactoryProvider.Binder<>(AuthenticatedDevice.class)) .addProvider(RateLimitExceededExceptionMapper.class) + .addProvider(CompletionExceptionMapper.class) .addProvider(MultiRecipientMessageProvider.class) .setTestContainerFactory(new GrizzlyWebTestContainerFactory()) .addResource(