Move "push notifications on close" logic to `WebSocketConnection`

This commit is contained in:
Jon Chambers 2024-08-14 12:24:49 -04:00 committed by GitHub
parent 84c329e911
commit 3b405a53d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 64 additions and 38 deletions

View File

@ -12,7 +12,6 @@ import com.google.protobuf.InvalidProtocolBufferException;
import io.dropwizard.lifecycle.Managed; import io.dropwizard.lifecycle.Managed;
import io.lettuce.core.ScoredValue; import io.lettuce.core.ScoredValue;
import io.lettuce.core.ScriptOutputType; import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import io.lettuce.core.ZAddArgs; import io.lettuce.core.ZAddArgs;
import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.models.partitions.RedisClusterNode;

View File

@ -23,12 +23,10 @@ import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform; import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException; import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
@ -139,10 +137,12 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
if (authenticated) { if (authenticated) {
final AuthenticatedAccount auth = context.getAuthenticated(AuthenticatedAccount.class); final AuthenticatedAccount auth = context.getAuthenticated(AuthenticatedAccount.class);
final Device device = auth.getAuthenticatedDevice();
final Timer.Sample sample = Timer.start(); final Timer.Sample sample = Timer.start();
final WebSocketConnection connection = new WebSocketConnection(receiptSender, final WebSocketConnection connection = new WebSocketConnection(receiptSender,
messagesManager, messageMetrics, auth, device, messagesManager,
messageMetrics,
pushNotificationManager,
auth,
context.getClient(), context.getClient(),
scheduledExecutorService, scheduledExecutorService,
messageDeliveryScheduler, messageDeliveryScheduler,
@ -150,8 +150,6 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
openWebsocketAtomicInteger.incrementAndGet(); openWebsocketAtomicInteger.incrementAndGet();
pushNotificationManager.handleMessagesRetrieved(auth.getAccount(), device, userAgent);
final AtomicReference<ScheduledFuture<?>> renewPresenceFutureReference = new AtomicReference<>(); final AtomicReference<ScheduledFuture<?>> renewPresenceFutureReference = new AtomicReference<>();
context.addWebsocketClosedListener((closingContext, statusCode, reason) -> { context.addWebsocketClosedListener((closingContext, statusCode, reason) -> {
@ -164,29 +162,41 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
renewPresenceFuture.cancel(false); renewPresenceFuture.cancel(false);
} }
// We begin the shutdown process by removing this client's "presence," which means it will again begin to
// receive push notifications for inbound messages. We should do this first because, at this point, the
// connection has already closed and attempts to actually deliver a message via the connection will not succeed.
// It's preferable to start sending push notifications as soon as possible.
RedisOperation.unchecked(() -> clientPresenceManager.clearPresence(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), connection));
// Next, we stop listening for inbound messages. If a message arrives after this call, the websocket connection
// will not be notified and will not change its state, but that's okay because it has already closed and
// attempts to deliver mesages via this connection will not succeed.
RedisOperation.unchecked(() -> messagesManager.removeMessageAvailabilityListener(connection));
// Finally, stop trying to deliver messages and send a push notification if the connection is aware of any
// undelivered messages.
connection.stop(); connection.stop();
RedisOperation.unchecked(
() -> clientPresenceManager.clearPresence(auth.getAccount().getUuid(), device.getId(), connection));
RedisOperation.unchecked(() -> {
messagesManager.removeMessageAvailabilityListener(connection);
if (messagesManager.hasCachedMessages(auth.getAccount().getUuid(), device.getId())) {
try {
pushNotificationManager.sendNewMessageNotification(auth.getAccount(), device.getId(), true);
} catch (NotPushRegisteredException ignored) {
}
}
});
}); });
try { try {
// Once we add this connection as a message availability listener, it will be notified any time a new message
// arrives in the message cache. This updates the connection's "may have messages" state. It's important that
// we do this first because we want to make sure we're accurately tracking message availability in the
// connection's internal state.
messagesManager.addMessageAvailabilityListener(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), connection);
// Once we "start" the websocket connection, we'll cancel any scheduled "you may have new messages" push
// notifications and begin delivering any stored messages for the connected device. We have not yet declared the
// client as "present" yet. If a message arrives at this point, we will update the message availability state
// correctly, but we may also send a spurious push notification.
connection.start(); connection.start();
clientPresenceManager.setPresent(auth.getAccount().getUuid(), device.getId(), connection);
messagesManager.addMessageAvailabilityListener(auth.getAccount().getUuid(), device.getId(), connection); // Finally, we register this client's presence, which suppresses push notifications. We do this last because
// receiving extra push notifications is generally preferable to missing out on a push notification.
clientPresenceManager.setPresent(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), connection);
renewPresenceFutureReference.set(scheduledExecutorService.scheduleAtFixedRate(() -> RedisOperation.unchecked(() -> renewPresenceFutureReference.set(scheduledExecutorService.scheduleAtFixedRate(() -> RedisOperation.unchecked(() ->
clientPresenceManager.renewPresence(auth.getAccount().getUuid(), device.getId())), clientPresenceManager.renewPresence(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId())),
RENEW_PRESENCE_INTERVAL_MINUTES, RENEW_PRESENCE_INTERVAL_MINUTES,
RENEW_PRESENCE_INTERVAL_MINUTES, RENEW_PRESENCE_INTERVAL_MINUTES,
TimeUnit.MINUTES)); TimeUnit.MINUTES));

View File

@ -43,6 +43,8 @@ import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.push.DisplacedPresenceListener; import org.whispersystems.textsecuregcm.push.DisplacedPresenceListener;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.Device;
@ -86,6 +88,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
"sendMessages"); "sendMessages");
private static final String SEND_MESSAGE_ERROR_COUNTER = MetricsUtil.name(WebSocketConnection.class, private static final String SEND_MESSAGE_ERROR_COUNTER = MetricsUtil.name(WebSocketConnection.class,
"sendMessageError"); "sendMessageError");
private static final String PUSH_NOTIFICATION_ON_CLOSE_COUNTER_NAME =
MetricsUtil.name(WebSocketConnection.class, "pushNotificationOnClose");
private static final String STATUS_CODE_TAG = "status"; private static final String STATUS_CODE_TAG = "status";
private static final String STATUS_MESSAGE_TAG = "message"; private static final String STATUS_MESSAGE_TAG = "message";
private static final String ERROR_TYPE_TAG = "errorType"; private static final String ERROR_TYPE_TAG = "errorType";
@ -110,9 +114,9 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private final ReceiptSender receiptSender; private final ReceiptSender receiptSender;
private final MessagesManager messagesManager; private final MessagesManager messagesManager;
private final MessageMetrics messageMetrics; private final MessageMetrics messageMetrics;
private final PushNotificationManager pushNotificationManager;
private final AuthenticatedAccount auth; private final AuthenticatedAccount auth;
private final Device device;
private final WebSocketClient client; private final WebSocketClient client;
private final int sendFuturesTimeoutMillis; private final int sendFuturesTimeoutMillis;
@ -143,8 +147,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
public WebSocketConnection(ReceiptSender receiptSender, public WebSocketConnection(ReceiptSender receiptSender,
MessagesManager messagesManager, MessagesManager messagesManager,
MessageMetrics messageMetrics, MessageMetrics messageMetrics,
PushNotificationManager pushNotificationManager,
AuthenticatedAccount auth, AuthenticatedAccount auth,
Device device,
WebSocketClient client, WebSocketClient client,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
Scheduler messageDeliveryScheduler, Scheduler messageDeliveryScheduler,
@ -153,8 +157,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
this(receiptSender, this(receiptSender,
messagesManager, messagesManager,
messageMetrics, messageMetrics,
pushNotificationManager,
auth, auth,
device,
client, client,
DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS, DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS,
scheduledExecutorService, scheduledExecutorService,
@ -166,8 +170,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
WebSocketConnection(ReceiptSender receiptSender, WebSocketConnection(ReceiptSender receiptSender,
MessagesManager messagesManager, MessagesManager messagesManager,
MessageMetrics messageMetrics, MessageMetrics messageMetrics,
PushNotificationManager pushNotificationManager,
AuthenticatedAccount auth, AuthenticatedAccount auth,
Device device,
WebSocketClient client, WebSocketClient client,
int sendFuturesTimeoutMillis, int sendFuturesTimeoutMillis,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
@ -177,8 +181,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
this.receiptSender = receiptSender; this.receiptSender = receiptSender;
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
this.messageMetrics = messageMetrics; this.messageMetrics = messageMetrics;
this.pushNotificationManager = pushNotificationManager;
this.auth = auth; this.auth = auth;
this.device = device;
this.client = client; this.client = client;
this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis; this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis;
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
@ -187,6 +191,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
} }
public void start() { public void start() {
pushNotificationManager.handleMessagesRetrieved(auth.getAccount(), auth.getAuthenticatedDevice(), client.getUserAgent());
queueDrainStartTime.set(System.currentTimeMillis()); queueDrainStartTime.set(System.currentTimeMillis());
processStoredMessages(); processStoredMessages();
} }
@ -204,6 +209,17 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
} }
client.close(1000, "OK"); client.close(1000, "OK");
if (storedMessageState.get() != StoredMessageState.EMPTY) {
try {
pushNotificationManager.sendNewMessageNotification(auth.getAccount(), auth.getAuthenticatedDevice().getId(), true);
Metrics.counter(PUSH_NOTIFICATION_ON_CLOSE_COUNTER_NAME,
Tags.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent())))
.increment();
} catch (NotPushRegisteredException ignored) {
}
}
} }
private CompletableFuture<Void> sendMessage(final Envelope message, StoredMessageInfo storedMessageInfo) { private CompletableFuture<Void> sendMessage(final Envelope message, StoredMessageInfo storedMessageInfo) {
@ -224,7 +240,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
} else { } else {
messageMetrics.measureOutgoingMessageLatency(message.getServerTimestamp(), messageMetrics.measureOutgoingMessageLatency(message.getServerTimestamp(),
"websocket", "websocket",
device.isPrimary(), auth.getAuthenticatedDevice().isPrimary(),
client.getUserAgent(), client.getUserAgent(),
clientReleaseManager); clientReleaseManager);
} }
@ -232,12 +248,12 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
final CompletableFuture<Void> result; final CompletableFuture<Void> result;
if (isSuccessResponse(response)) { if (isSuccessResponse(response)) {
result = messagesManager.delete(auth.getAccount().getUuid(), device, result = messagesManager.delete(auth.getAccount().getUuid(), auth.getAuthenticatedDevice(),
storedMessageInfo.guid(), storedMessageInfo.serverTimestamp()) storedMessageInfo.guid(), storedMessageInfo.serverTimestamp())
.thenApply(ignored -> null); .thenApply(ignored -> null);
if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) { if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) {
recordMessageDeliveryDuration(message.getServerTimestamp(), device); recordMessageDeliveryDuration(message.getServerTimestamp(), auth.getAuthenticatedDevice());
sendDeliveryReceiptFor(message); sendDeliveryReceiptFor(message);
} }
} else { } else {
@ -359,7 +375,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private void sendMessages(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueCleared) { private void sendMessages(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueCleared) {
final Publisher<Envelope> messages = final Publisher<Envelope> messages =
messagesManager.getMessagesForDeviceReactive(auth.getAccount().getUuid(), device, cachedMessagesOnly); messagesManager.getMessagesForDeviceReactive(auth.getAccount().getUuid(), auth.getAuthenticatedDevice(), cachedMessagesOnly);
final AtomicBoolean hasErrored = new AtomicBoolean(); final AtomicBoolean hasErrored = new AtomicBoolean();
@ -418,7 +434,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
final UUID messageGuid = UUID.fromString(envelope.getServerGuid()); final UUID messageGuid = UUID.fromString(envelope.getServerGuid());
if (envelope.getStory() && !client.shouldDeliverStories()) { if (envelope.getStory() && !client.shouldDeliverStories()) {
messagesManager.delete(auth.getAccount().getUuid(), device, messageGuid, envelope.getServerTimestamp()); messagesManager.delete(auth.getAccount().getUuid(), auth.getAuthenticatedDevice(), messageGuid, envelope.getServerTimestamp());
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} else { } else {

View File

@ -47,12 +47,12 @@ import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtension; import org.whispersystems.textsecuregcm.storage.DynamoDbExtension;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables;
import org.whispersystems.textsecuregcm.storage.MessagesCache; import org.whispersystems.textsecuregcm.storage.MessagesCache;
@ -126,8 +126,8 @@ class WebSocketConnectionIntegrationTest {
mock(ReceiptSender.class), mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService),
new MessageMetrics(), new MessageMetrics(),
mock(PushNotificationManager.class),
new AuthenticatedAccount(account, device), new AuthenticatedAccount(account, device),
device,
webSocketClient, webSocketClient,
scheduledExecutorService, scheduledExecutorService,
messageDeliveryScheduler, messageDeliveryScheduler,
@ -212,8 +212,8 @@ class WebSocketConnectionIntegrationTest {
mock(ReceiptSender.class), mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService),
new MessageMetrics(), new MessageMetrics(),
mock(PushNotificationManager.class),
new AuthenticatedAccount(account, device), new AuthenticatedAccount(account, device),
device,
webSocketClient, webSocketClient,
scheduledExecutorService, scheduledExecutorService,
messageDeliveryScheduler, messageDeliveryScheduler,
@ -279,8 +279,8 @@ class WebSocketConnectionIntegrationTest {
mock(ReceiptSender.class), mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService), new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService),
new MessageMetrics(), new MessageMetrics(),
mock(PushNotificationManager.class),
new AuthenticatedAccount(account, device), new AuthenticatedAccount(account, device),
device,
webSocketClient, webSocketClient,
100, // use a very short timeout, so that this test completes quickly 100, // use a very short timeout, so that this test completes quickly
scheduledExecutorService, scheduledExecutorService,

View File

@ -626,7 +626,8 @@ class WebSocketConnectionTest {
} }
private @NotNull WebSocketConnection webSocketConnection(final WebSocketClient client) { private @NotNull WebSocketConnection webSocketConnection(final WebSocketClient client) {
return new WebSocketConnection(receiptSender, messagesManager, new MessageMetrics(), auth, device, client, return new WebSocketConnection(receiptSender, messagesManager, new MessageMetrics(),
mock(PushNotificationManager.class), auth, client,
retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager); retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager);
} }