From 0cde06557d482cf1231c6527326708db293f3cd6 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Fri, 13 Aug 2021 10:19:51 -0500 Subject: [PATCH] Catch and log unexpected exceptions keyspace notification executor service --- .../push/ClientPresenceManager.java | 4 +++ .../textsecuregcm/storage/MessagesCache.java | 30 +++++++++++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java index b197cf9d7..c7201e649 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java @@ -256,8 +256,12 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter { + try { displacePresence(channel.substring("__keyspace@0__:".length())); remoteDisplacementMeter.mark(); + } catch (final Exception e) { + log.warn("Error displacing presence", e); + } }); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 6a631ef9c..8d1f03020 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -353,14 +353,32 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp pubSubMessageCounter.increment(); if (channel.startsWith(QUEUE_KEYSPACE_PREFIX) && "zadd".equals(message)) { - newMessageNotificationCounter.increment(); - notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable)); + newMessageNotificationCounter.increment(); + notificationExecutorService.execute(() -> { + try { + findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable); + } catch (final Exception e) { + logger.warn("Unexpected error handling new message", e); + } + }); } else if (channel.startsWith(EPHEMERAL_QUEUE_KEYSPACE_PREFIX) && "rpush".equals(message)) { - ephemeralMessageNotificationCounter.increment(); - notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleNewEphemeralMessageAvailable)); + ephemeralMessageNotificationCounter.increment(); + notificationExecutorService.execute(() -> { + try { + findListener(channel).ifPresent(MessageAvailabilityListener::handleNewEphemeralMessageAvailable); + } catch (final Exception e) { + logger.warn("Unexpected error handling new ephemeral message", e); + } + }); } else if (channel.startsWith(PERSISTING_KEYSPACE_PREFIX) && "del".equals(message)) { - queuePersistedNotificationCounter.increment(); - notificationExecutorService.execute(() -> findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted)); + queuePersistedNotificationCounter.increment(); + notificationExecutorService.execute(() -> { + try { + findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted); + } catch (final Exception e) { + logger.warn("Unexpected error handling messages persisted", e); + } + }); } }