Retire the explicit "handle new message available" system in favor of implicit presence-on-insert values

This commit is contained in:
Jon Chambers 2024-11-07 16:21:44 -05:00 committed by Jon Chambers
parent eeeb565313
commit 084607f359
7 changed files with 28 additions and 126 deletions

View File

@ -654,8 +654,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager); final AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
final MessageSender messageSender = final MessageSender messageSender = new MessageSender(messagesManager, pushNotificationManager);
new MessageSender(pubSubClientEventManager, messagesManager, pushNotificationManager);
final ReceiptSender receiptSender = new ReceiptSender(accountsManager, messageSender, receiptSenderExecutor); final ReceiptSender receiptSender = new ReceiptSender(accountsManager, messageSender, receiptSenderExecutor);
final TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(dynamicConfigurationManager, final TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(dynamicConfigurationManager,
config.getTurnConfiguration().secret().value()); config.getTurnConfiguration().secret().value());

View File

@ -8,15 +8,10 @@ import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.Util;
import java.util.concurrent.CompletableFuture;
/** /**
* A MessageSender sends Signal messages to destination devices. Messages may be "normal" user-to-user messages, * A MessageSender sends Signal messages to destination devices. Messages may be "normal" user-to-user messages,
@ -27,13 +22,10 @@ import java.util.concurrent.CompletableFuture;
* for "online" delivery only and will not be delivered (and clients will not be notified) if the destination device * for "online" delivery only and will not be delivered (and clients will not be notified) if the destination device
* isn't actively connected to a Signal server. * isn't actively connected to a Signal server.
* *
* @see ClientPresenceManager
* @see org.whispersystems.textsecuregcm.storage.MessageAvailabilityListener
* @see ReceiptSender * @see ReceiptSender
*/ */
public class MessageSender { public class MessageSender {
private final PubSubClientEventManager pubSubClientEventManager;
private final MessagesManager messagesManager; private final MessagesManager messagesManager;
private final PushNotificationManager pushNotificationManager; private final PushNotificationManager pushNotificationManager;
@ -45,52 +37,31 @@ public class MessageSender {
private static final String STORY_TAG_NAME = "story"; private static final String STORY_TAG_NAME = "story";
private static final String SEALED_SENDER_TAG_NAME = "sealedSender"; private static final String SEALED_SENDER_TAG_NAME = "sealedSender";
private static final Counter CLIENT_PRESENCE_ERROR = public MessageSender(final MessagesManager messagesManager, final PushNotificationManager pushNotificationManager) {
Metrics.counter(MetricsUtil.name(MessageSender.class, "clientPresenceError"));
public MessageSender(final PubSubClientEventManager pubSubClientEventManager,
final MessagesManager messagesManager,
final PushNotificationManager pushNotificationManager) {
this.pubSubClientEventManager = pubSubClientEventManager;
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
this.pushNotificationManager = pushNotificationManager; this.pushNotificationManager = pushNotificationManager;
} }
public CompletableFuture<Void> sendMessage(final Account account, final Device device, final Envelope message, final boolean online) { public void sendMessage(final Account account, final Device device, final Envelope message, final boolean online) {
messagesManager.insert(account.getUuid(), final boolean destinationPresent = messagesManager.insert(account.getUuid(),
device.getId(), device.getId(),
online ? message.toBuilder().setEphemeral(true).build() : message); online ? message.toBuilder().setEphemeral(true).build() : message);
return pubSubClientEventManager.handleNewMessageAvailable(account.getIdentifier(IdentityType.ACI), device.getId()) if (!destinationPresent && !online) {
.exceptionally(throwable -> {
// It's unlikely that the message insert (synchronous) would succeed and sending a "new message available"
// event would fail since both things happen in the same cluster, but just in case, we should "fail open" and
// act as if the client wasn't present if this happens. This is a conservative measure that biases toward
// sending more push notifications, though again, it shouldn't happen often.
CLIENT_PRESENCE_ERROR.increment();
return false;
})
.thenApply(clientPresent -> {
if (!clientPresent && !online) {
try { try {
pushNotificationManager.sendNewMessageNotification(account, device.getId(), message.getUrgent()); pushNotificationManager.sendNewMessageNotification(account, device.getId(), message.getUrgent());
} catch (final NotPushRegisteredException ignored) { } catch (final NotPushRegisteredException ignored) {
} }
} }
return clientPresent; Metrics.counter(SEND_COUNTER_NAME,
})
.whenComplete((clientPresent, throwable) -> Metrics.counter(SEND_COUNTER_NAME,
CHANNEL_TAG_NAME, getDeliveryChannelName(device), CHANNEL_TAG_NAME, getDeliveryChannelName(device),
EPHEMERAL_TAG_NAME, String.valueOf(online), EPHEMERAL_TAG_NAME, String.valueOf(online),
CLIENT_ONLINE_TAG_NAME, String.valueOf(clientPresent), CLIENT_ONLINE_TAG_NAME, String.valueOf(destinationPresent),
URGENT_TAG_NAME, String.valueOf(message.getUrgent()), URGENT_TAG_NAME, String.valueOf(message.getUrgent()),
STORY_TAG_NAME, String.valueOf(message.getStory()), STORY_TAG_NAME, String.valueOf(message.getStory()),
SEALED_SENDER_TAG_NAME, String.valueOf(!message.hasSourceServiceId())) SEALED_SENDER_TAG_NAME, String.valueOf(!message.hasSourceServiceId()))
.increment()) .increment();
.thenRun(Util.NOOP)
.toCompletableFuture();
} }
@VisibleForTesting @VisibleForTesting

View File

@ -68,11 +68,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
private final Map<AccountAndDeviceIdentifier, ClientEventListener> listenersByAccountAndDeviceIdentifier; private final Map<AccountAndDeviceIdentifier, ClientEventListener> listenersByAccountAndDeviceIdentifier;
private static final byte[] NEW_MESSAGE_EVENT_BYTES = ClientEvent.newBuilder()
.setNewMessageAvailable(NewMessageAvailableEvent.getDefaultInstance())
.build()
.toByteArray();
private static final byte[] DISCONNECT_REQUESTED_EVENT_BYTES = ClientEvent.newBuilder() private static final byte[] DISCONNECT_REQUESTED_EVENT_BYTES = ClientEvent.newBuilder()
.setDisconnectRequested(DisconnectRequested.getDefaultInstance()) .setDisconnectRequested(DisconnectRequested.getDefaultInstance())
.build() .build()
@ -229,26 +224,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
}); });
} }
/**
* Publishes an event notifying a specific device that a new message is available for retrieval. This method indicates
* whether the target device is "present" (i.e. has an active listener). Callers may choose to take follow-up action
* (like sending a push notification) if the target device is not present.
*
* @param accountIdentifier the account identifier of the receiving device
* @param deviceId the ID of the receiving device within the target account
*
* @return a future that yields {@code true} if the target device had an active listener or {@code false} otherwise
*/
public CompletionStage<Boolean> handleNewMessageAvailable(final UUID accountIdentifier, final byte deviceId) {
if (pubSubConnection == null) {
throw new IllegalStateException("Presence manager not started");
}
return pubSubConnection.withPubSubConnection(connection ->
connection.async().spublish(getClientEventChannel(accountIdentifier, deviceId), NEW_MESSAGE_EVENT_BYTES))
.thenApply(listeners -> listeners > 0);
}
/** /**
* Publishes an event notifying a specific device that messages have been persisted from short-term to long-term * Publishes an event notifying a specific device that messages have been persisted from short-term to long-term
* storage. * storage.

View File

@ -211,13 +211,13 @@ public class MessagesCache {
this.removeRecipientViewFromMrmDataScript = removeRecipientViewFromMrmDataScript; this.removeRecipientViewFromMrmDataScript = removeRecipientViewFromMrmDataScript;
} }
public void insert(final UUID messageGuid, public boolean insert(final UUID messageGuid,
final UUID destinationAccountIdentifier, final UUID destinationAccountIdentifier,
final byte destinationDeviceId, final byte destinationDeviceId,
final MessageProtos.Envelope message) { final MessageProtos.Envelope message) {
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(messageGuid.toString()).build(); final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(messageGuid.toString()).build();
insertTimer.record(() -> insertScript.execute(destinationAccountIdentifier, destinationDeviceId, messageWithGuid)); return insertTimer.record(() -> insertScript.execute(destinationAccountIdentifier, destinationDeviceId, messageWithGuid));
} }
public byte[] insertSharedMultiRecipientMessagePayload( public byte[] insertSharedMultiRecipientMessagePayload(

View File

@ -59,14 +59,16 @@ public class MessagesManager {
this.messageDeletionExecutor = messageDeletionExecutor; this.messageDeletionExecutor = messageDeletionExecutor;
} }
public void insert(UUID destinationUuid, byte destinationDevice, Envelope message) { public boolean insert(UUID destinationUuid, byte destinationDevice, Envelope message) {
final UUID messageGuid = UUID.randomUUID(); final UUID messageGuid = UUID.randomUUID();
messagesCache.insert(messageGuid, destinationUuid, destinationDevice, message); final boolean destinationPresent = messagesCache.insert(messageGuid, destinationUuid, destinationDevice, message);
if (message.hasSourceServiceId() && !destinationUuid.toString().equals(message.getSourceServiceId())) { if (message.hasSourceServiceId() && !destinationUuid.toString().equals(message.getSourceServiceId())) {
reportMessageManager.store(message.getSourceServiceId(), messageGuid); reportMessageManager.store(message.getSourceServiceId(), messageGuid);
} }
return destinationPresent;
} }
public CompletableFuture<Boolean> mayHavePersistedMessages(final UUID destinationUuid, final Device destinationDevice) { public CompletableFuture<Boolean> mayHavePersistedMessages(final UUID destinationUuid, final Device destinationDevice) {

View File

@ -10,6 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyByte; import static org.mockito.ArgumentMatchers.anyByte;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -20,7 +21,6 @@ import com.google.protobuf.ByteString;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
@ -35,21 +35,16 @@ import org.whispersystems.textsecuregcm.storage.MessagesManager;
class MessageSenderTest { class MessageSenderTest {
private PubSubClientEventManager pubSubClientEventManager;
private MessagesManager messagesManager; private MessagesManager messagesManager;
private PushNotificationManager pushNotificationManager; private PushNotificationManager pushNotificationManager;
private MessageSender messageSender; private MessageSender messageSender;
@BeforeEach @BeforeEach
void setUp() { void setUp() {
pubSubClientEventManager = mock(PubSubClientEventManager.class);
messagesManager = mock(MessagesManager.class); messagesManager = mock(MessagesManager.class);
pushNotificationManager = mock(PushNotificationManager.class); pushNotificationManager = mock(PushNotificationManager.class);
when(pubSubClientEventManager.handleNewMessageAvailable(any(), anyByte())) messageSender = new MessageSender(messagesManager, pushNotificationManager);
.thenReturn(CompletableFuture.completedFuture(true));
messageSender = new MessageSender(pubSubClientEventManager, messagesManager, pushNotificationManager);
} }
@CartesianTest @CartesianTest
@ -77,10 +72,9 @@ class MessageSenderTest {
.when(pushNotificationManager).sendNewMessageNotification(any(), anyByte(), anyBoolean()); .when(pushNotificationManager).sendNewMessageNotification(any(), anyByte(), anyBoolean());
} }
when(pubSubClientEventManager.handleNewMessageAvailable(accountIdentifier, deviceId)) when(messagesManager.insert(eq(accountIdentifier), eq(deviceId), any())).thenReturn(clientPresent);
.thenReturn(CompletableFuture.completedFuture(clientPresent));
assertDoesNotThrow(() -> messageSender.sendMessage(account, device, message, onlineMessage).join()); assertDoesNotThrow(() -> messageSender.sendMessage(account, device, message, onlineMessage));
final MessageProtos.Envelope expectedMessage = onlineMessage final MessageProtos.Envelope expectedMessage = onlineMessage
? message.toBuilder().setEphemeral(true).build() ? message.toBuilder().setEphemeral(true).build()

View File

@ -140,30 +140,6 @@ class PubSubClientEventManagerTest {
assertTrue(firstListenerConnectedElsewhere.get()); assertTrue(firstListenerConnectedElsewhere.get());
} }
@ParameterizedTest
@ValueSource(booleans = {true, false})
void handleNewMessageAvailable(final boolean messageAvailableRemotely) throws InterruptedException {
final UUID accountIdentifier = UUID.randomUUID();
final byte deviceId = Device.PRIMARY_ID;
final CountDownLatch messageReceivedLatch = new CountDownLatch(1);
localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter() {
@Override
public void handleNewMessageAvailable() {
messageReceivedLatch.countDown();
}
}).toCompletableFuture().join();
final PubSubClientEventManager messagePresenceManager =
messageAvailableRemotely ? remotePresenceManager : localPresenceManager;
assertTrue(messagePresenceManager.handleNewMessageAvailable(accountIdentifier, deviceId).toCompletableFuture().join());
assertTrue(messageReceivedLatch.await(2, TimeUnit.SECONDS),
"Message not received within time limit");
}
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @ValueSource(booleans = {true, false})
void handleMessagesPersisted(final boolean messagesPersistedRemotely) throws InterruptedException { void handleMessagesPersisted(final boolean messagesPersistedRemotely) throws InterruptedException {
@ -188,21 +164,6 @@ class PubSubClientEventManagerTest {
"Message persistence event not received within time limit"); "Message persistence event not received within time limit");
} }
@Test
void handleClientDisconnected() {
final UUID accountIdentifier = UUID.randomUUID();
final byte deviceId = Device.PRIMARY_ID;
localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter())
.toCompletableFuture().join();
assertTrue(localPresenceManager.handleNewMessageAvailable(accountIdentifier, deviceId).toCompletableFuture().join());
localPresenceManager.handleClientDisconnected(accountIdentifier, deviceId).toCompletableFuture().join();
assertFalse(localPresenceManager.handleNewMessageAvailable(accountIdentifier, deviceId).toCompletableFuture().join());
}
@Test @Test
void isLocallyPresent() { void isLocallyPresent() {
final UUID accountIdentifier = UUID.randomUUID(); final UUID accountIdentifier = UUID.randomUUID();