log slot number and shard host of message persister failures

This commit is contained in:
Jonathan Klabunde Tomer 2025-04-22 12:26:56 -07:00 committed by Jon Chambers
parent 23bb8277d5
commit ed382fff6d
2 changed files with 14 additions and 1 deletions

View File

@ -165,7 +165,8 @@ public class MessagePersister implements Managed {
persistQueue(maybeAccount.get(), maybeDevice.get()); persistQueue(maybeAccount.get(), maybeDevice.get());
} catch (final Exception e) { } catch (final Exception e) {
PERSIST_QUEUE_EXCEPTION_METER.increment(); PERSIST_QUEUE_EXCEPTION_METER.increment();
logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e); logger.warn("Failed to persist queue {}::{} (slot {}, shard {}); will schedule for retry",
accountUuid, deviceId, slot, messagesCache.shardForSlot(slot), e);
messagesCache.addQueueToPersist(accountUuid, deviceId); messagesCache.addQueueToPersist(accountUuid, deviceId);

View File

@ -15,6 +15,8 @@ import io.lettuce.core.Range;
import io.lettuce.core.ScoredValue; import io.lettuce.core.ScoredValue;
import io.lettuce.core.ZAddArgs; import io.lettuce.core.ZAddArgs;
import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.models.partitions.ClusterPartitionParser;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tag;
@ -668,6 +670,16 @@ public class MessagesCache {
.thenRun(() -> sample.stop(clearQueueTimer)); .thenRun(() -> sample.stop(clearQueueTimer));
} }
// expensiveuse for rare error logging only
public String shardForSlot(int slot) {
try {
return redisCluster.withBinaryCluster(
connection -> ClusterPartitionParser.parse(connection.sync().clusterNodes()).getPartitionBySlot(slot).getUri().getHost());
} catch (Throwable ignored) {
return "unknown";
}
}
int getNextSlotToPersist() { int getNextSlotToPersist() {
return (int) (redisCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) return (int) (redisCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY))
% SlotHash.SLOT_COUNT); % SlotHash.SLOT_COUNT);