Subdivide `MessageController`'s message-sending methods into message-type-specific methods

This commit is contained in:
Jon Chambers 2025-04-08 10:22:07 -04:00 committed by GitHub
parent 58ad647d29
commit 759bf372ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 425 additions and 276 deletions

View File

@ -109,7 +109,6 @@ import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.HeaderUtils;
import org.whispersystems.textsecuregcm.util.Util;
import org.whispersystems.textsecuregcm.websocket.WebSocketConnection;
@ -177,6 +176,9 @@ public class MessageController {
private static final Duration NOTIFY_FOR_REMAINING_MESSAGES_DELAY = Duration.ofMinutes(1);
private static final SendMultiRecipientMessageResponse SEND_STORY_RESPONSE =
new SendMultiRecipientMessageResponse(Collections.emptyList());
public MessageController(
RateLimiters rateLimiters,
CardinalityEstimator messageByteLimitEstimator,
@ -262,144 +264,207 @@ public class MessageController {
@Context final ContainerRequestContext context) throws RateLimitExceededException {
if (source.isEmpty() && accessKey.isEmpty() && groupSendToken == null && !isStory) {
throw new WebApplicationException(Status.UNAUTHORIZED);
}
if (groupSendToken != null) {
if (source.isPresent() || accessKey.isPresent()) {
throw new BadRequestException(
"Group send endorsement tokens should not be combined with other authentication");
throw new BadRequestException("Group send endorsement tokens should not be combined with other authentication");
} else if (isStory) {
throw new BadRequestException("Group send endorsement tokens should not be sent for story messages");
}
}
final Sample sample = Timer.start();
final boolean needsSync;
try {
final boolean isSyncMessage =
source.map(s -> s.getAccount().isIdentifiedBy(destinationIdentifier)).orElse(false);
if (isSyncMessage && destinationIdentifier.identityType() == IdentityType.PNI) {
throw new WebApplicationException(Status.FORBIDDEN);
}
final Optional<Account> maybeDestination;
if (!isSyncMessage) {
maybeDestination = accountsManager.getByServiceIdentifier(destinationIdentifier);
} else {
maybeDestination = source.map(AuthenticatedDevice::getAccount);
}
final MessageType messageType;
if (isStory) {
messageType = MessageType.INDIVIDUAL_STORY;
} else if (isSyncMessage) {
messageType = MessageType.SYNC;
needsSync = false;
sendStoryMessage(destinationIdentifier, messages, context);
} else if (source.isPresent()) {
messageType = MessageType.INDIVIDUAL_IDENTIFIED_SENDER;
final AuthenticatedDevice authenticatedDevice = source.get();
if (authenticatedDevice.getAccount().isIdentifiedBy(destinationIdentifier)) {
needsSync = false;
sendSyncMessage(source.get(), destinationIdentifier, messages, context);
} else {
needsSync = authenticatedDevice.getAccount().getDevices().size() > 1;
sendIdentifiedSenderIndividualMessage(authenticatedDevice, destinationIdentifier, messages, context);
}
} else {
messageType = MessageType.INDIVIDUAL_SEALED_SENDER;
}
final SpamCheckResult<Response> spamCheckResult =
spamChecker.checkForIndividualRecipientSpamHttp(messageType, context, source, maybeDestination, destinationIdentifier);
if (spamCheckResult.response().isPresent()) {
return spamCheckResult.response().get();
}
final Optional<byte[]> reportSpamToken = spamCheckResult.token();
try {
final int totalContentLength =
messages.messages().stream().mapToInt(message -> message.content().length).sum();
rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength);
} catch (final RateLimitExceededException e) {
messageByteLimitEstimator.add(destinationIdentifier.uuid().toString());
throw e;
}
try {
if (isStory) {
// Stories will be checked by the client; we bypass access checks here for stories.
} else if (groupSendToken != null) {
checkGroupSendToken(List.of(destinationIdentifier.toLibsignal()), groupSendToken);
if (maybeDestination.isEmpty()) {
throw new NotFoundException();
}
} else {
OptionalAccess.verify(source.map(AuthenticatedDevice::getAccount), accessKey, maybeDestination,
destinationIdentifier);
}
final boolean needsSync = !isSyncMessage && source.isPresent() && source.get().getAccount().getDevices().size() > 1;
// We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify
// we leak information about whether a destination UUID exists if we return any other code (e.g. 404) from
// these requests.
if (isStory && maybeDestination.isEmpty()) {
return Response.ok(new SendMessageResponse(needsSync)).build();
}
// if destination is empty we would either throw an exception in OptionalAccess.verify when isStory is false
// or else return a 200 response when isStory is true.
final Account destination = maybeDestination.orElseThrow();
if (source.isPresent() && !isSyncMessage) {
rateLimiters.getMessagesLimiter().validate(source.get().getAccount().getUuid(), destination.getUuid());
}
if (isStory) {
rateLimiters.getStoriesLimiter().validate(destination.getUuid());
}
final Map<Byte, Envelope> messagesByDeviceId = messages.messages().stream()
.collect(Collectors.toMap(IncomingMessage::destinationDeviceId, message -> {
try {
return message.toEnvelope(
destinationIdentifier,
source.map(AuthenticatedDevice::getAccount).orElse(null),
source.map(account -> account.getAuthenticatedDevice().getId()).orElse(null),
messages.timestamp() == 0 ? System.currentTimeMillis() : messages.timestamp(),
isStory,
messages.online(),
messages.urgent(),
reportSpamToken.orElse(null));
} catch (final IllegalArgumentException e) {
logger.warn("Received bad envelope type {} from {}", message.type(), userAgent);
throw new BadRequestException(e);
}
}));
final Map<Byte, Integer> registrationIdsByDeviceId = messages.messages().stream()
.collect(Collectors.toMap(IncomingMessage::destinationDeviceId, IncomingMessage::destinationRegistrationId));
messageSender.sendMessages(destination, destinationIdentifier, messagesByDeviceId, registrationIdsByDeviceId, userAgent);
return Response.ok(new SendMessageResponse(needsSync)).build();
} catch (final MismatchedDevicesException e) {
if (!e.getMismatchedDevices().staleDeviceIds().isEmpty()) {
throw new WebApplicationException(Response.status(410)
.type(MediaType.APPLICATION_JSON)
.entity(new StaleDevicesResponse(e.getMismatchedDevices().staleDeviceIds()))
.build());
} else {
throw new WebApplicationException(Response.status(409)
.type(MediaType.APPLICATION_JSON_TYPE)
.entity(new MismatchedDevicesResponse(e.getMismatchedDevices().missingDeviceIds(),
e.getMismatchedDevices().extraDeviceIds()))
.build());
}
} catch (final MessageTooLargeException e) {
throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE);
needsSync = false;
sendSealedSenderMessage(destinationIdentifier, messages, accessKey, groupSendToken != null ? groupSendToken.token() : null, context);
}
} finally {
sample.stop(INDIVIDUAL_MESSAGE_LATENCY_TIMER);
}
return Response.ok(new SendMessageResponse(needsSync)).build();
}
private void sendIdentifiedSenderIndividualMessage(final AuthenticatedDevice source,
final ServiceIdentifier destinationIdentifier,
final IncomingMessageList messages,
final ContainerRequestContext context)
throws RateLimitExceededException {
final Account destination =
accountsManager.getByServiceIdentifier(destinationIdentifier).orElseThrow(NotFoundException::new);
rateLimiters.getMessagesLimiter().validate(source.getAccount().getUuid(), destination.getUuid());
sendIndividualMessage(destination,
destinationIdentifier,
source,
messages,
false,
MessageType.INDIVIDUAL_IDENTIFIED_SENDER,
context);
}
private void sendSyncMessage(final AuthenticatedDevice source,
final ServiceIdentifier destinationIdentifier,
final IncomingMessageList messages,
final ContainerRequestContext context)
throws RateLimitExceededException {
if (destinationIdentifier.identityType() == IdentityType.PNI) {
throw new WebApplicationException(Status.FORBIDDEN);
}
sendIndividualMessage(source.getAccount(),
destinationIdentifier,
source,
messages,
false,
MessageType.SYNC,
context);
}
private void sendSealedSenderMessage(final ServiceIdentifier destinationIdentifier,
final IncomingMessageList messages,
final Optional<Anonymous> accessKey,
@Nullable final GroupSendFullToken groupSendToken,
final ContainerRequestContext context)
throws RateLimitExceededException {
if (accessKey.isEmpty() && groupSendToken == null) {
throw new WebApplicationException(Status.UNAUTHORIZED);
}
final Optional<Account> maybeDestination = accountsManager.getByServiceIdentifier(destinationIdentifier);
if (groupSendToken != null) {
checkGroupSendToken(List.of(destinationIdentifier.toLibsignal()), groupSendToken);
} else {
OptionalAccess.verify(Optional.empty(), accessKey, maybeDestination, destinationIdentifier);
}
final Account destination = maybeDestination.orElseThrow(NotFoundException::new);
sendIndividualMessage(destination,
destinationIdentifier,
null,
messages,
false,
MessageType.INDIVIDUAL_SEALED_SENDER,
context);
}
private void sendStoryMessage(final ServiceIdentifier destinationIdentifier,
final IncomingMessageList messages,
final ContainerRequestContext context)
throws RateLimitExceededException {
// We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify
// authentication is handled by the receiving client, we leak information about whether a destination UUID exists if
// we return any other code (e.g. 404) from these requests.
final Account destination = accountsManager.getByServiceIdentifier(destinationIdentifier).orElseThrow(() ->
new WebApplicationException(Response.ok(new SendMessageResponse(false)).build()));
rateLimiters.getStoriesLimiter().validate(destination.getUuid());
sendIndividualMessage(destination,
destinationIdentifier,
null,
messages,
true,
MessageType.INDIVIDUAL_STORY,
context);
}
private void sendIndividualMessage(final Account destination,
final ServiceIdentifier destinationIdentifier,
@Nullable final AuthenticatedDevice sender,
final IncomingMessageList messages,
final boolean isStory,
final MessageType messageType,
final ContainerRequestContext context) throws RateLimitExceededException {
final SpamCheckResult<Response> spamCheckResult =
spamChecker.checkForIndividualRecipientSpamHttp(messageType,
context,
Optional.ofNullable(sender),
Optional.of(destination),
destinationIdentifier);
spamCheckResult.response().ifPresent(response -> {
throw new WebApplicationException(response);
});
final String userAgent = context.getHeaderString(HttpHeaders.USER_AGENT);
try {
final int totalContentLength =
messages.messages().stream().mapToInt(message -> message.content().length).sum();
rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength);
} catch (final RateLimitExceededException e) {
messageByteLimitEstimator.add(destinationIdentifier.uuid().toString());
throw e;
}
final Map<Byte, Envelope> messagesByDeviceId = messages.messages().stream()
.collect(Collectors.toMap(IncomingMessage::destinationDeviceId, message -> {
try {
return message.toEnvelope(
destinationIdentifier,
sender != null ? sender.getAccount() : null,
sender != null ? sender.getAuthenticatedDevice().getId() : null,
messages.timestamp() == 0 ? System.currentTimeMillis() : messages.timestamp(),
isStory,
messages.online(),
messages.urgent(),
spamCheckResult.token().orElse(null));
} catch (final IllegalArgumentException e) {
logger.warn("Received bad envelope type {} from {}", message.type(), userAgent);
throw new BadRequestException(e);
}
}));
final Map<Byte, Integer> registrationIdsByDeviceId = messages.messages().stream()
.collect(Collectors.toMap(IncomingMessage::destinationDeviceId, IncomingMessage::destinationRegistrationId));
try {
messageSender.sendMessages(destination,
destinationIdentifier,
messagesByDeviceId,
registrationIdsByDeviceId,
userAgent);
} catch (final MismatchedDevicesException e) {
if (!e.getMismatchedDevices().staleDeviceIds().isEmpty()) {
throw new WebApplicationException(Response.status(410)
.type(MediaType.APPLICATION_JSON)
.entity(new StaleDevicesResponse(e.getMismatchedDevices().staleDeviceIds()))
.build());
} else {
throw new WebApplicationException(Response.status(409)
.type(MediaType.APPLICATION_JSON_TYPE)
.entity(new MismatchedDevicesResponse(e.getMismatchedDevices().missingDeviceIds(),
e.getMismatchedDevices().extraDeviceIds()))
.build());
}
} catch (final MessageTooLargeException e) {
throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE);
}
}
@Timed
@ -439,8 +504,6 @@ public class MessageController {
@HeaderParam(HeaderUtils.GROUP_SEND_TOKEN)
@Nullable GroupSendTokenHeader groupSendToken,
@HeaderParam(HttpHeaders.USER_AGENT) String userAgent,
@Parameter(description="If true, deliver the message only to recipients that are online when it is sent")
@QueryParam("online") boolean online,
@ -455,7 +518,7 @@ public class MessageController {
@Parameter(description="The sealed-sender multi-recipient message payload as serialized by libsignal")
@NotNull SealedSenderMultiRecipientMessage multiRecipientMessage,
@Context ContainerRequestContext context) throws RateLimitExceededException {
@Context ContainerRequestContext context) {
if (timestamp < 0 || timestamp > MAX_TIMESTAMP) {
throw new BadRequestException("Illegal timestamp");
@ -465,172 +528,207 @@ public class MessageController {
throw new BadRequestException("Recipient list is empty");
}
// Check that the request is well-formed and doesn't contain repeated entries for the same device for the same
// recipient
{
final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID];
for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) {
Arrays.fill(usedDeviceIds, false);
for (final byte deviceId : recipient.getDevices()) {
if (usedDeviceIds[deviceId]) {
throw new BadRequestException();
}
usedDeviceIds[deviceId] = true;
}
}
}
final Timer.Sample sample = Timer.start();
try {
final SpamCheckResult<Response> spamCheckResult = spamChecker.checkForMultiRecipientSpamHttp(
isStory ? MessageType.MULTI_RECIPIENT_STORY : MessageType.MULTI_RECIPIENT_SEALED_SENDER,
context);
final SendMultiRecipientMessageResponse sendMultiRecipientMessageResponse;
if (spamCheckResult.response().isPresent()) {
return spamCheckResult.response().get();
}
if (groupSendToken == null && accessKeys == null && !isStory) {
throw new NotAuthorizedException("A group send endorsement token or unidentified access key is required for non-story messages");
}
if (groupSendToken != null) {
if (accessKeys != null) {
throw new BadRequestException("Only one of group send endorsement token and unidentified access key may be provided");
} else if (isStory) {
throw new BadRequestException("Stories should not provide a group send endorsement token");
}
}
if (groupSendToken != null) {
// Group send endorsements are checked before we even attempt to resolve any accounts, since
// the lists of service IDs in the envelope are all that we need to check against
checkGroupSendToken(multiRecipientMessage.getRecipients().keySet(), groupSendToken);
}
// At this point, the caller has at least superficially provided the information needed to send a multi-recipient
// message. Attempt to resolve the destination service identifiers to Signal accounts.
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients =
Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet())
.flatMap(serviceIdAndRecipient -> {
final ServiceIdentifier serviceIdentifier =
ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey());
return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier))
.flatMap(Mono::justOrEmpty)
.switchIfEmpty(isStory || groupSendToken != null ? Mono.empty() : Mono.error(NotFoundException::new))
.map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account));
}, MAX_FETCH_ACCOUNT_CONCURRENCY)
.collectMap(Tuple2::getT1, Tuple2::getT2)
.blockOptional()
.orElse(Collections.emptyMap());
// Access keys are checked against the UAK in the resolved accounts, so we have to check after resolving accounts above.
// Group send endorsements are checked earlier; for stories, we don't check permissions at all because only clients check them
if (groupSendToken == null && !isStory) {
checkAccessKeys(accessKeys, multiRecipientMessage, resolvedRecipients);
}
// We might filter out all the recipients of a story (if none exist).
// In this case there is no error so we should just return 200 now.
if (isStory) {
if (resolvedRecipients.isEmpty()) {
return Response.ok(new SendMultiRecipientMessageResponse(List.of())).build();
}
try {
CompletableFuture.allOf(resolvedRecipients.values()
.stream()
.map(account -> account.getIdentifier(IdentityType.ACI))
.map(accountIdentifier ->
rateLimiters.getStoriesLimiter().validateAsync(accountIdentifier).toCompletableFuture())
.toList()
.toArray(EMPTY_FUTURE_ARRAY))
.join();
} catch (final Exception e) {
if (ExceptionUtils.unwrap(e) instanceof RateLimitExceededException rateLimitExceededException) {
throw rateLimitExceededException;
} else {
throw ExceptionUtils.wrap(e);
}
}
}
try {
if (!resolvedRecipients.isEmpty()) {
messageSender.sendMultiRecipientMessage(multiRecipientMessage, resolvedRecipients, timestamp, isStory, online, isUrgent, userAgent).get();
}
final List<ServiceIdentifier> unresolvedRecipientServiceIds;
if (groupSendToken != null) {
unresolvedRecipientServiceIds = multiRecipientMessage.getRecipients().entrySet().stream()
.filter(entry -> !resolvedRecipients.containsKey(entry.getValue()))
.map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey()))
.toList();
} else {
unresolvedRecipientServiceIds = List.of();
// Stories require no authentication. We fail requests that provide a groupSendToken, but for historical
// reasons we allow requests to set a combined access key, even though we ignore it
throw new BadRequestException("Group send token not allowed when sending stories");
}
return Response.ok(new SendMultiRecipientMessageResponse(unresolvedRecipientServiceIds)).build();
} catch (InterruptedException e) {
logger.error("interrupted while delivering multi-recipient messages", e);
throw new InternalServerErrorException("interrupted during delivery");
} catch (CancellationException e) {
logger.error("cancelled while delivering multi-recipient messages", e);
throw new InternalServerErrorException("delivery cancelled");
} catch (ExecutionException e) {
logger.error("partial failure while delivering multi-recipient messages", e.getCause());
throw new InternalServerErrorException("failure during delivery");
} catch (MultiRecipientMismatchedDevicesException e) {
final List<AccountMismatchedDevices> accountMismatchedDevices =
e.getMismatchedDevicesByServiceIdentifier().entrySet().stream()
.filter(entry -> !entry.getValue().missingDeviceIds().isEmpty() || !entry.getValue().extraDeviceIds().isEmpty())
.map(entry -> new AccountMismatchedDevices(entry.getKey(),
new MismatchedDevicesResponse(entry.getValue().missingDeviceIds(), entry.getValue().extraDeviceIds())))
.toList();
if (!accountMismatchedDevices.isEmpty()) {
return Response
.status(409)
.type(MediaType.APPLICATION_JSON_TYPE)
.entity(accountMismatchedDevices)
.build();
}
final List<AccountStaleDevices> accountStaleDevices =
e.getMismatchedDevicesByServiceIdentifier().entrySet().stream()
.filter(entry -> !entry.getValue().staleDeviceIds().isEmpty())
.map(entry -> new AccountStaleDevices(entry.getKey(),
new StaleDevicesResponse(entry.getValue().staleDeviceIds())))
.toList();
if (!accountStaleDevices.isEmpty()) {
return Response
.status(410)
.type(MediaType.APPLICATION_JSON)
.entity(accountStaleDevices)
.build();
}
throw new RuntimeException(e);
} catch (final MessageTooLargeException e) {
throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE);
sendMultiRecipientMessageResponse =
sendMultiRecipientStoryMessage(multiRecipientMessage, timestamp, online, isUrgent, context);
} else {
sendMultiRecipientMessageResponse =
sendMultiRecipientMessage(multiRecipientMessage, timestamp, online, isUrgent, groupSendToken, accessKeys,
context);
}
return Response.ok(sendMultiRecipientMessageResponse).build();
} finally {
sample.stop(MULTI_RECIPIENT_MESSAGE_LATENCY_TIMER);
}
}
private void checkGroupSendToken(
final Collection<ServiceId> recipients,
final @NotNull GroupSendTokenHeader groupSendToken) {
private SendMultiRecipientMessageResponse sendMultiRecipientMessage(final SealedSenderMultiRecipientMessage multiRecipientMessage,
final long timestamp,
final boolean ephemeral,
final boolean urgent,
@Nullable final GroupSendTokenHeader groupSendTokenHeader,
@Nullable final CombinedUnidentifiedSenderAccessKeys combinedUnidentifiedSenderAccessKeys,
final ContainerRequestContext context) {
// Perform fast, inexpensive checks before attempting to resolve recipients
validateNoDuplicateDevices(multiRecipientMessage);
if (groupSendTokenHeader == null && combinedUnidentifiedSenderAccessKeys == null) {
throw new NotAuthorizedException("A group send endorsement token or unidentified access key is required for non-story messages");
}
if (groupSendTokenHeader != null && combinedUnidentifiedSenderAccessKeys != null) {
throw new BadRequestException("Only one of group send endorsement token and unidentified access key may be provided");
}
if (groupSendTokenHeader != null) {
// Group send endorsements are checked before we even attempt to resolve any accounts, since
// the lists of service IDs in the envelope are all that we need to check against
checkGroupSendToken(multiRecipientMessage.getRecipients().keySet(), groupSendTokenHeader);
}
// At this point, the caller has at least superficially provided the information needed to send a multi-recipient
// message. Attempt to resolve the destination service identifiers to Signal accounts.
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients =
resolveRecipients(multiRecipientMessage, groupSendTokenHeader == null);
// Access keys are checked against the UAK in the resolved accounts, so we have to check after resolving accounts above.
// Group send endorsements are checked earlier; for stories, we don't check permissions at all because only clients check them
if (groupSendTokenHeader == null) {
checkAccessKeys(combinedUnidentifiedSenderAccessKeys, multiRecipientMessage, resolvedRecipients);
}
sendMultiRecipientMessage(multiRecipientMessage,
resolvedRecipients,
timestamp,
false,
ephemeral,
urgent,
context);
final List<ServiceIdentifier> unresolvedRecipientServiceIdentifiers;
if (groupSendTokenHeader != null) {
unresolvedRecipientServiceIdentifiers = multiRecipientMessage.getRecipients().entrySet().stream()
.filter(entry -> !resolvedRecipients.containsKey(entry.getValue()))
.map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey()))
.toList();
} else {
unresolvedRecipientServiceIdentifiers = List.of();
}
return new SendMultiRecipientMessageResponse(unresolvedRecipientServiceIdentifiers);
}
@SuppressWarnings("SameReturnValue")
private SendMultiRecipientMessageResponse sendMultiRecipientStoryMessage(final SealedSenderMultiRecipientMessage multiRecipientMessage,
final long timestamp,
final boolean ephemeral,
final boolean urgent,
final ContainerRequestContext context) {
// Perform fast, inexpensive checks before attempting to resolve recipients
validateNoDuplicateDevices(multiRecipientMessage);
// At this point, the caller has at least superficially provided the information needed to send a multi-recipient
// message. Attempt to resolve the destination service identifiers to Signal accounts.
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients =
resolveRecipients(multiRecipientMessage, false);
// We might filter out all the recipients of a story (if none exist).
// In this case there is no error so we should just return 200 now.
if (resolvedRecipients.isEmpty()) {
return SEND_STORY_RESPONSE;
}
CompletableFuture.allOf(resolvedRecipients.values()
.stream()
.map(account -> account.getIdentifier(IdentityType.ACI))
.map(accountIdentifier ->
rateLimiters.getStoriesLimiter().validateAsync(accountIdentifier).toCompletableFuture())
.toList()
.toArray(EMPTY_FUTURE_ARRAY))
.join();
sendMultiRecipientMessage(multiRecipientMessage,
resolvedRecipients,
timestamp,
true,
ephemeral,
urgent,
context);
return SEND_STORY_RESPONSE;
}
private void sendMultiRecipientMessage(final SealedSenderMultiRecipientMessage multiRecipientMessage,
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients,
final long timestamp,
final boolean isStory,
final boolean ephemeral,
final boolean urgent,
final ContainerRequestContext context) {
final MessageType messageType =
isStory ? MessageType.MULTI_RECIPIENT_STORY : MessageType.MULTI_RECIPIENT_SEALED_SENDER;
spamChecker.checkForMultiRecipientSpamHttp(messageType, context).response().ifPresent(response -> {
throw new WebApplicationException(response);
});
try {
final GroupSendFullToken token = groupSendToken.token();
token.verify(recipients, clock.instant(), GroupSendDerivedKeyPair.forExpiration(token.getExpiration(), serverSecretParams));
} catch (VerificationFailedException e) {
if (!resolvedRecipients.isEmpty()) {
messageSender.sendMultiRecipientMessage(multiRecipientMessage,
resolvedRecipients,
timestamp, isStory,
ephemeral,
urgent,
context.getHeaderString(HttpHeaders.USER_AGENT)).get();
}
} catch (final InterruptedException e) {
logger.error("interrupted while delivering multi-recipient messages", e);
throw new InternalServerErrorException("interrupted during delivery");
} catch (final CancellationException e) {
logger.error("cancelled while delivering multi-recipient messages", e);
throw new InternalServerErrorException("delivery cancelled");
} catch (final ExecutionException e) {
logger.error("partial failure while delivering multi-recipient messages", e.getCause());
throw new InternalServerErrorException("failure during delivery");
} catch (final MessageTooLargeException e) {
throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE);
} catch (final MultiRecipientMismatchedDevicesException e) {
final List<AccountMismatchedDevices> accountMismatchedDevices =
e.getMismatchedDevicesByServiceIdentifier().entrySet().stream()
.filter(entry -> !entry.getValue().missingDeviceIds().isEmpty() || !entry.getValue().extraDeviceIds().isEmpty())
.map(entry -> new AccountMismatchedDevices(entry.getKey(),
new MismatchedDevicesResponse(entry.getValue().missingDeviceIds(), entry.getValue().extraDeviceIds())))
.toList();
if (!accountMismatchedDevices.isEmpty()) {
throw new WebApplicationException(Response
.status(409)
.type(MediaType.APPLICATION_JSON_TYPE)
.entity(accountMismatchedDevices)
.build());
}
final List<AccountStaleDevices> accountStaleDevices =
e.getMismatchedDevicesByServiceIdentifier().entrySet().stream()
.filter(entry -> !entry.getValue().staleDeviceIds().isEmpty())
.map(entry -> new AccountStaleDevices(entry.getKey(),
new StaleDevicesResponse(entry.getValue().staleDeviceIds())))
.toList();
throw new WebApplicationException(Response
.status(410)
.type(MediaType.APPLICATION_JSON)
.entity(accountStaleDevices)
.build());
}
}
private void checkGroupSendToken(final Collection<ServiceId> recipients, final GroupSendTokenHeader groupSendToken) {
checkGroupSendToken(recipients, groupSendToken.token());
}
private void checkGroupSendToken(final Collection<ServiceId> recipients, final GroupSendFullToken groupSendFullToken) {
try {
groupSendFullToken.verify(recipients,
clock.instant(),
GroupSendDerivedKeyPair.forExpiration(groupSendFullToken.getExpiration(), serverSecretParams));
} catch (final VerificationFailedException e) {
throw new NotAuthorizedException(e);
}
}
@ -811,4 +909,43 @@ public class MessageController {
return Response.status(Status.ACCEPTED)
.build();
}
private static void validateNoDuplicateDevices(final SealedSenderMultiRecipientMessage multiRecipientMessage) {
final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID + 1];
for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) {
if (recipient.getDevices().length == 1) {
// A recipient can't have repeated devices if they only have one device
continue;
}
Arrays.fill(usedDeviceIds, false);
for (final byte deviceId : recipient.getDevices()) {
if (usedDeviceIds[deviceId]) {
throw new BadRequestException();
}
usedDeviceIds[deviceId] = true;
}
}
}
private Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolveRecipients(final SealedSenderMultiRecipientMessage multiRecipientMessage,
final boolean throwOnNotFound) {
return Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet())
.flatMap(serviceIdAndRecipient -> {
final ServiceIdentifier serviceIdentifier =
ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey());
return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier))
.flatMap(Mono::justOrEmpty)
.switchIfEmpty(throwOnNotFound ? Mono.error(NotFoundException::new) : Mono.empty())
.map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account));
}, MAX_FETCH_ACCOUNT_CONCURRENCY)
.collectMap(Tuple2::getT1, Tuple2::getT2)
.blockOptional()
.orElse(Collections.emptyMap());
}
}

View File

@ -8,4 +8,10 @@ package org.whispersystems.textsecuregcm.controllers;
import java.util.Set;
public record MismatchedDevices(Set<Byte> missingDeviceIds, Set<Byte> extraDeviceIds, Set<Byte> staleDeviceIds) {
public MismatchedDevices {
if (missingDeviceIds.isEmpty() && extraDeviceIds.isEmpty() && staleDeviceIds.isEmpty()) {
throw new IllegalArgumentException("At least one of missingDevices, extraDevices, or staleDevices must be non-empty");
}
}
}

View File

@ -15,6 +15,10 @@ public class MultiRecipientMismatchedDevicesException extends Exception {
public MultiRecipientMismatchedDevicesException(
final Map<ServiceIdentifier, MismatchedDevices> mismatchedDevicesByServiceIdentifier) {
if (mismatchedDevicesByServiceIdentifier.isEmpty()) {
throw new IllegalArgumentException("Must provide non-empty map of service identifiers to mismatched devices");
}
this.mismatchedDevicesByServiceIdentifier = mismatchedDevicesByServiceIdentifier;
}

View File

@ -92,6 +92,7 @@ import org.whispersystems.textsecuregcm.limits.CardinalityEstimator;
import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.mappers.CompletionExceptionMapper;
import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider;
@ -187,6 +188,7 @@ class MessageControllerTest {
.addProvider(AuthHelper.getAuthFilter())
.addProvider(new AuthValueFactoryProvider.Binder<>(AuthenticatedDevice.class))
.addProvider(RateLimitExceededExceptionMapper.class)
.addProvider(CompletionExceptionMapper.class)
.addProvider(MultiRecipientMessageProvider.class)
.setTestContainerFactory(new GrizzlyWebTestContainerFactory())
.addResource(