diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java index 7f14878b0..6d3afa693 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java @@ -82,7 +82,8 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter { + if (existingListener == null && pubSubConnection != null) { + // Enqueue, but do not block on, an "unsubscribe" operation + pubSubConnection.usePubSubConnection(connection -> + connection.async().sunsubscribe(getClientPresenceKey(accountAndDeviceIdentifier.accountIdentifier(), + accountAndDeviceIdentifier.deviceId()))); + } + + // Make no change to the existing listener whether present or absent + return existingListener; + }); + } + @Override public void smessage(final RedisClusterNode node, final byte[] shardChannel, final byte[] message) { final ClientEvent clientEvent; @@ -328,6 +349,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter unsubscribeIfMissingListener(accountAndDeviceIdentifier)); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java index 9525f0610..d9404a6de 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java @@ -281,4 +281,42 @@ class PubSubClientEventManagerTest { verify(pubSubCommands).ssubscribe(PubSubClientEventManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId)); verify(pubSubCommands, never()).ssubscribe(PubSubClientEventManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId)); } + + @Test + void smessageWithoutListener() { + @SuppressWarnings("unchecked") final RedisClusterPubSubAsyncCommands pubSubAsyncCommands = + mock(RedisClusterPubSubAsyncCommands.class); + + when(pubSubAsyncCommands.ssubscribe(any())).thenReturn(MockRedisFuture.completedFuture(null)); + + final FaultTolerantRedisClusterClient clusterClient = RedisClusterHelper.builder() + .binaryPubSubAsyncCommands(pubSubAsyncCommands) + .build(); + + final PubSubClientEventManager eventManager = new PubSubClientEventManager(clusterClient, Runnable::run); + + eventManager.start(); + + final UUID listenerAccountIdentifier = UUID.randomUUID(); + final byte listenerDeviceId = Device.PRIMARY_ID; + + final UUID noListenerAccountIdentifier = UUID.randomUUID(); + final byte noListenerDeviceId = listenerDeviceId + 1; + + eventManager.handleClientConnected(listenerAccountIdentifier, listenerDeviceId, new ClientEventAdapter()) + .toCompletableFuture() + .join(); + + eventManager.unsubscribeIfMissingListener( + new PubSubClientEventManager.AccountAndDeviceIdentifier(listenerAccountIdentifier, listenerDeviceId)); + + eventManager.unsubscribeIfMissingListener( + new PubSubClientEventManager.AccountAndDeviceIdentifier(noListenerAccountIdentifier, noListenerDeviceId)); + + verify(pubSubAsyncCommands, never()) + .sunsubscribe(PubSubClientEventManager.getClientPresenceKey(listenerAccountIdentifier, listenerDeviceId)); + + verify(pubSubAsyncCommands) + .sunsubscribe(PubSubClientEventManager.getClientPresenceKey(noListenerAccountIdentifier, noListenerDeviceId)); + } }