Instrument "get queues to persist" calls and "persist queues" exceptions.
This commit is contained in:
parent
02a2c3224f
commit
2686761608
|
@ -35,12 +35,13 @@ public class MessagePersister implements Managed {
|
||||||
private final Thread workerThread;
|
private final Thread workerThread;
|
||||||
private volatile boolean running;
|
private volatile boolean running;
|
||||||
|
|
||||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||||
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
|
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
|
||||||
private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue"));
|
private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue"));
|
||||||
private final Meter persistMessageMeter = metricRegistry.meter(name(MessagePersister.class, "persistMessage"));
|
private final Meter persistMessageMeter = metricRegistry.meter(name(MessagePersister.class, "persistMessage"));
|
||||||
private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount"));
|
private final Meter persistQueueExceptionMeter = metricRegistry.meter(name(MessagePersister.class, "persistQueueException"));
|
||||||
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize"));
|
private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount"));
|
||||||
|
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize"));
|
||||||
|
|
||||||
static final int QUEUE_BATCH_LIMIT = 100;
|
static final int QUEUE_BATCH_LIMIT = 100;
|
||||||
static final int MESSAGE_BATCH_LIMIT = 100;
|
static final int MESSAGE_BATCH_LIMIT = 100;
|
||||||
|
@ -112,7 +113,9 @@ public class MessagePersister implements Managed {
|
||||||
try {
|
try {
|
||||||
persistQueue(accountUuid, deviceId);
|
persistQueue(accountUuid, deviceId);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
|
persistQueueExceptionMeter.mark();
|
||||||
logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e);
|
logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e);
|
||||||
|
|
||||||
messagesCache.addQueueToPersist(accountUuid, deviceId);
|
messagesCache.addQueueToPersist(accountUuid, deviceId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,6 +62,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert"), "ephemeral", "false");
|
private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert"), "ephemeral", "false");
|
||||||
private final Timer insertEphemeralTimer = Metrics.timer(name(MessagesCache.class, "insert"), "ephemeral", "true");
|
private final Timer insertEphemeralTimer = Metrics.timer(name(MessagesCache.class, "insert"), "ephemeral", "true");
|
||||||
private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get"));
|
private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get"));
|
||||||
|
private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist"));
|
||||||
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
|
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
|
||||||
private final Timer takeEphemeralMessageTimer = Metrics.timer(name(MessagesCache.class, "takeEphemeral"));
|
private final Timer takeEphemeralMessageTimer = Metrics.timer(name(MessagesCache.class, "takeEphemeral"));
|
||||||
private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage"));
|
private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage"));
|
||||||
|
@ -318,9 +319,9 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
|
|
||||||
List<String> getQueuesToPersist(final int slot, final Instant maxTime, final int limit) {
|
List<String> getQueuesToPersist(final int slot, final Instant maxTime, final int limit) {
|
||||||
//noinspection unchecked
|
//noinspection unchecked
|
||||||
return (List<String>)getQueuesToPersistScript.execute(List.of(new String(getQueueIndexKey(slot), StandardCharsets.UTF_8)),
|
return getQueuesToPersistTimer.record(() -> (List<String>)getQueuesToPersistScript.execute(List.of(new String(getQueueIndexKey(slot), StandardCharsets.UTF_8)),
|
||||||
List.of(String.valueOf(maxTime.toEpochMilli()),
|
List.of(String.valueOf(maxTime.toEpochMilli()),
|
||||||
String.valueOf(limit)));
|
String.valueOf(limit))));
|
||||||
}
|
}
|
||||||
|
|
||||||
void addQueueToPersist(final UUID accountUuid, final long deviceId) {
|
void addQueueToPersist(final UUID accountUuid, final long deviceId) {
|
||||||
|
|
Loading…
Reference in New Issue