Subdivide `MessageController`'s message-sending methods into message-type-specific methods
This commit is contained in:
parent
58ad647d29
commit
02a5a6b55f
|
@ -109,7 +109,6 @@ import org.whispersystems.textsecuregcm.storage.Device;
|
|||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
|
||||
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import org.whispersystems.textsecuregcm.util.HeaderUtils;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebSocketConnection;
|
||||
|
@ -177,6 +176,9 @@ public class MessageController {
|
|||
|
||||
private static final Duration NOTIFY_FOR_REMAINING_MESSAGES_DELAY = Duration.ofMinutes(1);
|
||||
|
||||
private static final SendMultiRecipientMessageResponse SEND_STORY_RESPONSE =
|
||||
new SendMultiRecipientMessageResponse(Collections.emptyList());
|
||||
|
||||
public MessageController(
|
||||
RateLimiters rateLimiters,
|
||||
CardinalityEstimator messageByteLimitEstimator,
|
||||
|
@ -262,144 +264,207 @@ public class MessageController {
|
|||
|
||||
@Context final ContainerRequestContext context) throws RateLimitExceededException {
|
||||
|
||||
if (source.isEmpty() && accessKey.isEmpty() && groupSendToken == null && !isStory) {
|
||||
throw new WebApplicationException(Status.UNAUTHORIZED);
|
||||
}
|
||||
|
||||
if (groupSendToken != null) {
|
||||
if (source.isPresent() || accessKey.isPresent()) {
|
||||
throw new BadRequestException(
|
||||
"Group send endorsement tokens should not be combined with other authentication");
|
||||
throw new BadRequestException("Group send endorsement tokens should not be combined with other authentication");
|
||||
} else if (isStory) {
|
||||
throw new BadRequestException("Group send endorsement tokens should not be sent for story messages");
|
||||
}
|
||||
}
|
||||
|
||||
final Sample sample = Timer.start();
|
||||
final boolean needsSync;
|
||||
|
||||
try {
|
||||
final boolean isSyncMessage =
|
||||
source.map(s -> s.getAccount().isIdentifiedBy(destinationIdentifier)).orElse(false);
|
||||
|
||||
if (isSyncMessage && destinationIdentifier.identityType() == IdentityType.PNI) {
|
||||
throw new WebApplicationException(Status.FORBIDDEN);
|
||||
}
|
||||
|
||||
final Optional<Account> maybeDestination;
|
||||
if (!isSyncMessage) {
|
||||
maybeDestination = accountsManager.getByServiceIdentifier(destinationIdentifier);
|
||||
} else {
|
||||
maybeDestination = source.map(AuthenticatedDevice::getAccount);
|
||||
}
|
||||
|
||||
final MessageType messageType;
|
||||
|
||||
if (isStory) {
|
||||
messageType = MessageType.INDIVIDUAL_STORY;
|
||||
} else if (isSyncMessage) {
|
||||
messageType = MessageType.SYNC;
|
||||
needsSync = false;
|
||||
sendStoryMessage(destinationIdentifier, messages, context);
|
||||
} else if (source.isPresent()) {
|
||||
messageType = MessageType.INDIVIDUAL_IDENTIFIED_SENDER;
|
||||
final AuthenticatedDevice authenticatedDevice = source.get();
|
||||
|
||||
if (authenticatedDevice.getAccount().isIdentifiedBy(destinationIdentifier)) {
|
||||
needsSync = false;
|
||||
sendSyncMessage(source.get(), destinationIdentifier, messages, context);
|
||||
} else {
|
||||
needsSync = authenticatedDevice.getAccount().getDevices().size() > 1;
|
||||
sendIdentifiedSenderIndividualMessage(authenticatedDevice, destinationIdentifier, messages, context);
|
||||
}
|
||||
} else {
|
||||
messageType = MessageType.INDIVIDUAL_SEALED_SENDER;
|
||||
}
|
||||
|
||||
final SpamCheckResult<Response> spamCheckResult =
|
||||
spamChecker.checkForIndividualRecipientSpamHttp(messageType, context, source, maybeDestination, destinationIdentifier);
|
||||
|
||||
if (spamCheckResult.response().isPresent()) {
|
||||
return spamCheckResult.response().get();
|
||||
}
|
||||
|
||||
final Optional<byte[]> reportSpamToken = spamCheckResult.token();
|
||||
|
||||
try {
|
||||
final int totalContentLength =
|
||||
messages.messages().stream().mapToInt(message -> message.content().length).sum();
|
||||
|
||||
rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength);
|
||||
} catch (final RateLimitExceededException e) {
|
||||
messageByteLimitEstimator.add(destinationIdentifier.uuid().toString());
|
||||
throw e;
|
||||
}
|
||||
|
||||
try {
|
||||
if (isStory) {
|
||||
// Stories will be checked by the client; we bypass access checks here for stories.
|
||||
} else if (groupSendToken != null) {
|
||||
checkGroupSendToken(List.of(destinationIdentifier.toLibsignal()), groupSendToken);
|
||||
if (maybeDestination.isEmpty()) {
|
||||
throw new NotFoundException();
|
||||
}
|
||||
} else {
|
||||
OptionalAccess.verify(source.map(AuthenticatedDevice::getAccount), accessKey, maybeDestination,
|
||||
destinationIdentifier);
|
||||
}
|
||||
|
||||
final boolean needsSync = !isSyncMessage && source.isPresent() && source.get().getAccount().getDevices().size() > 1;
|
||||
|
||||
// We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify
|
||||
// we leak information about whether a destination UUID exists if we return any other code (e.g. 404) from
|
||||
// these requests.
|
||||
if (isStory && maybeDestination.isEmpty()) {
|
||||
return Response.ok(new SendMessageResponse(needsSync)).build();
|
||||
}
|
||||
|
||||
// if destination is empty we would either throw an exception in OptionalAccess.verify when isStory is false
|
||||
// or else return a 200 response when isStory is true.
|
||||
final Account destination = maybeDestination.orElseThrow();
|
||||
|
||||
if (source.isPresent() && !isSyncMessage) {
|
||||
rateLimiters.getMessagesLimiter().validate(source.get().getAccount().getUuid(), destination.getUuid());
|
||||
}
|
||||
|
||||
if (isStory) {
|
||||
rateLimiters.getStoriesLimiter().validate(destination.getUuid());
|
||||
}
|
||||
|
||||
final Map<Byte, Envelope> messagesByDeviceId = messages.messages().stream()
|
||||
.collect(Collectors.toMap(IncomingMessage::destinationDeviceId, message -> {
|
||||
try {
|
||||
return message.toEnvelope(
|
||||
destinationIdentifier,
|
||||
source.map(AuthenticatedDevice::getAccount).orElse(null),
|
||||
source.map(account -> account.getAuthenticatedDevice().getId()).orElse(null),
|
||||
messages.timestamp() == 0 ? System.currentTimeMillis() : messages.timestamp(),
|
||||
isStory,
|
||||
messages.online(),
|
||||
messages.urgent(),
|
||||
reportSpamToken.orElse(null));
|
||||
} catch (final IllegalArgumentException e) {
|
||||
logger.warn("Received bad envelope type {} from {}", message.type(), userAgent);
|
||||
throw new BadRequestException(e);
|
||||
}
|
||||
}));
|
||||
|
||||
final Map<Byte, Integer> registrationIdsByDeviceId = messages.messages().stream()
|
||||
.collect(Collectors.toMap(IncomingMessage::destinationDeviceId, IncomingMessage::destinationRegistrationId));
|
||||
|
||||
messageSender.sendMessages(destination, destinationIdentifier, messagesByDeviceId, registrationIdsByDeviceId, userAgent);
|
||||
|
||||
return Response.ok(new SendMessageResponse(needsSync)).build();
|
||||
} catch (final MismatchedDevicesException e) {
|
||||
if (!e.getMismatchedDevices().staleDeviceIds().isEmpty()) {
|
||||
throw new WebApplicationException(Response.status(410)
|
||||
.type(MediaType.APPLICATION_JSON)
|
||||
.entity(new StaleDevicesResponse(e.getMismatchedDevices().staleDeviceIds()))
|
||||
.build());
|
||||
} else {
|
||||
throw new WebApplicationException(Response.status(409)
|
||||
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||
.entity(new MismatchedDevicesResponse(e.getMismatchedDevices().missingDeviceIds(),
|
||||
e.getMismatchedDevices().extraDeviceIds()))
|
||||
.build());
|
||||
}
|
||||
} catch (final MessageTooLargeException e) {
|
||||
throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE);
|
||||
needsSync = false;
|
||||
sendSealedSenderMessage(destinationIdentifier, messages, accessKey, groupSendToken != null ? groupSendToken.token() : null, context);
|
||||
}
|
||||
} finally {
|
||||
sample.stop(INDIVIDUAL_MESSAGE_LATENCY_TIMER);
|
||||
}
|
||||
|
||||
return Response.ok(new SendMessageResponse(needsSync)).build();
|
||||
}
|
||||
|
||||
private void sendIdentifiedSenderIndividualMessage(final AuthenticatedDevice source,
|
||||
final ServiceIdentifier destinationIdentifier,
|
||||
final IncomingMessageList messages,
|
||||
final ContainerRequestContext context)
|
||||
throws RateLimitExceededException {
|
||||
|
||||
final Account destination =
|
||||
accountsManager.getByServiceIdentifier(destinationIdentifier).orElseThrow(NotFoundException::new);
|
||||
|
||||
rateLimiters.getMessagesLimiter().validate(source.getAccount().getUuid(), destination.getUuid());
|
||||
|
||||
sendIndividualMessage(destination,
|
||||
destinationIdentifier,
|
||||
source,
|
||||
messages,
|
||||
false,
|
||||
MessageType.INDIVIDUAL_IDENTIFIED_SENDER,
|
||||
context);
|
||||
}
|
||||
|
||||
private void sendSyncMessage(final AuthenticatedDevice source,
|
||||
final ServiceIdentifier destinationIdentifier,
|
||||
final IncomingMessageList messages,
|
||||
final ContainerRequestContext context)
|
||||
throws RateLimitExceededException {
|
||||
|
||||
if (destinationIdentifier.identityType() == IdentityType.PNI) {
|
||||
throw new WebApplicationException(Status.FORBIDDEN);
|
||||
}
|
||||
|
||||
sendIndividualMessage(source.getAccount(),
|
||||
destinationIdentifier,
|
||||
source,
|
||||
messages,
|
||||
false,
|
||||
MessageType.SYNC,
|
||||
context);
|
||||
}
|
||||
|
||||
private void sendSealedSenderMessage(final ServiceIdentifier destinationIdentifier,
|
||||
final IncomingMessageList messages,
|
||||
final Optional<Anonymous> accessKey,
|
||||
@Nullable final GroupSendFullToken groupSendToken,
|
||||
final ContainerRequestContext context)
|
||||
throws RateLimitExceededException {
|
||||
|
||||
if (accessKey.isEmpty() && groupSendToken == null) {
|
||||
throw new WebApplicationException(Status.UNAUTHORIZED);
|
||||
}
|
||||
|
||||
final Optional<Account> maybeDestination = accountsManager.getByServiceIdentifier(destinationIdentifier);
|
||||
|
||||
if (groupSendToken != null) {
|
||||
checkGroupSendToken(List.of(destinationIdentifier.toLibsignal()), groupSendToken);
|
||||
} else {
|
||||
OptionalAccess.verify(Optional.empty(), accessKey, maybeDestination, destinationIdentifier);
|
||||
}
|
||||
|
||||
final Account destination = maybeDestination.orElseThrow(NotFoundException::new);
|
||||
|
||||
sendIndividualMessage(destination,
|
||||
destinationIdentifier,
|
||||
null,
|
||||
messages,
|
||||
false,
|
||||
MessageType.INDIVIDUAL_SEALED_SENDER,
|
||||
context);
|
||||
}
|
||||
|
||||
private void sendStoryMessage(final ServiceIdentifier destinationIdentifier,
|
||||
final IncomingMessageList messages,
|
||||
final ContainerRequestContext context)
|
||||
throws RateLimitExceededException {
|
||||
|
||||
// We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify
|
||||
// authentication is handled by the receiving client, we leak information about whether a destination UUID exists if
|
||||
// we return any other code (e.g. 404) from these requests.
|
||||
final Account destination = accountsManager.getByServiceIdentifier(destinationIdentifier).orElseThrow(() ->
|
||||
new WebApplicationException(Response.ok(new SendMessageResponse(false)).build()));
|
||||
|
||||
rateLimiters.getStoriesLimiter().validate(destination.getUuid());
|
||||
|
||||
sendIndividualMessage(destination,
|
||||
destinationIdentifier,
|
||||
null,
|
||||
messages,
|
||||
true,
|
||||
MessageType.INDIVIDUAL_STORY,
|
||||
context);
|
||||
}
|
||||
|
||||
private void sendIndividualMessage(final Account destination,
|
||||
final ServiceIdentifier destinationIdentifier,
|
||||
@Nullable final AuthenticatedDevice sender,
|
||||
final IncomingMessageList messages,
|
||||
final boolean isStory,
|
||||
final MessageType messageType,
|
||||
final ContainerRequestContext context) throws RateLimitExceededException {
|
||||
|
||||
final SpamCheckResult<Response> spamCheckResult =
|
||||
spamChecker.checkForIndividualRecipientSpamHttp(messageType,
|
||||
context,
|
||||
Optional.ofNullable(sender),
|
||||
Optional.of(destination),
|
||||
destinationIdentifier);
|
||||
|
||||
spamCheckResult.response().ifPresent(response -> {
|
||||
throw new WebApplicationException(response);
|
||||
});
|
||||
|
||||
final String userAgent = context.getHeaderString(HttpHeaders.USER_AGENT);
|
||||
|
||||
try {
|
||||
final int totalContentLength =
|
||||
messages.messages().stream().mapToInt(message -> message.content().length).sum();
|
||||
|
||||
rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength);
|
||||
} catch (final RateLimitExceededException e) {
|
||||
messageByteLimitEstimator.add(destinationIdentifier.uuid().toString());
|
||||
throw e;
|
||||
}
|
||||
|
||||
final Map<Byte, Envelope> messagesByDeviceId = messages.messages().stream()
|
||||
.collect(Collectors.toMap(IncomingMessage::destinationDeviceId, message -> {
|
||||
try {
|
||||
return message.toEnvelope(
|
||||
destinationIdentifier,
|
||||
sender != null ? sender.getAccount() : null,
|
||||
sender != null ? sender.getAuthenticatedDevice().getId() : null,
|
||||
messages.timestamp() == 0 ? System.currentTimeMillis() : messages.timestamp(),
|
||||
isStory,
|
||||
messages.online(),
|
||||
messages.urgent(),
|
||||
spamCheckResult.token().orElse(null));
|
||||
} catch (final IllegalArgumentException e) {
|
||||
logger.warn("Received bad envelope type {} from {}", message.type(), userAgent);
|
||||
throw new BadRequestException(e);
|
||||
}
|
||||
}));
|
||||
|
||||
final Map<Byte, Integer> registrationIdsByDeviceId = messages.messages().stream()
|
||||
.collect(Collectors.toMap(IncomingMessage::destinationDeviceId, IncomingMessage::destinationRegistrationId));
|
||||
|
||||
try {
|
||||
messageSender.sendMessages(destination,
|
||||
destinationIdentifier,
|
||||
messagesByDeviceId,
|
||||
registrationIdsByDeviceId,
|
||||
userAgent);
|
||||
} catch (final MismatchedDevicesException e) {
|
||||
if (!e.getMismatchedDevices().staleDeviceIds().isEmpty()) {
|
||||
throw new WebApplicationException(Response.status(410)
|
||||
.type(MediaType.APPLICATION_JSON)
|
||||
.entity(new StaleDevicesResponse(e.getMismatchedDevices().staleDeviceIds()))
|
||||
.build());
|
||||
} else {
|
||||
throw new WebApplicationException(Response.status(409)
|
||||
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||
.entity(new MismatchedDevicesResponse(e.getMismatchedDevices().missingDeviceIds(),
|
||||
e.getMismatchedDevices().extraDeviceIds()))
|
||||
.build());
|
||||
}
|
||||
} catch (final MessageTooLargeException e) {
|
||||
throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE);
|
||||
}
|
||||
}
|
||||
|
||||
@Timed
|
||||
|
@ -439,8 +504,6 @@ public class MessageController {
|
|||
@HeaderParam(HeaderUtils.GROUP_SEND_TOKEN)
|
||||
@Nullable GroupSendTokenHeader groupSendToken,
|
||||
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) String userAgent,
|
||||
|
||||
@Parameter(description="If true, deliver the message only to recipients that are online when it is sent")
|
||||
@QueryParam("online") boolean online,
|
||||
|
||||
|
@ -455,7 +518,7 @@ public class MessageController {
|
|||
@Parameter(description="The sealed-sender multi-recipient message payload as serialized by libsignal")
|
||||
@NotNull SealedSenderMultiRecipientMessage multiRecipientMessage,
|
||||
|
||||
@Context ContainerRequestContext context) throws RateLimitExceededException {
|
||||
@Context ContainerRequestContext context) {
|
||||
|
||||
if (timestamp < 0 || timestamp > MAX_TIMESTAMP) {
|
||||
throw new BadRequestException("Illegal timestamp");
|
||||
|
@ -465,172 +528,207 @@ public class MessageController {
|
|||
throw new BadRequestException("Recipient list is empty");
|
||||
}
|
||||
|
||||
// Check that the request is well-formed and doesn't contain repeated entries for the same device for the same
|
||||
// recipient
|
||||
{
|
||||
final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID];
|
||||
|
||||
for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) {
|
||||
Arrays.fill(usedDeviceIds, false);
|
||||
|
||||
for (final byte deviceId : recipient.getDevices()) {
|
||||
if (usedDeviceIds[deviceId]) {
|
||||
throw new BadRequestException();
|
||||
}
|
||||
|
||||
usedDeviceIds[deviceId] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
try {
|
||||
final SpamCheckResult<Response> spamCheckResult = spamChecker.checkForMultiRecipientSpamHttp(
|
||||
isStory ? MessageType.MULTI_RECIPIENT_STORY : MessageType.MULTI_RECIPIENT_SEALED_SENDER,
|
||||
context);
|
||||
final SendMultiRecipientMessageResponse sendMultiRecipientMessageResponse;
|
||||
|
||||
if (spamCheckResult.response().isPresent()) {
|
||||
return spamCheckResult.response().get();
|
||||
}
|
||||
|
||||
if (groupSendToken == null && accessKeys == null && !isStory) {
|
||||
throw new NotAuthorizedException("A group send endorsement token or unidentified access key is required for non-story messages");
|
||||
}
|
||||
if (groupSendToken != null) {
|
||||
if (accessKeys != null) {
|
||||
throw new BadRequestException("Only one of group send endorsement token and unidentified access key may be provided");
|
||||
} else if (isStory) {
|
||||
throw new BadRequestException("Stories should not provide a group send endorsement token");
|
||||
}
|
||||
}
|
||||
|
||||
if (groupSendToken != null) {
|
||||
// Group send endorsements are checked before we even attempt to resolve any accounts, since
|
||||
// the lists of service IDs in the envelope are all that we need to check against
|
||||
checkGroupSendToken(multiRecipientMessage.getRecipients().keySet(), groupSendToken);
|
||||
}
|
||||
|
||||
// At this point, the caller has at least superficially provided the information needed to send a multi-recipient
|
||||
// message. Attempt to resolve the destination service identifiers to Signal accounts.
|
||||
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients =
|
||||
Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet())
|
||||
.flatMap(serviceIdAndRecipient -> {
|
||||
final ServiceIdentifier serviceIdentifier =
|
||||
ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey());
|
||||
|
||||
return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier))
|
||||
.flatMap(Mono::justOrEmpty)
|
||||
.switchIfEmpty(isStory || groupSendToken != null ? Mono.empty() : Mono.error(NotFoundException::new))
|
||||
.map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account));
|
||||
}, MAX_FETCH_ACCOUNT_CONCURRENCY)
|
||||
.collectMap(Tuple2::getT1, Tuple2::getT2)
|
||||
.blockOptional()
|
||||
.orElse(Collections.emptyMap());
|
||||
|
||||
// Access keys are checked against the UAK in the resolved accounts, so we have to check after resolving accounts above.
|
||||
// Group send endorsements are checked earlier; for stories, we don't check permissions at all because only clients check them
|
||||
if (groupSendToken == null && !isStory) {
|
||||
checkAccessKeys(accessKeys, multiRecipientMessage, resolvedRecipients);
|
||||
}
|
||||
|
||||
// We might filter out all the recipients of a story (if none exist).
|
||||
// In this case there is no error so we should just return 200 now.
|
||||
if (isStory) {
|
||||
if (resolvedRecipients.isEmpty()) {
|
||||
return Response.ok(new SendMultiRecipientMessageResponse(List.of())).build();
|
||||
}
|
||||
|
||||
try {
|
||||
CompletableFuture.allOf(resolvedRecipients.values()
|
||||
.stream()
|
||||
.map(account -> account.getIdentifier(IdentityType.ACI))
|
||||
.map(accountIdentifier ->
|
||||
rateLimiters.getStoriesLimiter().validateAsync(accountIdentifier).toCompletableFuture())
|
||||
.toList()
|
||||
.toArray(EMPTY_FUTURE_ARRAY))
|
||||
.join();
|
||||
} catch (final Exception e) {
|
||||
if (ExceptionUtils.unwrap(e) instanceof RateLimitExceededException rateLimitExceededException) {
|
||||
throw rateLimitExceededException;
|
||||
} else {
|
||||
throw ExceptionUtils.wrap(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
if (!resolvedRecipients.isEmpty()) {
|
||||
messageSender.sendMultiRecipientMessage(multiRecipientMessage, resolvedRecipients, timestamp, isStory, online, isUrgent, userAgent).get();
|
||||
}
|
||||
|
||||
final List<ServiceIdentifier> unresolvedRecipientServiceIds;
|
||||
if (groupSendToken != null) {
|
||||
unresolvedRecipientServiceIds = multiRecipientMessage.getRecipients().entrySet().stream()
|
||||
.filter(entry -> !resolvedRecipients.containsKey(entry.getValue()))
|
||||
.map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey()))
|
||||
.toList();
|
||||
} else {
|
||||
unresolvedRecipientServiceIds = List.of();
|
||||
// Stories require no authentication. We fail requests that provide a groupSendToken, but for historical
|
||||
// reasons we allow requests to set a combined access key, even though we ignore it
|
||||
throw new BadRequestException("Group send token not allowed when sending stories");
|
||||
}
|
||||
|
||||
return Response.ok(new SendMultiRecipientMessageResponse(unresolvedRecipientServiceIds)).build();
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("interrupted while delivering multi-recipient messages", e);
|
||||
throw new InternalServerErrorException("interrupted during delivery");
|
||||
} catch (CancellationException e) {
|
||||
logger.error("cancelled while delivering multi-recipient messages", e);
|
||||
throw new InternalServerErrorException("delivery cancelled");
|
||||
} catch (ExecutionException e) {
|
||||
logger.error("partial failure while delivering multi-recipient messages", e.getCause());
|
||||
throw new InternalServerErrorException("failure during delivery");
|
||||
} catch (MultiRecipientMismatchedDevicesException e) {
|
||||
final List<AccountMismatchedDevices> accountMismatchedDevices =
|
||||
e.getMismatchedDevicesByServiceIdentifier().entrySet().stream()
|
||||
.filter(entry -> !entry.getValue().missingDeviceIds().isEmpty() || !entry.getValue().extraDeviceIds().isEmpty())
|
||||
.map(entry -> new AccountMismatchedDevices(entry.getKey(),
|
||||
new MismatchedDevicesResponse(entry.getValue().missingDeviceIds(), entry.getValue().extraDeviceIds())))
|
||||
.toList();
|
||||
|
||||
if (!accountMismatchedDevices.isEmpty()) {
|
||||
return Response
|
||||
.status(409)
|
||||
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||
.entity(accountMismatchedDevices)
|
||||
.build();
|
||||
}
|
||||
|
||||
final List<AccountStaleDevices> accountStaleDevices =
|
||||
e.getMismatchedDevicesByServiceIdentifier().entrySet().stream()
|
||||
.filter(entry -> !entry.getValue().staleDeviceIds().isEmpty())
|
||||
.map(entry -> new AccountStaleDevices(entry.getKey(),
|
||||
new StaleDevicesResponse(entry.getValue().staleDeviceIds())))
|
||||
.toList();
|
||||
|
||||
if (!accountStaleDevices.isEmpty()) {
|
||||
return Response
|
||||
.status(410)
|
||||
.type(MediaType.APPLICATION_JSON)
|
||||
.entity(accountStaleDevices)
|
||||
.build();
|
||||
}
|
||||
|
||||
throw new RuntimeException(e);
|
||||
} catch (final MessageTooLargeException e) {
|
||||
throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE);
|
||||
sendMultiRecipientMessageResponse =
|
||||
sendMultiRecipientStoryMessage(multiRecipientMessage, timestamp, online, isUrgent, context);
|
||||
} else {
|
||||
sendMultiRecipientMessageResponse =
|
||||
sendMultiRecipientMessage(multiRecipientMessage, timestamp, online, isUrgent, groupSendToken, accessKeys,
|
||||
context);
|
||||
}
|
||||
|
||||
return Response.ok(sendMultiRecipientMessageResponse).build();
|
||||
} finally {
|
||||
sample.stop(MULTI_RECIPIENT_MESSAGE_LATENCY_TIMER);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkGroupSendToken(
|
||||
final Collection<ServiceId> recipients,
|
||||
final @NotNull GroupSendTokenHeader groupSendToken) {
|
||||
private SendMultiRecipientMessageResponse sendMultiRecipientMessage(final SealedSenderMultiRecipientMessage multiRecipientMessage,
|
||||
final long timestamp,
|
||||
final boolean ephemeral,
|
||||
final boolean urgent,
|
||||
@Nullable final GroupSendTokenHeader groupSendTokenHeader,
|
||||
@Nullable final CombinedUnidentifiedSenderAccessKeys combinedUnidentifiedSenderAccessKeys,
|
||||
final ContainerRequestContext context) {
|
||||
|
||||
// Perform fast, inexpensive checks before attempting to resolve recipients
|
||||
validateNoDuplicateDevices(multiRecipientMessage);
|
||||
|
||||
if (groupSendTokenHeader == null && combinedUnidentifiedSenderAccessKeys == null) {
|
||||
throw new NotAuthorizedException("A group send endorsement token or unidentified access key is required for non-story messages");
|
||||
}
|
||||
|
||||
if (groupSendTokenHeader != null && combinedUnidentifiedSenderAccessKeys != null) {
|
||||
throw new BadRequestException("Only one of group send endorsement token and unidentified access key may be provided");
|
||||
}
|
||||
|
||||
if (groupSendTokenHeader != null) {
|
||||
// Group send endorsements are checked before we even attempt to resolve any accounts, since
|
||||
// the lists of service IDs in the envelope are all that we need to check against
|
||||
checkGroupSendToken(multiRecipientMessage.getRecipients().keySet(), groupSendTokenHeader);
|
||||
}
|
||||
|
||||
// At this point, the caller has at least superficially provided the information needed to send a multi-recipient
|
||||
// message. Attempt to resolve the destination service identifiers to Signal accounts.
|
||||
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients =
|
||||
resolveRecipients(multiRecipientMessage, groupSendTokenHeader == null);
|
||||
|
||||
// Access keys are checked against the UAK in the resolved accounts, so we have to check after resolving accounts above.
|
||||
// Group send endorsements are checked earlier; for stories, we don't check permissions at all because only clients check them
|
||||
if (groupSendTokenHeader == null) {
|
||||
checkAccessKeys(combinedUnidentifiedSenderAccessKeys, multiRecipientMessage, resolvedRecipients);
|
||||
}
|
||||
|
||||
sendMultiRecipientMessage(multiRecipientMessage,
|
||||
resolvedRecipients,
|
||||
timestamp,
|
||||
false,
|
||||
ephemeral,
|
||||
urgent,
|
||||
context);
|
||||
|
||||
final List<ServiceIdentifier> unresolvedRecipientServiceIdentifiers;
|
||||
|
||||
if (groupSendTokenHeader != null) {
|
||||
unresolvedRecipientServiceIdentifiers = multiRecipientMessage.getRecipients().entrySet().stream()
|
||||
.filter(entry -> !resolvedRecipients.containsKey(entry.getValue()))
|
||||
.map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey()))
|
||||
.toList();
|
||||
} else {
|
||||
unresolvedRecipientServiceIdentifiers = List.of();
|
||||
}
|
||||
|
||||
return new SendMultiRecipientMessageResponse(unresolvedRecipientServiceIdentifiers);
|
||||
}
|
||||
|
||||
@SuppressWarnings("SameReturnValue")
|
||||
private SendMultiRecipientMessageResponse sendMultiRecipientStoryMessage(final SealedSenderMultiRecipientMessage multiRecipientMessage,
|
||||
final long timestamp,
|
||||
final boolean ephemeral,
|
||||
final boolean urgent,
|
||||
final ContainerRequestContext context) {
|
||||
|
||||
// Perform fast, inexpensive checks before attempting to resolve recipients
|
||||
validateNoDuplicateDevices(multiRecipientMessage);
|
||||
|
||||
// At this point, the caller has at least superficially provided the information needed to send a multi-recipient
|
||||
// message. Attempt to resolve the destination service identifiers to Signal accounts.
|
||||
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients =
|
||||
resolveRecipients(multiRecipientMessage, false);
|
||||
|
||||
// We might filter out all the recipients of a story (if none exist).
|
||||
// In this case there is no error so we should just return 200 now.
|
||||
if (resolvedRecipients.isEmpty()) {
|
||||
return SEND_STORY_RESPONSE;
|
||||
}
|
||||
|
||||
CompletableFuture.allOf(resolvedRecipients.values()
|
||||
.stream()
|
||||
.map(account -> account.getIdentifier(IdentityType.ACI))
|
||||
.map(accountIdentifier ->
|
||||
rateLimiters.getStoriesLimiter().validateAsync(accountIdentifier).toCompletableFuture())
|
||||
.toList()
|
||||
.toArray(EMPTY_FUTURE_ARRAY))
|
||||
.join();
|
||||
|
||||
sendMultiRecipientMessage(multiRecipientMessage,
|
||||
resolvedRecipients,
|
||||
timestamp,
|
||||
true,
|
||||
ephemeral,
|
||||
urgent,
|
||||
context);
|
||||
|
||||
return SEND_STORY_RESPONSE;
|
||||
}
|
||||
|
||||
private void sendMultiRecipientMessage(final SealedSenderMultiRecipientMessage multiRecipientMessage,
|
||||
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients,
|
||||
final long timestamp,
|
||||
final boolean isStory,
|
||||
final boolean ephemeral,
|
||||
final boolean urgent,
|
||||
final ContainerRequestContext context) {
|
||||
|
||||
final MessageType messageType =
|
||||
isStory ? MessageType.MULTI_RECIPIENT_STORY : MessageType.MULTI_RECIPIENT_SEALED_SENDER;
|
||||
|
||||
spamChecker.checkForMultiRecipientSpamHttp(messageType, context).response().ifPresent(response -> {
|
||||
throw new WebApplicationException(response);
|
||||
});
|
||||
|
||||
try {
|
||||
final GroupSendFullToken token = groupSendToken.token();
|
||||
token.verify(recipients, clock.instant(), GroupSendDerivedKeyPair.forExpiration(token.getExpiration(), serverSecretParams));
|
||||
} catch (VerificationFailedException e) {
|
||||
if (!resolvedRecipients.isEmpty()) {
|
||||
messageSender.sendMultiRecipientMessage(multiRecipientMessage,
|
||||
resolvedRecipients,
|
||||
timestamp, isStory,
|
||||
ephemeral,
|
||||
urgent,
|
||||
context.getHeaderString(HttpHeaders.USER_AGENT)).get();
|
||||
}
|
||||
} catch (final InterruptedException e) {
|
||||
logger.error("interrupted while delivering multi-recipient messages", e);
|
||||
throw new InternalServerErrorException("interrupted during delivery");
|
||||
} catch (final CancellationException e) {
|
||||
logger.error("cancelled while delivering multi-recipient messages", e);
|
||||
throw new InternalServerErrorException("delivery cancelled");
|
||||
} catch (final ExecutionException e) {
|
||||
logger.error("partial failure while delivering multi-recipient messages", e.getCause());
|
||||
throw new InternalServerErrorException("failure during delivery");
|
||||
} catch (final MessageTooLargeException e) {
|
||||
throw new WebApplicationException(Status.REQUEST_ENTITY_TOO_LARGE);
|
||||
} catch (final MultiRecipientMismatchedDevicesException e) {
|
||||
final List<AccountMismatchedDevices> accountMismatchedDevices =
|
||||
e.getMismatchedDevicesByServiceIdentifier().entrySet().stream()
|
||||
.filter(entry -> !entry.getValue().missingDeviceIds().isEmpty() || !entry.getValue().extraDeviceIds().isEmpty())
|
||||
.map(entry -> new AccountMismatchedDevices(entry.getKey(),
|
||||
new MismatchedDevicesResponse(entry.getValue().missingDeviceIds(), entry.getValue().extraDeviceIds())))
|
||||
.toList();
|
||||
|
||||
if (!accountMismatchedDevices.isEmpty()) {
|
||||
throw new WebApplicationException(Response
|
||||
.status(409)
|
||||
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||
.entity(accountMismatchedDevices)
|
||||
.build());
|
||||
}
|
||||
|
||||
final List<AccountStaleDevices> accountStaleDevices =
|
||||
e.getMismatchedDevicesByServiceIdentifier().entrySet().stream()
|
||||
.filter(entry -> !entry.getValue().staleDeviceIds().isEmpty())
|
||||
.map(entry -> new AccountStaleDevices(entry.getKey(),
|
||||
new StaleDevicesResponse(entry.getValue().staleDeviceIds())))
|
||||
.toList();
|
||||
|
||||
throw new WebApplicationException(Response
|
||||
.status(410)
|
||||
.type(MediaType.APPLICATION_JSON)
|
||||
.entity(accountStaleDevices)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
private void checkGroupSendToken(final Collection<ServiceId> recipients, final GroupSendTokenHeader groupSendToken) {
|
||||
checkGroupSendToken(recipients, groupSendToken.token());
|
||||
}
|
||||
|
||||
private void checkGroupSendToken(final Collection<ServiceId> recipients, final GroupSendFullToken groupSendFullToken) {
|
||||
try {
|
||||
groupSendFullToken.verify(recipients,
|
||||
clock.instant(),
|
||||
GroupSendDerivedKeyPair.forExpiration(groupSendFullToken.getExpiration(), serverSecretParams));
|
||||
} catch (final VerificationFailedException e) {
|
||||
throw new NotAuthorizedException(e);
|
||||
}
|
||||
}
|
||||
|
@ -811,4 +909,43 @@ public class MessageController {
|
|||
return Response.status(Status.ACCEPTED)
|
||||
.build();
|
||||
}
|
||||
|
||||
private static void validateNoDuplicateDevices(final SealedSenderMultiRecipientMessage multiRecipientMessage) {
|
||||
final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID + 1];
|
||||
|
||||
for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) {
|
||||
if (recipient.getDevices().length == 1) {
|
||||
// A recipient can't have repeated devices if they only have one device
|
||||
continue;
|
||||
}
|
||||
|
||||
Arrays.fill(usedDeviceIds, false);
|
||||
|
||||
for (final byte deviceId : recipient.getDevices()) {
|
||||
if (usedDeviceIds[deviceId]) {
|
||||
throw new BadRequestException();
|
||||
}
|
||||
|
||||
usedDeviceIds[deviceId] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolveRecipients(final SealedSenderMultiRecipientMessage multiRecipientMessage,
|
||||
final boolean throwOnNotFound) {
|
||||
|
||||
return Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet())
|
||||
.flatMap(serviceIdAndRecipient -> {
|
||||
final ServiceIdentifier serviceIdentifier =
|
||||
ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey());
|
||||
|
||||
return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier))
|
||||
.flatMap(Mono::justOrEmpty)
|
||||
.switchIfEmpty(throwOnNotFound ? Mono.error(NotFoundException::new) : Mono.empty())
|
||||
.map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account));
|
||||
}, MAX_FETCH_ACCOUNT_CONCURRENCY)
|
||||
.collectMap(Tuple2::getT1, Tuple2::getT2)
|
||||
.blockOptional()
|
||||
.orElse(Collections.emptyMap());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,4 +8,10 @@ package org.whispersystems.textsecuregcm.controllers;
|
|||
import java.util.Set;
|
||||
|
||||
public record MismatchedDevices(Set<Byte> missingDeviceIds, Set<Byte> extraDeviceIds, Set<Byte> staleDeviceIds) {
|
||||
|
||||
public MismatchedDevices {
|
||||
if (missingDeviceIds.isEmpty() && extraDeviceIds.isEmpty() && staleDeviceIds.isEmpty()) {
|
||||
throw new IllegalArgumentException("At least one of missingDevices, extraDevices, or staleDevices must be non-empty");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,10 @@ public class MultiRecipientMismatchedDevicesException extends Exception {
|
|||
public MultiRecipientMismatchedDevicesException(
|
||||
final Map<ServiceIdentifier, MismatchedDevices> mismatchedDevicesByServiceIdentifier) {
|
||||
|
||||
if (mismatchedDevicesByServiceIdentifier.isEmpty()) {
|
||||
throw new IllegalArgumentException("Must provide non-empty map of service identifiers to mismatched devices");
|
||||
}
|
||||
|
||||
this.mismatchedDevicesByServiceIdentifier = mismatchedDevicesByServiceIdentifier;
|
||||
}
|
||||
|
||||
|
|
|
@ -92,6 +92,7 @@ import org.whispersystems.textsecuregcm.limits.CardinalityEstimator;
|
|||
import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor;
|
||||
import org.whispersystems.textsecuregcm.limits.RateLimiter;
|
||||
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
||||
import org.whispersystems.textsecuregcm.mappers.CompletionExceptionMapper;
|
||||
import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper;
|
||||
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
|
||||
import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider;
|
||||
|
@ -187,6 +188,7 @@ class MessageControllerTest {
|
|||
.addProvider(AuthHelper.getAuthFilter())
|
||||
.addProvider(new AuthValueFactoryProvider.Binder<>(AuthenticatedDevice.class))
|
||||
.addProvider(RateLimitExceededExceptionMapper.class)
|
||||
.addProvider(CompletionExceptionMapper.class)
|
||||
.addProvider(MultiRecipientMessageProvider.class)
|
||||
.setTestContainerFactory(new GrizzlyWebTestContainerFactory())
|
||||
.addResource(
|
||||
|
|
Loading…
Reference in New Issue