Extract common message-sending methods into a shared utility class

This commit is contained in:
Jon Chambers 2025-04-08 17:39:45 -04:00 committed by GitHub
parent caa81b4885
commit 8d8a2a5583
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 150 additions and 115 deletions

View File

@ -43,7 +43,6 @@ import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import java.time.Clock;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -96,6 +95,7 @@ import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider;
import org.whispersystems.textsecuregcm.push.MessageSender;
import org.whispersystems.textsecuregcm.push.MessageTooLargeException;
import org.whispersystems.textsecuregcm.push.MessageUtil;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.PushNotificationScheduler;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
@ -105,7 +105,6 @@ import org.whispersystems.textsecuregcm.spam.SpamChecker;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
@ -114,11 +113,7 @@ import org.whispersystems.textsecuregcm.util.Util;
import org.whispersystems.textsecuregcm.websocket.WebSocketConnection;
import org.whispersystems.websocket.WebsocketHeaders;
import org.whispersystems.websocket.auth.ReadOnly;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@Path("/v1/messages")
@ -145,8 +140,6 @@ public class MessageController {
private final MessageDeliveryLoopMonitor messageDeliveryLoopMonitor;
private final Clock clock;
private static final int MAX_FETCH_ACCOUNT_CONCURRENCY = 8;
private static final CompletableFuture<?>[] EMPTY_FUTURE_ARRAY = new CompletableFuture<?>[0];
private static final String OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME = name(MessageController.class, "outgoingMessageListSizeBytes");
@ -563,7 +556,9 @@ public class MessageController {
final ContainerRequestContext context) {
// Perform fast, inexpensive checks before attempting to resolve recipients
validateNoDuplicateDevices(multiRecipientMessage);
if (MessageUtil.hasDuplicateDevices(multiRecipientMessage)) {
throw new BadRequestException("Multi-recipient message contains duplicate recipient");
}
if (groupSendTokenHeader == null && combinedUnidentifiedSenderAccessKeys == null) {
throw new NotAuthorizedException("A group send endorsement token or unidentified access key is required for non-story messages");
@ -582,7 +577,14 @@ public class MessageController {
// At this point, the caller has at least superficially provided the information needed to send a multi-recipient
// message. Attempt to resolve the destination service identifiers to Signal accounts.
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients =
resolveRecipients(multiRecipientMessage, groupSendTokenHeader == null);
MessageUtil.resolveRecipients(accountsManager, multiRecipientMessage);
final List<ServiceIdentifier> unresolvedRecipientServiceIdentifiers =
MessageUtil.getUnresolvedRecipients(multiRecipientMessage, resolvedRecipients);
if (groupSendTokenHeader == null && !unresolvedRecipientServiceIdentifiers.isEmpty()) {
throw new NotFoundException();
}
// Access keys are checked against the UAK in the resolved accounts, so we have to check after resolving accounts above.
// Group send endorsements are checked earlier; for stories, we don't check permissions at all because only clients check them
@ -598,17 +600,6 @@ public class MessageController {
urgent,
context);
final List<ServiceIdentifier> unresolvedRecipientServiceIdentifiers;
if (groupSendTokenHeader != null) {
unresolvedRecipientServiceIdentifiers = multiRecipientMessage.getRecipients().entrySet().stream()
.filter(entry -> !resolvedRecipients.containsKey(entry.getValue()))
.map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey()))
.toList();
} else {
unresolvedRecipientServiceIdentifiers = List.of();
}
return new SendMultiRecipientMessageResponse(unresolvedRecipientServiceIdentifiers);
}
@ -620,12 +611,14 @@ public class MessageController {
final ContainerRequestContext context) {
// Perform fast, inexpensive checks before attempting to resolve recipients
validateNoDuplicateDevices(multiRecipientMessage);
if (MessageUtil.hasDuplicateDevices(multiRecipientMessage)) {
throw new BadRequestException("Multi-recipient message contains duplicate recipient");
}
// At this point, the caller has at least superficially provided the information needed to send a multi-recipient
// message. Attempt to resolve the destination service identifiers to Signal accounts.
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients =
resolveRecipients(multiRecipientMessage, false);
MessageUtil.resolveRecipients(accountsManager, multiRecipientMessage);
// We might filter out all the recipients of a story (if none exist).
// In this case there is no error so we should just return 200 now.
@ -909,43 +902,4 @@ public class MessageController {
return Response.status(Status.ACCEPTED)
.build();
}
private static void validateNoDuplicateDevices(final SealedSenderMultiRecipientMessage multiRecipientMessage) {
final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID + 1];
for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) {
if (recipient.getDevices().length == 1) {
// A recipient can't have repeated devices if they only have one device
continue;
}
Arrays.fill(usedDeviceIds, false);
for (final byte deviceId : recipient.getDevices()) {
if (usedDeviceIds[deviceId]) {
throw new BadRequestException();
}
usedDeviceIds[deviceId] = true;
}
}
}
private Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolveRecipients(final SealedSenderMultiRecipientMessage multiRecipientMessage,
final boolean throwOnNotFound) {
return Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet())
.flatMap(serviceIdAndRecipient -> {
final ServiceIdentifier serviceIdentifier =
ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey());
return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier))
.flatMap(Mono::justOrEmpty)
.switchIfEmpty(throwOnNotFound ? Mono.error(NotFoundException::new) : Mono.empty())
.map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account));
}, MAX_FETCH_ACCOUNT_CONCURRENCY)
.collectMap(Tuple2::getT1, Tuple2::getT2)
.blockOptional()
.orElse(Collections.emptyMap());
}
}

View File

@ -8,8 +8,6 @@ package org.whispersystems.textsecuregcm.grpc;
import io.grpc.Status;
import io.grpc.StatusException;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@ -37,17 +35,13 @@ import org.whispersystems.textsecuregcm.limits.CardinalityEstimator;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.push.MessageSender;
import org.whispersystems.textsecuregcm.push.MessageTooLargeException;
import org.whispersystems.textsecuregcm.push.MessageUtil;
import org.whispersystems.textsecuregcm.spam.GrpcResponse;
import org.whispersystems.textsecuregcm.spam.MessageType;
import org.whispersystems.textsecuregcm.spam.SpamCheckResult;
import org.whispersystems.textsecuregcm.spam.SpamChecker;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.MessagesAnonymousImplBase {
@ -61,8 +55,6 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me
private static final SendMessageResponse SEND_MESSAGE_SUCCESS_RESPONSE = SendMessageResponse.newBuilder().build();
private static final int MAX_FETCH_ACCOUNT_CONCURRENCY = 8;
public MessagesAnonymousGrpcService(final AccountsManager accountsManager,
final RateLimiters rateLimiters,
final MessageSender messageSender,
@ -256,7 +248,7 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me
// At this point, the caller has at least superficially provided the information needed to send a multi-recipient
// message. Attempt to resolve the destination service identifiers to Signal accounts.
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients =
resolveRecipients(multiRecipientMessage);
MessageUtil.resolveRecipients(accountsManager, multiRecipientMessage);
try {
messageSender.sendMultiRecipientMessage(multiRecipientMessage,
@ -269,10 +261,7 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me
final SendMultiRecipientMessageResponse.Builder responseBuilder = SendMultiRecipientMessageResponse.newBuilder();
multiRecipientMessage.getRecipients().entrySet()
.stream()
.filter(entry -> !resolvedRecipients.containsKey(entry.getValue()))
.map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey()))
MessageUtil.getUnresolvedRecipients(multiRecipientMessage, resolvedRecipients).stream()
.map(ServiceIdentifierUtil::toGrpcServiceIdentifier)
.forEach(responseBuilder::addUnresolvedRecipients);
@ -308,49 +297,13 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me
// Check that the request is well-formed and doesn't contain repeated entries for the same device for the same
// recipient
validateNoDuplicateDevices(multiRecipientMessage);
if (MessageUtil.hasDuplicateDevices(multiRecipientMessage)) {
throw Status.INVALID_ARGUMENT.withDescription("Multi-recipient message contains duplicate recipient").asException();
}
return multiRecipientMessage;
}
private void validateNoDuplicateDevices(final SealedSenderMultiRecipientMessage multiRecipientMessage)
throws StatusException {
final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID + 1];
for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) {
if (recipient.getDevices().length == 1) {
// A recipient can't have repeated devices if they only have one device
continue;
}
Arrays.fill(usedDeviceIds, false);
for (final byte deviceId : recipient.getDevices()) {
if (usedDeviceIds[deviceId]) {
throw Status.INVALID_ARGUMENT.withDescription("Request contains repeated device entries").asException();
}
usedDeviceIds[deviceId] = true;
}
}
}
private Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolveRecipients(final SealedSenderMultiRecipientMessage multiRecipientMessage) {
return Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet())
.flatMap(serviceIdAndRecipient -> {
final ServiceIdentifier serviceIdentifier =
ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey());
return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier))
.flatMap(Mono::justOrEmpty)
.map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account));
}, MAX_FETCH_ACCOUNT_CONCURRENCY)
.collectMap(Tuple2::getT1, Tuple2::getT2)
.blockOptional()
.orElse(Collections.emptyMap());
}
private MismatchedDevices buildMismatchedDevices(final ServiceIdentifier serviceIdentifier,
org.whispersystems.textsecuregcm.controllers.MismatchedDevices mismatchedDevices) {

View File

@ -0,0 +1,128 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.push;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class MessageUtil {
public static final int DEFAULT_MAX_FETCH_ACCOUNT_CONCURRENCY = 8;
private MessageUtil() {
}
/**
* Finds account records for all recipients named in the given multi-recipient manager. Note that the returned map
* of recipients to account records will omit entries for recipients that could not be resolved to active accounts;
* callers that require full resolution should check for a missing entries and take appropriate action.
*
* @param accountsManager the {@code AccountsManager} instance to use to find account records
* @param multiRecipientMessage the message for which to resolve recipients
*
* @return a map of recipients to account records
*
* @see #getUnresolvedRecipients(SealedSenderMultiRecipientMessage, Map)
*/
public static Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolveRecipients(
final AccountsManager accountsManager,
final SealedSenderMultiRecipientMessage multiRecipientMessage) {
return resolveRecipients(accountsManager, multiRecipientMessage, DEFAULT_MAX_FETCH_ACCOUNT_CONCURRENCY);
}
/**
* Finds account records for all recipients named in the given multi-recipient manager. Note that the returned map
* of recipients to account records will omit entries for recipients that could not be resolved to active accounts;
* callers that require full resolution should check for a missing entries and take appropriate action.
*
* @param accountsManager the {@code AccountsManager} instance to use to find account records
* @param multiRecipientMessage the message for which to resolve recipients
* @param maxFetchAccountConcurrency the maximum number of concurrent account-retrieval operations
*
* @return a map of recipients to account records
*
* @see #getUnresolvedRecipients(SealedSenderMultiRecipientMessage, Map)
*/
public static Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolveRecipients(
final AccountsManager accountsManager,
final SealedSenderMultiRecipientMessage multiRecipientMessage,
final int maxFetchAccountConcurrency) {
return Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet())
.flatMap(serviceIdAndRecipient -> {
final ServiceIdentifier serviceIdentifier =
ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey());
return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier))
.flatMap(Mono::justOrEmpty)
.map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account));
}, maxFetchAccountConcurrency)
.collectMap(Tuple2::getT1, Tuple2::getT2)
.blockOptional()
.orElse(Collections.emptyMap());
}
/**
* Returns a list of recipients missing from the map of resolved recipients for a multi-recipient message.
*
* @param multiRecipientMessage the multi-recipient message
* @param resolvedRecipients the map of resolved recipients to check for missing entries
*
* @return a list of {@code ServiceIdentifiers} belonging to multi-recipient message recipients that are not present
* in the given map of {@code resolvedRecipients}
*/
public static List<ServiceIdentifier> getUnresolvedRecipients(
final SealedSenderMultiRecipientMessage multiRecipientMessage,
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients) {
return multiRecipientMessage.getRecipients().entrySet().stream()
.filter(entry -> !resolvedRecipients.containsKey(entry.getValue()))
.map(entry -> ServiceIdentifier.fromLibsignal(entry.getKey()))
.toList();
}
/**
* Checks if a multi-recipient message contains duplicate recipients.
*
* @param multiRecipientMessage the message to check for duplicate recipients
*
* @return {@code true} if the message contains duplicate recipients or {@code false} otherwise
*/
public static boolean hasDuplicateDevices(final SealedSenderMultiRecipientMessage multiRecipientMessage) {
final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID + 1];
for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) {
if (recipient.getDevices().length == 1) {
// A recipient can't have repeated devices if they only have one device
continue;
}
Arrays.fill(usedDeviceIds, false);
for (final byte deviceId : recipient.getDevices()) {
if (usedDeviceIds[deviceId]) {
return true;
}
usedDeviceIds[deviceId] = true;
}
}
return false;
}
}