Tag queue size distribution with client platform

This commit is contained in:
Jon Chambers 2025-04-22 16:42:32 -04:00 committed by Jon Chambers
parent b1805d4bf1
commit 1ef3a230a1
1 changed files with 10 additions and 8 deletions

View File

@ -12,6 +12,7 @@ import io.dropwizard.lifecycle.Managed;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
@ -62,10 +63,7 @@ public class MessagePersister implements Managed {
.publishPercentileHistogram(true)
.register(Metrics.globalRegistry);
private static final DistributionSummary QUEUE_SIZE_DISTRIBUTION_SUMMARY = DistributionSummary.builder(
name(MessagePersister.class, "queueSize"))
.publishPercentileHistogram(true)
.register(Metrics.globalRegistry);
private static final String QUEUE_SIZE_DISTRIBUTION_SUMMARY_NAME = name(MessagePersister.class, "queueSize");
static final int QUEUE_BATCH_LIMIT = 100;
static final int MESSAGE_BATCH_LIMIT = 100;
@ -194,9 +192,7 @@ public class MessagePersister implements Managed {
final UUID accountUuid = account.getUuid();
final byte deviceId = device.getId();
final Tags tags = Tags.of(
"shard", shard,
"platform", DevicePlatformUtil.getDevicePlatform(device)
final Tag platformTag = Tag.of("platform", DevicePlatformUtil.getDevicePlatform(device)
.map(platform -> platform.name().toLowerCase(Locale.ROOT))
.orElse("unknown"));
@ -216,6 +212,8 @@ public class MessagePersister implements Managed {
final int urgentMessageCount = (int) messages.stream().filter(MessageProtos.Envelope::getUrgent).count();
final int nonUrgentMessageCount = messages.size() - urgentMessageCount;
final Tags tags = Tags.of(platformTag, Tag.of("shard", shard));
Metrics.counter(PERSISTED_MESSAGE_COUNTER_NAME, tags.and("urgent", "true")).increment(urgentMessageCount);
Metrics.counter(PERSISTED_MESSAGE_COUNTER_NAME, tags.and("urgent", "false")).increment(nonUrgentMessageCount);
Metrics.counter(PERSISTED_BYTES_COUNTER_NAME, tags)
@ -236,7 +234,11 @@ public class MessagePersister implements Managed {
} while (!messages.isEmpty());
QUEUE_SIZE_DISTRIBUTION_SUMMARY.record(messageCount);
DistributionSummary.builder(QUEUE_SIZE_DISTRIBUTION_SUMMARY_NAME)
.tags(Tags.of(platformTag))
.publishPercentileHistogram(true)
.register(Metrics.globalRegistry)
.record(messageCount);
} catch (ItemCollectionSizeLimitExceededException e) {
final boolean isPrimary = deviceId == Device.PRIMARY_ID;
Metrics.counter(OVERSIZED_QUEUE_COUNTER_NAME, "primary", String.valueOf(isPrimary)).increment();