Catch and log unexpected exceptions keyspace notification executor service

This commit is contained in:
Chris Eager 2021-08-13 10:19:51 -05:00 committed by Chris Eager
parent 27844fe692
commit 0cde06557d
2 changed files with 28 additions and 6 deletions

View File

@ -256,8 +256,12 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
// At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making
// synchronous Lettuce calls to avoid deadlocking.
keyspaceNotificationExecutorService.execute(() -> {
try {
displacePresence(channel.substring("__keyspace@0__:".length()));
remoteDisplacementMeter.mark();
} catch (final Exception e) {
log.warn("Error displacing presence", e);
}
});
}
}

View File

@ -353,14 +353,32 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
pubSubMessageCounter.increment();
if (channel.startsWith(QUEUE_KEYSPACE_PREFIX) && "zadd".equals(message)) {
newMessageNotificationCounter.increment();
notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable));
newMessageNotificationCounter.increment();
notificationExecutorService.execute(() -> {
try {
findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable);
} catch (final Exception e) {
logger.warn("Unexpected error handling new message", e);
}
});
} else if (channel.startsWith(EPHEMERAL_QUEUE_KEYSPACE_PREFIX) && "rpush".equals(message)) {
ephemeralMessageNotificationCounter.increment();
notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleNewEphemeralMessageAvailable));
ephemeralMessageNotificationCounter.increment();
notificationExecutorService.execute(() -> {
try {
findListener(channel).ifPresent(MessageAvailabilityListener::handleNewEphemeralMessageAvailable);
} catch (final Exception e) {
logger.warn("Unexpected error handling new ephemeral message", e);
}
});
} else if (channel.startsWith(PERSISTING_KEYSPACE_PREFIX) && "del".equals(message)) {
queuePersistedNotificationCounter.increment();
notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted));
queuePersistedNotificationCounter.increment();
notificationExecutorService.execute(() -> {
try {
findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted);
} catch (final Exception e) {
logger.warn("Unexpected error handling messages persisted", e);
}
});
}
}