diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java index 2780d78a0..1277c3c07 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java @@ -105,7 +105,7 @@ public class PushNotificationManager { // APNs imposes a per-device limit on background push notifications; schedule a notification for some time in the // future (possibly even now!) rather than sending a notification directly return pushNotificationScheduler - .scheduleBackgroundApnsNotification(pushNotification.destination(), pushNotification.destinationDevice()) + .scheduleBackgroundNotification(pushNotification.tokenType(), pushNotification.destination(), pushNotification.destinationDevice()) .whenComplete(logErrors()) .thenApply(ignored -> Optional.empty()) .toCompletableFuture(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationScheduler.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationScheduler.java index 5596cba81..fe3e3c399 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationScheduler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationScheduler.java @@ -13,7 +13,6 @@ import io.lettuce.core.Range; import io.lettuce.core.ScriptOutputType; import io.lettuce.core.SetArgs; import io.lettuce.core.cluster.SlotHash; -import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import java.io.IOException; import java.time.Clock; @@ -44,14 +43,15 @@ public class PushNotificationScheduler implements Managed { private static final Logger logger = LoggerFactory.getLogger(PushNotificationScheduler.class); - private static final String PENDING_BACKGROUND_NOTIFICATIONS_KEY_PREFIX = "PENDING_BACKGROUND_APN"; + private static final String PENDING_BACKGROUND_APN_NOTIFICATIONS_KEY_PREFIX = "PENDING_BACKGROUND_APN"; + private static final String PENDING_BACKGROUND_FCM_NOTIFICATIONS_KEY_PREFIX = "PENDING_BACKGROUND_FCM"; private static final String LAST_BACKGROUND_NOTIFICATION_TIMESTAMP_KEY_PREFIX = "LAST_BACKGROUND_NOTIFICATION"; private static final String PENDING_DELAYED_NOTIFICATIONS_KEY_PREFIX = "DELAYED"; @VisibleForTesting static final String NEXT_SLOT_TO_PROCESS_KEY = "pending_notification_next_slot"; - private static final Counter BACKGROUND_NOTIFICATION_SCHEDULED_COUNTER = Metrics.counter(name(PushNotificationScheduler.class, "backgroundNotification", "scheduled")); + private static final String BACKGROUND_NOTIFICATION_SCHEDULED_COUNTER_NAME = name(PushNotificationScheduler.class, "backgroundNotification", "scheduled"); private static final String BACKGROUND_NOTIFICATION_SENT_COUNTER_NAME = name(PushNotificationScheduler.class, "backgroundNotification", "sent"); private static final String DELAYED_NOTIFICATION_SCHEDULED_COUNTER_NAME = name(PushNotificationScheduler.class, "delayedNotificationScheduled"); @@ -65,7 +65,7 @@ public class PushNotificationScheduler implements Managed { private final FaultTolerantRedisClusterClient pushSchedulingCluster; private final Clock clock; - private final ClusterLuaScript scheduleBackgroundApnsNotificationScript; + private final ClusterLuaScript scheduleBackgroundNotificationScript; private final Thread[] workerThreads; @@ -103,15 +103,18 @@ public class PushNotificationScheduler implements Managed { final int slot = (int) (pushSchedulingCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PROCESS_KEY)) % SlotHash.SLOT_COUNT); - return processScheduledBackgroundApnsNotifications(slot) + processScheduledDelayedNotifications(slot); + return processScheduledBackgroundNotifications(PushNotification.TokenType.APN, slot) + + processScheduledBackgroundNotifications(PushNotification.TokenType.FCM, slot) + + processScheduledDelayedNotifications(slot); } @VisibleForTesting - long processScheduledBackgroundApnsNotifications(final int slot) { - return processScheduledNotifications(getPendingBackgroundApnsNotificationQueueKey(slot), - PushNotificationScheduler.this::sendBackgroundApnsNotification); + long processScheduledBackgroundNotifications(PushNotification.TokenType tokenType, final int slot) { + return processScheduledNotifications(getPendingBackgroundNotificationQueueKey(tokenType, slot), + (account, device) -> sendBackgroundNotification(tokenType, account, device)); } + @VisibleForTesting long processScheduledDelayedNotifications(final int slot) { return processScheduledNotifications(getDelayedNotificationQueueKey(slot), @@ -172,7 +175,7 @@ public class PushNotificationScheduler implements Managed { this.pushSchedulingCluster = pushSchedulingCluster; this.clock = clock; - this.scheduleBackgroundApnsNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, + this.scheduleBackgroundNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/schedule_background_notification.lua", ScriptOutputType.VALUE); this.workerThreads = new Thread[dedicatedProcessThreadCount]; @@ -183,27 +186,25 @@ public class PushNotificationScheduler implements Managed { } /** - * Schedule a background APNs notification to be sent some time in the future. + * Schedule a background push notification to be sent some time in the future. * * @return A CompletionStage that completes when the notification has successfully been scheduled * - * @throws IllegalArgumentException if the given device does not have an APNs token + * @throws IllegalArgumentException if the given device does not have a push token */ - public CompletionStage scheduleBackgroundApnsNotification(final Account account, final Device device) { - if (StringUtils.isBlank(device.getApnId())) { - throw new IllegalArgumentException("Device must have an APNs token"); + public CompletionStage scheduleBackgroundNotification(final PushNotification.TokenType tokenType, final Account account, final Device device) { + if (StringUtils.isBlank(getPushToken(tokenType, device))) { + throw new IllegalArgumentException("Device must have an " + tokenType + " token"); } - - BACKGROUND_NOTIFICATION_SCHEDULED_COUNTER.increment(); - - return scheduleBackgroundApnsNotificationScript.executeAsync( - List.of( - getLastBackgroundApnsNotificationTimestampKey(account, device), - getPendingBackgroundApnsNotificationQueueKey(account, device)), - List.of( - encodeAciAndDeviceId(account, device), - String.valueOf(clock.millis()), - String.valueOf(BACKGROUND_NOTIFICATION_PERIOD.toMillis()))) + Metrics.counter(BACKGROUND_NOTIFICATION_SCHEDULED_COUNTER_NAME, "type", tokenType.name()).increment(); + return scheduleBackgroundNotificationScript.executeAsync( + List.of( + getLastBackgroundNotificationTimestampKey(account, device), + getPendingBackgroundNotificationQueueKey(tokenType, account, device)), + List.of( + encodeAciAndDeviceId(account, device), + String.valueOf(clock.millis()), + String.valueOf(BACKGROUND_NOTIFICATION_PERIOD.toMillis()))) .thenRun(Util.NOOP); } @@ -236,14 +237,15 @@ public class PushNotificationScheduler implements Managed { */ public CompletionStage cancelScheduledNotifications(Account account, Device device) { return CompletableFuture.allOf( - cancelBackgroundApnsNotifications(account, device), + cancelBackgroundNotifications(PushNotification.TokenType.FCM, account, device), + cancelBackgroundNotifications(PushNotification.TokenType.APN, account, device), cancelDelayedNotifications(account, device)); } @VisibleForTesting - CompletableFuture cancelBackgroundApnsNotifications(final Account account, final Device device) { + CompletableFuture cancelBackgroundNotifications(final PushNotification.TokenType tokenType, final Account account, final Device device) { return pushSchedulingCluster.withCluster(connection -> connection.async() - .zrem(getPendingBackgroundApnsNotificationQueueKey(account, device), encodeAciAndDeviceId(account, device))) + .zrem(getPendingBackgroundNotificationQueueKey(tokenType, account, device), encodeAciAndDeviceId(account, device))) .thenRun(Util.NOOP) .toCompletableFuture(); } @@ -276,17 +278,23 @@ public class PushNotificationScheduler implements Managed { } @VisibleForTesting - CompletableFuture sendBackgroundApnsNotification(final Account account, final Device device) { - if (StringUtils.isBlank(device.getApnId())) { + CompletableFuture sendBackgroundNotification(PushNotification.TokenType tokenType, final Account account, final Device device) { + final String pushToken = getPushToken(tokenType, device); + if (StringUtils.isBlank(pushToken)) { return CompletableFuture.completedFuture(null); } + final PushNotificationSender sender = switch (tokenType) { + case FCM -> fcmSender; + case APN -> apnSender; + }; + // It's okay for the "last notification" timestamp to expire after the "cooldown" period has elapsed; a missing // timestamp and a timestamp older than the period are functionally equivalent. return pushSchedulingCluster.withCluster(connection -> connection.async().set( - getLastBackgroundApnsNotificationTimestampKey(account, device), + getLastBackgroundNotificationTimestampKey(account, device), String.valueOf(clock.millis()), new SetArgs().ex(BACKGROUND_NOTIFICATION_PERIOD))) - .thenCompose(ignored -> apnSender.sendNotification(new PushNotification(device.getApnId(), PushNotification.TokenType.APN, PushNotification.NotificationType.NOTIFICATION, null, account, device, false))) + .thenCompose(ignored -> sender.sendNotification(new PushNotification(pushToken, tokenType, PushNotification.NotificationType.NOTIFICATION, null, account, device, false))) .thenAccept(response -> Metrics.counter(BACKGROUND_NOTIFICATION_SENT_COUNTER_NAME, ACCEPTED_TAG, String.valueOf(response.accepted())) .increment()) @@ -321,6 +329,10 @@ public class PushNotificationScheduler implements Managed { @VisibleForTesting static String encodeAciAndDeviceId(final Account account, final Device device) { + // Note: This does not include a device registration id. If a device is unlinked and a new device is linked with + // the original device's id, the new device might get the old device's scheduled push, or the new device might + // delay its own push because the old device had a recent push. An extra or delayed background push is harmless, + // so this is okay. return account.getUuid() + ":" + device.getId(); } @@ -351,15 +363,19 @@ public class PushNotificationScheduler implements Managed { } @VisibleForTesting - static String getPendingBackgroundApnsNotificationQueueKey(final Account account, final Device device) { - return getPendingBackgroundApnsNotificationQueueKey(SlotHash.getSlot(encodeAciAndDeviceId(account, device))); + static String getPendingBackgroundNotificationQueueKey(final PushNotification.TokenType tokenType, final Account account, final Device device) { + return getPendingBackgroundNotificationQueueKey(tokenType, SlotHash.getSlot(encodeAciAndDeviceId(account, device))); } - private static String getPendingBackgroundApnsNotificationQueueKey(final int slot) { - return PENDING_BACKGROUND_NOTIFICATIONS_KEY_PREFIX + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}"; + private static String getPendingBackgroundNotificationQueueKey(final PushNotification.TokenType tokenType, final int slot) { + final String prefix = switch (tokenType) { + case APN -> PENDING_BACKGROUND_APN_NOTIFICATIONS_KEY_PREFIX; + case FCM -> PENDING_BACKGROUND_FCM_NOTIFICATIONS_KEY_PREFIX; + }; + return prefix + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}"; } - private static String getLastBackgroundApnsNotificationTimestampKey(final Account account, final Device device) { + private static String getLastBackgroundNotificationTimestampKey(final Account account, final Device device) { return LAST_BACKGROUND_NOTIFICATION_TIMESTAMP_KEY_PREFIX + "::{" + encodeAciAndDeviceId(account, device) + "}"; } @@ -376,15 +392,15 @@ public class PushNotificationScheduler implements Managed { Optional getLastBackgroundApnsNotificationTimestamp(final Account account, final Device device) { return Optional.ofNullable( pushSchedulingCluster.withCluster(connection -> - connection.sync().get(getLastBackgroundApnsNotificationTimestampKey(account, device)))) + connection.sync().get(getLastBackgroundNotificationTimestampKey(account, device)))) .map(timestampString -> Instant.ofEpochMilli(Long.parseLong(timestampString))); } @VisibleForTesting - Optional getNextScheduledBackgroundApnsNotificationTimestamp(final Account account, final Device device) { + Optional getNextScheduledBackgroundNotificationTimestamp(PushNotification.TokenType tokenType, final Account account, final Device device) { return Optional.ofNullable( pushSchedulingCluster.withCluster(connection -> - connection.sync().zscore(getPendingBackgroundApnsNotificationQueueKey(account, device), + connection.sync().zscore(getPendingBackgroundNotificationQueueKey(tokenType, account, device), encodeAciAndDeviceId(account, device)))) .map(timestamp -> Instant.ofEpochMilli(timestamp.longValue())); } @@ -407,4 +423,11 @@ public class PushNotificationScheduler implements Managed { return "unknown"; } } + + private static String getPushToken(final PushNotification.TokenType tokenType, final Device device) { + return switch (tokenType) { + case FCM -> device.getGcmId(); + case APN -> device.getApnId(); + }; + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java index c1153d6e4..cc7469aa2 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java @@ -171,7 +171,7 @@ class PushNotificationManagerTest { .thenReturn(CompletableFuture.completedFuture(new SendPushNotificationResult(true, Optional.empty(), false, Optional.empty()))); if (!urgent) { - when(pushNotificationScheduler.scheduleBackgroundApnsNotification(account, device)) + when(pushNotificationScheduler.scheduleBackgroundNotification(PushNotification.TokenType.APN, account, device)) .thenReturn(CompletableFuture.completedFuture(null)); } @@ -184,7 +184,7 @@ class PushNotificationManagerTest { verifyNoInteractions(pushNotificationScheduler); } else { verifyNoInteractions(apnSender); - verify(pushNotificationScheduler).scheduleBackgroundApnsNotification(account, device); + verify(pushNotificationScheduler).scheduleBackgroundNotification(PushNotification.TokenType.APN, account, device); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationSchedulerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationSchedulerTest.java index ebfb7f259..29d6f51b3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationSchedulerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationSchedulerTest.java @@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.ArgumentCaptor; import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; @@ -56,6 +57,7 @@ class PushNotificationSchedulerTest { private static final String ACCOUNT_NUMBER = "+18005551234"; private static final byte DEVICE_ID = 1; private static final String APN_ID = RandomStringUtils.secure().nextAlphanumeric(32); + private static final String GCM_ID = RandomStringUtils.secure().nextAlphanumeric(32); @BeforeEach void setUp() throws Exception { @@ -63,6 +65,7 @@ class PushNotificationSchedulerTest { device = mock(Device.class); when(device.getId()).thenReturn(DEVICE_ID); when(device.getApnId()).thenReturn(APN_ID); + when(device.getGcmId()).thenReturn(GCM_ID); when(device.getLastSeen()).thenReturn(System.currentTimeMillis()); account = mock(Account.class); @@ -90,8 +93,9 @@ class PushNotificationSchedulerTest { apnSender, fcmSender, accountsManager, clock, 1, 1); } - @Test - void testScheduleBackgroundNotificationWithNoRecentApnsNotification() throws ExecutionException, InterruptedException { + @ParameterizedTest + @EnumSource(PushNotification.TokenType.class) + void testScheduleBackgroundNotificationWithNoRecentApnsNotification(PushNotification.TokenType tokenType) throws ExecutionException, InterruptedException { final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); clock.pin(now); @@ -99,97 +103,107 @@ class PushNotificationSchedulerTest { pushNotificationScheduler.getLastBackgroundApnsNotificationTimestamp(account, device)); assertEquals(Optional.empty(), - pushNotificationScheduler.getNextScheduledBackgroundApnsNotificationTimestamp(account, device)); + pushNotificationScheduler.getNextScheduledBackgroundNotificationTimestamp(tokenType, account, device)); - pushNotificationScheduler.scheduleBackgroundApnsNotification(account, device).toCompletableFuture().get(); + pushNotificationScheduler.scheduleBackgroundNotification(tokenType, account, device).toCompletableFuture().get(); assertEquals(Optional.of(now), - pushNotificationScheduler.getNextScheduledBackgroundApnsNotificationTimestamp(account, device)); + pushNotificationScheduler.getNextScheduledBackgroundNotificationTimestamp(tokenType, account, device)); } - @Test - void testScheduleBackgroundNotificationWithRecentApnsNotification() throws ExecutionException, InterruptedException { + @ParameterizedTest + @EnumSource(PushNotification.TokenType.class) + void testScheduleBackgroundNotificationWithRecentNotification(PushNotification.TokenType tokenType) throws ExecutionException, InterruptedException { final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); final Instant recentNotificationTimestamp = now.minus(PushNotificationScheduler.BACKGROUND_NOTIFICATION_PERIOD.dividedBy(2)); // Insert a timestamp for a recently-sent background push notification clock.pin(Instant.ofEpochMilli(recentNotificationTimestamp.toEpochMilli())); - pushNotificationScheduler.sendBackgroundApnsNotification(account, device); + pushNotificationScheduler.sendBackgroundNotification(tokenType, account, device); clock.pin(now); - pushNotificationScheduler.scheduleBackgroundApnsNotification(account, device).toCompletableFuture().get(); + pushNotificationScheduler.scheduleBackgroundNotification(tokenType, account, device).toCompletableFuture().get(); final Instant expectedScheduledTimestamp = recentNotificationTimestamp.plus(PushNotificationScheduler.BACKGROUND_NOTIFICATION_PERIOD); assertEquals(Optional.of(expectedScheduledTimestamp), - pushNotificationScheduler.getNextScheduledBackgroundApnsNotificationTimestamp(account, device)); + pushNotificationScheduler.getNextScheduledBackgroundNotificationTimestamp(tokenType, account, device)); } - @Test - void testCancelBackgroundApnsNotifications() { + @ParameterizedTest + @EnumSource(PushNotification.TokenType.class) + void testCancelBackgroundApnsNotifications(PushNotification.TokenType tokenType) { final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); clock.pin(now); - pushNotificationScheduler.scheduleBackgroundApnsNotification(account, device).toCompletableFuture().join(); - pushNotificationScheduler.cancelBackgroundApnsNotifications(account, device).join(); + pushNotificationScheduler.scheduleBackgroundNotification(tokenType, account, device).toCompletableFuture().join(); + pushNotificationScheduler.cancelBackgroundNotifications(tokenType, account, device).join(); assertEquals(Optional.empty(), pushNotificationScheduler.getLastBackgroundApnsNotificationTimestamp(account, device)); assertEquals(Optional.empty(), - pushNotificationScheduler.getNextScheduledBackgroundApnsNotificationTimestamp(account, device)); + pushNotificationScheduler.getNextScheduledBackgroundNotificationTimestamp(tokenType, account, device)); } - @Test - void testProcessScheduledBackgroundNotifications() { + @ParameterizedTest + @EnumSource(PushNotification.TokenType.class) + void testProcessScheduledBackgroundNotifications(PushNotification.TokenType tokenType) { final PushNotificationScheduler.NotificationWorker worker = pushNotificationScheduler.new NotificationWorker(1); final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); clock.pin(Instant.ofEpochMilli(now.toEpochMilli())); - pushNotificationScheduler.scheduleBackgroundApnsNotification(account, device).toCompletableFuture().join(); + pushNotificationScheduler.scheduleBackgroundNotification(tokenType, account, device).toCompletableFuture().join(); final int slot = - SlotHash.getSlot(PushNotificationScheduler.getPendingBackgroundApnsNotificationQueueKey(account, device)); + SlotHash.getSlot(PushNotificationScheduler.getPendingBackgroundNotificationQueueKey(tokenType, account, device)); clock.pin(Instant.ofEpochMilli(now.minusMillis(1).toEpochMilli())); - assertEquals(0, worker.processScheduledBackgroundApnsNotifications(slot)); + assertEquals(0, worker.processScheduledBackgroundNotifications(tokenType, slot)); clock.pin(now); - assertEquals(1, worker.processScheduledBackgroundApnsNotifications(slot)); + assertEquals(1, worker.processScheduledBackgroundNotifications(tokenType, slot)); final ArgumentCaptor notificationCaptor = ArgumentCaptor.forClass(PushNotification.class); - verify(apnSender).sendNotification(notificationCaptor.capture()); + verify(switch (tokenType) { + case FCM -> fcmSender; + case APN -> apnSender; + }).sendNotification(notificationCaptor.capture()); final PushNotification pushNotification = notificationCaptor.getValue(); - assertEquals(PushNotification.TokenType.APN, pushNotification.tokenType()); - assertEquals(APN_ID, pushNotification.deviceToken()); + assertEquals(tokenType, pushNotification.tokenType()); + assertEquals(switch (tokenType) { + case FCM -> GCM_ID; + case APN -> APN_ID; + }, pushNotification.deviceToken()); assertEquals(account, pushNotification.destination()); assertEquals(device, pushNotification.destinationDevice()); assertEquals(PushNotification.NotificationType.NOTIFICATION, pushNotification.notificationType()); assertFalse(pushNotification.urgent()); assertEquals(Optional.empty(), - pushNotificationScheduler.getNextScheduledBackgroundApnsNotificationTimestamp(account, device)); + pushNotificationScheduler.getNextScheduledBackgroundNotificationTimestamp(tokenType, account, device)); } - @Test - void testProcessScheduledBackgroundNotificationsCancelled() throws ExecutionException, InterruptedException { + @ParameterizedTest + @EnumSource(PushNotification.TokenType.class) + void testProcessScheduledBackgroundNotificationsCancelled(PushNotification.TokenType tokenType) throws ExecutionException, InterruptedException { final PushNotificationScheduler.NotificationWorker worker = pushNotificationScheduler.new NotificationWorker(1); final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); clock.pin(now); - pushNotificationScheduler.scheduleBackgroundApnsNotification(account, device).toCompletableFuture().get(); + pushNotificationScheduler.scheduleBackgroundNotification(tokenType, account, device).toCompletableFuture().get(); pushNotificationScheduler.cancelScheduledNotifications(account, device).toCompletableFuture().get(); final int slot = - SlotHash.getSlot(PushNotificationScheduler.getPendingBackgroundApnsNotificationQueueKey(account, device)); + SlotHash.getSlot(PushNotificationScheduler.getPendingBackgroundNotificationQueueKey(tokenType, account, device)); - assertEquals(0, worker.processScheduledBackgroundApnsNotifications(slot)); + assertEquals(0, worker.processScheduledBackgroundNotifications(tokenType, slot)); verify(apnSender, never()).sendNotification(any()); }