diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java index e3d84751d..cc2f90f87 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java @@ -45,13 +45,22 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter pubSubConnection; - private final Map listenersByAccountAndDeviceIdentifier; + private final Map listenersByAccountAndDeviceIdentifier; private static final byte[] NEW_MESSAGE_EVENT_BYTES = ClientEvent.newBuilder() .setNewMessageAvailable(NewMessageAvailableEvent.getDefaultInstance()) @@ -80,9 +89,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter { + (key, existingListener) -> { subscribeFuture.set(pubSubConnection.withPubSubConnection(connection -> connection.async().ssubscribe(clientPresenceKey))); - if (existingIdAndListener != null) { - displacedListener.set(existingIdAndListener.listener()); + if (existingListener != null) { + displacedListener.set(existingListener); } - return new ConnectionIdAndListener(connectionId, listener); + return listener; }); if (displacedListener.get() != null) { @@ -171,7 +177,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter clusterClient.withBinaryCluster(connection -> connection.async() - .spublish(clientPresenceKey, buildClientConnectedMessage(connectionId)))) + .spublish(clientPresenceKey, CLIENT_CONNECTED_EVENT_BYTES))) .handle((ignored, throwable) -> { if (throwable != null) { PUBLISH_CLIENT_CONNECTION_EVENT_ERROR_COUNTER.increment(); @@ -182,17 +188,15 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter handleClientDisconnected(final UUID accountIdentifier, final byte deviceId, final UUID connectionId) { + public CompletionStage handleClientDisconnected(final UUID accountIdentifier, final byte deviceId) { if (pubSubConnection == null) { throw new IllegalStateException("Presence manager not started"); } @@ -214,35 +218,19 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter { - final ConnectionIdAndListener remainingIdAndListener; + (ignored, existingListener) -> { + unsubscribeFuture.set(pubSubConnection.withPubSubConnection(connection -> + connection.async().sunsubscribe(getClientPresenceKey(accountIdentifier, deviceId))) + .thenRun(Util.NOOP)); - if (existingIdAndListener == null) { - remainingIdAndListener = null; - } else if (existingIdAndListener.connectionIdentifier().equals(connectionId)) { - remainingIdAndListener = null; - } else { - remainingIdAndListener = existingIdAndListener; - } - - if (remainingIdAndListener == null) { - // Only unsubscribe if there's no listener remaining - unsubscribeFuture.set(pubSubConnection.withPubSubConnection(connection -> - connection.async().sunsubscribe(getClientPresenceKey(accountIdentifier, deviceId))) - .thenRun(Util.NOOP)); - } else { - unsubscribeFuture.set(CompletableFuture.completedFuture(null)); - } - - return remainingIdAndListener; + return null; }); - return unsubscribeFuture.get() - .whenComplete((ignored, throwable) -> { - if (throwable != null) { - UNSUBSCRIBE_ERROR_COUNTER.increment(); - } - }); + return unsubscribeFuture.get().whenComplete((ignored, throwable) -> { + if (throwable != null) { + UNSUBSCRIBE_ERROR_COUNTER.increment(); + } + }); } /** @@ -355,24 +343,22 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter connectionIdAndListener.listener().handleNewMessageAvailable(); + case NEW_MESSAGE_AVAILABLE -> listener.handleNewMessageAvailable(); case CLIENT_CONNECTED -> { - final UUID connectionId = UUIDUtil.fromByteString(clientEvent.getClientConnected().getConnectionId()); - - if (!connectionIdAndListener.connectionIdentifier().equals(connectionId)) { - listenerEventExecutor.execute(() -> - connectionIdAndListener.listener().handleConnectionDisplaced(true)); + // Only act on new connections to other presence manager instances; we'll learn about displacements in THIS + // instance when we update the listener map in `handleClientConnected` + if (!this.serverId.equals(UUIDUtil.fromByteString(clientEvent.getClientConnected().getServerId()))) { + listenerEventExecutor.execute(() -> listener.handleConnectionDisplaced(true)); } } - case DISCONNECT_REQUESTED -> listenerEventExecutor.execute(() -> - connectionIdAndListener.listener().handleConnectionDisplaced(false)); + case DISCONNECT_REQUESTED -> listenerEventExecutor.execute(() -> listener.handleConnectionDisplaced(false)); default -> logger.warn("Unexpected client event type: {}", clientEvent.getClass()); } @@ -381,15 +367,6 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter clientPresenceManager.clearPresence(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), connection)); - if (connectionId != null) { - pubSubClientEventManager.handleClientDisconnected(auth.getAccount().getUuid(), - auth.getAuthenticatedDevice().getId(), - connectionId); - } + pubSubClientEventManager.handleClientDisconnected(auth.getAccount().getUuid(), + auth.getAuthenticatedDevice().getId()); // Next, we stop listening for inbound messages. If a message arrives after this call, the websocket connection // will not be notified and will not change its state, but that's okay because it has already closed and @@ -160,8 +154,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { // Finally, we register this client's presence, which suppresses push notifications. We do this last because // receiving extra push notifications is generally preferable to missing out on a push notification. clientPresenceManager.setPresent(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), connection); - pubSubClientEventManager.handleClientConnected(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), null) - .thenAccept(connectionId -> this.connectionId = connectionId); + pubSubClientEventManager.handleClientConnected(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), null); renewPresenceFutureReference.set(scheduledExecutorService.scheduleAtFixedRate(() -> RedisOperation.unchecked(() -> clientPresenceManager.renewPresence(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId())), diff --git a/service/src/main/proto/ClientPresence.proto b/service/src/main/proto/ClientPresence.proto index b2141009d..8e2bef36d 100644 --- a/service/src/main/proto/ClientPresence.proto +++ b/service/src/main/proto/ClientPresence.proto @@ -27,7 +27,7 @@ message NewMessageAvailableEvent { * Indicates that a client has connected to the presence system. */ message ClientConnectedEvent { - bytes connection_id = 1; + bytes server_id = 1; } /** diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java index 38a04aa83..7293b3424 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java @@ -175,13 +175,12 @@ class PubSubClientEventManagerTest { final UUID accountIdentifier = UUID.randomUUID(); final byte deviceId = Device.PRIMARY_ID; - final UUID connectionId = - localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter()) - .toCompletableFuture().join(); + localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter()) + .toCompletableFuture().join(); assertTrue(localPresenceManager.handleNewMessageAvailable(accountIdentifier, deviceId).toCompletableFuture().join()); - localPresenceManager.handleClientDisconnected(accountIdentifier, deviceId, connectionId).toCompletableFuture().join(); + localPresenceManager.handleClientDisconnected(accountIdentifier, deviceId).toCompletableFuture().join(); assertFalse(localPresenceManager.handleNewMessageAvailable(accountIdentifier, deviceId).toCompletableFuture().join()); } @@ -194,15 +193,14 @@ class PubSubClientEventManagerTest { assertFalse(localPresenceManager.isLocallyPresent(accountIdentifier, deviceId)); assertFalse(remotePresenceManager.isLocallyPresent(accountIdentifier, deviceId)); - final UUID connectionId = - localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter()) - .toCompletableFuture() - .join(); + localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter()) + .toCompletableFuture() + .join(); assertTrue(localPresenceManager.isLocallyPresent(accountIdentifier, deviceId)); assertFalse(remotePresenceManager.isLocallyPresent(accountIdentifier, deviceId)); - localPresenceManager.handleClientDisconnected(accountIdentifier, deviceId, connectionId) + localPresenceManager.handleClientDisconnected(accountIdentifier, deviceId) .toCompletableFuture() .join();