diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 0bc4fa3bd..048450e00 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -193,7 +193,7 @@ import org.whispersystems.textsecuregcm.push.APNSender; import org.whispersystems.textsecuregcm.push.FcmSender; import org.whispersystems.textsecuregcm.push.MessageSender; import org.whispersystems.textsecuregcm.push.ProvisioningManager; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ReceiptSender; @@ -597,7 +597,7 @@ public class WhisperServerService extends Application(AuthenticatedDevice.class)); - environment.jersey().register(new WebsocketRefreshApplicationEventListener(accountsManager, pubSubClientEventManager)); + environment.jersey().register(new WebsocketRefreshApplicationEventListener(accountsManager, + webSocketConnectionEventManager)); environment.jersey().register(new TimestampResponseFilter()); /// @@ -982,15 +983,15 @@ public class WhisperServerService extends Application spamFilters = ServiceLoader.load(SpamFilter.class) @@ -1127,10 +1128,11 @@ public class WhisperServerService extends Application provisioningEnvironment = new WebSocketEnvironment<>(environment, webSocketEnvironment.getRequestLog(), Duration.ofMillis(60000)); - provisioningEnvironment.jersey().register(new WebsocketRefreshApplicationEventListener(accountsManager, pubSubClientEventManager)); + provisioningEnvironment.jersey().register(new WebsocketRefreshApplicationEventListener(accountsManager, + webSocketConnectionEventManager)); provisioningEnvironment.setConnectListener(new ProvisioningConnectListener(provisioningManager)); provisioningEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET, clientReleaseManager)); - provisioningEnvironment.jersey().register(new KeepAliveController(pubSubClientEventManager)); + provisioningEnvironment.jersey().register(new KeepAliveController(webSocketConnectionEventManager)); provisioningEnvironment.jersey().register(new TimestampResponseFilter()); registerExceptionMappers(environment, webSocketEnvironment, provisioningEnvironment); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/auth/RegistrationLockVerificationManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/auth/RegistrationLockVerificationManager.java index f4167eae6..d432d7aeb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/auth/RegistrationLockVerificationManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/auth/RegistrationLockVerificationManager.java @@ -26,7 +26,7 @@ import org.whispersystems.textsecuregcm.entities.Svr3Credentials; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; @@ -54,7 +54,7 @@ public class RegistrationLockVerificationManager { private static final String PHONE_VERIFICATION_TYPE_TAG_NAME = "phoneVerificationType"; private final AccountsManager accounts; - private final PubSubClientEventManager pubSubClientEventManager; + private final WebSocketConnectionEventManager webSocketConnectionEventManager; private final ExternalServiceCredentialsGenerator svr2CredentialGenerator; private final ExternalServiceCredentialsGenerator svr3CredentialGenerator; private final RateLimiters rateLimiters; @@ -63,14 +63,14 @@ public class RegistrationLockVerificationManager { public RegistrationLockVerificationManager( final AccountsManager accounts, - final PubSubClientEventManager pubSubClientEventManager, + final WebSocketConnectionEventManager webSocketConnectionEventManager, final ExternalServiceCredentialsGenerator svr2CredentialGenerator, final ExternalServiceCredentialsGenerator svr3CredentialGenerator, final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager, final PushNotificationManager pushNotificationManager, final RateLimiters rateLimiters) { this.accounts = accounts; - this.pubSubClientEventManager = pubSubClientEventManager; + this.webSocketConnectionEventManager = webSocketConnectionEventManager; this.svr2CredentialGenerator = svr2CredentialGenerator; this.svr3CredentialGenerator = svr3CredentialGenerator; this.registrationRecoveryPasswordsManager = registrationRecoveryPasswordsManager; @@ -161,7 +161,7 @@ public class RegistrationLockVerificationManager { } final List deviceIds = updatedAccount.getDevices().stream().map(Device::getId).toList(); - pubSubClientEventManager.requestDisconnection(updatedAccount.getUuid(), deviceIds); + webSocketConnectionEventManager.requestDisconnection(updatedAccount.getUuid(), deviceIds); try { // Send a push notification that prompts the client to attempt login and fail due to locked credentials diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/auth/WebsocketRefreshApplicationEventListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/auth/WebsocketRefreshApplicationEventListener.java index 0c91d141f..03ed0965f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/auth/WebsocketRefreshApplicationEventListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/auth/WebsocketRefreshApplicationEventListener.java @@ -9,7 +9,7 @@ import org.glassfish.jersey.server.monitoring.ApplicationEvent; import org.glassfish.jersey.server.monitoring.ApplicationEventListener; import org.glassfish.jersey.server.monitoring.RequestEvent; import org.glassfish.jersey.server.monitoring.RequestEventListener; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.storage.AccountsManager; /** @@ -20,9 +20,10 @@ public class WebsocketRefreshApplicationEventListener implements ApplicationEven private final WebsocketRefreshRequestEventListener websocketRefreshRequestEventListener; public WebsocketRefreshApplicationEventListener(final AccountsManager accountsManager, - final PubSubClientEventManager pubSubClientEventManager) { + final WebSocketConnectionEventManager webSocketConnectionEventManager) { - this.websocketRefreshRequestEventListener = new WebsocketRefreshRequestEventListener(pubSubClientEventManager, + this.websocketRefreshRequestEventListener = new WebsocketRefreshRequestEventListener( + webSocketConnectionEventManager, new LinkedDeviceRefreshRequirementProvider(accountsManager), new PhoneNumberChangeRefreshRequirementProvider(accountsManager)); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/auth/WebsocketRefreshRequestEventListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/auth/WebsocketRefreshRequestEventListener.java index a8f2e3aa8..41df06e7e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/auth/WebsocketRefreshRequestEventListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/auth/WebsocketRefreshRequestEventListener.java @@ -19,11 +19,11 @@ import org.glassfish.jersey.server.monitoring.RequestEvent.Type; import org.glassfish.jersey.server.monitoring.RequestEventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; public class WebsocketRefreshRequestEventListener implements RequestEventListener { - private final PubSubClientEventManager pubSubClientEventManager; + private final WebSocketConnectionEventManager webSocketConnectionEventManager; private final WebsocketRefreshRequirementProvider[] providers; private static final Counter DISPLACED_ACCOUNTS = Metrics.counter( @@ -35,10 +35,10 @@ public class WebsocketRefreshRequestEventListener implements RequestEventListene private static final Logger logger = LoggerFactory.getLogger(WebsocketRefreshRequestEventListener.class); public WebsocketRefreshRequestEventListener( - final PubSubClientEventManager pubSubClientEventManager, + final WebSocketConnectionEventManager webSocketConnectionEventManager, final WebsocketRefreshRequirementProvider... providers) { - this.pubSubClientEventManager = pubSubClientEventManager; + this.webSocketConnectionEventManager = webSocketConnectionEventManager; this.providers = providers; } @@ -60,7 +60,7 @@ public class WebsocketRefreshRequestEventListener implements RequestEventListene .forEach(pair -> { try { displacedDevices.incrementAndGet(); - pubSubClientEventManager.requestDisconnection(pair.first(), List.of(pair.second())); + webSocketConnectionEventManager.requestDisconnection(pair.first(), List.of(pair.second())); } catch (final Exception e) { logger.error("Could not displace device presence", e); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeepAliveController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeepAliveController.java index 3f7af8c07..8813d9a37 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeepAliveController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/KeepAliveController.java @@ -22,7 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.websocket.auth.ReadOnly; import org.whispersystems.websocket.session.WebSocketSession; import org.whispersystems.websocket.session.WebSocketSessionContext; @@ -34,14 +34,14 @@ public class KeepAliveController { private final Logger logger = LoggerFactory.getLogger(KeepAliveController.class); - private final PubSubClientEventManager pubSubClientEventManager; + private final WebSocketConnectionEventManager webSocketConnectionEventManager; private static final String CLOSED_CONNECTION_AGE_DISTRIBUTION_NAME = name(KeepAliveController.class, "closedConnectionAge"); - public KeepAliveController(final PubSubClientEventManager pubSubClientEventManager) { - this.pubSubClientEventManager = pubSubClientEventManager; + public KeepAliveController(final WebSocketConnectionEventManager webSocketConnectionEventManager) { + this.webSocketConnectionEventManager = webSocketConnectionEventManager; } @GET @@ -49,7 +49,7 @@ public class KeepAliveController { @WebSocketSession WebSocketSessionContext context) { maybeAuth.ifPresent(auth -> { - if (!pubSubClientEventManager.isLocallyPresent(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId())) { + if (!webSocketConnectionEventManager.isLocallyPresent(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId())) { final Duration age = Duration.between(context.getClient().getCreated(), Instant.now()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientEventListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventListener.java similarity index 74% rename from service/src/main/java/org/whispersystems/textsecuregcm/push/ClientEventListener.java rename to service/src/main/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventListener.java index 405a59445..90078cfc0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientEventListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventListener.java @@ -6,10 +6,10 @@ package org.whispersystems.textsecuregcm.push; /** - * A client event listener handles events related to a client's message-retrieval presence. Handler methods are run on - * dedicated threads and may safely perform blocking operations. + * A WebSocket connection event listener handles message availability and presence events related to a client's open + * WebSocket connection. Handler methods are run on dedicated threads and may safely perform blocking operations. */ -public interface ClientEventListener { +public interface WebSocketConnectionEventListener { /** * Indicates that a new message is available in the connected client's message queue. diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManager.java similarity index 70% rename from service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java rename to service/src/main/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManager.java index f9485f328..c20df6bc3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManager.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; @@ -39,16 +40,31 @@ import org.whispersystems.textsecuregcm.util.UUIDUtil; import org.whispersystems.textsecuregcm.util.Util; /** - * The pub/sub-based client presence manager uses the Redis 7 sharded pub/sub system to notify connected clients that - * new messages are available for retrieval and report to senders whether a client was present to receive a message when - * sent. This system makes a best effort to ensure that a given client has only a single open connection across the - * fleet of servers, but cannot guarantee at-most-one behavior. + * The WebSocket connection event manager distributes events related to client presence and message availability to + * registered listeners. In the current Signal server implementation, clients generally interact with the service by + * opening a dual-purpose WebSocket. The WebSocket serves as both a delivery mechanism for messages and as a channel + * for the client to issue API requests to the server. Clients are considered "present" if they have an open WebSocket + * connection and are therefore likely to receive messages as soon as they're delivered to the server. WebSocket + * connection managers make a best effort to ensure that clients have at most one active message delivery channel at + * a time. + * + * @implNote The WebSocket connection event manager uses the Redis 7 sharded pub/sub system to distribute events. This + * system makes a best effort to ensure that a given client has only a single open connection across the fleet of + * servers, but cannot guarantee at-most-one behavior. + * + * @see WebSocketConnectionEventListener + * @see org.whispersystems.textsecuregcm.storage.MessagesManager#insert(UUID, byte, MessageProtos.Envelope) */ -public class PubSubClientEventManager extends RedisClusterPubSubAdapter implements Managed { +public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter implements Managed { private final FaultTolerantRedisClusterClient clusterClient; private final Executor listenerEventExecutor; + @Nullable + private FaultTolerantPubSubClusterConnection pubSubConnection; + + private final Map listenersByAccountAndDeviceIdentifier; + private final UUID serverId = UUID.randomUUID(); private final byte[] CLIENT_CONNECTED_EVENT_BYTES = ClientEvent.newBuilder() @@ -58,36 +74,31 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter pubSubConnection; - - private final Map listenersByAccountAndDeviceIdentifier; - private static final byte[] DISCONNECT_REQUESTED_EVENT_BYTES = ClientEvent.newBuilder() .setDisconnectRequested(DisconnectRequested.getDefaultInstance()) .build() .toByteArray(); private static final Counter PUBLISH_CLIENT_CONNECTION_EVENT_ERROR_COUNTER = - Metrics.counter(MetricsUtil.name(PubSubClientEventManager.class, "publishClientConnectionEventError")); + Metrics.counter(MetricsUtil.name(WebSocketConnectionEventManager.class, "publishClientConnectionEventError")); private static final Counter UNSUBSCRIBE_ERROR_COUNTER = - Metrics.counter(MetricsUtil.name(PubSubClientEventManager.class, "unsubscribeError")); + Metrics.counter(MetricsUtil.name(WebSocketConnectionEventManager.class, "unsubscribeError")); private static final Counter MESSAGE_WITHOUT_LISTENER_COUNTER = - Metrics.counter(MetricsUtil.name(PubSubClientEventManager.class, "messageWithoutListener")); + Metrics.counter(MetricsUtil.name(WebSocketConnectionEventManager.class, "messageWithoutListener")); private static final String LISTENER_GAUGE_NAME = - MetricsUtil.name(PubSubClientEventManager.class, "listeners"); + MetricsUtil.name(WebSocketConnectionEventManager.class, "listeners"); - private static final Logger logger = LoggerFactory.getLogger(PubSubClientEventManager.class); + private static final Logger logger = LoggerFactory.getLogger(WebSocketConnectionEventManager.class); @VisibleForTesting record AccountAndDeviceIdentifier(UUID accountIdentifier, byte deviceId) { } - public PubSubClientEventManager(final FaultTolerantRedisClusterClient clusterClient, - final Executor listenerEventExecutor) { + public WebSocketConnectionEventManager(final FaultTolerantRedisClusterClient clusterClient, + final Executor listenerEventExecutor) { this.clusterClient = clusterClient; this.listenerEventExecutor = listenerEventExecutor; @@ -117,27 +128,26 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter handleClientConnected(final UUID accountIdentifier, final byte deviceId, final ClientEventListener listener) { + public CompletionStage handleClientConnected(final UUID accountIdentifier, final byte deviceId, final WebSocketConnectionEventListener listener) { if (pubSubConnection == null) { - throw new IllegalStateException("Presence manager not started"); + throw new IllegalStateException("WebSocket connection event manager not started"); } - final UUID connectionId = UUID.randomUUID(); - final byte[] clientPresenceKey = getClientEventChannel(accountIdentifier, deviceId); - final AtomicReference displacedListener = new AtomicReference<>(); + final byte[] eventChannel = getClientEventChannel(accountIdentifier, deviceId); + final AtomicReference displacedListener = new AtomicReference<>(); final AtomicReference> subscribeFuture = new AtomicReference<>(); // Note that we're relying on some specific implementation details of `ConcurrentHashMap#compute(...)`. In @@ -153,7 +163,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter { subscribeFuture.set(pubSubConnection.withPubSubConnection(connection -> - connection.async().ssubscribe(clientPresenceKey))); + connection.async().ssubscribe(eventChannel))); if (existingListener != null) { displacedListener.set(existingListener); @@ -168,28 +178,28 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter clusterClient.withBinaryCluster(connection -> connection.async() - .spublish(clientPresenceKey, CLIENT_CONNECTED_EVENT_BYTES))) + .spublish(eventChannel, CLIENT_CONNECTED_EVENT_BYTES))) .handle((ignored, throwable) -> { if (throwable != null) { PUBLISH_CLIENT_CONNECTION_EVENT_ERROR_COUNTER.increment(); } - return connectionId; + return null; }); } /** - * Removes the "presence" for the given device. Callers should call this method when they have been notified that - * the client's underlying network connection has been closed. + * Removes the "presence" and event listener for the given device. Callers should call this method when the client's + * underlying network connection has closed. * * @param accountIdentifier the identifier of the account for the disconnected device * @param deviceId the ID of the disconnected device within the given account * - * @return a future that completes when the presence has been removed + * @return a future that completes when the presence and event listener have been removed */ public CompletionStage handleClientDisconnected(final UUID accountIdentifier, final byte deviceId) { if (pubSubConnection == null) { - throw new IllegalStateException("Presence manager not started"); + throw new IllegalStateException("WebSocket connection event manager not started"); } final AtomicReference> unsubscribeFuture = new AtomicReference<>(); @@ -221,20 +231,20 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter requestDisconnection(final UUID accountIdentifier, final Collection deviceIds) { return CompletableFuture.allOf(deviceIds.stream() - .map(deviceId -> { - final byte[] clientPresenceKey = getClientEventChannel(accountIdentifier, deviceId); - - return clusterClient.withBinaryCluster(connection -> connection.async() - .spublish(clientPresenceKey, DISCONNECT_REQUESTED_EVENT_BYTES)) - .toCompletableFuture(); - }) + .map(deviceId -> clusterClient.withBinaryCluster(connection -> connection.async() + .spublish(getClientEventChannel(accountIdentifier, deviceId), DISCONNECT_REQUESTED_EVENT_BYTES)) + .toCompletableFuture()) .toArray(CompletableFuture[]::new)); } @@ -270,7 +276,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter> clientPresenceKeysBySlot = new HashMap<>(); + final Map> eventChannelsBySlot = new HashMap<>(); // Organize subscriptions by slot so we can issue a smaller number of larger resubscription commands listenersByAccountAndDeviceIdentifier.keySet() @@ -280,15 +286,15 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter new ArrayList<>()).add(clientEventChannel); + eventChannelsBySlot.computeIfAbsent(slot, ignored -> new ArrayList<>()).add(clientEventChannel); } }); // Issue one resubscription command per affected slot - clientPresenceKeysBySlot.forEach((slot, clientPresenceKeys) -> { + eventChannelsBySlot.forEach((slot, eventChannels) -> { if (pubSubConnection != null) { - final byte[][] clientPresenceKeyArray = clientPresenceKeys.toArray(byte[][]::new); - pubSubConnection.usePubSubConnection(connection -> connection.sync().ssubscribe(clientPresenceKeyArray)); + pubSubConnection.usePubSubConnection(connection -> + connection.sync().ssubscribe(eventChannels.toArray(byte[][]::new))); } }); } @@ -324,9 +330,9 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter listener.handleNewMessageAvailable(); case CLIENT_CONNECTED -> { - // Only act on new connections to other presence manager instances; we'll learn about displacements in THIS + // Only act on new connections to other event manager instances; we'll learn about displacements in THIS // instance when we update the listener map in `handleClientConnected` if (!this.serverId.equals(UUIDUtil.fromByteString(clientEvent.getClientConnected().getServerId()))) { listenerEventExecutor.execute(() -> listener.handleConnectionDisplaced(true)); @@ -357,12 +363,12 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter implemen private final ProfilesManager profilesManager; private final SecureStorageClient secureStorageClient; private final SecureValueRecovery2Client secureValueRecovery2Client; - private final PubSubClientEventManager pubSubClientEventManager; + private final WebSocketConnectionEventManager webSocketConnectionEventManager; private final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager; private final ClientPublicKeysManager clientPublicKeysManager; private final Executor accountLockExecutor; @@ -202,7 +202,7 @@ public class AccountsManager extends RedisPubSubAdapter implemen final ProfilesManager profilesManager, final SecureStorageClient secureStorageClient, final SecureValueRecovery2Client secureValueRecovery2Client, - final PubSubClientEventManager pubSubClientEventManager, + final WebSocketConnectionEventManager webSocketConnectionEventManager, final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager, final ClientPublicKeysManager clientPublicKeysManager, final Executor accountLockExecutor, @@ -219,7 +219,7 @@ public class AccountsManager extends RedisPubSubAdapter implemen this.profilesManager = profilesManager; this.secureStorageClient = secureStorageClient; this.secureValueRecovery2Client = secureValueRecovery2Client; - this.pubSubClientEventManager = pubSubClientEventManager; + this.webSocketConnectionEventManager = webSocketConnectionEventManager; this.registrationRecoveryPasswordsManager = requireNonNull(registrationRecoveryPasswordsManager); this.clientPublicKeysManager = clientPublicKeysManager; this.accountLockExecutor = accountLockExecutor; @@ -325,7 +325,7 @@ public class AccountsManager extends RedisPubSubAdapter implemen keysManager.deleteSingleUsePreKeys(pni), messagesManager.clear(aci), profilesManager.deleteAll(aci)) - .thenCompose(ignored -> pubSubClientEventManager.requestDisconnection(aci)) + .thenCompose(ignored -> webSocketConnectionEventManager.requestDisconnection(aci)) .thenCompose(ignored -> accounts.reclaimAccount(e.getExistingAccount(), account, additionalWriteItems)) .thenCompose(ignored -> { // We should have cleared all messages before overwriting the old account, but more may have arrived @@ -589,7 +589,7 @@ public class AccountsManager extends RedisPubSubAdapter implemen }) .whenComplete((ignored, throwable) -> { if (throwable == null) { - pubSubClientEventManager.requestDisconnection(accountIdentifier, List.of(deviceId)); + webSocketConnectionEventManager.requestDisconnection(accountIdentifier, List.of(deviceId)); } }); } @@ -1236,7 +1236,7 @@ public class AccountsManager extends RedisPubSubAdapter implemen registrationRecoveryPasswordsManager.removeForNumber(account.getNumber())) .thenCompose(ignored -> accounts.delete(account.getUuid(), additionalWriteItems)) .thenCompose(ignored -> redisDeleteAsync(account)) - .thenRun(() -> pubSubClientEventManager.requestDisconnection(account.getUuid())); + .thenRun(() -> webSocketConnectionEventManager.requestDisconnection(account.getUuid())); } private String getAccountMapKey(String key) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScript.java index 23b7d7d44..784d2a264 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScript.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScript.java @@ -15,7 +15,7 @@ import java.util.UUID; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.push.ClientEvent; import org.whispersystems.textsecuregcm.push.NewMessageAvailableEvent; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; @@ -52,7 +52,7 @@ class MessagesCacheInsertScript { MessagesCache.getMessageQueueKey(destinationUuid, destinationDevice), // queueKey MessagesCache.getMessageQueueMetadataKey(destinationUuid, destinationDevice), // queueMetadataKey MessagesCache.getQueueIndexKey(destinationUuid, destinationDevice), // queueTotalIndexKey - PubSubClientEventManager.getClientEventChannel(destinationUuid, destinationDevice) // eventChannelKey + WebSocketConnectionEventManager.getClientEventChannel(destinationUuid, destinationDevice) // eventChannelKey ); final List args = new ArrayList<>(Arrays.asList( diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheUnlockQueueScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheUnlockQueueScript.java index 169892723..bb654ce93 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheUnlockQueueScript.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheUnlockQueueScript.java @@ -8,7 +8,7 @@ package org.whispersystems.textsecuregcm.storage; import io.lettuce.core.ScriptOutputType; import org.whispersystems.textsecuregcm.push.ClientEvent; import org.whispersystems.textsecuregcm.push.MessagesPersistedEvent; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import java.io.IOException; @@ -35,7 +35,7 @@ class MessagesCacheUnlockQueueScript { void execute(final UUID accountIdentifier, final byte deviceId) { final List keys = List.of( MessagesCache.getPersistInProgressKey(accountIdentifier, deviceId), // persistInProgressKey - PubSubClientEventManager.getClientEventChannel(accountIdentifier, deviceId) // eventChannelKey + WebSocketConnectionEventManager.getClientEventChannel(accountIdentifier, deviceId) // eventChannelKey ); unlockQueueScript.executeBinary(keys, MESSAGES_PERSISTED_EVENT_ARGS); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 0f219a91d..23872ac08 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -59,10 +59,23 @@ public class MessagesManager { this.messageDeletionExecutor = messageDeletionExecutor; } - public boolean insert(UUID destinationUuid, byte destinationDevice, Envelope message) { + /** + * Inserts a message into a target device's message queue and notifies registered listeners that a new message is + * available. + * + * @param destinationUuid the account identifier for the destination queue + * @param destinationDeviceId the device ID for the destination queue + * @param message the message to insert into the queue + * + * @return {@code true} if the destination device is "present" (i.e. has an active event listener) or {@code false} + * otherwise + * + * @see org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager + */ + public boolean insert(final UUID destinationUuid, final byte destinationDeviceId, final Envelope message) { final UUID messageGuid = UUID.randomUUID(); - final boolean destinationPresent = messagesCache.insert(messageGuid, destinationUuid, destinationDevice, message); + final boolean destinationPresent = messagesCache.insert(messageGuid, destinationUuid, destinationDeviceId, message); if (message.hasSourceServiceId() && !destinationUuid.toString().equals(message.getSourceServiceId())) { reportMessageManager.store(message.getSourceServiceId(), messageGuid); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java index a9606b720..bfad3bb02 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java @@ -15,7 +15,7 @@ import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice; import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.metrics.OpenWebSocketCounter; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ReceiptSender; @@ -40,7 +40,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { private final MessageMetrics messageMetrics; private final PushNotificationManager pushNotificationManager; private final PushNotificationScheduler pushNotificationScheduler; - private final PubSubClientEventManager pubSubClientEventManager; + private final WebSocketConnectionEventManager webSocketConnectionEventManager; private final ScheduledExecutorService scheduledExecutorService; private final Scheduler messageDeliveryScheduler; private final ClientReleaseManager clientReleaseManager; @@ -54,7 +54,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { MessageMetrics messageMetrics, PushNotificationManager pushNotificationManager, PushNotificationScheduler pushNotificationScheduler, - PubSubClientEventManager pubSubClientEventManager, + WebSocketConnectionEventManager webSocketConnectionEventManager, ScheduledExecutorService scheduledExecutorService, Scheduler messageDeliveryScheduler, ClientReleaseManager clientReleaseManager, @@ -64,7 +64,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { this.messageMetrics = messageMetrics; this.pushNotificationManager = pushNotificationManager; this.pushNotificationScheduler = pushNotificationScheduler; - this.pubSubClientEventManager = pubSubClientEventManager; + this.webSocketConnectionEventManager = webSocketConnectionEventManager; this.scheduledExecutorService = scheduledExecutorService; this.messageDeliveryScheduler = messageDeliveryScheduler; this.clientReleaseManager = clientReleaseManager; @@ -105,7 +105,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { // receive push notifications for inbound messages. We should do this first because, at this point, the // connection has already closed and attempts to actually deliver a message via the connection will not succeed. // It's preferable to start sending push notifications as soon as possible. - pubSubClientEventManager.handleClientDisconnected(auth.getAccount().getUuid(), + webSocketConnectionEventManager.handleClientDisconnected(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId()); // Finally, stop trying to deliver messages and send a push notification if the connection is aware of any @@ -122,7 +122,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { // Finally, we register this client's presence, which suppresses push notifications. We do this last because // receiving extra push notifications is generally preferable to missing out on a push notification. - pubSubClientEventManager.handleClientConnected(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), connection); + webSocketConnectionEventManager.handleClientConnected(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), connection); } catch (final Exception e) { log.warn("Failed to initialize websocket", e); context.getClient().close(1011, "Unexpected error initializing connection"); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 818195200..6ff1a152e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -45,7 +45,7 @@ import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; -import org.whispersystems.textsecuregcm.push.ClientEventListener; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventListener; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ReceiptSender; @@ -63,7 +63,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import javax.annotation.Nullable; -public class WebSocketConnection implements ClientEventListener { +public class WebSocketConnection implements WebSocketConnectionEventListener { private static final DistributionSummary messageTime = Metrics.summary( name(MessageController.class, "messageDeliveryDuration")); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 9e5a3e7da..215af6e77 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -36,7 +36,7 @@ import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.metrics.MicrometerAwsSdkMetricPublisher; import org.whispersystems.textsecuregcm.push.APNSender; import org.whispersystems.textsecuregcm.push.FcmSender; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient; @@ -205,7 +205,7 @@ record CommandDependencies( configuration.getSvr2Configuration()); SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator, storageServiceExecutor, storageServiceRetryExecutor, configuration.getSecureStorageServiceConfiguration()); - PubSubClientEventManager pubSubClientEventManager = new PubSubClientEventManager(messagesCluster, clientEventExecutor); + WebSocketConnectionEventManager webSocketConnectionEventManager = new WebSocketConnectionEventManager(messagesCluster, clientEventExecutor); MessagesCache messagesCache = new MessagesCache(messagesCluster, messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC(), dynamicConfigurationManager); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); @@ -222,7 +222,7 @@ record CommandDependencies( new ClientPublicKeysManager(clientPublicKeys, accountLockManager, accountLockExecutor); AccountsManager accountsManager = new AccountsManager(accounts, phoneNumberIdentifiers, cacheCluster, pubsubClient, accountLockManager, keys, messagesManager, profilesManager, - secureStorageClient, secureValueRecovery2Client, pubSubClientEventManager, + secureStorageClient, secureValueRecovery2Client, webSocketConnectionEventManager, registrationRecoveryPasswordsManager, clientPublicKeysManager, accountLockExecutor, clock, configuration.getLinkDeviceSecretConfiguration().secret().value(), dynamicConfigurationManager); RateLimiters rateLimiters = RateLimiters.createAndValidate(configuration.getLimitsConfiguration(), @@ -259,7 +259,7 @@ record CommandDependencies( Clock.systemUTC()); environment.lifecycle().manage(apnSender); - environment.lifecycle().manage(pubSubClientEventManager); + environment.lifecycle().manage(webSocketConnectionEventManager); environment.lifecycle().manage(new ManagedAwsCrt()); return new CommandDependencies( diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/auth/LinkedDeviceRefreshRequirementProviderTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/auth/LinkedDeviceRefreshRequirementProviderTest.java index 849a1ef1c..5c48ef0d6 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/auth/LinkedDeviceRefreshRequirementProviderTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/auth/LinkedDeviceRefreshRequirementProviderTest.java @@ -59,7 +59,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.whispersystems.textsecuregcm.filters.RemoteAddressFilter; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; @@ -95,19 +95,19 @@ class LinkedDeviceRefreshRequirementProviderTest { .build(); private AccountsManager accountsManager; - private PubSubClientEventManager pubSubClientEventManager; + private WebSocketConnectionEventManager webSocketConnectionEventManager; private LinkedDeviceRefreshRequirementProvider provider; @BeforeEach void setup() { accountsManager = mock(AccountsManager.class); - pubSubClientEventManager = mock(PubSubClientEventManager.class); + webSocketConnectionEventManager = mock(WebSocketConnectionEventManager.class); provider = new LinkedDeviceRefreshRequirementProvider(accountsManager); final WebsocketRefreshRequestEventListener listener = - new WebsocketRefreshRequestEventListener(pubSubClientEventManager, provider); + new WebsocketRefreshRequestEventListener(webSocketConnectionEventManager, provider); when(applicationEventListener.onRequest(any())).thenReturn(listener); @@ -139,9 +139,9 @@ class LinkedDeviceRefreshRequirementProviderTest { assertEquals(initialDeviceCount + addedDeviceNames.size(), account.getDevices().size()); - verify(pubSubClientEventManager).requestDisconnection(account.getUuid(), List.of((byte) 1)); - verify(pubSubClientEventManager).requestDisconnection(account.getUuid(), List.of((byte) 2)); - verify(pubSubClientEventManager).requestDisconnection(account.getUuid(), List.of((byte) 3)); + verify(webSocketConnectionEventManager).requestDisconnection(account.getUuid(), List.of((byte) 1)); + verify(webSocketConnectionEventManager).requestDisconnection(account.getUuid(), List.of((byte) 2)); + verify(webSocketConnectionEventManager).requestDisconnection(account.getUuid(), List.of((byte) 3)); } @ParameterizedTest @@ -170,10 +170,10 @@ class LinkedDeviceRefreshRequirementProviderTest { assertEquals(200, response.getStatus()); initialDeviceIds.forEach(deviceId -> { - verify(pubSubClientEventManager).requestDisconnection(account.getUuid(), List.of(deviceId)); + verify(webSocketConnectionEventManager).requestDisconnection(account.getUuid(), List.of(deviceId)); }); - verifyNoMoreInteractions(pubSubClientEventManager); + verifyNoMoreInteractions(webSocketConnectionEventManager); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/auth/PhoneNumberChangeRefreshRequirementProviderTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/auth/PhoneNumberChangeRefreshRequirementProviderTest.java index db4272b77..af5df7f1c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/auth/PhoneNumberChangeRefreshRequirementProviderTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/auth/PhoneNumberChangeRefreshRequirementProviderTest.java @@ -47,7 +47,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.whispersystems.textsecuregcm.filters.RemoteAddressFilter; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; @@ -74,7 +74,7 @@ class PhoneNumberChangeRefreshRequirementProviderTest { private static final AccountAuthenticator AUTHENTICATOR = mock(AccountAuthenticator.class); private static final AccountsManager ACCOUNTS_MANAGER = mock(AccountsManager.class); - private static final PubSubClientEventManager PUBSUB_CLIENT_PRESENCE = mock(PubSubClientEventManager.class); + private static final WebSocketConnectionEventManager PUBSUB_CLIENT_PRESENCE = mock(WebSocketConnectionEventManager.class); private WebSocketClient client; private final Account account1 = new Account(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/auth/RegistrationLockVerificationManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/auth/RegistrationLockVerificationManagerTest.java index 645dd7e72..e04400c0a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/auth/RegistrationLockVerificationManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/auth/RegistrationLockVerificationManagerTest.java @@ -34,7 +34,7 @@ import org.whispersystems.textsecuregcm.entities.PhoneVerificationRequest; import org.whispersystems.textsecuregcm.limits.RateLimiter; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; @@ -46,7 +46,7 @@ import org.whispersystems.textsecuregcm.util.Pair; class RegistrationLockVerificationManagerTest { private final AccountsManager accountsManager = mock(AccountsManager.class); - private final PubSubClientEventManager pubSubClientEventManager = mock(PubSubClientEventManager.class); + private final WebSocketConnectionEventManager webSocketConnectionEventManager = mock(WebSocketConnectionEventManager.class); private final ExternalServiceCredentialsGenerator svr2CredentialsGenerator = mock( ExternalServiceCredentialsGenerator.class); private final ExternalServiceCredentialsGenerator svr3CredentialsGenerator = mock( @@ -56,7 +56,7 @@ class RegistrationLockVerificationManagerTest { private static PushNotificationManager pushNotificationManager = mock(PushNotificationManager.class); private final RateLimiters rateLimiters = mock(RateLimiters.class); private final RegistrationLockVerificationManager registrationLockVerificationManager = new RegistrationLockVerificationManager( - accountsManager, pubSubClientEventManager, svr2CredentialsGenerator, svr3CredentialsGenerator, + accountsManager, webSocketConnectionEventManager, svr2CredentialsGenerator, svr3CredentialsGenerator, registrationRecoveryPasswordsManager, pushNotificationManager, rateLimiters); private final RateLimiter pinLimiter = mock(RateLimiter.class); @@ -107,7 +107,7 @@ class RegistrationLockVerificationManagerTest { } else { verify(registrationRecoveryPasswordsManager, never()).removeForNumber(account.getNumber()); } - verify(pubSubClientEventManager).requestDisconnection(account.getUuid(), List.of(Device.PRIMARY_ID)); + verify(webSocketConnectionEventManager).requestDisconnection(account.getUuid(), List.of(Device.PRIMARY_ID)); try { verify(pushNotificationManager).sendAttemptLoginNotification(any(), eq("failedRegistrationLock")); } catch (NotPushRegisteredException npre) {} @@ -130,7 +130,7 @@ class RegistrationLockVerificationManagerTest { verify(pushNotificationManager, never()).sendAttemptLoginNotification(any(), eq("failedRegistrationLock")); } catch (NotPushRegisteredException npre) {} verify(registrationRecoveryPasswordsManager, never()).removeForNumber(account.getNumber()); - verify(pubSubClientEventManager, never()).requestDisconnection(any(), any()); + verify(webSocketConnectionEventManager, never()).requestDisconnection(any(), any()); }); } }; @@ -168,7 +168,7 @@ class RegistrationLockVerificationManagerTest { verify(account, never()).lockAuthTokenHash(); verify(registrationRecoveryPasswordsManager, never()).removeForNumber(account.getNumber()); - verify(pubSubClientEventManager, never()).requestDisconnection(any(), any()); + verify(webSocketConnectionEventManager, never()).requestDisconnection(any(), any()); } static Stream testSuccess() { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/DeviceControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/DeviceControllerTest.java index 87242241c..37c185d32 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/DeviceControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/DeviceControllerTest.java @@ -79,7 +79,7 @@ import org.whispersystems.textsecuregcm.limits.RateLimiter; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.mappers.DeviceLimitExceededExceptionMapper; import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.ClientPublicKeysManager; @@ -110,7 +110,7 @@ class DeviceControllerTest { private static final Account account = mock(Account.class); private static final Account maxedAccount = mock(Account.class); private static final Device primaryDevice = mock(Device.class); - private static final PubSubClientEventManager pubSubClientEventManager = mock(PubSubClientEventManager.class); + private static final WebSocketConnectionEventManager webSocketConnectionEventManager = mock(WebSocketConnectionEventManager.class); private static final Map deviceConfiguration = new HashMap<>(); private static final TestClock testClock = TestClock.now(); @@ -131,7 +131,7 @@ class DeviceControllerTest { .addProvider(new AuthValueFactoryProvider.Binder<>(AuthenticatedDevice.class)) .addProvider(new RateLimitExceededExceptionMapper()) .setTestContainerFactory(new GrizzlyWebTestContainerFactory()) - .addProvider(new WebsocketRefreshApplicationEventListener(accountsManager, pubSubClientEventManager)) + .addProvider(new WebsocketRefreshApplicationEventListener(accountsManager, webSocketConnectionEventManager)) .addProvider(new DeviceLimitExceededExceptionMapper()) .addResource(deviceController) .build(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManagerTest.java similarity index 70% rename from service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java rename to service/src/test/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManagerTest.java index f35edb9a2..93e665ada 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManagerTest.java @@ -40,17 +40,17 @@ import org.whispersystems.textsecuregcm.tests.util.MockRedisFuture; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; @Timeout(value = 10, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) -class PubSubClientEventManagerTest { +class WebSocketConnectionEventManagerTest { - private PubSubClientEventManager localPresenceManager; - private PubSubClientEventManager remotePresenceManager; + private WebSocketConnectionEventManager localEventManager; + private WebSocketConnectionEventManager remoteEventManager; private static ExecutorService clientEventExecutor; @RegisterExtension static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); - private static class ClientEventAdapter implements ClientEventListener { + private static class WebSocketConnectionEventAdapter implements WebSocketConnectionEventListener { @Override public void handleNewMessageAvailable() { @@ -72,17 +72,17 @@ class PubSubClientEventManagerTest { @BeforeEach void setUp() { - localPresenceManager = new PubSubClientEventManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), clientEventExecutor); - remotePresenceManager = new PubSubClientEventManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), clientEventExecutor); + localEventManager = new WebSocketConnectionEventManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), clientEventExecutor); + remoteEventManager = new WebSocketConnectionEventManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), clientEventExecutor); - localPresenceManager.start(); - remotePresenceManager.start(); + localEventManager.start(); + remoteEventManager.start(); } @AfterEach void tearDown() { - localPresenceManager.stop(); - remotePresenceManager.stop(); + localEventManager.stop(); + remoteEventManager.stop(); } @AfterAll @@ -101,7 +101,7 @@ class PubSubClientEventManagerTest { final AtomicBoolean secondListenerDisplaced = new AtomicBoolean(false); final AtomicBoolean firstListenerConnectedElsewhere = new AtomicBoolean(false); - localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter() { + localEventManager.handleClientConnected(accountIdentifier, deviceId, new WebSocketConnectionEventAdapter() { @Override public void handleConnectionDisplaced(final boolean connectedElsewhere) { synchronized (firstListenerDisplaced) { @@ -116,10 +116,10 @@ class PubSubClientEventManagerTest { assertFalse(firstListenerDisplaced.get()); assertFalse(secondListenerDisplaced.get()); - final PubSubClientEventManager displacingManager = - displaceRemotely ? remotePresenceManager : localPresenceManager; + final WebSocketConnectionEventManager displacingManager = + displaceRemotely ? remoteEventManager : localEventManager; - displacingManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter() { + displacingManager.handleClientConnected(accountIdentifier, deviceId, new WebSocketConnectionEventAdapter() { @Override public void handleConnectionDisplaced(final boolean connectedElsewhere) { secondListenerDisplaced.set(true); @@ -143,22 +143,22 @@ class PubSubClientEventManagerTest { final UUID accountIdentifier = UUID.randomUUID(); final byte deviceId = Device.PRIMARY_ID; - assertFalse(localPresenceManager.isLocallyPresent(accountIdentifier, deviceId)); - assertFalse(remotePresenceManager.isLocallyPresent(accountIdentifier, deviceId)); + assertFalse(localEventManager.isLocallyPresent(accountIdentifier, deviceId)); + assertFalse(remoteEventManager.isLocallyPresent(accountIdentifier, deviceId)); - localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter()) + localEventManager.handleClientConnected(accountIdentifier, deviceId, new WebSocketConnectionEventAdapter()) .toCompletableFuture() .join(); - assertTrue(localPresenceManager.isLocallyPresent(accountIdentifier, deviceId)); - assertFalse(remotePresenceManager.isLocallyPresent(accountIdentifier, deviceId)); + assertTrue(localEventManager.isLocallyPresent(accountIdentifier, deviceId)); + assertFalse(remoteEventManager.isLocallyPresent(accountIdentifier, deviceId)); - localPresenceManager.handleClientDisconnected(accountIdentifier, deviceId) + localEventManager.handleClientDisconnected(accountIdentifier, deviceId) .toCompletableFuture() .join(); - assertFalse(localPresenceManager.isLocallyPresent(accountIdentifier, deviceId)); - assertFalse(remotePresenceManager.isLocallyPresent(accountIdentifier, deviceId)); + assertFalse(localEventManager.isLocallyPresent(accountIdentifier, deviceId)); + assertFalse(remoteEventManager.isLocallyPresent(accountIdentifier, deviceId)); } @ParameterizedTest @@ -173,7 +173,7 @@ class PubSubClientEventManagerTest { final AtomicBoolean firstListenerConnectedElsewhere = new AtomicBoolean(false); - localPresenceManager.handleClientConnected(accountIdentifier, firstDeviceId, new ClientEventAdapter() { + localEventManager.handleClientConnected(accountIdentifier, firstDeviceId, new WebSocketConnectionEventAdapter() { @Override public void handleConnectionDisplaced(final boolean connectedElsewhere) { synchronized (firstListenerDisplaced) { @@ -185,7 +185,7 @@ class PubSubClientEventManagerTest { } }).toCompletableFuture().join(); - localPresenceManager.handleClientConnected(accountIdentifier, secondDeviceId, new ClientEventAdapter() { + localEventManager.handleClientConnected(accountIdentifier, secondDeviceId, new WebSocketConnectionEventAdapter() { @Override public void handleConnectionDisplaced(final boolean connectedElsewhere) { synchronized (secondListenerDisplaced) { @@ -198,8 +198,8 @@ class PubSubClientEventManagerTest { assertFalse(firstListenerDisplaced.get()); assertFalse(secondListenerDisplaced.get()); - final PubSubClientEventManager displacingManager = - requestDisconnectionRemotely ? remotePresenceManager : localPresenceManager; + final WebSocketConnectionEventManager displacingManager = + requestDisconnectionRemotely ? remoteEventManager : localEventManager; displacingManager.requestDisconnection(accountIdentifier, List.of(firstDeviceId)).toCompletableFuture().join(); @@ -230,13 +230,13 @@ class PubSubClientEventManagerTest { .binaryPubSubAsyncCommands(pubSubAsyncCommands) .build(); - final PubSubClientEventManager presenceManager = new PubSubClientEventManager(clusterClient, Runnable::run); + final WebSocketConnectionEventManager eventManager = new WebSocketConnectionEventManager(clusterClient, Runnable::run); - presenceManager.start(); + eventManager.start(); final UUID firstAccountIdentifier = UUID.randomUUID(); final byte firstDeviceId = Device.PRIMARY_ID; - final int firstSlot = SlotHash.getSlot(PubSubClientEventManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId)); + final int firstSlot = SlotHash.getSlot(WebSocketConnectionEventManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId)); final UUID secondAccountIdentifier; final byte secondDeviceId = firstDeviceId + 1; @@ -247,15 +247,15 @@ class PubSubClientEventManagerTest { do { candidateIdentifier = UUID.randomUUID(); - } while (SlotHash.getSlot(PubSubClientEventManager.getClientEventChannel(candidateIdentifier, secondDeviceId)) == firstSlot); + } while (SlotHash.getSlot(WebSocketConnectionEventManager.getClientEventChannel(candidateIdentifier, secondDeviceId)) == firstSlot); secondAccountIdentifier = candidateIdentifier; } - presenceManager.handleClientConnected(firstAccountIdentifier, firstDeviceId, new ClientEventAdapter()).toCompletableFuture().join(); - presenceManager.handleClientConnected(secondAccountIdentifier, secondDeviceId, new ClientEventAdapter()).toCompletableFuture().join(); + eventManager.handleClientConnected(firstAccountIdentifier, firstDeviceId, new WebSocketConnectionEventAdapter()).toCompletableFuture().join(); + eventManager.handleClientConnected(secondAccountIdentifier, secondDeviceId, new WebSocketConnectionEventAdapter()).toCompletableFuture().join(); - final int secondSlot = SlotHash.getSlot(PubSubClientEventManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId)); + final int secondSlot = SlotHash.getSlot(WebSocketConnectionEventManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId)); final String firstNodeId = UUID.randomUUID().toString(); @@ -274,12 +274,12 @@ class PubSubClientEventManagerTest { when(secondAfterNode.getNodeId()).thenReturn(UUID.randomUUID().toString()); when(secondAfterNode.getSlots()).thenReturn(List.of(secondSlot)); - presenceManager.resubscribe(new ClusterTopologyChangedEvent( + eventManager.resubscribe(new ClusterTopologyChangedEvent( List.of(firstBeforeNode), List.of(firstAfterNode, secondAfterNode))); - verify(pubSubCommands).ssubscribe(PubSubClientEventManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId)); - verify(pubSubCommands, never()).ssubscribe(PubSubClientEventManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId)); + verify(pubSubCommands).ssubscribe(WebSocketConnectionEventManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId)); + verify(pubSubCommands, never()).ssubscribe(WebSocketConnectionEventManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId)); } @Test @@ -293,7 +293,7 @@ class PubSubClientEventManagerTest { .binaryPubSubAsyncCommands(pubSubAsyncCommands) .build(); - final PubSubClientEventManager eventManager = new PubSubClientEventManager(clusterClient, Runnable::run); + final WebSocketConnectionEventManager eventManager = new WebSocketConnectionEventManager(clusterClient, Runnable::run); eventManager.start(); @@ -303,20 +303,20 @@ class PubSubClientEventManagerTest { final UUID noListenerAccountIdentifier = UUID.randomUUID(); final byte noListenerDeviceId = listenerDeviceId + 1; - eventManager.handleClientConnected(listenerAccountIdentifier, listenerDeviceId, new ClientEventAdapter()) + eventManager.handleClientConnected(listenerAccountIdentifier, listenerDeviceId, new WebSocketConnectionEventAdapter()) .toCompletableFuture() .join(); eventManager.unsubscribeIfMissingListener( - new PubSubClientEventManager.AccountAndDeviceIdentifier(listenerAccountIdentifier, listenerDeviceId)); + new WebSocketConnectionEventManager.AccountAndDeviceIdentifier(listenerAccountIdentifier, listenerDeviceId)); eventManager.unsubscribeIfMissingListener( - new PubSubClientEventManager.AccountAndDeviceIdentifier(noListenerAccountIdentifier, noListenerDeviceId)); + new WebSocketConnectionEventManager.AccountAndDeviceIdentifier(noListenerAccountIdentifier, noListenerDeviceId)); verify(pubSubAsyncCommands, never()) - .sunsubscribe(PubSubClientEventManager.getClientEventChannel(listenerAccountIdentifier, listenerDeviceId)); + .sunsubscribe(WebSocketConnectionEventManager.getClientEventChannel(listenerAccountIdentifier, listenerDeviceId)); verify(pubSubAsyncCommands) - .sunsubscribe(PubSubClientEventManager.getClientEventChannel(noListenerAccountIdentifier, noListenerDeviceId)); + .sunsubscribe(WebSocketConnectionEventManager.getClientEventChannel(noListenerAccountIdentifier, noListenerDeviceId)); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCreationDeletionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCreationDeletionIntegrationTest.java index eafd6c0fb..a7254cbe4 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCreationDeletionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCreationDeletionIntegrationTest.java @@ -43,7 +43,7 @@ import org.whispersystems.textsecuregcm.entities.ECSignedPreKey; import org.whispersystems.textsecuregcm.entities.GcmRegistrationId; import org.whispersystems.textsecuregcm.entities.KEMSignedPreKey; import org.whispersystems.textsecuregcm.identity.IdentityType; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; @@ -138,8 +138,8 @@ public class AccountCreationDeletionIntegrationTest { when(registrationRecoveryPasswordsManager.removeForNumber(any())) .thenReturn(CompletableFuture.completedFuture(null)); - final PubSubClientEventManager pubSubClientEventManager = mock(PubSubClientEventManager.class); - when(pubSubClientEventManager.requestDisconnection(any())) + final WebSocketConnectionEventManager webSocketConnectionEventManager = mock(WebSocketConnectionEventManager.class); + when(webSocketConnectionEventManager.requestDisconnection(any())) .thenReturn(CompletableFuture.completedFuture(null)); accountsManager = new AccountsManager( @@ -153,7 +153,7 @@ public class AccountCreationDeletionIntegrationTest { profilesManager, secureStorageClient, svr2Client, - pubSubClientEventManager, + webSocketConnectionEventManager, registrationRecoveryPasswordsManager, clientPublicKeysManager, accountLockExecutor, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java index e994317e5..03fc0b506 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java @@ -36,7 +36,7 @@ import org.whispersystems.textsecuregcm.controllers.MismatchedDevicesException; import org.whispersystems.textsecuregcm.entities.AccountAttributes; import org.whispersystems.textsecuregcm.entities.ECSignedPreKey; import org.whispersystems.textsecuregcm.identity.IdentityType; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; @@ -66,7 +66,7 @@ class AccountsManagerChangeNumberIntegrationTest { static final RedisClusterExtension CACHE_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); private KeysManager keysManager; - private PubSubClientEventManager pubSubClientEventManager; + private WebSocketConnectionEventManager webSocketConnectionEventManager; private ExecutorService accountLockExecutor; private AccountsManager accountsManager; @@ -116,7 +116,7 @@ class AccountsManagerChangeNumberIntegrationTest { final SecureValueRecovery2Client svr2Client = mock(SecureValueRecovery2Client.class); when(svr2Client.deleteBackups(any())).thenReturn(CompletableFuture.completedFuture(null)); - pubSubClientEventManager = mock(PubSubClientEventManager.class); + webSocketConnectionEventManager = mock(WebSocketConnectionEventManager.class); final PhoneNumberIdentifiers phoneNumberIdentifiers = new PhoneNumberIdentifiers(DYNAMO_DB_EXTENSION.getDynamoDbClient(), Tables.PNI.tableName()); @@ -144,7 +144,7 @@ class AccountsManagerChangeNumberIntegrationTest { profilesManager, secureStorageClient, svr2Client, - pubSubClientEventManager, + webSocketConnectionEventManager, registrationRecoveryPasswordsManager, clientPublicKeysManager, accountLockExecutor, @@ -274,7 +274,7 @@ class AccountsManagerChangeNumberIntegrationTest { assertEquals(secondNumber, accountsManager.getByAccountIdentifier(originalUuid).map(Account::getNumber).orElseThrow()); - verify(pubSubClientEventManager).requestDisconnection(existingAccountUuid); + verify(webSocketConnectionEventManager).requestDisconnection(existingAccountUuid); assertEquals(Optional.of(existingAccountUuid), accountsManager.findRecentlyDeletedAccountIdentifier(originalNumber)); assertEquals(Optional.empty(), accountsManager.findRecentlyDeletedAccountIdentifier(secondNumber)); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java index 0bc3f5fc6..0bb81c21f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerConcurrentModificationIntegrationTest.java @@ -48,7 +48,7 @@ import org.whispersystems.textsecuregcm.auth.UnidentifiedAccessUtil; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.AccountAttributes; import org.whispersystems.textsecuregcm.identity.IdentityType; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; @@ -133,7 +133,7 @@ class AccountsManagerConcurrentModificationIntegrationTest { mock(ProfilesManager.class), mock(SecureStorageClient.class), mock(SecureValueRecovery2Client.class), - mock(PubSubClientEventManager.class), + mock(WebSocketConnectionEventManager.class), mock(RegistrationRecoveryPasswordsManager.class), mock(ClientPublicKeysManager.class), mock(Executor.class), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerDeviceTransferIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerDeviceTransferIntegrationTest.java index d5bb98a3c..71547ec2e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerDeviceTransferIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerDeviceTransferIntegrationTest.java @@ -14,7 +14,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.entities.RestoreAccountRequest; import org.whispersystems.textsecuregcm.entities.RemoteAttachment; import org.whispersystems.textsecuregcm.identity.IdentityType; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.redis.RedisServerExtension; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; @@ -62,7 +62,7 @@ public class AccountsManagerDeviceTransferIntegrationTest { mock(ProfilesManager.class), mock(SecureStorageClient.class), mock(SecureValueRecovery2Client.class), - mock(PubSubClientEventManager.class), + mock(WebSocketConnectionEventManager.class), mock(RegistrationRecoveryPasswordsManager.class), mock(ClientPublicKeysManager.class), mock(ExecutorService.class), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java index 6c781b260..7ec64abca 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerTest.java @@ -79,7 +79,7 @@ import org.whispersystems.textsecuregcm.entities.KEMSignedPreKey; import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.identity.PniServiceIdentifier; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; @@ -117,7 +117,7 @@ class AccountsManagerTest { private KeysManager keysManager; private MessagesManager messagesManager; private ProfilesManager profilesManager; - private PubSubClientEventManager pubSubClientEventManager; + private WebSocketConnectionEventManager webSocketConnectionEventManager; private ClientPublicKeysManager clientPublicKeysManager; private Map phoneNumberIdentifiersByE164; @@ -152,7 +152,7 @@ class AccountsManagerTest { keysManager = mock(KeysManager.class); messagesManager = mock(MessagesManager.class); profilesManager = mock(ProfilesManager.class); - pubSubClientEventManager = mock(PubSubClientEventManager.class); + webSocketConnectionEventManager = mock(WebSocketConnectionEventManager.class); clientPublicKeysManager = mock(ClientPublicKeysManager.class); dynamicConfiguration = mock(DynamicConfiguration.class); @@ -238,7 +238,7 @@ class AccountsManagerTest { .stringAsyncCommands(asyncClusterCommands) .build(); - when(pubSubClientEventManager.requestDisconnection(any())) + when(webSocketConnectionEventManager.requestDisconnection(any())) .thenReturn(CompletableFuture.completedFuture(null)); accountsManager = new AccountsManager( @@ -252,7 +252,7 @@ class AccountsManagerTest { profilesManager, storageClient, svr2Client, - pubSubClientEventManager, + webSocketConnectionEventManager, registrationRecoveryPasswordsManager, clientPublicKeysManager, mock(Executor.class), @@ -791,7 +791,7 @@ class AccountsManagerTest { verify(keysManager, times(2)).deleteSingleUsePreKeys(account.getUuid(), linkedDevice.getId()); verify(keysManager).buildWriteItemsForRemovedDevice(account.getUuid(), account.getPhoneNumberIdentifier(), linkedDevice.getId()); verify(clientPublicKeysManager).buildTransactWriteItemForDeletion(account.getUuid(), linkedDevice.getId()); - verify(pubSubClientEventManager).requestDisconnection(account.getUuid(), List.of(linkedDevice.getId())); + verify(webSocketConnectionEventManager).requestDisconnection(account.getUuid(), List.of(linkedDevice.getId())); } @Test @@ -809,7 +809,7 @@ class AccountsManagerTest { assertDoesNotThrow(account::getPrimaryDevice); verify(messagesManager, never()).clear(any(), anyByte()); verify(keysManager, never()).deleteSingleUsePreKeys(any(), anyByte()); - verify(pubSubClientEventManager, never()).requestDisconnection(any(), any()); + verify(webSocketConnectionEventManager, never()).requestDisconnection(any(), any()); } @Test @@ -878,7 +878,7 @@ class AccountsManagerTest { verify(keysManager, times(2)).deleteSingleUsePreKeys(phoneNumberIdentifiersByE164.get(e164)); verify(messagesManager, times(2)).clear(existingUuid); verify(profilesManager, times(2)).deleteAll(existingUuid); - verify(pubSubClientEventManager).requestDisconnection(existingUuid); + verify(webSocketConnectionEventManager).requestDisconnection(existingUuid); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java index 73e72e105..aae1d4a92 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerUsernameIntegrationTest.java @@ -35,7 +35,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.Mockito; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; @@ -134,8 +134,8 @@ class AccountsManagerUsernameIntegrationTest { when(messageManager.clear(any())).thenReturn(CompletableFuture.completedFuture(null)); when(profileManager.deleteAll(any())).thenReturn(CompletableFuture.completedFuture(null)); - final PubSubClientEventManager pubSubClientEventManager = mock(PubSubClientEventManager.class); - when(pubSubClientEventManager.requestDisconnection(any())).thenReturn(CompletableFuture.completedFuture(null)); + final WebSocketConnectionEventManager webSocketConnectionEventManager = mock(WebSocketConnectionEventManager.class); + when(webSocketConnectionEventManager.requestDisconnection(any())).thenReturn(CompletableFuture.completedFuture(null)); accountsManager = new AccountsManager( accounts, @@ -148,7 +148,7 @@ class AccountsManagerUsernameIntegrationTest { profileManager, mock(SecureStorageClient.class), mock(SecureValueRecovery2Client.class), - pubSubClientEventManager, + webSocketConnectionEventManager, mock(RegistrationRecoveryPasswordsManager.class), mock(ClientPublicKeysManager.class), Executors.newSingleThreadExecutor(), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AddRemoveDeviceIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AddRemoveDeviceIntegrationTest.java index 444f20cd3..c3cbcccef 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AddRemoveDeviceIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AddRemoveDeviceIntegrationTest.java @@ -33,7 +33,7 @@ import org.signal.libsignal.protocol.ecc.ECKeyPair; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.DeviceInfo; import org.whispersystems.textsecuregcm.identity.IdentityType; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.redis.RedisServerExtension; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; @@ -149,7 +149,7 @@ public class AddRemoveDeviceIntegrationTest { profilesManager, secureStorageClient, svr2Client, - mock(PubSubClientEventManager.class), + mock(WebSocketConnectionEventManager.class), registrationRecoveryPasswordsManager, clientPublicKeysManager, accountLockExecutor, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index 8ac623d63..85ff58322 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -32,8 +32,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; -import org.whispersystems.textsecuregcm.push.ClientEventListener; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventListener; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; @@ -55,7 +55,7 @@ class MessagePersisterIntegrationTest { private ExecutorService clientEventExecutorService; private MessagesCache messagesCache; private MessagesManager messagesManager; - private PubSubClientEventManager pubSubClientEventManager; + private WebSocketConnectionEventManager webSocketConnectionEventManager; private MessagePersister messagePersister; private Account account; @@ -85,8 +85,8 @@ class MessagePersisterIntegrationTest { messageDeletionExecutorService); clientEventExecutorService = Executors.newVirtualThreadPerTaskExecutor(); - pubSubClientEventManager = new PubSubClientEventManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), clientEventExecutorService); - pubSubClientEventManager.start(); + webSocketConnectionEventManager = new WebSocketConnectionEventManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), clientEventExecutorService); + webSocketConnectionEventManager.start(); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, PERSIST_DELAY, 1); @@ -113,7 +113,7 @@ class MessagePersisterIntegrationTest { messageDeliveryScheduler.dispose(); - pubSubClientEventManager.stop(); + webSocketConnectionEventManager.stop(); } @Test @@ -142,7 +142,7 @@ class MessagePersisterIntegrationTest { final AtomicBoolean messagesPersisted = new AtomicBoolean(false); - pubSubClientEventManager.handleClientConnected(account.getUuid(), Device.PRIMARY_ID, new ClientEventListener() { + webSocketConnectionEventManager.handleClientConnected(account.getUuid(), Device.PRIMARY_ID, new WebSocketConnectionEventListener() { @Override public void handleNewMessageAvailable() { } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScriptTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScriptTest.java index 733b3ded8..753f29f2a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScriptTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScriptTest.java @@ -20,7 +20,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.entities.MessageProtos; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; @@ -98,7 +98,7 @@ class MessagesCacheInsertScriptTest { REDIS_CLUSTER_EXTENSION.getRedisCluster().createBinaryPubSubConnection(); pubSubClusterConnection.usePubSubConnection(connection -> - connection.sync().ssubscribe(PubSubClientEventManager.getClientEventChannel(destinationUuid, deviceId))); + connection.sync().ssubscribe(WebSocketConnectionEventManager.getClientEventChannel(destinationUuid, deviceId))); assertTrue(insertScript.execute(destinationUuid, deviceId, MessageProtos.Envelope.newBuilder() .setServerTimestamp(Instant.now().getEpochSecond()) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index e4c039a31..be7ff7b66 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -57,7 +57,7 @@ import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice; import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; -import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ReceiptSender; @@ -124,7 +124,7 @@ class WebSocketConnectionTest { new WebSocketAccountAuthenticator(accountAuthenticator, mock(PrincipalSupplier.class)); AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, messagesManager, new MessageMetrics(), mock(PushNotificationManager.class), mock(PushNotificationScheduler.class), - mock(PubSubClientEventManager.class), retrySchedulingExecutor, + mock(WebSocketConnectionEventManager.class), retrySchedulingExecutor, messageDeliveryScheduler, clientReleaseManager, mock(MessageDeliveryLoopMonitor.class)); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class);