Remove subscriptions if we get pub/sub events without a registered listener
This commit is contained in:
parent
562b495a18
commit
767f6a90e0
|
@ -82,7 +82,8 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(PubSubClientEventManager.class);
|
private static final Logger logger = LoggerFactory.getLogger(PubSubClientEventManager.class);
|
||||||
|
|
||||||
private record AccountAndDeviceIdentifier(UUID accountIdentifier, byte deviceId) {
|
@VisibleForTesting
|
||||||
|
record AccountAndDeviceIdentifier(UUID accountIdentifier, byte deviceId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
public PubSubClientEventManager(final FaultTolerantRedisClusterClient clusterClient,
|
public PubSubClientEventManager(final FaultTolerantRedisClusterClient clusterClient,
|
||||||
|
@ -292,6 +293,26 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unsubscribes for notifications for the given account and device identifier if and only if no listener is registered
|
||||||
|
* for that account and device identifier.
|
||||||
|
*
|
||||||
|
* @param accountAndDeviceIdentifier the account and device identifier for which to stop receiving notifications
|
||||||
|
*/
|
||||||
|
void unsubscribeIfMissingListener(final AccountAndDeviceIdentifier accountAndDeviceIdentifier) {
|
||||||
|
listenersByAccountAndDeviceIdentifier.compute(accountAndDeviceIdentifier, (ignored, existingListener) -> {
|
||||||
|
if (existingListener == null && pubSubConnection != null) {
|
||||||
|
// Enqueue, but do not block on, an "unsubscribe" operation
|
||||||
|
pubSubConnection.usePubSubConnection(connection ->
|
||||||
|
connection.async().sunsubscribe(getClientPresenceKey(accountAndDeviceIdentifier.accountIdentifier(),
|
||||||
|
accountAndDeviceIdentifier.deviceId())));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make no change to the existing listener whether present or absent
|
||||||
|
return existingListener;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void smessage(final RedisClusterNode node, final byte[] shardChannel, final byte[] message) {
|
public void smessage(final RedisClusterNode node, final byte[] shardChannel, final byte[] message) {
|
||||||
final ClientEvent clientEvent;
|
final ClientEvent clientEvent;
|
||||||
|
@ -328,6 +349,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
MESSAGE_WITHOUT_LISTENER_COUNTER.increment();
|
MESSAGE_WITHOUT_LISTENER_COUNTER.increment();
|
||||||
|
listenerEventExecutor.execute(() -> unsubscribeIfMissingListener(accountAndDeviceIdentifier));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -281,4 +281,42 @@ class PubSubClientEventManagerTest {
|
||||||
verify(pubSubCommands).ssubscribe(PubSubClientEventManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId));
|
verify(pubSubCommands).ssubscribe(PubSubClientEventManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId));
|
||||||
verify(pubSubCommands, never()).ssubscribe(PubSubClientEventManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId));
|
verify(pubSubCommands, never()).ssubscribe(PubSubClientEventManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void smessageWithoutListener() {
|
||||||
|
@SuppressWarnings("unchecked") final RedisClusterPubSubAsyncCommands<byte[], byte[]> pubSubAsyncCommands =
|
||||||
|
mock(RedisClusterPubSubAsyncCommands.class);
|
||||||
|
|
||||||
|
when(pubSubAsyncCommands.ssubscribe(any())).thenReturn(MockRedisFuture.completedFuture(null));
|
||||||
|
|
||||||
|
final FaultTolerantRedisClusterClient clusterClient = RedisClusterHelper.builder()
|
||||||
|
.binaryPubSubAsyncCommands(pubSubAsyncCommands)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final PubSubClientEventManager eventManager = new PubSubClientEventManager(clusterClient, Runnable::run);
|
||||||
|
|
||||||
|
eventManager.start();
|
||||||
|
|
||||||
|
final UUID listenerAccountIdentifier = UUID.randomUUID();
|
||||||
|
final byte listenerDeviceId = Device.PRIMARY_ID;
|
||||||
|
|
||||||
|
final UUID noListenerAccountIdentifier = UUID.randomUUID();
|
||||||
|
final byte noListenerDeviceId = listenerDeviceId + 1;
|
||||||
|
|
||||||
|
eventManager.handleClientConnected(listenerAccountIdentifier, listenerDeviceId, new ClientEventAdapter())
|
||||||
|
.toCompletableFuture()
|
||||||
|
.join();
|
||||||
|
|
||||||
|
eventManager.unsubscribeIfMissingListener(
|
||||||
|
new PubSubClientEventManager.AccountAndDeviceIdentifier(listenerAccountIdentifier, listenerDeviceId));
|
||||||
|
|
||||||
|
eventManager.unsubscribeIfMissingListener(
|
||||||
|
new PubSubClientEventManager.AccountAndDeviceIdentifier(noListenerAccountIdentifier, noListenerDeviceId));
|
||||||
|
|
||||||
|
verify(pubSubAsyncCommands, never())
|
||||||
|
.sunsubscribe(PubSubClientEventManager.getClientPresenceKey(listenerAccountIdentifier, listenerDeviceId));
|
||||||
|
|
||||||
|
verify(pubSubAsyncCommands)
|
||||||
|
.sunsubscribe(PubSubClientEventManager.getClientPresenceKey(noListenerAccountIdentifier, noListenerDeviceId));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue