diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index fb337ae1c..2e397bbcc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -668,7 +668,7 @@ public class WhisperServerService extends Application { final Envelope message = messagesByDeviceId.get(deviceId); - if (!destinationPresent && !message.getEphemeral()) { + if (!destinationPresent && !message.getEphemeral() && !shouldSkipPush(destination, deviceId, message.getUrgent())) { try { pushNotificationManager.sendNewMessageNotification(destination, deviceId, message.getUrgent()); } catch (final NotPushRegisteredException ignored) { @@ -165,6 +175,13 @@ public class MessageSender { }); } + private boolean shouldSkipPush(final Account account, byte deviceId, boolean urgent) { + final boolean isAndroidFcm = account.getDevice(deviceId).map(Device::getGcmId).isPresent(); + return !urgent + && isAndroidFcm + && experimentEnrollmentManager.isEnrolled(account.getUuid(), ANDROID_SKIP_LOW_URGENCY_PUSH_EXPERIMENT); + } + /** * Sends messages to a group of recipients. If a destination device has a valid push notification token and does not * have an active connection to a Signal server, then this method will also send a push notification to that device to diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index 28913630e..e815ca9f9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -27,8 +27,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil; +import org.whispersystems.textsecuregcm.push.MessageSender; import org.whispersystems.textsecuregcm.util.Util; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -41,6 +43,7 @@ public class MessagePersister implements Managed { private final MessagesManager messagesManager; private final AccountsManager accountsManager; private final DynamicConfigurationManager dynamicConfigurationManager; + private final ExperimentEnrollmentManager experimentEnrollmentManager; private final Duration persistDelay; @@ -78,6 +81,7 @@ public class MessagePersister implements Managed { final MessagesManager messagesManager, final AccountsManager accountsManager, final DynamicConfigurationManager dynamicConfigurationManager, + final ExperimentEnrollmentManager experimentEnrollmentManager, final Duration persistDelay, final int dedicatedProcessWorkerThreadCount) { @@ -85,6 +89,7 @@ public class MessagePersister implements Managed { this.messagesManager = messagesManager; this.accountsManager = accountsManager; this.dynamicConfigurationManager = dynamicConfigurationManager; + this.experimentEnrollmentManager = experimentEnrollmentManager; this.persistDelay = persistDelay; this.workerThreads = new Thread[dedicatedProcessWorkerThreadCount]; @@ -234,8 +239,11 @@ public class MessagePersister implements Managed { } while (!messages.isEmpty()); + final boolean inSkipExperiment = device.getGcmId() != null && experimentEnrollmentManager.isEnrolled( + accountUuid, + MessageSender.ANDROID_SKIP_LOW_URGENCY_PUSH_EXPERIMENT); DistributionSummary.builder(QUEUE_SIZE_DISTRIBUTION_SUMMARY_NAME) - .tags(Tags.of(platformTag)) + .tags(Tags.of(platformTag).and("lowUrgencySkip", Boolean.toString(inSkipExperiment))) .publishPercentileHistogram(true) .register(Metrics.globalRegistry) .record(messageCount); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java index bfad3bb02..f448cbfdd 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java @@ -12,6 +12,7 @@ import java.util.concurrent.ScheduledExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.metrics.OpenWebSocketCounter; @@ -45,6 +46,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { private final Scheduler messageDeliveryScheduler; private final ClientReleaseManager clientReleaseManager; private final MessageDeliveryLoopMonitor messageDeliveryLoopMonitor; + private final ExperimentEnrollmentManager experimentEnrollmentManager; private final OpenWebSocketCounter openAuthenticatedWebSocketCounter; private final OpenWebSocketCounter openUnauthenticatedWebSocketCounter; @@ -58,7 +60,8 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { ScheduledExecutorService scheduledExecutorService, Scheduler messageDeliveryScheduler, ClientReleaseManager clientReleaseManager, - MessageDeliveryLoopMonitor messageDeliveryLoopMonitor) { + MessageDeliveryLoopMonitor messageDeliveryLoopMonitor, + final ExperimentEnrollmentManager experimentEnrollmentManager) { this.receiptSender = receiptSender; this.messagesManager = messagesManager; this.messageMetrics = messageMetrics; @@ -69,6 +72,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { this.messageDeliveryScheduler = messageDeliveryScheduler; this.clientReleaseManager = clientReleaseManager; this.messageDeliveryLoopMonitor = messageDeliveryLoopMonitor; + this.experimentEnrollmentManager = experimentEnrollmentManager; openAuthenticatedWebSocketCounter = new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, CONNECTED_DURATION_TIMER_NAME, Tags.of(AUTHENTICATED_TAG_NAME, "true")); @@ -98,7 +102,8 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { scheduledExecutorService, messageDeliveryScheduler, clientReleaseManager, - messageDeliveryLoopMonitor); + messageDeliveryLoopMonitor, + experimentEnrollmentManager); context.addWebsocketClosedListener((closingContext, statusCode, reason) -> { // We begin the shutdown process by removing this client's "presence," which means it will again begin to diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 4921ad45a..e0e2baa2e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice; import org.whispersystems.textsecuregcm.controllers.MessageController; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; @@ -46,6 +47,7 @@ import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; +import org.whispersystems.textsecuregcm.push.MessageSender; import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventListener; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; @@ -120,6 +122,7 @@ public class WebSocketConnection implements WebSocketConnectionEventListener { private final PushNotificationManager pushNotificationManager; private final PushNotificationScheduler pushNotificationScheduler; private final MessageDeliveryLoopMonitor messageDeliveryLoopMonitor; + private final ExperimentEnrollmentManager experimentEnrollmentManager; private final AuthenticatedDevice auth; private final WebSocketClient client; @@ -159,7 +162,8 @@ public class WebSocketConnection implements WebSocketConnectionEventListener { ScheduledExecutorService scheduledExecutorService, Scheduler messageDeliveryScheduler, ClientReleaseManager clientReleaseManager, - MessageDeliveryLoopMonitor messageDeliveryLoopMonitor) { + MessageDeliveryLoopMonitor messageDeliveryLoopMonitor, + ExperimentEnrollmentManager experimentEnrollmentManager) { this(receiptSender, messagesManager, @@ -172,7 +176,7 @@ public class WebSocketConnection implements WebSocketConnectionEventListener { scheduledExecutorService, messageDeliveryScheduler, clientReleaseManager, - messageDeliveryLoopMonitor); + messageDeliveryLoopMonitor, experimentEnrollmentManager); } @VisibleForTesting @@ -187,7 +191,8 @@ public class WebSocketConnection implements WebSocketConnectionEventListener { ScheduledExecutorService scheduledExecutorService, Scheduler messageDeliveryScheduler, ClientReleaseManager clientReleaseManager, - MessageDeliveryLoopMonitor messageDeliveryLoopMonitor) { + MessageDeliveryLoopMonitor messageDeliveryLoopMonitor, + ExperimentEnrollmentManager experimentEnrollmentManager) { this.receiptSender = receiptSender; this.messagesManager = messagesManager; @@ -201,6 +206,7 @@ public class WebSocketConnection implements WebSocketConnectionEventListener { this.messageDeliveryScheduler = messageDeliveryScheduler; this.clientReleaseManager = clientReleaseManager; this.messageDeliveryLoopMonitor = messageDeliveryLoopMonitor; + this.experimentEnrollmentManager = experimentEnrollmentManager; } public void start() { @@ -331,7 +337,13 @@ public class WebSocketConnection implements WebSocketConnectionEventListener { // Cleared the queue! Send a queue empty message if we need to consecutiveRetries.set(0); if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) { - final Tags tags = Tags.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent())); + final boolean inSkipExperiment = auth.getAuthenticatedDevice().getGcmId() != null && experimentEnrollmentManager.isEnrolled( + auth.getAccount().getUuid(), + MessageSender.ANDROID_SKIP_LOW_URGENCY_PUSH_EXPERIMENT); + + final Tags tags = Tags + .of(UserAgentTagUtil.getPlatformTag(client.getUserAgent())) + .and("lowUrgencySkip", Boolean.toString(inSkipExperiment)); final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get(); Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCounter.sum()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java index a83bc5a36..7a4663567 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java @@ -14,6 +14,7 @@ import java.time.Duration; import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Subparser; import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.storage.MessagePersister; import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler; @@ -64,6 +65,7 @@ public class MessagePersisterServiceCommand extends ServerCommand messageSender.sendMessages(account, + serviceIdentifier, + Map.of(device.getId(), message), + Map.of(device.getId(), registrationId), + Optional.empty(), + null)); + + if (shouldSkip) { + verifyNoInteractions(pushNotificationManager); + } else { + verify(pushNotificationManager).sendNewMessageNotification(account, deviceId, isUrgent); + } } @CartesianTest diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index f588d750f..2300f90fd 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventListener; import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; @@ -97,7 +98,7 @@ class MessagePersisterIntegrationTest { webSocketConnectionEventManager.start(); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, - dynamicConfigurationManager, PERSIST_DELAY, 1); + dynamicConfigurationManager, mock(ExperimentEnrollmentManager.class), PERSIST_DELAY, 1); account = mock(Account.class); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index 3987c9dbc..87bf184a2 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -54,6 +54,7 @@ import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagePersisterConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; @@ -118,7 +119,7 @@ class MessagePersisterTest { messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC()); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, - dynamicConfigurationManager, PERSIST_DELAY, 1); + dynamicConfigurationManager, mock(ExperimentEnrollmentManager.class), PERSIST_DELAY, 1); when(messagesManager.clear(any(UUID.class), anyByte())).thenReturn(CompletableFuture.completedFuture(null)); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index efd6c4d1c..35a704307 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -47,6 +47,7 @@ import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.push.PushNotificationManager; @@ -141,7 +142,8 @@ class WebSocketConnectionIntegrationTest { scheduledExecutorService, messageDeliveryScheduler, clientReleaseManager, - mock(MessageDeliveryLoopMonitor.class)); + mock(MessageDeliveryLoopMonitor.class), + mock(ExperimentEnrollmentManager.class)); final List expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount); @@ -229,7 +231,8 @@ class WebSocketConnectionIntegrationTest { scheduledExecutorService, messageDeliveryScheduler, clientReleaseManager, - mock(MessageDeliveryLoopMonitor.class)); + mock(MessageDeliveryLoopMonitor.class), + mock(ExperimentEnrollmentManager.class)); final int persistedMessageCount = 207; final int cachedMessageCount = 173; @@ -299,7 +302,8 @@ class WebSocketConnectionIntegrationTest { scheduledExecutorService, messageDeliveryScheduler, clientReleaseManager, - mock(MessageDeliveryLoopMonitor.class)); + mock(MessageDeliveryLoopMonitor.class), + mock(ExperimentEnrollmentManager.class)); final int persistedMessageCount = 207; final int cachedMessageCount = 173; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index be7ff7b66..154a254b7 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -54,6 +54,7 @@ import org.junit.jupiter.api.Test; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice; +import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; @@ -125,7 +126,8 @@ class WebSocketConnectionTest { AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, messagesManager, new MessageMetrics(), mock(PushNotificationManager.class), mock(PushNotificationScheduler.class), mock(WebSocketConnectionEventManager.class), retrySchedulingExecutor, - messageDeliveryScheduler, clientReleaseManager, mock(MessageDeliveryLoopMonitor.class)); + messageDeliveryScheduler, clientReleaseManager, mock(MessageDeliveryLoopMonitor.class), + mock(ExperimentEnrollmentManager.class)); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD)))) @@ -629,7 +631,8 @@ class WebSocketConnectionTest { private WebSocketConnection webSocketConnection(final WebSocketClient client) { return new WebSocketConnection(receiptSender, messagesManager, new MessageMetrics(), mock(PushNotificationManager.class), mock(PushNotificationScheduler.class), auth, client, - retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager, mock(MessageDeliveryLoopMonitor.class)); + retrySchedulingExecutor, Schedulers.immediate(), clientReleaseManager, + mock(MessageDeliveryLoopMonitor.class), mock(ExperimentEnrollmentManager.class)); } @Test