diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ReceiptSender.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ReceiptSender.java index 01684b624..e34cf9498 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ReceiptSender.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ReceiptSender.java @@ -8,14 +8,11 @@ package org.whispersystems.textsecuregcm.push; import com.codahale.metrics.InstrumentedExecutorService; import com.codahale.metrics.SharedMetricRegistries; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.controllers.NoSuchUserException; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; -import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.util.Constants; @@ -37,34 +34,41 @@ public class ReceiptSender { MetricsUtil.name(ReceiptSender.class, "executor")); } - public CompletableFuture sendReceipt(UUID sourceUuid, long sourceDeviceId, UUID destinationUuid, long messageId) - throws NoSuchUserException { + public void sendReceipt(UUID sourceUuid, long sourceDeviceId, UUID destinationUuid, long messageId) { if (sourceUuid.equals(destinationUuid)) { - return CompletableFuture.completedFuture(null); + return; } - final Account destinationAccount = accountManager.getByAccountIdentifier(destinationUuid) - .orElseThrow(() -> new NoSuchUserException(destinationUuid)); + executor.submit(() -> { + try { + accountManager.getByAccountIdentifier(destinationUuid).ifPresentOrElse( + destinationAccount -> { + final Envelope.Builder message = Envelope.newBuilder() + .setServerTimestamp(System.currentTimeMillis()) + .setSourceUuid(sourceUuid.toString()) + .setSourceDevice((int) sourceDeviceId) + .setDestinationUuid(destinationUuid.toString()) + .setTimestamp(messageId) + .setType(Envelope.Type.SERVER_DELIVERY_RECEIPT) + .setUrgent(false); - final Envelope.Builder message = Envelope.newBuilder() - .setServerTimestamp(System.currentTimeMillis()) - .setSourceUuid(sourceUuid.toString()) - .setSourceDevice((int) sourceDeviceId) - .setDestinationUuid(destinationUuid.toString()) - .setTimestamp(messageId) - .setType(Envelope.Type.SERVER_DELIVERY_RECEIPT) - .setUrgent(false); + for (final Device destinationDevice : destinationAccount.getDevices()) { + try { + messageSender.sendMessage(destinationAccount, destinationDevice, message.build(), false); + } catch (final NotPushRegisteredException e) { + logger.debug("User no longer push registered for delivery receipt: {}", e.getMessage()); + } catch (final Exception e) { + logger.warn("Could not send delivery receipt", e); + } + } + }, + () -> logger.info("No longer registered: {}", destinationUuid) + ); - return CompletableFuture.runAsync(() -> { - for (final Device destinationDevice : destinationAccount.getDevices()) { - try { - messageSender.sendMessage(destinationAccount, destinationDevice, message.build(), false); - } catch (final NotPushRegisteredException e) { - logger.info("User no longer push registered for delivery receipt: " + e.getMessage()); - } catch (final Exception e) { - logger.warn("Could not send delivery receipt", e); - } + } catch (final Exception e) { + // this exception is most likely a Dynamo timeout or a Redis timeout/circuit breaker + logger.warn("Could not send delivery receipt", e); } - }, executor); + }); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 9f347f68d..af36618ed 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -31,14 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; -import javax.ws.rs.WebApplicationException; import org.apache.commons.lang3.StringUtils; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; import org.whispersystems.textsecuregcm.controllers.MessageController; -import org.whispersystems.textsecuregcm.controllers.NoSuchUserException; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; @@ -291,12 +289,10 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac receiptSender.sendReceipt(UUID.fromString(message.getDestinationUuid()), auth.getAuthenticatedDevice().getId(), UUID.fromString(message.getSourceUuid()), message.getTimestamp()); - } catch (NoSuchUserException e) { - logger.info("No longer registered: {}", e.getMessage()); - } catch (WebApplicationException e) { - logger.warn("Bad federated response for receipt: {}", e.getResponse().getStatus()); } catch (IllegalArgumentException e) { logger.error("Could not parse UUID: {}", message.getSourceUuid()); + } catch (Exception e) { + logger.warn("Failed to send receipt", e); } }