Move all receipt sending work to executor
This commit is contained in:
parent
bbbab4b8a4
commit
d186245c5c
|
@ -8,14 +8,11 @@ package org.whispersystems.textsecuregcm.push;
|
||||||
import com.codahale.metrics.InstrumentedExecutorService;
|
import com.codahale.metrics.InstrumentedExecutorService;
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.controllers.NoSuchUserException;
|
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
|
||||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.Device;
|
import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
|
@ -37,34 +34,41 @@ public class ReceiptSender {
|
||||||
MetricsUtil.name(ReceiptSender.class, "executor"));
|
MetricsUtil.name(ReceiptSender.class, "executor"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompletableFuture<Void> sendReceipt(UUID sourceUuid, long sourceDeviceId, UUID destinationUuid, long messageId)
|
public void sendReceipt(UUID sourceUuid, long sourceDeviceId, UUID destinationUuid, long messageId) {
|
||||||
throws NoSuchUserException {
|
|
||||||
if (sourceUuid.equals(destinationUuid)) {
|
if (sourceUuid.equals(destinationUuid)) {
|
||||||
return CompletableFuture.completedFuture(null);
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Account destinationAccount = accountManager.getByAccountIdentifier(destinationUuid)
|
executor.submit(() -> {
|
||||||
.orElseThrow(() -> new NoSuchUserException(destinationUuid));
|
try {
|
||||||
|
accountManager.getByAccountIdentifier(destinationUuid).ifPresentOrElse(
|
||||||
|
destinationAccount -> {
|
||||||
|
final Envelope.Builder message = Envelope.newBuilder()
|
||||||
|
.setServerTimestamp(System.currentTimeMillis())
|
||||||
|
.setSourceUuid(sourceUuid.toString())
|
||||||
|
.setSourceDevice((int) sourceDeviceId)
|
||||||
|
.setDestinationUuid(destinationUuid.toString())
|
||||||
|
.setTimestamp(messageId)
|
||||||
|
.setType(Envelope.Type.SERVER_DELIVERY_RECEIPT)
|
||||||
|
.setUrgent(false);
|
||||||
|
|
||||||
final Envelope.Builder message = Envelope.newBuilder()
|
for (final Device destinationDevice : destinationAccount.getDevices()) {
|
||||||
.setServerTimestamp(System.currentTimeMillis())
|
try {
|
||||||
.setSourceUuid(sourceUuid.toString())
|
messageSender.sendMessage(destinationAccount, destinationDevice, message.build(), false);
|
||||||
.setSourceDevice((int) sourceDeviceId)
|
} catch (final NotPushRegisteredException e) {
|
||||||
.setDestinationUuid(destinationUuid.toString())
|
logger.debug("User no longer push registered for delivery receipt: {}", e.getMessage());
|
||||||
.setTimestamp(messageId)
|
} catch (final Exception e) {
|
||||||
.setType(Envelope.Type.SERVER_DELIVERY_RECEIPT)
|
logger.warn("Could not send delivery receipt", e);
|
||||||
.setUrgent(false);
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
() -> logger.info("No longer registered: {}", destinationUuid)
|
||||||
|
);
|
||||||
|
|
||||||
return CompletableFuture.runAsync(() -> {
|
} catch (final Exception e) {
|
||||||
for (final Device destinationDevice : destinationAccount.getDevices()) {
|
// this exception is most likely a Dynamo timeout or a Redis timeout/circuit breaker
|
||||||
try {
|
logger.warn("Could not send delivery receipt", e);
|
||||||
messageSender.sendMessage(destinationAccount, destinationDevice, message.build(), false);
|
|
||||||
} catch (final NotPushRegisteredException e) {
|
|
||||||
logger.info("User no longer push registered for delivery receipt: " + e.getMessage());
|
|
||||||
} catch (final Exception e) {
|
|
||||||
logger.warn("Could not send delivery receipt", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}, executor);
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,14 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import javax.ws.rs.WebApplicationException;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
|
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
|
||||||
import org.whispersystems.textsecuregcm.controllers.MessageController;
|
import org.whispersystems.textsecuregcm.controllers.MessageController;
|
||||||
import org.whispersystems.textsecuregcm.controllers.NoSuchUserException;
|
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||||
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
|
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
|
||||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||||
|
@ -291,12 +289,10 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||||
receiptSender.sendReceipt(UUID.fromString(message.getDestinationUuid()),
|
receiptSender.sendReceipt(UUID.fromString(message.getDestinationUuid()),
|
||||||
auth.getAuthenticatedDevice().getId(), UUID.fromString(message.getSourceUuid()),
|
auth.getAuthenticatedDevice().getId(), UUID.fromString(message.getSourceUuid()),
|
||||||
message.getTimestamp());
|
message.getTimestamp());
|
||||||
} catch (NoSuchUserException e) {
|
|
||||||
logger.info("No longer registered: {}", e.getMessage());
|
|
||||||
} catch (WebApplicationException e) {
|
|
||||||
logger.warn("Bad federated response for receipt: {}", e.getResponse().getStatus());
|
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
logger.error("Could not parse UUID: {}", message.getSourceUuid());
|
logger.error("Could not parse UUID: {}", message.getSourceUuid());
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("Failed to send receipt", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue