Return futures from "send push notification" operations
This commit is contained in:
parent
2e36673702
commit
6d166fdfc5
|
@ -12,6 +12,7 @@ import io.micrometer.core.instrument.Metrics;
|
||||||
import io.micrometer.core.instrument.Tags;
|
import io.micrometer.core.instrument.Tags;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -49,35 +50,38 @@ public class PushNotificationManager {
|
||||||
this.pushLatencyManager = pushLatencyManager;
|
this.pushLatencyManager = pushLatencyManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendNewMessageNotification(final Account destination, final byte destinationDeviceId, final boolean urgent) throws NotPushRegisteredException {
|
public CompletableFuture<Optional<SendPushNotificationResult>> sendNewMessageNotification(final Account destination, final byte destinationDeviceId, final boolean urgent) throws NotPushRegisteredException {
|
||||||
final Device device = destination.getDevice(destinationDeviceId).orElseThrow(NotPushRegisteredException::new);
|
final Device device = destination.getDevice(destinationDeviceId).orElseThrow(NotPushRegisteredException::new);
|
||||||
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);
|
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);
|
||||||
|
|
||||||
sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
|
return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
|
||||||
PushNotification.NotificationType.NOTIFICATION, null, destination, device, urgent));
|
PushNotification.NotificationType.NOTIFICATION, null, destination, device, urgent));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendRegistrationChallengeNotification(final String deviceToken, final PushNotification.TokenType tokenType, final String challengeToken) {
|
public CompletableFuture<SendPushNotificationResult> sendRegistrationChallengeNotification(final String deviceToken, final PushNotification.TokenType tokenType, final String challengeToken) {
|
||||||
sendNotification(new PushNotification(deviceToken, tokenType, PushNotification.NotificationType.CHALLENGE, challengeToken, null, null, true));
|
return sendNotification(new PushNotification(deviceToken, tokenType, PushNotification.NotificationType.CHALLENGE, challengeToken, null, null, true))
|
||||||
|
.thenApply(maybeResponse -> maybeResponse.orElseThrow(() -> new AssertionError("Responses must be present for urgent notifications")));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendRateLimitChallengeNotification(final Account destination, final String challengeToken)
|
public CompletableFuture<SendPushNotificationResult> sendRateLimitChallengeNotification(final Account destination, final String challengeToken)
|
||||||
throws NotPushRegisteredException {
|
throws NotPushRegisteredException {
|
||||||
|
|
||||||
final Device device = destination.getPrimaryDevice();
|
final Device device = destination.getPrimaryDevice();
|
||||||
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);
|
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);
|
||||||
|
|
||||||
sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
|
return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
|
||||||
PushNotification.NotificationType.RATE_LIMIT_CHALLENGE, challengeToken, destination, device, true));
|
PushNotification.NotificationType.RATE_LIMIT_CHALLENGE, challengeToken, destination, device, true))
|
||||||
|
.thenApply(maybeResponse -> maybeResponse.orElseThrow(() -> new AssertionError("Responses must be present for urgent notifications")));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendAttemptLoginNotification(final Account destination, final String context) throws NotPushRegisteredException {
|
public CompletableFuture<SendPushNotificationResult> sendAttemptLoginNotification(final Account destination, final String context) throws NotPushRegisteredException {
|
||||||
final Device device = destination.getDevice(Device.PRIMARY_ID).orElseThrow(NotPushRegisteredException::new);
|
final Device device = destination.getDevice(Device.PRIMARY_ID).orElseThrow(NotPushRegisteredException::new);
|
||||||
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);
|
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);
|
||||||
|
|
||||||
sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
|
return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
|
||||||
PushNotification.NotificationType.ATTEMPT_LOGIN_NOTIFICATION_HIGH_PRIORITY,
|
PushNotification.NotificationType.ATTEMPT_LOGIN_NOTIFICATION_HIGH_PRIORITY,
|
||||||
context, destination, device, true));
|
context, destination, device, true))
|
||||||
|
.thenApply(maybeResponse -> maybeResponse.orElseThrow(() -> new AssertionError("Responses must be present for urgent notifications")));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void handleMessagesRetrieved(final Account account, final Device device, final String userAgent) {
|
public void handleMessagesRetrieved(final Account account, final Device device, final String userAgent) {
|
||||||
|
@ -103,64 +107,66 @@ public class PushNotificationManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void sendNotification(final PushNotification pushNotification) {
|
CompletableFuture<Optional<SendPushNotificationResult>> sendNotification(final PushNotification pushNotification) {
|
||||||
if (pushNotification.tokenType() == PushNotification.TokenType.APN && !pushNotification.urgent()) {
|
if (pushNotification.tokenType() == PushNotification.TokenType.APN && !pushNotification.urgent()) {
|
||||||
// APNs imposes a per-device limit on background push notifications; schedule a notification for some time in the
|
// APNs imposes a per-device limit on background push notifications; schedule a notification for some time in the
|
||||||
// future (possibly even now!) rather than sending a notification directly
|
// future (possibly even now!) rather than sending a notification directly
|
||||||
apnPushNotificationScheduler
|
return apnPushNotificationScheduler
|
||||||
.scheduleBackgroundNotification(pushNotification.destination(), pushNotification.destinationDevice())
|
.scheduleBackgroundNotification(pushNotification.destination(), pushNotification.destinationDevice())
|
||||||
.whenComplete(logErrors());
|
.whenComplete(logErrors())
|
||||||
|
.thenApply(ignored -> Optional.<SendPushNotificationResult>empty())
|
||||||
} else {
|
.toCompletableFuture();
|
||||||
final PushNotificationSender sender = switch (pushNotification.tokenType()) {
|
|
||||||
case FCM -> fcmSender;
|
|
||||||
case APN, APN_VOIP -> apnSender;
|
|
||||||
};
|
|
||||||
|
|
||||||
sender.sendNotification(pushNotification).whenComplete((result, throwable) -> {
|
|
||||||
if (throwable == null) {
|
|
||||||
Tags tags = Tags.of("tokenType", pushNotification.tokenType().name(),
|
|
||||||
"notificationType", pushNotification.notificationType().name(),
|
|
||||||
"urgent", String.valueOf(pushNotification.urgent()),
|
|
||||||
"accepted", String.valueOf(result.accepted()),
|
|
||||||
"unregistered", String.valueOf(result.unregistered()));
|
|
||||||
|
|
||||||
if (result.errorCode().isPresent()) {
|
|
||||||
tags = tags.and("errorCode", result.errorCode().get());
|
|
||||||
}
|
|
||||||
|
|
||||||
Metrics.counter(SENT_NOTIFICATION_COUNTER_NAME, tags).increment();
|
|
||||||
|
|
||||||
if (result.unregistered() && pushNotification.destination() != null
|
|
||||||
&& pushNotification.destinationDevice() != null) {
|
|
||||||
|
|
||||||
handleDeviceUnregistered(pushNotification.destination(),
|
|
||||||
pushNotification.destinationDevice(),
|
|
||||||
pushNotification.tokenType(),
|
|
||||||
result.errorCode(),
|
|
||||||
result.unregisteredTimestamp());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (result.accepted() &&
|
|
||||||
pushNotification.tokenType() == PushNotification.TokenType.APN_VOIP &&
|
|
||||||
pushNotification.notificationType() == PushNotification.NotificationType.NOTIFICATION &&
|
|
||||||
pushNotification.destination() != null &&
|
|
||||||
pushNotification.destinationDevice() != null) {
|
|
||||||
|
|
||||||
apnPushNotificationScheduler.scheduleRecurringVoipNotification(
|
|
||||||
pushNotification.destination(),
|
|
||||||
pushNotification.destinationDevice())
|
|
||||||
.whenComplete(logErrors());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.debug("Failed to deliver {} push notification to {} ({})",
|
|
||||||
pushNotification.notificationType(), pushNotification.deviceToken(), pushNotification.tokenType(),
|
|
||||||
throwable);
|
|
||||||
|
|
||||||
Metrics.counter(FAILED_NOTIFICATION_COUNTER_NAME, "cause", throwable.getClass().getSimpleName()).increment();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final PushNotificationSender sender = switch (pushNotification.tokenType()) {
|
||||||
|
case FCM -> fcmSender;
|
||||||
|
case APN, APN_VOIP -> apnSender;
|
||||||
|
};
|
||||||
|
|
||||||
|
return sender.sendNotification(pushNotification).whenComplete((result, throwable) -> {
|
||||||
|
if (throwable == null) {
|
||||||
|
Tags tags = Tags.of("tokenType", pushNotification.tokenType().name(),
|
||||||
|
"notificationType", pushNotification.notificationType().name(),
|
||||||
|
"urgent", String.valueOf(pushNotification.urgent()),
|
||||||
|
"accepted", String.valueOf(result.accepted()),
|
||||||
|
"unregistered", String.valueOf(result.unregistered()));
|
||||||
|
|
||||||
|
if (result.errorCode().isPresent()) {
|
||||||
|
tags = tags.and("errorCode", result.errorCode().get());
|
||||||
|
}
|
||||||
|
|
||||||
|
Metrics.counter(SENT_NOTIFICATION_COUNTER_NAME, tags).increment();
|
||||||
|
|
||||||
|
if (result.unregistered() && pushNotification.destination() != null
|
||||||
|
&& pushNotification.destinationDevice() != null) {
|
||||||
|
|
||||||
|
handleDeviceUnregistered(pushNotification.destination(),
|
||||||
|
pushNotification.destinationDevice(),
|
||||||
|
pushNotification.tokenType(),
|
||||||
|
result.errorCode(),
|
||||||
|
result.unregisteredTimestamp());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result.accepted() &&
|
||||||
|
pushNotification.tokenType() == PushNotification.TokenType.APN_VOIP &&
|
||||||
|
pushNotification.notificationType() == PushNotification.NotificationType.NOTIFICATION &&
|
||||||
|
pushNotification.destination() != null &&
|
||||||
|
pushNotification.destinationDevice() != null) {
|
||||||
|
|
||||||
|
apnPushNotificationScheduler.scheduleRecurringVoipNotification(
|
||||||
|
pushNotification.destination(),
|
||||||
|
pushNotification.destinationDevice())
|
||||||
|
.whenComplete(logErrors());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.debug("Failed to deliver {} push notification to {} ({})",
|
||||||
|
pushNotification.notificationType(), pushNotification.deviceToken(), pushNotification.tokenType(),
|
||||||
|
throwable);
|
||||||
|
|
||||||
|
Metrics.counter(FAILED_NOTIFICATION_COUNTER_NAME, "cause", throwable.getClass().getSimpleName()).increment();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.thenApply(Optional::of);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> BiConsumer<T, Throwable> logErrors() {
|
private static <T> BiConsumer<T, Throwable> logErrors() {
|
||||||
|
|
Loading…
Reference in New Issue