From a53a85d78801574a3880a065466aabfd1d070ed6 Mon Sep 17 00:00:00 2001 From: Jon Chambers <63609320+jon-signal@users.noreply.github.com> Date: Fri, 12 Aug 2022 10:47:49 -0400 Subject: [PATCH] Refactor scheduled APNs notifications in preparation for future development --- .../textsecuregcm/WhisperServerService.java | 17 +- .../controllers/MessageController.java | 15 +- ...java => ApnPushNotificationScheduler.java} | 161 ++++++++---------- .../textsecuregcm/push/MessageSender.java | 1 - .../{metrics => push}/PushLatencyManager.java | 9 +- .../push/PushNotificationManager.java | 22 ++- .../storage/MessagesManager.java | 9 +- .../AuthenticatedConnectListener.java | 6 +- .../websocket/WebSocketConnection.java | 4 +- .../workers/AssignUsernameCommand.java | 4 +- .../workers/DeleteUserCommand.java | 6 +- .../SetUserDiscoverabilityCommand.java | 6 +- .../controllers/MessageControllerTest.java | 17 +- ... => ApnPushNotificationSchedulerTest.java} | 28 +-- .../textsecuregcm/push/MessageSenderTest.java | 1 - .../PushLatencyManagerTest.java | 9 +- .../push/PushNotificationManagerTest.java | 34 +++- .../MessagePersisterIntegrationTest.java | 5 +- .../storage/MessagesManagerTest.java | 4 +- .../WebSocketConnectionIntegrationTest.java | 3 +- .../websocket/WebSocketConnectionTest.java | 52 +++--- 21 files changed, 200 insertions(+), 213 deletions(-) rename service/src/main/java/org/whispersystems/textsecuregcm/push/{ApnFallbackManager.java => ApnPushNotificationScheduler.java} (52%) rename service/src/main/java/org/whispersystems/textsecuregcm/{metrics => push}/PushLatencyManager.java (94%) rename service/src/test/java/org/whispersystems/textsecuregcm/push/{ApnFallbackManagerTest.java => ApnPushNotificationSchedulerTest.java} (70%) rename service/src/test/java/org/whispersystems/textsecuregcm/{metrics => push}/PushLatencyManagerTest.java (90%) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 7ae185a02..f3b9e09ca 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -136,14 +136,14 @@ import org.whispersystems.textsecuregcm.metrics.MicrometerRegistryManager; import org.whispersystems.textsecuregcm.metrics.NetworkReceivedGauge; import org.whispersystems.textsecuregcm.metrics.NetworkSentGauge; import org.whispersystems.textsecuregcm.metrics.OperatingSystemMemoryGauge; -import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; +import org.whispersystems.textsecuregcm.push.PushLatencyManager; import org.whispersystems.textsecuregcm.metrics.ReportedMessageMetricsListener; import org.whispersystems.textsecuregcm.metrics.TrafficSource; import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider; import org.whispersystems.textsecuregcm.providers.RedisClientFactory; import org.whispersystems.textsecuregcm.providers.RedisClusterHealthCheck; import org.whispersystems.textsecuregcm.push.APNSender; -import org.whispersystems.textsecuregcm.push.ApnFallbackManager; +import org.whispersystems.textsecuregcm.push.ApnPushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.FcmSender; import org.whispersystems.textsecuregcm.push.MessageSender; @@ -443,7 +443,7 @@ public class WhisperServerService extends Application apnFallbackManager.cancel(auth.getAccount(), auth.getAuthenticatedDevice())); - } + pushNotificationManager.handleMessagesRetrieved(auth.getAccount(), auth.getAuthenticatedDevice(), userAgent); final OutgoingMessageEntityList outgoingMessages; { final Pair, Boolean> messagesAndHasMore = messagesManager.getMessagesForDevice( auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), - userAgent, false); outgoingMessages = new OutgoingMessageEntityList(messagesAndHasMore.first().stream() diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationScheduler.java similarity index 52% rename from service/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java rename to service/src/main/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationScheduler.java index 9b6c291e1..b84bdcf30 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationScheduler.java @@ -5,26 +5,12 @@ package org.whispersystems.textsecuregcm.push; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.RatioGauge; -import com.codahale.metrics.SharedMetricRegistries; import com.google.common.annotations.VisibleForTesting; import io.dropwizard.lifecycle.Managed; import io.lettuce.core.ScriptOutputType; import io.lettuce.core.cluster.SlotHash; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.AccountsManager; -import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.Pair; -import org.whispersystems.textsecuregcm.util.RedisClusterUtil; -import org.whispersystems.textsecuregcm.util.Util; - +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -32,33 +18,40 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.util.Pair; +import org.whispersystems.textsecuregcm.util.RedisClusterUtil; +import org.whispersystems.textsecuregcm.util.Util; -import static com.codahale.metrics.MetricRegistry.name; +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; -public class ApnFallbackManager implements Managed { +public class ApnPushNotificationScheduler implements Managed { - private static final Logger logger = LoggerFactory.getLogger(ApnFallbackManager.class); + private static final Logger logger = LoggerFactory.getLogger(ApnPushNotificationScheduler.class); private static final String PENDING_NOTIFICATIONS_KEY = "PENDING_APN"; - static final String NEXT_SLOT_TO_PERSIST_KEY = "pending_notification_next_slot"; - private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private static final Meter delivered = metricRegistry.meter(name(ApnFallbackManager.class, "voip_delivered")); - private static final Meter sent = metricRegistry.meter(name(ApnFallbackManager.class, "voip_sent" )); - private static final Meter retry = metricRegistry.meter(name(ApnFallbackManager.class, "voip_retry")); - private static final Meter evicted = metricRegistry.meter(name(ApnFallbackManager.class, "voip_evicted")); + @VisibleForTesting + static final String NEXT_SLOT_TO_PERSIST_KEY = "pending_notification_next_slot"; - static { - metricRegistry.register(name(ApnFallbackManager.class, "voip_ratio"), new VoipRatioGauge(delivered, sent)); - } + private static final Counter delivered = Metrics.counter(name(ApnPushNotificationScheduler.class, "voip_delivered")); + private static final Counter sent = Metrics.counter(name(ApnPushNotificationScheduler.class, "voip_sent")); + private static final Counter retry = Metrics.counter(name(ApnPushNotificationScheduler.class, "voip_retry")); + private static final Counter evicted = Metrics.counter(name(ApnPushNotificationScheduler.class, "voip_evicted")); - private final APNSender apnSender; - private final AccountsManager accountsManager; - private final FaultTolerantRedisCluster cluster; + private final APNSender apnSender; + private final AccountsManager accountsManager; + private final FaultTolerantRedisCluster pushSchedulingCluster; - private final ClusterLuaScript getScript; - private final ClusterLuaScript insertScript; - private final ClusterLuaScript removeScript; + private final ClusterLuaScript getPendingVoipDestinationsScript; + private final ClusterLuaScript insertPendingVoipDestinationScript; + private final ClusterLuaScript removePendingVoipDestinationScript; private final Thread[] workerThreads = new Thread[WORKER_THREAD_COUNT]; @@ -90,7 +83,7 @@ public class ApnFallbackManager implements Managed { long entriesProcessed = 0; do { - pendingDestinations = getPendingDestinations(slot, 100); + pendingDestinations = getPendingDestinationsForRecurringVoipNotifications(slot, 100); entriesProcessed += pendingDestinations.size(); for (final String uuidAndDevice : pendingDestinations) { @@ -104,9 +97,9 @@ public class ApnFallbackManager implements Managed { .flatMap(deviceId -> maybeAccount.flatMap(account -> account.getDevice(deviceId))); if (maybeAccount.isPresent() && maybeDevice.isPresent()) { - sendNotification(maybeAccount.get(), maybeDevice.get()); + sendRecurringVoipNotification(maybeAccount.get(), maybeDevice.get()); } else { - remove(uuidAndDevice); + removeRecurringVoipNotificationEntry(uuidAndDevice); } } } while (!pendingDestinations.isEmpty()); @@ -115,37 +108,37 @@ public class ApnFallbackManager implements Managed { } } - public ApnFallbackManager(FaultTolerantRedisCluster cluster, + public ApnPushNotificationScheduler(FaultTolerantRedisCluster pushSchedulingCluster, APNSender apnSender, AccountsManager accountsManager) throws IOException { - this.apnSender = apnSender; + this.apnSender = apnSender; this.accountsManager = accountsManager; - this.cluster = cluster; + this.pushSchedulingCluster = pushSchedulingCluster; - this.getScript = ClusterLuaScript.fromResource(cluster, "lua/apn/get.lua", ScriptOutputType.MULTI); - this.insertScript = ClusterLuaScript.fromResource(cluster, "lua/apn/insert.lua", ScriptOutputType.VALUE); - this.removeScript = ClusterLuaScript.fromResource(cluster, "lua/apn/remove.lua", ScriptOutputType.INTEGER); + this.getPendingVoipDestinationsScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/get.lua", ScriptOutputType.MULTI); + this.insertPendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/insert.lua", ScriptOutputType.VALUE); + this.removePendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/remove.lua", ScriptOutputType.INTEGER); for (int i = 0; i < this.workerThreads.length; i++) { this.workerThreads[i] = new Thread(new NotificationWorker(), "ApnFallbackManagerWorker-" + i); } } - public void schedule(Account account, Device device) { - schedule(account, device, System.currentTimeMillis()); + public void scheduleRecurringVoipNotification(Account account, Device device) { + scheduleRecurringVoipNotification(account, device, System.currentTimeMillis()); } @VisibleForTesting - void schedule(Account account, Device device, long timestamp) { - sent.mark(); - insert(account, device, timestamp + (15 * 1000), (15 * 1000)); + void scheduleRecurringVoipNotification(Account account, Device device, long timestamp) { + sent.increment(); + insertRecurringVoipNotificationEntry(account, device, timestamp + (15 * 1000), (15 * 1000)); } - public void cancel(Account account, Device device) { - if (remove(account, device)) { - delivered.mark(); + public void cancelRecurringVoipNotification(Account account, Device device) { + if (removeRecurringVoipNotificationEntry(account, device)) { + delivered.increment(); } } @@ -167,24 +160,24 @@ public class ApnFallbackManager implements Managed { } } - private void sendNotification(final Account account, final Device device) { + private void sendRecurringVoipNotification(final Account account, final Device device) { String apnId = device.getVoipApnId(); if (apnId == null) { - remove(account, device); + removeRecurringVoipNotificationEntry(account, device); return; } long deviceLastSeen = device.getLastSeen(); if (deviceLastSeen < System.currentTimeMillis() - TimeUnit.DAYS.toMillis(7)) { - evicted.mark(); - remove(account, device); + evicted.increment(); + removeRecurringVoipNotificationEntry(account, device); return; } apnSender.sendNotification(new PushNotification(apnId, PushNotification.TokenType.APN_VOIP, PushNotification.NotificationType.NOTIFICATION, null, account, device)); - retry.mark(); + retry.increment(); } @VisibleForTesting @@ -206,30 +199,33 @@ public class ApnFallbackManager implements Managed { } } - private boolean remove(Account account, Device device) { - return remove(getEndpointKey(account, device)); + private boolean removeRecurringVoipNotificationEntry(Account account, Device device) { + return removeRecurringVoipNotificationEntry(getEndpointKey(account, device)); } - private boolean remove(final String endpoint) { - return (long)removeScript.execute(List.of(getPendingNotificationQueueKey(endpoint), endpoint), - Collections.emptyList()) > 0; + private boolean removeRecurringVoipNotificationEntry(final String endpoint) { + return (long) removePendingVoipDestinationScript.execute( + List.of(getPendingRecurringVoipNotificationQueueKey(endpoint), endpoint), + Collections.emptyList()) > 0; } @SuppressWarnings("unchecked") @VisibleForTesting - List getPendingDestinations(final int slot, final int limit) { - return (List)getScript.execute(List.of(getPendingNotificationQueueKey(slot)), - List.of(String.valueOf(System.currentTimeMillis()), String.valueOf(limit))); + List getPendingDestinationsForRecurringVoipNotifications(final int slot, final int limit) { + return (List) getPendingVoipDestinationsScript.execute( + List.of(getPendingRecurringVoipNotificationQueueKey(slot)), + List.of(String.valueOf(System.currentTimeMillis()), String.valueOf(limit))); } - private void insert(final Account account, final Device device, final long timestamp, final long interval) { + private void insertRecurringVoipNotificationEntry(final Account account, final Device device, final long timestamp, final long interval) { final String endpoint = getEndpointKey(account, device); - insertScript.execute(List.of(getPendingNotificationQueueKey(endpoint), endpoint), - List.of(String.valueOf(timestamp), - String.valueOf(interval), - account.getUuid().toString(), - String.valueOf(device.getId()))); + insertPendingVoipDestinationScript.execute( + List.of(getPendingRecurringVoipNotificationQueueKey(endpoint), endpoint), + List.of(String.valueOf(timestamp), + String.valueOf(interval), + account.getUuid().toString(), + String.valueOf(device.getId()))); } @VisibleForTesting @@ -237,32 +233,15 @@ public class ApnFallbackManager implements Managed { return "apn_device::{" + account.getUuid() + "::" + device.getId() + "}"; } - private String getPendingNotificationQueueKey(final String endpoint) { - return getPendingNotificationQueueKey(SlotHash.getSlot(endpoint)); + private String getPendingRecurringVoipNotificationQueueKey(final String endpoint) { + return getPendingRecurringVoipNotificationQueueKey(SlotHash.getSlot(endpoint)); } - private String getPendingNotificationQueueKey(final int slot) { + private String getPendingRecurringVoipNotificationQueueKey(final int slot) { return PENDING_NOTIFICATIONS_KEY + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}"; } private int getNextSlot() { - return (int)(cluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT); + return (int)(pushSchedulingCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT); } - - private static class VoipRatioGauge extends RatioGauge { - - private final Meter success; - private final Meter attempts; - - private VoipRatioGauge(Meter success, Meter attempts) { - this.success = success; - this.attempts = attempts; - } - - @Override - protected Ratio getRatio() { - return RatioGauge.Ratio.of(success.getFiveMinuteRate(), attempts.getFiveMinuteRate()); - } - } - } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java index 1904b3eed..0d83babd1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageSender.java @@ -9,7 +9,6 @@ import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import io.micrometer.core.instrument.Metrics; import org.apache.commons.lang3.StringUtils; -import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushLatencyManager.java similarity index 94% rename from service/src/main/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManager.java rename to service/src/main/java/org/whispersystems/textsecuregcm/push/PushLatencyManager.java index e0d741e5e..6b7809ce4 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushLatencyManager.java @@ -1,9 +1,9 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2013-2022 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ -package org.whispersystems.textsecuregcm.metrics; +package org.whispersystems.textsecuregcm.push; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.annotation.JsonCreator; @@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.util.SystemMapper; @@ -103,7 +104,7 @@ public class PushLatencyManager { this.clock = clock; } - public void recordPushSent(final UUID accountUuid, final long deviceId, final boolean isVoip) { + void recordPushSent(final UUID accountUuid, final long deviceId, final boolean isVoip) { try { final String recordJson = SystemMapper.getMapper().writeValueAsString( new PushRecord(Instant.now(clock), isVoip ? PushType.VOIP : PushType.STANDARD)); @@ -118,7 +119,7 @@ public class PushLatencyManager { } } - public void recordQueueRead(final UUID accountUuid, final long deviceId, final String userAgentString) { + void recordQueueRead(final UUID accountUuid, final long deviceId, final String userAgentString) { takePushRecord(accountUuid, deviceId).thenAccept(pushRecord -> { if (pushRecord != null) { final Duration latency = Duration.between(pushRecord.getTimestamp(), Instant.now()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java index 1864f9f7e..b769a4bb0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java @@ -5,6 +5,8 @@ package org.whispersystems.textsecuregcm.push; +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; + import com.google.common.annotations.VisibleForTesting; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tags; @@ -18,14 +20,13 @@ import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.Util; -import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; - public class PushNotificationManager { private final AccountsManager accountsManager; private final APNSender apnSender; private final FcmSender fcmSender; - private final ApnFallbackManager fallbackManager; + private final ApnPushNotificationScheduler apnPushNotificationScheduler; + private final PushLatencyManager pushLatencyManager; private static final String SENT_NOTIFICATION_COUNTER_NAME = name(PushNotificationManager.class, "sentPushNotification"); private static final String FAILED_NOTIFICATION_COUNTER_NAME = name(PushNotificationManager.class, "failedPushNotification"); @@ -35,12 +36,14 @@ public class PushNotificationManager { public PushNotificationManager(final AccountsManager accountsManager, final APNSender apnSender, final FcmSender fcmSender, - final ApnFallbackManager fallbackManager) { + final ApnPushNotificationScheduler apnPushNotificationScheduler, + final PushLatencyManager pushLatencyManager) { this.accountsManager = accountsManager; this.apnSender = apnSender; this.fcmSender = fcmSender; - this.fallbackManager = fallbackManager; + this.apnPushNotificationScheduler = apnPushNotificationScheduler; + this.pushLatencyManager = pushLatencyManager; } public void sendNewMessageNotification(final Account destination, final long destinationDeviceId) throws NotPushRegisteredException { @@ -65,6 +68,11 @@ public class PushNotificationManager { PushNotification.NotificationType.RATE_LIMIT_CHALLENGE, challengeToken, destination, device)); } + public void handleMessagesRetrieved(final Account account, final Device device, final String userAgent) { + RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(account.getUuid(), device.getId(), userAgent)); + RedisOperation.unchecked(() -> apnPushNotificationScheduler.cancelRecurringVoipNotification(account, device)); + } + @VisibleForTesting Pair getToken(final Device device) throws NotPushRegisteredException { final Pair tokenAndType; @@ -112,7 +120,7 @@ public class PushNotificationManager { pushNotification.destination() != null && pushNotification.destinationDevice() != null) { - RedisOperation.unchecked(() -> fallbackManager.schedule(pushNotification.destination(), + RedisOperation.unchecked(() -> apnPushNotificationScheduler.scheduleRecurringVoipNotification(pushNotification.destination(), pushNotification.destinationDevice())); } } else { @@ -131,7 +139,7 @@ public class PushNotificationManager { d.setUninstalledFeedbackTimestamp(Util.todayInMillis())); } } else { - RedisOperation.unchecked(() -> fallbackManager.cancel(account, device)); + RedisOperation.unchecked(() -> apnPushNotificationScheduler.cancelRecurringVoipNotification(account, device)); } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 587c519b6..70bfb45aa 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -15,8 +15,6 @@ import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; -import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; -import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Pair; @@ -32,17 +30,14 @@ public class MessagesManager { private final MessagesDynamoDb messagesDynamoDb; private final MessagesCache messagesCache; - private final PushLatencyManager pushLatencyManager; private final ReportMessageManager reportMessageManager; public MessagesManager( final MessagesDynamoDb messagesDynamoDb, final MessagesCache messagesCache, - final PushLatencyManager pushLatencyManager, final ReportMessageManager reportMessageManager) { this.messagesDynamoDb = messagesDynamoDb; this.messagesCache = messagesCache; - this.pushLatencyManager = pushLatencyManager; this.reportMessageManager = reportMessageManager; } @@ -60,9 +55,7 @@ public class MessagesManager { return messagesCache.hasMessages(destinationUuid, destinationDevice); } - public Pair, Boolean> getMessagesForDevice(UUID destinationUuid, long destinationDevice, final String userAgent, final boolean cachedMessagesOnly) { - RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destinationUuid, destinationDevice, userAgent)); - + public Pair, Boolean> getMessagesForDevice(UUID destinationUuid, long destinationDevice, final boolean cachedMessagesOnly) { List messageList = new ArrayList<>(); if (!cachedMessagesOnly) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java index 0bcf56435..ec67a87e5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java @@ -18,7 +18,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; -import org.whispersystems.textsecuregcm.push.ApnFallbackManager; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; import org.whispersystems.textsecuregcm.push.PushNotificationManager; @@ -44,21 +43,18 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { private final ReceiptSender receiptSender; private final MessagesManager messagesManager; private final PushNotificationManager pushNotificationManager; - private final ApnFallbackManager apnFallbackManager; private final ClientPresenceManager clientPresenceManager; private final ScheduledExecutorService scheduledExecutorService; public AuthenticatedConnectListener(ReceiptSender receiptSender, MessagesManager messagesManager, PushNotificationManager pushNotificationManager, - ApnFallbackManager apnFallbackManager, ClientPresenceManager clientPresenceManager, ScheduledExecutorService scheduledExecutorService) { this.receiptSender = receiptSender; this.messagesManager = messagesManager; this.pushNotificationManager = pushNotificationManager; - this.apnFallbackManager = apnFallbackManager; this.clientPresenceManager = clientPresenceManager; this.scheduledExecutorService = scheduledExecutorService; } @@ -75,7 +71,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { scheduledExecutorService); openWebsocketCounter.inc(); - RedisOperation.unchecked(() -> apnFallbackManager.cancel(auth.getAccount(), device)); + pushNotificationManager.handleMessagesRetrieved(auth.getAccount(), device, context.getClient().getUserAgent()); final AtomicReference> renewPresenceFutureReference = new AtomicReference<>(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index ccfbbe242..f0da9987b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -320,8 +320,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture queueClearedFuture) { try { - final Pair, Boolean> messagesAndHasMore = messagesManager - .getMessagesForDevice(auth.getAccount().getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly); + final Pair, Boolean> messagesAndHasMore = messagesManager.getMessagesForDevice( + auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly); final List messages = messagesAndHasMore.first(); final boolean hasMore = messagesAndHasMore.second(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java index d1d2524ba..c7ea9f713 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java @@ -25,7 +25,7 @@ import net.sourceforge.argparse4j.inf.Subparser; import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialGenerator; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; -import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; +import org.whispersystems.textsecuregcm.push.PushLatencyManager; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient; @@ -180,7 +180,7 @@ public class AssignUsernameCommand extends EnvironmentCommand(messages, false)); + final String userAgent = "Test-UA"; + OutgoingMessageEntityList response = resources.getJerseyTest().target("/v1/messages/") .request() .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_UUID, AuthHelper.VALID_PASSWORD)) + .header("USer-Agent", userAgent) .accept(MediaType.APPLICATION_JSON_TYPE) .get(OutgoingMessageEntityList.class); @@ -458,6 +461,8 @@ class MessageControllerTest { assertEquals(updatedPniOne, response.messages().get(0).updatedPni()); assertNull(response.messages().get(1).updatedPni()); + + verify(pushNotificationManager).handleMessagesRetrieved(AuthHelper.VALID_ACCOUNT, AuthHelper.VALID_DEVICE, userAgent); } @Test @@ -472,7 +477,7 @@ class MessageControllerTest { UUID.randomUUID(), 2, AuthHelper.VALID_UUID, null, null, 0) ); - when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_UUID), eq(1L), anyString(), anyBoolean())) + when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_UUID), eq(1L), anyBoolean())) .thenReturn(new Pair<>(messages, false)); Response response = diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationSchedulerTest.java similarity index 70% rename from service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java rename to service/src/test/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationSchedulerTest.java index 464484947..c79fb3c08 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnPushNotificationSchedulerTest.java @@ -26,7 +26,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.util.Pair; -class ApnFallbackManagerTest { +class ApnPushNotificationSchedulerTest { @RegisterExtension static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); @@ -36,7 +36,7 @@ class ApnFallbackManagerTest { private APNSender apnSender; - private ApnFallbackManager apnFallbackManager; + private ApnPushNotificationScheduler apnPushNotificationScheduler; private static final UUID ACCOUNT_UUID = UUID.randomUUID(); private static final String ACCOUNT_NUMBER = "+18005551234"; @@ -62,41 +62,43 @@ class ApnFallbackManagerTest { apnSender = mock(APNSender.class); - apnFallbackManager = new ApnFallbackManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), apnSender, accountsManager); + apnPushNotificationScheduler = new ApnPushNotificationScheduler(REDIS_CLUSTER_EXTENSION.getRedisCluster(), apnSender, accountsManager); } @Test void testClusterInsert() { - final String endpoint = apnFallbackManager.getEndpointKey(account, device); + final String endpoint = apnPushNotificationScheduler.getEndpointKey(account, device); - assertTrue(apnFallbackManager.getPendingDestinations(SlotHash.getSlot(endpoint), 1).isEmpty()); + assertTrue( + apnPushNotificationScheduler.getPendingDestinationsForRecurringVoipNotifications(SlotHash.getSlot(endpoint), 1).isEmpty()); - apnFallbackManager.schedule(account, device, System.currentTimeMillis() - 30_000); + apnPushNotificationScheduler.scheduleRecurringVoipNotification(account, device, System.currentTimeMillis() - 30_000); - final List pendingDestinations = apnFallbackManager.getPendingDestinations(SlotHash.getSlot(endpoint), 2); + final List pendingDestinations = apnPushNotificationScheduler.getPendingDestinationsForRecurringVoipNotifications(SlotHash.getSlot(endpoint), 2); assertEquals(1, pendingDestinations.size()); - final Optional> maybeUuidAndDeviceId = ApnFallbackManager.getSeparated( + final Optional> maybeUuidAndDeviceId = ApnPushNotificationScheduler.getSeparated( pendingDestinations.get(0)); assertTrue(maybeUuidAndDeviceId.isPresent()); assertEquals(ACCOUNT_UUID.toString(), maybeUuidAndDeviceId.get().first()); assertEquals(DEVICE_ID, (long) maybeUuidAndDeviceId.get().second()); - assertTrue(apnFallbackManager.getPendingDestinations(SlotHash.getSlot(endpoint), 1).isEmpty()); + assertTrue( + apnPushNotificationScheduler.getPendingDestinationsForRecurringVoipNotifications(SlotHash.getSlot(endpoint), 1).isEmpty()); } @Test void testProcessNextSlot() { - final ApnFallbackManager.NotificationWorker worker = apnFallbackManager.new NotificationWorker(); + final ApnPushNotificationScheduler.NotificationWorker worker = apnPushNotificationScheduler.new NotificationWorker(); - apnFallbackManager.schedule(account, device, System.currentTimeMillis() - 30_000); + apnPushNotificationScheduler.scheduleRecurringVoipNotification(account, device, System.currentTimeMillis() - 30_000); - final int slot = SlotHash.getSlot(apnFallbackManager.getEndpointKey(account, device)); + final int slot = SlotHash.getSlot(apnPushNotificationScheduler.getEndpointKey(account, device)); final int previousSlot = (slot + SlotHash.SLOT_COUNT - 1) % SlotHash.SLOT_COUNT; REDIS_CLUSTER_EXTENSION.getRedisCluster().withCluster(connection -> connection.sync() - .set(ApnFallbackManager.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(previousSlot))); + .set(ApnPushNotificationScheduler.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(previousSlot))); assertEquals(1, worker.processNextSlot()); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java index 4bfc454d1..9100b4144 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/MessageSenderTest.java @@ -26,7 +26,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.whispersystems.textsecuregcm.entities.MessageProtos; -import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessagesManager; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushLatencyManagerTest.java similarity index 90% rename from service/src/test/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManagerTest.java rename to service/src/test/java/org/whispersystems/textsecuregcm/push/PushLatencyManagerTest.java index 900d4cc65..bbf82f8c7 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/metrics/PushLatencyManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushLatencyManagerTest.java @@ -1,9 +1,9 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2013-2022 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ -package org.whispersystems.textsecuregcm.metrics; +package org.whispersystems.textsecuregcm.push; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -23,8 +23,9 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicPushLatencyConfiguration; -import org.whispersystems.textsecuregcm.metrics.PushLatencyManager.PushRecord; -import org.whispersystems.textsecuregcm.metrics.PushLatencyManager.PushType; +import org.whispersystems.textsecuregcm.push.PushLatencyManager; +import org.whispersystems.textsecuregcm.push.PushLatencyManager.PushRecord; +import org.whispersystems.textsecuregcm.push.PushLatencyManager.PushType; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java index 1d6ed2550..8abf90fe8 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PushNotificationManagerTest.java @@ -14,6 +14,7 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -28,7 +29,8 @@ class PushNotificationManagerTest { private AccountsManager accountsManager; private APNSender apnSender; private FcmSender fcmSender; - private ApnFallbackManager apnFallbackManager; + private ApnPushNotificationScheduler apnPushNotificationScheduler; + private PushLatencyManager pushLatencyManager; private PushNotificationManager pushNotificationManager; @@ -37,11 +39,13 @@ class PushNotificationManagerTest { accountsManager = mock(AccountsManager.class); apnSender = mock(APNSender.class); fcmSender = mock(FcmSender.class); - apnFallbackManager = mock(ApnFallbackManager.class); + apnPushNotificationScheduler = mock(ApnPushNotificationScheduler.class); + pushLatencyManager = mock(PushLatencyManager.class); AccountsHelper.setupMockUpdate(accountsManager); - pushNotificationManager = new PushNotificationManager(accountsManager, apnSender, fcmSender, apnFallbackManager); + pushNotificationManager = new PushNotificationManager(accountsManager, apnSender, fcmSender, + apnPushNotificationScheduler, pushLatencyManager); } @Test @@ -113,7 +117,7 @@ class PushNotificationManagerTest { verifyNoInteractions(apnSender); verify(accountsManager, never()).updateDevice(eq(account), eq(Device.MASTER_ID), any()); verify(device, never()).setUninstalledFeedbackTimestamp(Util.todayInMillis()); - verifyNoInteractions(apnFallbackManager); + verifyNoInteractions(apnPushNotificationScheduler); } @Test @@ -136,7 +140,7 @@ class PushNotificationManagerTest { verifyNoInteractions(fcmSender); verify(accountsManager, never()).updateDevice(eq(account), eq(Device.MASTER_ID), any()); verify(device, never()).setUninstalledFeedbackTimestamp(Util.todayInMillis()); - verify(apnFallbackManager).schedule(account, device); + verify(apnPushNotificationScheduler).scheduleRecurringVoipNotification(account, device); } @Test @@ -159,7 +163,7 @@ class PushNotificationManagerTest { verify(accountsManager).updateDevice(eq(account), eq(Device.MASTER_ID), any()); verify(device).setUninstalledFeedbackTimestamp(Util.todayInMillis()); verifyNoInteractions(apnSender); - verifyNoInteractions(apnFallbackManager); + verifyNoInteractions(apnPushNotificationScheduler); } @Test @@ -181,6 +185,22 @@ class PushNotificationManagerTest { verifyNoInteractions(fcmSender); verify(accountsManager, never()).updateDevice(eq(account), eq(Device.MASTER_ID), any()); verify(device, never()).setUninstalledFeedbackTimestamp(Util.todayInMillis()); - verify(apnFallbackManager).cancel(account, device); + verify(apnPushNotificationScheduler).cancelRecurringVoipNotification(account, device); + } + + @Test + void testHandleMessagesRetrieved() { + final UUID accountIdentifier = UUID.randomUUID(); + final Account account = mock(Account.class); + final Device device = mock(Device.class); + final String userAgent = "User-Agent"; + + when(account.getUuid()).thenReturn(accountIdentifier); + when(device.getId()).thenReturn(Device.MASTER_ID); + + pushNotificationManager.handleMessagesRetrieved(account, device, userAgent); + + verify(pushLatencyManager).recordQueueRead(accountIdentifier, Device.MASTER_ID, userAgent); + verify(apnPushNotificationScheduler).cancelRecurringVoipNotification(account, device); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index bfbe204cb..793bac9ba 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -32,7 +32,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; -import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; +import org.whispersystems.textsecuregcm.push.PushLatencyManager; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbExtension; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; @@ -73,8 +73,7 @@ class MessagePersisterIntegrationTest { notificationExecutorService = Executors.newSingleThreadExecutor(); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), REDIS_CLUSTER_EXTENSION.getRedisCluster(), notificationExecutorService); - messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(PushLatencyManager.class), - mock(ReportMessageManager.class)); + messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class)); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, PERSIST_DELAY); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java index 653a1f900..55d35fb84 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java @@ -14,7 +14,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import java.util.UUID; import org.junit.jupiter.api.Test; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; -import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; +import org.whispersystems.textsecuregcm.push.PushLatencyManager; class MessagesManagerTest { @@ -24,7 +24,7 @@ class MessagesManagerTest { private final ReportMessageManager reportMessageManager = mock(ReportMessageManager.class); private final MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, - pushLatencyManager, reportMessageManager); + reportMessageManager); @Test void insert() { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 08b5de92a..59a85f38a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -43,7 +43,6 @@ import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; -import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.Account; @@ -100,7 +99,7 @@ class WebSocketConnectionIntegrationTest { webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), - new MessagesManager(messagesDynamoDb, messagesCache, mock(PushLatencyManager.class), reportMessageManager), + new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager), new AuthenticatedAccount(() -> new Pair<>(account, device)), device, webSocketClient, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 08f93290b..5f4faa293 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -52,7 +52,7 @@ import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList; -import org.whispersystems.textsecuregcm.push.ApnFallbackManager; +import org.whispersystems.textsecuregcm.push.ApnPushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.ReceiptSender; @@ -83,7 +83,7 @@ class WebSocketConnectionTest { private AuthenticatedAccount auth; private UpgradeRequest upgradeRequest; private ReceiptSender receiptSender; - private ApnFallbackManager apnFallbackManager; + private PushNotificationManager pushNotificationManager; private ScheduledExecutorService retrySchedulingExecutor; @BeforeEach @@ -95,7 +95,7 @@ class WebSocketConnectionTest { auth = new AuthenticatedAccount(() -> new Pair<>(account, device)); upgradeRequest = mock(UpgradeRequest.class); receiptSender = mock(ReceiptSender.class); - apnFallbackManager = mock(ApnFallbackManager.class); + pushNotificationManager = mock(PushNotificationManager.class); retrySchedulingExecutor = mock(ScheduledExecutorService.class); } @@ -104,7 +104,7 @@ class WebSocketConnectionTest { MessagesManager storedMessages = mock(MessagesManager.class); WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator); AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, storedMessages, - mock(PushNotificationManager.class), apnFallbackManager, mock(ClientPresenceManager.class), + mock(PushNotificationManager.class), mock(ClientPresenceManager.class), retrySchedulingExecutor); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); @@ -166,7 +166,7 @@ class WebSocketConnectionTest { String userAgent = "user-agent"; - when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), userAgent, false)) + when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false)) .thenReturn(new Pair<>(outgoingMessages, false)); final List> futures = new LinkedList<>(); @@ -221,9 +221,8 @@ class WebSocketConnectionTest { when(account.getUuid()).thenReturn(accountUuid); when(device.getId()).thenReturn(1L); when(client.isOpen()).thenReturn(true); - when(client.getUserAgent()).thenReturn("Test-UA"); - when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), eq("Test-UA"), anyBoolean())) + when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean())) .thenReturn(new Pair<>(Collections.emptyList(), false)) .thenReturn(new Pair<>(List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 1111, "first")), false)) .thenReturn(new Pair<>(List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 2222, "second")), false)); @@ -316,7 +315,7 @@ class WebSocketConnectionTest { String userAgent = "user-agent"; - when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), userAgent, false)) + when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false)) .thenReturn(new Pair<>(pendingMessages, false)); final List> futures = new LinkedList<>(); @@ -362,12 +361,11 @@ class WebSocketConnectionTest { when(account.getUuid()).thenReturn(UUID.randomUUID()); when(device.getId()).thenReturn(1L); when(client.isOpen()).thenReturn(true); - when(client.getUserAgent()).thenReturn("Test-UA"); final AtomicBoolean threadWaiting = new AtomicBoolean(false); final AtomicBoolean returnMessageList = new AtomicBoolean(false); - when(messagesManager.getMessagesForDevice(account.getUuid(), 1L, client.getUserAgent(), false)).thenAnswer( + when(messagesManager.getMessagesForDevice(account.getUuid(), 1L, false)).thenAnswer( (Answer) invocation -> { synchronized (threadWaiting) { threadWaiting.set(true); @@ -415,7 +413,7 @@ class WebSocketConnectionTest { } }); - verify(messagesManager).getMessagesForDevice(any(UUID.class), anyLong(), anyString(), eq(false)); + verify(messagesManager).getMessagesForDevice(any(UUID.class), anyLong(), eq(false)); } @Test @@ -429,7 +427,6 @@ class WebSocketConnectionTest { when(account.getUuid()).thenReturn(UUID.randomUUID()); when(device.getId()).thenReturn(1L); when(client.isOpen()).thenReturn(true); - when(client.getUserAgent()).thenReturn("Test-UA"); final List firstPageMessages = List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 1111, "first"), @@ -438,7 +435,7 @@ class WebSocketConnectionTest { final List secondPageMessages = List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 3333, "third")); - when(messagesManager.getMessagesForDevice(account.getUuid(), 1L, client.getUserAgent(), false)) + when(messagesManager.getMessagesForDevice(account.getUuid(), 1L, false)) .thenReturn(new Pair<>(firstPageMessages, true)) .thenReturn(new Pair<>(secondPageMessages, false)); @@ -473,13 +470,12 @@ class WebSocketConnectionTest { when(account.getUuid()).thenReturn(UUID.randomUUID()); when(device.getId()).thenReturn(1L); when(client.isOpen()).thenReturn(true); - when(client.getUserAgent()).thenReturn("Test-UA"); final UUID senderUuid = UUID.randomUUID(); final List messages = List.of( createMessage(senderUuid, UUID.randomUUID(), 1111L, "message the first")); - when(messagesManager.getMessagesForDevice(account.getUuid(), 1L, client.getUserAgent(), false)) + when(messagesManager.getMessagesForDevice(account.getUuid(), 1L, false)) .thenReturn(new Pair<>(messages, false)); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); @@ -530,9 +526,8 @@ class WebSocketConnectionTest { when(account.getUuid()).thenReturn(accountUuid); when(device.getId()).thenReturn(1L); when(client.isOpen()).thenReturn(true); - when(client.getUserAgent()).thenReturn("Test-UA"); - when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), eq("Test-UA"), anyBoolean())) + when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean())) .thenReturn(new Pair<>(Collections.emptyList(), false)); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); @@ -560,7 +555,6 @@ class WebSocketConnectionTest { when(account.getUuid()).thenReturn(accountUuid); when(device.getId()).thenReturn(1L); when(client.isOpen()).thenReturn(true); - when(client.getUserAgent()).thenReturn("Test-UA"); final List firstPageMessages = List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 1111, "first"), @@ -569,7 +563,7 @@ class WebSocketConnectionTest { final List secondPageMessages = List.of(createMessage(UUID.randomUUID(), UUID.randomUUID(), 3333, "third")); - when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), eq("Test-UA"), anyBoolean())) + when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean())) .thenReturn(new Pair<>(firstPageMessages, false)) .thenReturn(new Pair<>(secondPageMessages, false)) .thenReturn(new Pair<>(Collections.emptyList(), false)); @@ -609,9 +603,8 @@ class WebSocketConnectionTest { when(account.getUuid()).thenReturn(accountUuid); when(device.getId()).thenReturn(1L); when(client.isOpen()).thenReturn(true); - when(client.getUserAgent()).thenReturn("Test-UA"); - when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), eq("Test-UA"), anyBoolean())) + when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean())) .thenReturn(new Pair<>(Collections.emptyList(), false)); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); @@ -623,11 +616,11 @@ class WebSocketConnectionTest { // anything. connection.processStoredMessages(); - verify(messagesManager).getMessagesForDevice(account.getUuid(), device.getId(), client.getUserAgent(), false); + verify(messagesManager).getMessagesForDevice(account.getUuid(), device.getId(), false); connection.handleNewMessagesAvailable(); - verify(messagesManager).getMessagesForDevice(account.getUuid(), device.getId(), client.getUserAgent(), true); + verify(messagesManager).getMessagesForDevice(account.getUuid(), device.getId(), true); } @Test @@ -643,9 +636,8 @@ class WebSocketConnectionTest { when(account.getUuid()).thenReturn(accountUuid); when(device.getId()).thenReturn(1L); when(client.isOpen()).thenReturn(true); - when(client.getUserAgent()).thenReturn("Test-UA"); - when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), eq("Test-UA"), anyBoolean())) + when(messagesManager.getMessagesForDevice(eq(accountUuid), eq(1L), anyBoolean())) .thenReturn(new Pair<>(Collections.emptyList(), false)); final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); @@ -658,7 +650,7 @@ class WebSocketConnectionTest { connection.processStoredMessages(); connection.handleMessagesPersisted(); - verify(messagesManager, times(2)).getMessagesForDevice(account.getUuid(), device.getId(), client.getUserAgent(), false); + verify(messagesManager, times(2)).getMessagesForDevice(account.getUuid(), device.getId(), false); } @Test @@ -692,7 +684,7 @@ class WebSocketConnectionTest { String userAgent = "Signal-Desktop/1.2.3"; - when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), userAgent, false)) + when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false)) .thenReturn(new Pair<>(outgoingMessages, false)); final List> futures = new LinkedList<>(); @@ -762,7 +754,7 @@ class WebSocketConnectionTest { String userAgent = "Signal-Android/4.68.3"; - when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), userAgent, false)) + when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false)) .thenReturn(new Pair<>(outgoingMessages, false)); final List> futures = new LinkedList<>(); @@ -814,7 +806,7 @@ class WebSocketConnectionTest { String userAgent = "Signal-Android/4.68.3"; - when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), userAgent, false)) + when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false)) .thenThrow(new RedisException("OH NO")); when(retrySchedulingExecutor.schedule(any(Runnable.class), anyLong(), any())).thenAnswer( @@ -848,7 +840,7 @@ class WebSocketConnectionTest { String userAgent = "Signal-Android/4.68.3"; - when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), userAgent, false)) + when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), false)) .thenThrow(new RedisException("OH NO")); final WebSocketClient client = mock(WebSocketClient.class);