diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationScheduler.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationScheduler.java index 10654059b..93fcd8fe7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationScheduler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationScheduler.java @@ -11,6 +11,7 @@ import com.google.common.annotations.VisibleForTesting; import io.dropwizard.lifecycle.Managed; import io.lettuce.core.Limit; import io.lettuce.core.Range; +import io.lettuce.core.RedisException; import io.lettuce.core.ScriptOutputType; import io.lettuce.core.SetArgs; import io.lettuce.core.cluster.SlotHash; @@ -24,8 +25,12 @@ import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,7 +121,7 @@ public class ApnPushNotificationScheduler implements Managed { try { getAccountAndDeviceFromPairString(destination).ifPresentOrElse( accountAndDevice -> sendRecurringVoipNotification(accountAndDevice.first(), accountAndDevice.second()), - () -> removeRecurringVoipNotificationEntry(destination)); + () -> removeRecurringVoipNotificationEntrySync(destination)); } catch (final IllegalArgumentException e) { logger.warn("Failed to parse account/device pair: {}", destination, e); } @@ -189,31 +194,53 @@ public class ApnPushNotificationScheduler implements Managed { } } - void scheduleRecurringVoipNotification(Account account, Device device) { + /** + * Schedule a recurring VOIP notification until {@link this#cancelScheduledNotifications} is called or the device is + * removed + * + * @return A CompletionStage that completes when the recurring notification has successfully been scheduled + */ + public CompletionStage scheduleRecurringVoipNotification(Account account, Device device) { sent.increment(); - insertRecurringVoipNotificationEntry(account, device, clock.millis() + (15 * 1000), (15 * 1000)); + return insertRecurringVoipNotificationEntry(account, device, clock.millis() + (15 * 1000), (15 * 1000)); } - void scheduleBackgroundNotification(final Account account, final Device device) { + /** + * Schedule a background notification to be sent some time in the future + * + * @return A CompletionStage that completes when the notification has successfully been scheduled + */ + public CompletionStage scheduleBackgroundNotification(final Account account, final Device device) { backgroundNotificationScheduledCounter.increment(); - scheduleBackgroundNotificationScript.execute( + return scheduleBackgroundNotificationScript.executeAsync( List.of( getLastBackgroundNotificationTimestampKey(account, device), getPendingBackgroundNotificationQueueKey(account, device)), List.of( getPairString(account, device), String.valueOf(clock.millis()), - String.valueOf(BACKGROUND_NOTIFICATION_PERIOD.toMillis()))); + String.valueOf(BACKGROUND_NOTIFICATION_PERIOD.toMillis()))) + .thenAccept(dropValue()); } - public void cancelScheduledNotifications(Account account, Device device) { - if (removeRecurringVoipNotificationEntry(account, device)) { - delivered.increment(); - } - - pushSchedulingCluster.useCluster(connection -> - connection.sync().zrem(getPendingBackgroundNotificationQueueKey(account, device), getPairString(account, device))); + /** + * Cancel a scheduled recurring VOIP notification + * + * @return A CompletionStage that completes when the scheduled task has been cancelled. + */ + public CompletionStage cancelScheduledNotifications(Account account, Device device) { + return removeRecurringVoipNotificationEntry(account, device) + .thenCompose(removed -> { + if (removed) { + delivered.increment(); + } + return pushSchedulingCluster.withCluster(connection -> + connection.async().zrem( + getPendingBackgroundNotificationQueueKey(account, device), + getPairString(account, device))); + }) + .thenAccept(dropValue()); } @Override @@ -238,15 +265,14 @@ public class ApnPushNotificationScheduler implements Managed { String apnId = device.getVoipApnId(); if (apnId == null) { - removeRecurringVoipNotificationEntry(account, device); + removeRecurringVoipNotificationEntrySync(getEndpointKey(account, device)); return; } long deviceLastSeen = device.getLastSeen(); - if (deviceLastSeen < clock.millis() - TimeUnit.DAYS.toMillis(7)) { evicted.increment(); - removeRecurringVoipNotificationEntry(account, device); + removeRecurringVoipNotificationEntrySync(getEndpointKey(account, device)); return; } @@ -316,14 +342,28 @@ public class ApnPushNotificationScheduler implements Managed { } } - private boolean removeRecurringVoipNotificationEntry(Account account, Device device) { + private boolean removeRecurringVoipNotificationEntrySync(final String endpoint) { + try { + return removeRecurringVoipNotificationEntry(endpoint).toCompletableFuture().get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof RedisException re) { + throw re; + } + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private CompletionStage removeRecurringVoipNotificationEntry(Account account, Device device) { return removeRecurringVoipNotificationEntry(getEndpointKey(account, device)); } - private boolean removeRecurringVoipNotificationEntry(final String endpoint) { - return (long) removePendingVoipDestinationScript.execute( + private CompletionStage removeRecurringVoipNotificationEntry(final String endpoint) { + return removePendingVoipDestinationScript.executeAsync( List.of(getPendingRecurringVoipNotificationQueueKey(endpoint), endpoint), - Collections.emptyList()) > 0; + Collections.emptyList()) + .thenApply(result -> ((long) result) > 0); } @SuppressWarnings("unchecked") @@ -334,15 +374,16 @@ public class ApnPushNotificationScheduler implements Managed { List.of(String.valueOf(clock.millis()), String.valueOf(limit))); } - private void insertRecurringVoipNotificationEntry(final Account account, final Device device, final long timestamp, final long interval) { + private CompletionStage insertRecurringVoipNotificationEntry(final Account account, final Device device, final long timestamp, final long interval) { final String endpoint = getEndpointKey(account, device); - insertPendingVoipDestinationScript.execute( + return insertPendingVoipDestinationScript.executeAsync( List.of(getPendingRecurringVoipNotificationQueueKey(endpoint), endpoint), List.of(String.valueOf(timestamp), String.valueOf(interval), account.getUuid().toString(), - String.valueOf(device.getId()))); + String.valueOf(device.getId()))) + .thenAccept(dropValue()); } @VisibleForTesting @@ -387,4 +428,8 @@ public class ApnPushNotificationScheduler implements Managed { getPairString(account, device)))) .map(timestamp -> Instant.ofEpochMilli(timestamp.longValue())); } + + private static Consumer dropValue() { + return ignored -> {}; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java index 89341babb..c6c4d4816 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java @@ -10,6 +10,7 @@ import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; import com.google.common.annotations.VisibleForTesting; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tags; +import java.util.function.BiConsumer; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +35,7 @@ public class PushNotificationManager { private static final String SENT_NOTIFICATION_COUNTER_NAME = name(PushNotificationManager.class, "sentPushNotification"); private static final String FAILED_NOTIFICATION_COUNTER_NAME = name(PushNotificationManager.class, "failedPushNotification"); - private final Logger logger = LoggerFactory.getLogger(PushNotificationManager.class); + private static final Logger logger = LoggerFactory.getLogger(PushNotificationManager.class); public PushNotificationManager(final AccountsManager accountsManager, final APNSender apnSender, @@ -79,7 +80,7 @@ public class PushNotificationManager { public void handleMessagesRetrieved(final Account account, final Device device, final String userAgent) { RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(account.getUuid(), device.getId(), userAgent)); - RedisOperation.unchecked(() -> apnPushNotificationScheduler.cancelScheduledNotifications(account, device)); + apnPushNotificationScheduler.cancelScheduledNotifications(account, device).whenComplete(logErrors()); } @VisibleForTesting @@ -104,8 +105,10 @@ public class PushNotificationManager { if (pushNotification.tokenType() == PushNotification.TokenType.APN && !pushNotification.urgent()) { // APNs imposes a per-device limit on background push notifications; schedule a notification for some time in the // future (possibly even now!) rather than sending a notification directly - apnPushNotificationScheduler.scheduleBackgroundNotification(pushNotification.destination(), - pushNotification.destinationDevice()); + apnPushNotificationScheduler + .scheduleBackgroundNotification(pushNotification.destination(), pushNotification.destinationDevice()) + .whenComplete(logErrors()); + } else { final PushNotificationSender sender = switch (pushNotification.tokenType()) { case FCM -> fcmSender; @@ -137,9 +140,10 @@ public class PushNotificationManager { pushNotification.destination() != null && pushNotification.destinationDevice() != null) { - RedisOperation.unchecked( - () -> apnPushNotificationScheduler.scheduleRecurringVoipNotification(pushNotification.destination(), - pushNotification.destinationDevice())); + apnPushNotificationScheduler.scheduleRecurringVoipNotification( + pushNotification.destination(), + pushNotification.destinationDevice()) + .whenComplete(logErrors()); } } else { logger.debug("Failed to deliver {} push notification to {} ({})", @@ -152,6 +156,14 @@ public class PushNotificationManager { } } + private static BiConsumer logErrors() { + return (ignored, throwable) -> { + if (throwable != null) { + logger.warn("Failed push scheduling operation", throwable); + } + }; + } + private void handleDeviceUnregistered(final Account account, final Device device) { if (StringUtils.isNotBlank(device.getGcmId())) { if (device.getUninstalledFeedbackTimestamp() == 0) { @@ -159,7 +171,7 @@ public class PushNotificationManager { d.setUninstalledFeedbackTimestamp(Util.todayInMillis())); } } else { - RedisOperation.unchecked(() -> apnPushNotificationScheduler.cancelScheduledNotifications(account, device)); + apnPushNotificationScheduler.cancelScheduledNotifications(account, device).whenComplete(logErrors()); } } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationSchedulerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationSchedulerTest.java index 1e1fc5e87..906bbed3d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationSchedulerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationSchedulerTest.java @@ -21,6 +21,7 @@ import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutionException; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -77,7 +78,7 @@ class ApnPushNotificationSchedulerTest { } @Test - void testClusterInsert() { + void testClusterInsert() throws ExecutionException, InterruptedException { final String endpoint = ApnPushNotificationScheduler.getEndpointKey(account, device); final long currentTimeMillis = System.currentTimeMillis(); @@ -85,7 +86,7 @@ class ApnPushNotificationSchedulerTest { apnPushNotificationScheduler.getPendingDestinationsForRecurringVoipNotifications(SlotHash.getSlot(endpoint), 1).isEmpty()); clock.pin(Instant.ofEpochMilli(currentTimeMillis - 30_000)); - apnPushNotificationScheduler.scheduleRecurringVoipNotification(account, device); + apnPushNotificationScheduler.scheduleRecurringVoipNotification(account, device).toCompletableFuture().get(); clock.pin(Instant.ofEpochMilli(currentTimeMillis)); final List pendingDestinations = apnPushNotificationScheduler.getPendingDestinationsForRecurringVoipNotifications(SlotHash.getSlot(endpoint), 2); @@ -103,12 +104,12 @@ class ApnPushNotificationSchedulerTest { } @Test - void testProcessRecurringVoipNotifications() { + void testProcessRecurringVoipNotifications() throws ExecutionException, InterruptedException { final ApnPushNotificationScheduler.NotificationWorker worker = apnPushNotificationScheduler.new NotificationWorker(); final long currentTimeMillis = System.currentTimeMillis(); clock.pin(Instant.ofEpochMilli(currentTimeMillis - 30_000)); - apnPushNotificationScheduler.scheduleRecurringVoipNotification(account, device); + apnPushNotificationScheduler.scheduleRecurringVoipNotification(account, device).toCompletableFuture().get(); clock.pin(Instant.ofEpochMilli(currentTimeMillis)); @@ -129,7 +130,7 @@ class ApnPushNotificationSchedulerTest { } @Test - void testScheduleBackgroundNotificationWithNoRecentNotification() { + void testScheduleBackgroundNotificationWithNoRecentNotification() throws ExecutionException, InterruptedException { final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); clock.pin(now); @@ -139,14 +140,14 @@ class ApnPushNotificationSchedulerTest { assertEquals(Optional.empty(), apnPushNotificationScheduler.getNextScheduledBackgroundNotificationTimestamp(account, device)); - apnPushNotificationScheduler.scheduleBackgroundNotification(account, device); + apnPushNotificationScheduler.scheduleBackgroundNotification(account, device).toCompletableFuture().get(); assertEquals(Optional.of(now), apnPushNotificationScheduler.getNextScheduledBackgroundNotificationTimestamp(account, device)); } @Test - void testScheduleBackgroundNotificationWithRecentNotification() { + void testScheduleBackgroundNotificationWithRecentNotification() throws ExecutionException, InterruptedException { final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); final Instant recentNotificationTimestamp = now.minus(ApnPushNotificationScheduler.BACKGROUND_NOTIFICATION_PERIOD.dividedBy(2)); @@ -156,7 +157,7 @@ class ApnPushNotificationSchedulerTest { apnPushNotificationScheduler.sendBackgroundNotification(account, device); clock.pin(now); - apnPushNotificationScheduler.scheduleBackgroundNotification(account, device); + apnPushNotificationScheduler.scheduleBackgroundNotification(account, device).toCompletableFuture().get(); final Instant expectedScheduledTimestamp = recentNotificationTimestamp.plus(ApnPushNotificationScheduler.BACKGROUND_NOTIFICATION_PERIOD); @@ -166,13 +167,13 @@ class ApnPushNotificationSchedulerTest { } @Test - void testProcessScheduledBackgroundNotifications() { + void testProcessScheduledBackgroundNotifications() throws ExecutionException, InterruptedException { final ApnPushNotificationScheduler.NotificationWorker worker = apnPushNotificationScheduler.new NotificationWorker(); final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); clock.pin(Instant.ofEpochMilli(now.toEpochMilli())); - apnPushNotificationScheduler.scheduleBackgroundNotification(account, device); + apnPushNotificationScheduler.scheduleBackgroundNotification(account, device).toCompletableFuture().get(); final int slot = SlotHash.getSlot(ApnPushNotificationScheduler.getPendingBackgroundNotificationQueueKey(account, device)); @@ -199,14 +200,14 @@ class ApnPushNotificationSchedulerTest { } @Test - void testProcessScheduledBackgroundNotificationsCancelled() { + void testProcessScheduledBackgroundNotificationsCancelled() throws ExecutionException, InterruptedException { final ApnPushNotificationScheduler.NotificationWorker worker = apnPushNotificationScheduler.new NotificationWorker(); final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); clock.pin(now); - apnPushNotificationScheduler.scheduleBackgroundNotification(account, device); - apnPushNotificationScheduler.cancelScheduledNotifications(account, device); + apnPushNotificationScheduler.scheduleBackgroundNotification(account, device).toCompletableFuture().get(); + apnPushNotificationScheduler.cancelScheduledNotifications(account, device).toCompletableFuture().get(); final int slot = SlotHash.getSlot(ApnPushNotificationScheduler.getPendingBackgroundNotificationQueueKey(account, device)); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java index 858643c55..ae489d1fc 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java @@ -176,6 +176,11 @@ class PushNotificationManagerTest { when(apnSender.sendNotification(pushNotification)) .thenReturn(CompletableFuture.completedFuture(new SendPushNotificationResult(true, null, false))); + if (!urgent) { + when(apnPushNotificationScheduler.scheduleBackgroundNotification(account, device)) + .thenReturn(CompletableFuture.completedFuture(null)); + } + pushNotificationManager.sendNotification(pushNotification); verifyNoInteractions(fcmSender); @@ -252,6 +257,9 @@ class PushNotificationManagerTest { when(apnSender.sendNotification(pushNotification)) .thenReturn(CompletableFuture.completedFuture(new SendPushNotificationResult(false, null, true))); + when(apnPushNotificationScheduler.cancelScheduledNotifications(account, device)) + .thenReturn(CompletableFuture.completedFuture(null)); + pushNotificationManager.sendNotification(pushNotification); verifyNoInteractions(fcmSender); @@ -270,6 +278,9 @@ class PushNotificationManagerTest { when(account.getUuid()).thenReturn(accountIdentifier); when(device.getId()).thenReturn(Device.MASTER_ID); + when(apnPushNotificationScheduler.cancelScheduledNotifications(account, device)) + .thenReturn(CompletableFuture.completedFuture(null)); + pushNotificationManager.handleMessagesRetrieved(account, device, userAgent); verify(pushLatencyManager).recordQueueRead(accountIdentifier, Device.MASTER_ID, userAgent);