diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java index 16ff36887..340a67822 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java @@ -141,18 +141,6 @@ public class PushNotificationManager { result.errorCode(), result.unregisteredTimestamp()); } - - if (result.accepted() && - pushNotification.tokenType() == PushNotification.TokenType.APN_VOIP && - pushNotification.notificationType() == PushNotification.NotificationType.NOTIFICATION && - pushNotification.destination() != null && - pushNotification.destinationDevice() != null) { - - pushNotificationScheduler.scheduleRecurringApnsVoipNotification( - pushNotification.destination(), - pushNotification.destinationDevice()) - .whenComplete(logErrors()); - } } else { logger.debug("Failed to deliver {} push notification to {} ({})", pushNotification.notificationType(), pushNotification.deviceToken(), pushNotification.tokenType(), diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationScheduler.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationScheduler.java index b24f8008d..7c2fb8f37 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationScheduler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationScheduler.java @@ -19,20 +19,17 @@ import java.io.IOException; import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.storage.Account; @@ -41,14 +38,12 @@ import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.RedisClusterUtil; import org.whispersystems.textsecuregcm.util.Util; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class PushNotificationScheduler implements Managed { private static final Logger logger = LoggerFactory.getLogger(PushNotificationScheduler.class); - private static final String PENDING_RECURRING_VOIP_NOTIFICATIONS_KEY_PREFIX = "PENDING_APN"; private static final String PENDING_BACKGROUND_NOTIFICATIONS_KEY_PREFIX = "PENDING_BACKGROUND_APN"; private static final String LAST_BACKGROUND_NOTIFICATION_TIMESTAMP_KEY_PREFIX = "LAST_BACKGROUND_NOTIFICATION"; private static final String PENDING_DELAYED_NOTIFICATIONS_KEY_PREFIX = "DELAYED"; @@ -56,11 +51,6 @@ public class PushNotificationScheduler implements Managed { @VisibleForTesting static final String NEXT_SLOT_TO_PROCESS_KEY = "pending_notification_next_slot"; - private static final Counter delivered = Metrics.counter("chat.ApnPushNotificationScheduler.voip_delivered"); - private static final Counter sent = Metrics.counter("chat.ApnPushNotificationScheduler.voip_sent"); - private static final Counter retry = Metrics.counter("chat.ApnPushNotificationScheduler.voip_retry"); - private static final Counter evicted = Metrics.counter("chat.ApnPushNotificationScheduler.voip_evicted"); - private static final Counter BACKGROUND_NOTIFICATION_SCHEDULED_COUNTER = Metrics.counter(name(PushNotificationScheduler.class, "backgroundNotification", "scheduled")); private static final String BACKGROUND_NOTIFICATION_SENT_COUNTER_NAME = name(PushNotificationScheduler.class, "backgroundNotification", "sent"); @@ -75,10 +65,6 @@ public class PushNotificationScheduler implements Managed { private final FaultTolerantRedisCluster pushSchedulingCluster; private final Clock clock; - private final ClusterLuaScript getPendingVoipDestinationsScript; - private final ClusterLuaScript insertPendingVoipDestinationScript; - private final ClusterLuaScript removePendingVoipDestinationScript; - private final ClusterLuaScript scheduleBackgroundApnsNotificationScript; private final Thread[] workerThreads; @@ -117,37 +103,7 @@ public class PushNotificationScheduler implements Managed { final int slot = (int) (pushSchedulingCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PROCESS_KEY)) % SlotHash.SLOT_COUNT); - return processRecurringApnsVoipNotifications(slot) + - processScheduledBackgroundApnsNotifications(slot) + - processScheduledDelayedNotifications(slot); - } - - @VisibleForTesting - long processRecurringApnsVoipNotifications(final int slot) { - List pendingDestinations; - long entriesProcessed = 0; - - do { - pendingDestinations = getPendingDestinationsForRecurringApnsVoipNotifications(slot, PAGE_SIZE); - entriesProcessed += pendingDestinations.size(); - - Flux.fromIterable(pendingDestinations) - .flatMap(destination -> Mono.fromFuture(() -> getAccountAndDeviceFromPairString(destination)) - .flatMap(maybeAccountAndDevice -> { - if (maybeAccountAndDevice.isPresent()) { - final Pair accountAndDevice = maybeAccountAndDevice.get(); - return Mono.fromFuture(() -> sendRecurringApnsVoipNotification(accountAndDevice.first(), accountAndDevice.second())); - } else { - final Pair aciAndDeviceId = decodeAciAndDeviceId(destination); - return Mono.fromFuture(() -> removeRecurringApnsVoipNotificationEntry(aciAndDeviceId.first(), aciAndDeviceId.second())) - .then(); - } - }), maxConcurrency) - .then() - .block(); - } while (!pendingDestinations.isEmpty()); - - return entriesProcessed; + return processScheduledBackgroundApnsNotifications(slot) + processScheduledDelayedNotifications(slot); } @VisibleForTesting @@ -216,13 +172,6 @@ public class PushNotificationScheduler implements Managed { this.pushSchedulingCluster = pushSchedulingCluster; this.clock = clock; - this.getPendingVoipDestinationsScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/get.lua", - ScriptOutputType.MULTI); - this.insertPendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/insert.lua", - ScriptOutputType.VALUE); - this.removePendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/remove.lua", - ScriptOutputType.INTEGER); - this.scheduleBackgroundApnsNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/schedule_background_notification.lua", ScriptOutputType.VALUE); @@ -233,17 +182,6 @@ public class PushNotificationScheduler implements Managed { } } - /** - * Schedule a recurring VOIP notification until {@link this#cancelScheduledNotifications} is called or the device is - * removed - * - * @return A CompletionStage that completes when the recurring notification has successfully been scheduled - */ - public CompletionStage scheduleRecurringApnsVoipNotification(Account account, Device device) { - sent.increment(); - return insertRecurringApnsVoipNotificationEntry(account.getIdentifier(IdentityType.ACI), device.getId(), clock.millis() + (15 * 1000), (15 * 1000)); - } - /** * Schedule a background APNs notification to be sent some time in the future. * @@ -292,32 +230,16 @@ public class PushNotificationScheduler implements Managed { } /** - * Cancel a scheduled recurring VOIP notification + * Cancel scheduled notifications for the given account and device. * - * @return A CompletionStage that completes when the scheduled task has been cancelled. + * @return A CompletionStage that completes when the scheduled notification has been cancelled. */ public CompletionStage cancelScheduledNotifications(Account account, Device device) { return CompletableFuture.allOf( - cancelRecurringApnsVoipNotifications(account, device), cancelBackgroundApnsNotifications(account, device), cancelDelayedNotifications(account, device)); } - private CompletableFuture cancelRecurringApnsVoipNotifications(final Account account, final Device device) { - return removeRecurringApnsVoipNotificationEntry(account.getIdentifier(IdentityType.ACI), device.getId()) - .thenCompose(removed -> { - if (removed) { - delivered.increment(); - } - return pushSchedulingCluster.withCluster(connection -> - connection.async().zrem( - getPendingBackgroundApnsNotificationQueueKey(account, device), - encodeAciAndDeviceId(account, device))); - }) - .thenRun(Util.NOOP) - .toCompletableFuture(); - } - @VisibleForTesting CompletableFuture cancelBackgroundApnsNotifications(final Account account, final Device device) { return pushSchedulingCluster.withCluster(connection -> connection.async() @@ -353,21 +275,6 @@ public class PushNotificationScheduler implements Managed { } } - private CompletableFuture sendRecurringApnsVoipNotification(final Account account, final Device device) { - if (StringUtils.isBlank(device.getVoipApnId())) { - return removeRecurringApnsVoipNotificationEntry(account.getIdentifier(IdentityType.ACI), device.getId()) - .thenRun(Util.NOOP); - } - - if (device.getLastSeen() < clock.millis() - TimeUnit.DAYS.toMillis(7)) { - return removeRecurringApnsVoipNotificationEntry(account.getIdentifier(IdentityType.ACI), device.getId()) - .thenRun(evicted::increment); - } - - return apnSender.sendNotification(new PushNotification(device.getVoipApnId(), PushNotification.TokenType.APN_VOIP, PushNotification.NotificationType.NOTIFICATION, null, account, device, true)) - .thenRun(retry::increment); - } - @VisibleForTesting CompletableFuture sendBackgroundApnsNotification(final Account account, final Device device) { if (StringUtils.isBlank(device.getApnId())) { @@ -443,48 +350,6 @@ public class PushNotificationScheduler implements Managed { .flatMap(account -> account.getDevice(aciAndDeviceId.second()).map(device -> new Pair<>(account, device)))); } - private CompletableFuture removeRecurringApnsVoipNotificationEntry(final UUID aci, final byte deviceId) { - final String endpoint = getVoipEndpointKey(aci, deviceId); - - return removePendingVoipDestinationScript.executeAsync( - List.of(getPendingRecurringApnsVoipNotificationQueueKey(endpoint), endpoint), Collections.emptyList()) - .thenApply(result -> ((long) result) > 0); - } - - @SuppressWarnings("unchecked") - @VisibleForTesting - List getPendingDestinationsForRecurringApnsVoipNotifications(final int slot, final int limit) { - return (List) getPendingVoipDestinationsScript.execute( - List.of(getPendingRecurringApnsVoipNotificationQueueKey(slot)), - List.of(String.valueOf(clock.millis()), String.valueOf(limit))); - } - - @SuppressWarnings("SameParameterValue") - private CompletionStage insertRecurringApnsVoipNotificationEntry(final UUID aci, final byte deviceId, final long timestamp, final long interval) { - final String endpoint = getVoipEndpointKey(aci, deviceId); - - return insertPendingVoipDestinationScript.executeAsync( - List.of(getPendingRecurringApnsVoipNotificationQueueKey(endpoint), endpoint), - List.of(String.valueOf(timestamp), - String.valueOf(interval), - aci.toString(), - String.valueOf(deviceId))) - .thenRun(Util.NOOP); - } - - @VisibleForTesting - static String getVoipEndpointKey(final UUID aci, final byte deviceId) { - return "apn_device::{" + aci + "::" + deviceId + "}"; - } - - private static String getPendingRecurringApnsVoipNotificationQueueKey(final String endpoint) { - return getPendingRecurringApnsVoipNotificationQueueKey(SlotHash.getSlot(endpoint)); - } - - private static String getPendingRecurringApnsVoipNotificationQueueKey(final int slot) { - return PENDING_RECURRING_VOIP_NOTIFICATIONS_KEY_PREFIX + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}"; - } - @VisibleForTesting static String getPendingBackgroundApnsNotificationQueueKey(final Account account, final Device device) { return getPendingBackgroundApnsNotificationQueueKey(SlotHash.getSlot(encodeAciAndDeviceId(account, device))); diff --git a/service/src/main/resources/lua/apn/get.lua b/service/src/main/resources/lua/apn/get.lua deleted file mode 100644 index 45d984cec..000000000 --- a/service/src/main/resources/lua/apn/get.lua +++ /dev/null @@ -1,70 +0,0 @@ -local pendingNotificationQueue = KEYS[1] - -local maxTime = ARGV[1] -local limit = ARGV[2] - -local hgetall = function (key) - local bulk = redis.call('HGETALL', key) - local result = {} - local nextkey - for i, v in ipairs(bulk) do - if i % 2 == 1 then - nextkey = v - else - result[nextkey] = v - end - end - return result -end - -local getNextInterval = function(interval) - if interval < 20000 then - return 20000 - end - - if interval < 40000 then - return 40000 - end - - if interval < 80000 then - return 80000 - end - - if interval < 160000 then - return 160000 - end - - if interval < 600000 then - return 600000 - end - - if interval < 1800000 then - return 1800000 - end - - return 3600000 -end - - -local results = redis.call("ZRANGE", pendingNotificationQueue, 0, maxTime, "BYSCORE", "LIMIT", 0, limit) -local collated = {} - -if results and next(results) then - for i, name in ipairs(results) do - local pending = hgetall(name) - local lastInterval = pending["interval"] - - if lastInterval == nil then - lastInterval = 0 - end - - local nextInterval = getNextInterval(tonumber(lastInterval)) - - redis.call("HSET", name, "interval", nextInterval) - redis.call("ZADD", pendingNotificationQueue, tonumber(maxTime) + nextInterval, name) - - collated[i] = pending["account"] .. ":" .. pending["device"] - end -end - -return collated diff --git a/service/src/main/resources/lua/apn/insert.lua b/service/src/main/resources/lua/apn/insert.lua deleted file mode 100644 index 3512f22a3..000000000 --- a/service/src/main/resources/lua/apn/insert.lua +++ /dev/null @@ -1,14 +0,0 @@ -local pendingNotificationQueue = KEYS[1] -local endpoint = KEYS[2] - -local timestamp = ARGV[1] -local interval = ARGV[2] -local account = ARGV[3] -local deviceId = ARGV[4] - -redis.call("HSET", endpoint, "created", timestamp) -redis.call("HSET", endpoint, "interval", interval) -redis.call("HSET", endpoint, "account", account) -redis.call("HSET", endpoint, "device", deviceId) - -redis.call("ZADD", pendingNotificationQueue, timestamp, endpoint) diff --git a/service/src/main/resources/lua/apn/remove.lua b/service/src/main/resources/lua/apn/remove.lua deleted file mode 100644 index c2fb84e42..000000000 --- a/service/src/main/resources/lua/apn/remove.lua +++ /dev/null @@ -1,5 +0,0 @@ -local pendingNotificationQueue = KEYS[1] -local endpoint = KEYS[2] - -redis.call("DEL", endpoint) -return redis.call("ZREM", pendingNotificationQueue, endpoint) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java index 039f5cee6..3c319789d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java @@ -188,32 +188,6 @@ class PushNotificationManagerTest { } } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testSendNotificationApnVoip(final boolean urgent) { - final Account account = mock(Account.class); - final Device device = mock(Device.class); - - when(device.getId()).thenReturn(Device.PRIMARY_ID); - when(account.getDevice(Device.PRIMARY_ID)).thenReturn(Optional.of(device)); - - final PushNotification pushNotification = new PushNotification( - "token", PushNotification.TokenType.APN_VOIP, PushNotification.NotificationType.NOTIFICATION, null, account, device, urgent); - - when(apnSender.sendNotification(pushNotification)) - .thenReturn(CompletableFuture.completedFuture(new SendPushNotificationResult(true, Optional.empty(), false, Optional.empty()))); - - pushNotificationManager.sendNotification(pushNotification); - - verify(apnSender).sendNotification(pushNotification); - - verifyNoInteractions(fcmSender); - verify(accountsManager, never()).updateDevice(eq(account), eq(Device.PRIMARY_ID), any()); - verify(device, never()).setGcmId(any()); - verify(pushNotificationScheduler).scheduleRecurringApnsVoipNotification(account, device); - verify(pushNotificationScheduler, never()).scheduleBackgroundApnsNotification(any(), any()); - } - @Test void testSendNotificationUnregisteredFcm() { final Account account = mock(Account.class); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationSchedulerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationSchedulerTest.java index ead73819e..236275cfc 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationSchedulerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationSchedulerTest.java @@ -7,7 +7,6 @@ package org.whispersystems.textsecuregcm.push; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; @@ -20,7 +19,6 @@ import io.lettuce.core.cluster.SlotHash; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -38,7 +36,6 @@ import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.TestClock; class PushNotificationSchedulerTest { @@ -95,57 +92,6 @@ class PushNotificationSchedulerTest { apnSender, fcmSender, accountsManager, clock, 1, 1); } - @Test - void testClusterInsert() throws ExecutionException, InterruptedException { - final String endpoint = PushNotificationScheduler.getVoipEndpointKey(ACCOUNT_UUID, DEVICE_ID); - final long currentTimeMillis = System.currentTimeMillis(); - - assertTrue( - pushNotificationScheduler.getPendingDestinationsForRecurringApnsVoipNotifications(SlotHash.getSlot(endpoint), 1).isEmpty()); - - clock.pin(Instant.ofEpochMilli(currentTimeMillis - 30_000)); - pushNotificationScheduler.scheduleRecurringApnsVoipNotification(account, device).toCompletableFuture().get(); - - clock.pin(Instant.ofEpochMilli(currentTimeMillis)); - final List pendingDestinations = pushNotificationScheduler.getPendingDestinationsForRecurringApnsVoipNotifications(SlotHash.getSlot(endpoint), 2); - assertEquals(1, pendingDestinations.size()); - - final Pair aciAndDeviceId = - PushNotificationScheduler.decodeAciAndDeviceId(pendingDestinations.getFirst()); - - assertEquals(ACCOUNT_UUID, aciAndDeviceId.first()); - assertEquals(DEVICE_ID, aciAndDeviceId.second()); - - assertTrue( - pushNotificationScheduler.getPendingDestinationsForRecurringApnsVoipNotifications(SlotHash.getSlot(endpoint), 1).isEmpty()); - } - - @Test - void testProcessRecurringVoipNotifications() throws ExecutionException, InterruptedException { - final PushNotificationScheduler.NotificationWorker worker = pushNotificationScheduler.new NotificationWorker(1); - final long currentTimeMillis = System.currentTimeMillis(); - - clock.pin(Instant.ofEpochMilli(currentTimeMillis - 30_000)); - pushNotificationScheduler.scheduleRecurringApnsVoipNotification(account, device).toCompletableFuture().get(); - - clock.pin(Instant.ofEpochMilli(currentTimeMillis)); - - final int slot = SlotHash.getSlot(PushNotificationScheduler.getVoipEndpointKey(ACCOUNT_UUID, DEVICE_ID)); - - assertEquals(1, worker.processRecurringApnsVoipNotifications(slot)); - - final ArgumentCaptor notificationCaptor = ArgumentCaptor.forClass(PushNotification.class); - verify(apnSender).sendNotification(notificationCaptor.capture()); - - final PushNotification pushNotification = notificationCaptor.getValue(); - - assertEquals(VOIP_APN_ID, pushNotification.deviceToken()); - assertEquals(account, pushNotification.destination()); - assertEquals(device, pushNotification.destinationDevice()); - - assertEquals(0, worker.processRecurringApnsVoipNotifications(slot)); - } - @Test void testScheduleBackgroundNotificationWithNoRecentApnsNotification() throws ExecutionException, InterruptedException { final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); @@ -228,8 +174,6 @@ class PushNotificationSchedulerTest { assertEquals(PushNotification.NotificationType.NOTIFICATION, pushNotification.notificationType()); assertFalse(pushNotification.urgent()); - assertEquals(0, worker.processRecurringApnsVoipNotifications(slot)); - assertEquals(Optional.empty(), pushNotificationScheduler.getNextScheduledBackgroundApnsNotificationTimestamp(account, device)); }