Don't block when scheduling background apns pushes

This commit is contained in:
ravi-signal 2023-04-10 13:51:36 -05:00 committed by GitHub
parent 5242514874
commit 8847cb92ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 113 additions and 44 deletions

View File

@ -11,6 +11,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import io.lettuce.core.Limit;
import io.lettuce.core.Range;
import io.lettuce.core.RedisException;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import io.lettuce.core.cluster.SlotHash;
@ -24,8 +25,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -116,7 +121,7 @@ public class ApnPushNotificationScheduler implements Managed {
try {
getAccountAndDeviceFromPairString(destination).ifPresentOrElse(
accountAndDevice -> sendRecurringVoipNotification(accountAndDevice.first(), accountAndDevice.second()),
() -> removeRecurringVoipNotificationEntry(destination));
() -> removeRecurringVoipNotificationEntrySync(destination));
} catch (final IllegalArgumentException e) {
logger.warn("Failed to parse account/device pair: {}", destination, e);
}
@ -189,31 +194,53 @@ public class ApnPushNotificationScheduler implements Managed {
}
}
void scheduleRecurringVoipNotification(Account account, Device device) {
/**
* Schedule a recurring VOIP notification until {@link this#cancelScheduledNotifications} is called or the device is
* removed
*
* @return A CompletionStage that completes when the recurring notification has successfully been scheduled
*/
public CompletionStage<Void> scheduleRecurringVoipNotification(Account account, Device device) {
sent.increment();
insertRecurringVoipNotificationEntry(account, device, clock.millis() + (15 * 1000), (15 * 1000));
return insertRecurringVoipNotificationEntry(account, device, clock.millis() + (15 * 1000), (15 * 1000));
}
void scheduleBackgroundNotification(final Account account, final Device device) {
/**
* Schedule a background notification to be sent some time in the future
*
* @return A CompletionStage that completes when the notification has successfully been scheduled
*/
public CompletionStage<Void> scheduleBackgroundNotification(final Account account, final Device device) {
backgroundNotificationScheduledCounter.increment();
scheduleBackgroundNotificationScript.execute(
return scheduleBackgroundNotificationScript.executeAsync(
List.of(
getLastBackgroundNotificationTimestampKey(account, device),
getPendingBackgroundNotificationQueueKey(account, device)),
List.of(
getPairString(account, device),
String.valueOf(clock.millis()),
String.valueOf(BACKGROUND_NOTIFICATION_PERIOD.toMillis())));
String.valueOf(BACKGROUND_NOTIFICATION_PERIOD.toMillis())))
.thenAccept(dropValue());
}
public void cancelScheduledNotifications(Account account, Device device) {
if (removeRecurringVoipNotificationEntry(account, device)) {
delivered.increment();
}
pushSchedulingCluster.useCluster(connection ->
connection.sync().zrem(getPendingBackgroundNotificationQueueKey(account, device), getPairString(account, device)));
/**
* Cancel a scheduled recurring VOIP notification
*
* @return A CompletionStage that completes when the scheduled task has been cancelled.
*/
public CompletionStage<Void> cancelScheduledNotifications(Account account, Device device) {
return removeRecurringVoipNotificationEntry(account, device)
.thenCompose(removed -> {
if (removed) {
delivered.increment();
}
return pushSchedulingCluster.withCluster(connection ->
connection.async().zrem(
getPendingBackgroundNotificationQueueKey(account, device),
getPairString(account, device)));
})
.thenAccept(dropValue());
}
@Override
@ -238,15 +265,14 @@ public class ApnPushNotificationScheduler implements Managed {
String apnId = device.getVoipApnId();
if (apnId == null) {
removeRecurringVoipNotificationEntry(account, device);
removeRecurringVoipNotificationEntrySync(getEndpointKey(account, device));
return;
}
long deviceLastSeen = device.getLastSeen();
if (deviceLastSeen < clock.millis() - TimeUnit.DAYS.toMillis(7)) {
evicted.increment();
removeRecurringVoipNotificationEntry(account, device);
removeRecurringVoipNotificationEntrySync(getEndpointKey(account, device));
return;
}
@ -316,14 +342,28 @@ public class ApnPushNotificationScheduler implements Managed {
}
}
private boolean removeRecurringVoipNotificationEntry(Account account, Device device) {
private boolean removeRecurringVoipNotificationEntrySync(final String endpoint) {
try {
return removeRecurringVoipNotificationEntry(endpoint).toCompletableFuture().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof RedisException re) {
throw re;
}
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private CompletionStage<Boolean> removeRecurringVoipNotificationEntry(Account account, Device device) {
return removeRecurringVoipNotificationEntry(getEndpointKey(account, device));
}
private boolean removeRecurringVoipNotificationEntry(final String endpoint) {
return (long) removePendingVoipDestinationScript.execute(
private CompletionStage<Boolean> removeRecurringVoipNotificationEntry(final String endpoint) {
return removePendingVoipDestinationScript.executeAsync(
List.of(getPendingRecurringVoipNotificationQueueKey(endpoint), endpoint),
Collections.emptyList()) > 0;
Collections.emptyList())
.thenApply(result -> ((long) result) > 0);
}
@SuppressWarnings("unchecked")
@ -334,15 +374,16 @@ public class ApnPushNotificationScheduler implements Managed {
List.of(String.valueOf(clock.millis()), String.valueOf(limit)));
}
private void insertRecurringVoipNotificationEntry(final Account account, final Device device, final long timestamp, final long interval) {
private CompletionStage<Void> insertRecurringVoipNotificationEntry(final Account account, final Device device, final long timestamp, final long interval) {
final String endpoint = getEndpointKey(account, device);
insertPendingVoipDestinationScript.execute(
return insertPendingVoipDestinationScript.executeAsync(
List.of(getPendingRecurringVoipNotificationQueueKey(endpoint), endpoint),
List.of(String.valueOf(timestamp),
String.valueOf(interval),
account.getUuid().toString(),
String.valueOf(device.getId())));
String.valueOf(device.getId())))
.thenAccept(dropValue());
}
@VisibleForTesting
@ -387,4 +428,8 @@ public class ApnPushNotificationScheduler implements Managed {
getPairString(account, device))))
.map(timestamp -> Instant.ofEpochMilli(timestamp.longValue()));
}
private static <T> Consumer<T> dropValue() {
return ignored -> {};
}
}

View File

@ -10,6 +10,7 @@ import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,7 +35,7 @@ public class PushNotificationManager {
private static final String SENT_NOTIFICATION_COUNTER_NAME = name(PushNotificationManager.class, "sentPushNotification");
private static final String FAILED_NOTIFICATION_COUNTER_NAME = name(PushNotificationManager.class, "failedPushNotification");
private final Logger logger = LoggerFactory.getLogger(PushNotificationManager.class);
private static final Logger logger = LoggerFactory.getLogger(PushNotificationManager.class);
public PushNotificationManager(final AccountsManager accountsManager,
final APNSender apnSender,
@ -79,7 +80,7 @@ public class PushNotificationManager {
public void handleMessagesRetrieved(final Account account, final Device device, final String userAgent) {
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(account.getUuid(), device.getId(), userAgent));
RedisOperation.unchecked(() -> apnPushNotificationScheduler.cancelScheduledNotifications(account, device));
apnPushNotificationScheduler.cancelScheduledNotifications(account, device).whenComplete(logErrors());
}
@VisibleForTesting
@ -104,8 +105,10 @@ public class PushNotificationManager {
if (pushNotification.tokenType() == PushNotification.TokenType.APN && !pushNotification.urgent()) {
// APNs imposes a per-device limit on background push notifications; schedule a notification for some time in the
// future (possibly even now!) rather than sending a notification directly
apnPushNotificationScheduler.scheduleBackgroundNotification(pushNotification.destination(),
pushNotification.destinationDevice());
apnPushNotificationScheduler
.scheduleBackgroundNotification(pushNotification.destination(), pushNotification.destinationDevice())
.whenComplete(logErrors());
} else {
final PushNotificationSender sender = switch (pushNotification.tokenType()) {
case FCM -> fcmSender;
@ -137,9 +140,10 @@ public class PushNotificationManager {
pushNotification.destination() != null &&
pushNotification.destinationDevice() != null) {
RedisOperation.unchecked(
() -> apnPushNotificationScheduler.scheduleRecurringVoipNotification(pushNotification.destination(),
pushNotification.destinationDevice()));
apnPushNotificationScheduler.scheduleRecurringVoipNotification(
pushNotification.destination(),
pushNotification.destinationDevice())
.whenComplete(logErrors());
}
} else {
logger.debug("Failed to deliver {} push notification to {} ({})",
@ -152,6 +156,14 @@ public class PushNotificationManager {
}
}
private static <T> BiConsumer<T, Throwable> logErrors() {
return (ignored, throwable) -> {
if (throwable != null) {
logger.warn("Failed push scheduling operation", throwable);
}
};
}
private void handleDeviceUnregistered(final Account account, final Device device) {
if (StringUtils.isNotBlank(device.getGcmId())) {
if (device.getUninstalledFeedbackTimestamp() == 0) {
@ -159,7 +171,7 @@ public class PushNotificationManager {
d.setUninstalledFeedbackTimestamp(Util.todayInMillis()));
}
} else {
RedisOperation.unchecked(() -> apnPushNotificationScheduler.cancelScheduledNotifications(account, device));
apnPushNotificationScheduler.cancelScheduledNotifications(account, device).whenComplete(logErrors());
}
}
}

View File

@ -21,6 +21,7 @@ import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -77,7 +78,7 @@ class ApnPushNotificationSchedulerTest {
}
@Test
void testClusterInsert() {
void testClusterInsert() throws ExecutionException, InterruptedException {
final String endpoint = ApnPushNotificationScheduler.getEndpointKey(account, device);
final long currentTimeMillis = System.currentTimeMillis();
@ -85,7 +86,7 @@ class ApnPushNotificationSchedulerTest {
apnPushNotificationScheduler.getPendingDestinationsForRecurringVoipNotifications(SlotHash.getSlot(endpoint), 1).isEmpty());
clock.pin(Instant.ofEpochMilli(currentTimeMillis - 30_000));
apnPushNotificationScheduler.scheduleRecurringVoipNotification(account, device);
apnPushNotificationScheduler.scheduleRecurringVoipNotification(account, device).toCompletableFuture().get();
clock.pin(Instant.ofEpochMilli(currentTimeMillis));
final List<String> pendingDestinations = apnPushNotificationScheduler.getPendingDestinationsForRecurringVoipNotifications(SlotHash.getSlot(endpoint), 2);
@ -103,12 +104,12 @@ class ApnPushNotificationSchedulerTest {
}
@Test
void testProcessRecurringVoipNotifications() {
void testProcessRecurringVoipNotifications() throws ExecutionException, InterruptedException {
final ApnPushNotificationScheduler.NotificationWorker worker = apnPushNotificationScheduler.new NotificationWorker();
final long currentTimeMillis = System.currentTimeMillis();
clock.pin(Instant.ofEpochMilli(currentTimeMillis - 30_000));
apnPushNotificationScheduler.scheduleRecurringVoipNotification(account, device);
apnPushNotificationScheduler.scheduleRecurringVoipNotification(account, device).toCompletableFuture().get();
clock.pin(Instant.ofEpochMilli(currentTimeMillis));
@ -129,7 +130,7 @@ class ApnPushNotificationSchedulerTest {
}
@Test
void testScheduleBackgroundNotificationWithNoRecentNotification() {
void testScheduleBackgroundNotificationWithNoRecentNotification() throws ExecutionException, InterruptedException {
final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
clock.pin(now);
@ -139,14 +140,14 @@ class ApnPushNotificationSchedulerTest {
assertEquals(Optional.empty(),
apnPushNotificationScheduler.getNextScheduledBackgroundNotificationTimestamp(account, device));
apnPushNotificationScheduler.scheduleBackgroundNotification(account, device);
apnPushNotificationScheduler.scheduleBackgroundNotification(account, device).toCompletableFuture().get();
assertEquals(Optional.of(now),
apnPushNotificationScheduler.getNextScheduledBackgroundNotificationTimestamp(account, device));
}
@Test
void testScheduleBackgroundNotificationWithRecentNotification() {
void testScheduleBackgroundNotificationWithRecentNotification() throws ExecutionException, InterruptedException {
final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
final Instant recentNotificationTimestamp =
now.minus(ApnPushNotificationScheduler.BACKGROUND_NOTIFICATION_PERIOD.dividedBy(2));
@ -156,7 +157,7 @@ class ApnPushNotificationSchedulerTest {
apnPushNotificationScheduler.sendBackgroundNotification(account, device);
clock.pin(now);
apnPushNotificationScheduler.scheduleBackgroundNotification(account, device);
apnPushNotificationScheduler.scheduleBackgroundNotification(account, device).toCompletableFuture().get();
final Instant expectedScheduledTimestamp =
recentNotificationTimestamp.plus(ApnPushNotificationScheduler.BACKGROUND_NOTIFICATION_PERIOD);
@ -166,13 +167,13 @@ class ApnPushNotificationSchedulerTest {
}
@Test
void testProcessScheduledBackgroundNotifications() {
void testProcessScheduledBackgroundNotifications() throws ExecutionException, InterruptedException {
final ApnPushNotificationScheduler.NotificationWorker worker = apnPushNotificationScheduler.new NotificationWorker();
final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
clock.pin(Instant.ofEpochMilli(now.toEpochMilli()));
apnPushNotificationScheduler.scheduleBackgroundNotification(account, device);
apnPushNotificationScheduler.scheduleBackgroundNotification(account, device).toCompletableFuture().get();
final int slot =
SlotHash.getSlot(ApnPushNotificationScheduler.getPendingBackgroundNotificationQueueKey(account, device));
@ -199,14 +200,14 @@ class ApnPushNotificationSchedulerTest {
}
@Test
void testProcessScheduledBackgroundNotificationsCancelled() {
void testProcessScheduledBackgroundNotificationsCancelled() throws ExecutionException, InterruptedException {
final ApnPushNotificationScheduler.NotificationWorker worker = apnPushNotificationScheduler.new NotificationWorker();
final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
clock.pin(now);
apnPushNotificationScheduler.scheduleBackgroundNotification(account, device);
apnPushNotificationScheduler.cancelScheduledNotifications(account, device);
apnPushNotificationScheduler.scheduleBackgroundNotification(account, device).toCompletableFuture().get();
apnPushNotificationScheduler.cancelScheduledNotifications(account, device).toCompletableFuture().get();
final int slot =
SlotHash.getSlot(ApnPushNotificationScheduler.getPendingBackgroundNotificationQueueKey(account, device));

View File

@ -176,6 +176,11 @@ class PushNotificationManagerTest {
when(apnSender.sendNotification(pushNotification))
.thenReturn(CompletableFuture.completedFuture(new SendPushNotificationResult(true, null, false)));
if (!urgent) {
when(apnPushNotificationScheduler.scheduleBackgroundNotification(account, device))
.thenReturn(CompletableFuture.completedFuture(null));
}
pushNotificationManager.sendNotification(pushNotification);
verifyNoInteractions(fcmSender);
@ -252,6 +257,9 @@ class PushNotificationManagerTest {
when(apnSender.sendNotification(pushNotification))
.thenReturn(CompletableFuture.completedFuture(new SendPushNotificationResult(false, null, true)));
when(apnPushNotificationScheduler.cancelScheduledNotifications(account, device))
.thenReturn(CompletableFuture.completedFuture(null));
pushNotificationManager.sendNotification(pushNotification);
verifyNoInteractions(fcmSender);
@ -270,6 +278,9 @@ class PushNotificationManagerTest {
when(account.getUuid()).thenReturn(accountIdentifier);
when(device.getId()).thenReturn(Device.MASTER_ID);
when(apnPushNotificationScheduler.cancelScheduledNotifications(account, device))
.thenReturn(CompletableFuture.completedFuture(null));
pushNotificationManager.handleMessagesRetrieved(account, device, userAgent);
verify(pushLatencyManager).recordQueueRead(accountIdentifier, Device.MASTER_ID, userAgent);