Normalize persister metric names; make metrics `static final`
This commit is contained in:
parent
99041bc593
commit
e1b1c7db8d
|
@ -45,18 +45,18 @@ public class MessagePersister implements Managed {
|
||||||
|
|
||||||
private static final String OVERSIZED_QUEUE_COUNTER_NAME = name(MessagePersister.class, "persistQueueOversized");
|
private static final String OVERSIZED_QUEUE_COUNTER_NAME = name(MessagePersister.class, "persistQueueOversized");
|
||||||
|
|
||||||
private final Timer getQueuesTimer = Metrics.timer(name(MessagePersister.class, "getQueues"));
|
private static final Timer GET_QUEUES_TIMER = Metrics.timer(name(MessagePersister.class, "getQueues"));
|
||||||
private final Timer persistQueueTimer = Metrics.timer(name(MessagePersister.class, "persistQueue"));
|
private static final Timer PERSIST_QUEUE_TIMER = Metrics.timer(name(MessagePersister.class, "persistQueue"));
|
||||||
private final Counter persistQueueExceptionMeter = Metrics.counter(
|
private static final Counter PERSIST_QUEUE_EXCEPTION_METER =
|
||||||
name(MessagePersister.class, "persistQueueException"));
|
Metrics.counter(name(MessagePersister.class, "persistQueueException"));
|
||||||
private static final Counter trimmedMessageCounter = Metrics.counter(name(MessagePersister.class, "trimmedMessage"));
|
private static final Counter TRIMMED_MESSAGE_COUNTER = Metrics.counter(name(MessagePersister.class, "trimmedMessage"));
|
||||||
private static final Counter trimmedMessageBytesCounter = Metrics.counter(name(MessagePersister.class, "trimmedMessageBytes"));
|
private static final Counter TRIMMED_MESSAGE_BYTES_COUNTER = Metrics.counter(name(MessagePersister.class, "trimmedMessageBytes"));
|
||||||
private final DistributionSummary queueCountDistributionSummery = DistributionSummary.builder(
|
private static final DistributionSummary QUEUE_COUNT_DISTRIBUTION_SUMMARY = DistributionSummary.builder(
|
||||||
name(MessagePersister.class, "queueCount"))
|
name(MessagePersister.class, "queueCount"))
|
||||||
.publishPercentiles(0.5, 0.75, 0.95, 0.99, 0.999)
|
.publishPercentiles(0.5, 0.75, 0.95, 0.99, 0.999)
|
||||||
.distributionStatisticExpiry(Duration.ofMinutes(10))
|
.distributionStatisticExpiry(Duration.ofMinutes(10))
|
||||||
.register(Metrics.globalRegistry);
|
.register(Metrics.globalRegistry);
|
||||||
private final DistributionSummary queueSizeDistributionSummery = DistributionSummary.builder(
|
private static final DistributionSummary QUEUE_SIZE_DISTRIBUTION_SUMMARY = DistributionSummary.builder(
|
||||||
name(MessagePersister.class, "queueSize"))
|
name(MessagePersister.class, "queueSize"))
|
||||||
.publishPercentiles(0.5, 0.75, 0.95, 0.99, 0.999)
|
.publishPercentiles(0.5, 0.75, 0.95, 0.99, 0.999)
|
||||||
.distributionStatisticExpiry(Duration.ofMinutes(10))
|
.distributionStatisticExpiry(Duration.ofMinutes(10))
|
||||||
|
@ -64,7 +64,6 @@ public class MessagePersister implements Managed {
|
||||||
|
|
||||||
static final int QUEUE_BATCH_LIMIT = 100;
|
static final int QUEUE_BATCH_LIMIT = 100;
|
||||||
static final int MESSAGE_BATCH_LIMIT = 100;
|
static final int MESSAGE_BATCH_LIMIT = 100;
|
||||||
static final int CACHE_PAGE_SIZE = 100;
|
|
||||||
|
|
||||||
private static final long EXCEPTION_PAUSE_MILLIS = Duration.ofSeconds(3).toMillis();
|
private static final long EXCEPTION_PAUSE_MILLIS = Duration.ofSeconds(3).toMillis();
|
||||||
|
|
||||||
|
@ -93,7 +92,7 @@ public class MessagePersister implements Managed {
|
||||||
.isPersistenceEnabled()) {
|
.isPersistenceEnabled()) {
|
||||||
try {
|
try {
|
||||||
final int queuesPersisted = persistNextQueues(Instant.now());
|
final int queuesPersisted = persistNextQueues(Instant.now());
|
||||||
queueCountDistributionSummery.record(queuesPersisted);
|
QUEUE_COUNT_DISTRIBUTION_SUMMARY.record(queuesPersisted);
|
||||||
|
|
||||||
if (queuesPersisted == 0) {
|
if (queuesPersisted == 0) {
|
||||||
Util.sleep(100);
|
Util.sleep(100);
|
||||||
|
@ -145,7 +144,7 @@ public class MessagePersister implements Managed {
|
||||||
int queuesPersisted = 0;
|
int queuesPersisted = 0;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
queuesToPersist = getQueuesTimer.record(
|
queuesToPersist = GET_QUEUES_TIMER.record(
|
||||||
() -> messagesCache.getQueuesToPersist(slot, currentTime.minus(persistDelay), QUEUE_BATCH_LIMIT));
|
() -> messagesCache.getQueuesToPersist(slot, currentTime.minus(persistDelay), QUEUE_BATCH_LIMIT));
|
||||||
|
|
||||||
for (final String queue : queuesToPersist) {
|
for (final String queue : queuesToPersist) {
|
||||||
|
@ -165,7 +164,7 @@ public class MessagePersister implements Managed {
|
||||||
try {
|
try {
|
||||||
persistQueue(maybeAccount.get(), maybeDevice.get());
|
persistQueue(maybeAccount.get(), maybeDevice.get());
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
persistQueueExceptionMeter.increment();
|
PERSIST_QUEUE_EXCEPTION_METER.increment();
|
||||||
logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e);
|
logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e);
|
||||||
|
|
||||||
messagesCache.addQueueToPersist(accountUuid, deviceId);
|
messagesCache.addQueueToPersist(accountUuid, deviceId);
|
||||||
|
@ -216,7 +215,7 @@ public class MessagePersister implements Managed {
|
||||||
|
|
||||||
} while (!messages.isEmpty());
|
} while (!messages.isEmpty());
|
||||||
|
|
||||||
queueSizeDistributionSummery.record(messageCount);
|
QUEUE_SIZE_DISTRIBUTION_SUMMARY.record(messageCount);
|
||||||
} catch (ItemCollectionSizeLimitExceededException e) {
|
} catch (ItemCollectionSizeLimitExceededException e) {
|
||||||
final boolean isPrimary = deviceId == Device.PRIMARY_ID;
|
final boolean isPrimary = deviceId == Device.PRIMARY_ID;
|
||||||
Metrics.counter(OVERSIZED_QUEUE_COUNTER_NAME, "primary", String.valueOf(isPrimary)).increment();
|
Metrics.counter(OVERSIZED_QUEUE_COUNTER_NAME, "primary", String.valueOf(isPrimary)).increment();
|
||||||
|
@ -233,7 +232,7 @@ public class MessagePersister implements Managed {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
|
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
|
||||||
sample.stop(persistQueueTimer);
|
sample.stop(PERSIST_QUEUE_TIMER);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -273,8 +272,8 @@ public class MessagePersister implements Managed {
|
||||||
.takeWhile(Optional::isPresent)
|
.takeWhile(Optional::isPresent)
|
||||||
.flatMap(maybeEnvelope -> {
|
.flatMap(maybeEnvelope -> {
|
||||||
final MessageProtos.Envelope envelope = maybeEnvelope.get();
|
final MessageProtos.Envelope envelope = maybeEnvelope.get();
|
||||||
trimmedMessageCounter.increment();
|
TRIMMED_MESSAGE_COUNTER.increment();
|
||||||
trimmedMessageBytesCounter.increment(envelope.getSerializedSize());
|
TRIMMED_MESSAGE_BYTES_COUNTER.increment(envelope.getSerializedSize());
|
||||||
return Mono
|
return Mono
|
||||||
.fromCompletionStage(() -> messagesManager
|
.fromCompletionStage(() -> messagesManager
|
||||||
.delete(aci, device, UUID.fromString(envelope.getServerGuid()), envelope.getServerTimestamp()))
|
.delete(aci, device, UUID.fromString(envelope.getServerGuid()), envelope.getServerTimestamp()))
|
||||||
|
|
Loading…
Reference in New Issue