Execute send multi-recipient message loop in parallel

This commit is contained in:
Ehren Kret 2021-08-12 10:25:09 -05:00
parent de59aa099d
commit f7f870fe62
1 changed files with 10 additions and 9 deletions

View File

@ -17,6 +17,7 @@ import com.google.protobuf.ByteString;
import io.dropwizard.auth.Auth; import io.dropwizard.auth.Auth;
import io.dropwizard.util.DataSize; import io.dropwizard.util.DataSize;
import io.lettuce.core.ScriptOutputType; import io.lettuce.core.ScriptOutputType;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Tags;
@ -361,7 +362,7 @@ public class MessageController {
Map<UUID, Account> uuidToAccountMap = Arrays.stream(multiRecipientMessage.getRecipients()) Map<UUID, Account> uuidToAccountMap = Arrays.stream(multiRecipientMessage.getRecipients())
.map(Recipient::getUuid) .map(Recipient::getUuid)
.distinct() .distinct()
.collect(Collectors.toMap(Function.identity(), uuid -> { .collect(Collectors.toUnmodifiableMap(Function.identity(), uuid -> {
Optional<Account> account = accountsManager.get(uuid); Optional<Account> account = accountsManager.get(uuid);
if (account.isEmpty()) { if (account.isEmpty()) {
throw new WebApplicationException(Status.NOT_FOUND); throw new WebApplicationException(Status.NOT_FOUND);
@ -417,21 +418,21 @@ public class MessageController {
UserAgentTagUtil.getPlatformTag(userAgent), UserAgentTagUtil.getPlatformTag(userAgent),
Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(online)), Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(online)),
Tag.of(SENDER_TYPE_TAG_NAME, "unidentified")); Tag.of(SENDER_TYPE_TAG_NAME, "unidentified"));
List<UUID> uuids404 = new ArrayList<>(); List<UUID> uuids404 = Collections.synchronizedList(new ArrayList<>());
for (Recipient recipient : multiRecipientMessage.getRecipients()) { final Counter counter = Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags);
Arrays.stream(multiRecipientMessage.getRecipients()).parallel().forEach(recipient -> {
Account destinationAccount = uuidToAccountMap.get(recipient.getUuid()); Account destinationAccount = uuidToAccountMap.get(recipient.getUuid());
// we asserted this must be true in validateCompleteDeviceList
//noinspection OptionalGetWithoutIsPresent // we asserted this must exist in validateCompleteDeviceList
Device destinationDevice = destinationAccount.getDevice(recipient.getDeviceId()).get(); Device destinationDevice = destinationAccount.getDevice(recipient.getDeviceId()).orElseThrow();
Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment(); counter.increment();
try { try {
sendMessage(destinationAccount, destinationDevice, timestamp, online, recipient, sendMessage(destinationAccount, destinationDevice, timestamp, online, recipient,
multiRecipientMessage.getCommonPayload()); multiRecipientMessage.getCommonPayload());
} catch (NoSuchUserException e) { } catch (NoSuchUserException e) {
uuids404.add(destinationAccount.getUuid()); uuids404.add(destinationAccount.getUuid());
} }
} });
return Response.ok(new SendMultiRecipientMessageResponse(uuids404)).build(); return Response.ok(new SendMultiRecipientMessageResponse(uuids404)).build();
} }