From ed382fff6d115a8180fc44ec7a927834d9162135 Mon Sep 17 00:00:00 2001 From: Jonathan Klabunde Tomer Date: Tue, 22 Apr 2025 12:26:56 -0700 Subject: [PATCH] log slot number and shard host of message persister failures --- .../textsecuregcm/storage/MessagePersister.java | 3 ++- .../textsecuregcm/storage/MessagesCache.java | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index 73cc0ea96..24abea4a7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -165,7 +165,8 @@ public class MessagePersister implements Managed { persistQueue(maybeAccount.get(), maybeDevice.get()); } catch (final Exception e) { PERSIST_QUEUE_EXCEPTION_METER.increment(); - logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e); + logger.warn("Failed to persist queue {}::{} (slot {}, shard {}); will schedule for retry", + accountUuid, deviceId, slot, messagesCache.shardForSlot(slot), e); messagesCache.addQueueToPersist(accountUuid, deviceId); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 109235b0f..c6c82d96d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -15,6 +15,8 @@ import io.lettuce.core.Range; import io.lettuce.core.ScoredValue; import io.lettuce.core.ZAddArgs; import io.lettuce.core.cluster.SlotHash; +import io.lettuce.core.cluster.models.partitions.ClusterPartitionParser; +import io.lettuce.core.cluster.models.partitions.Partitions; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; @@ -668,6 +670,16 @@ public class MessagesCache { .thenRun(() -> sample.stop(clearQueueTimer)); } + // expensive—use for rare error logging only + public String shardForSlot(int slot) { + try { + return redisCluster.withBinaryCluster( + connection -> ClusterPartitionParser.parse(connection.sync().clusterNodes()).getPartitionBySlot(slot).getUri().getHost()); + } catch (Throwable ignored) { + return "unknown"; + } + } + int getNextSlotToPersist() { return (int) (redisCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT);