From 1ef3a230a1cf9ee5cddf7a8852ff8762cf2132d4 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Tue, 22 Apr 2025 16:42:32 -0400 Subject: [PATCH] Tag queue size distribution with client platform --- .../storage/MessagePersister.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index e2f6589eb..28913630e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -12,6 +12,7 @@ import io.dropwizard.lifecycle.Managed; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Timer; import java.time.Duration; @@ -62,10 +63,7 @@ public class MessagePersister implements Managed { .publishPercentileHistogram(true) .register(Metrics.globalRegistry); - private static final DistributionSummary QUEUE_SIZE_DISTRIBUTION_SUMMARY = DistributionSummary.builder( - name(MessagePersister.class, "queueSize")) - .publishPercentileHistogram(true) - .register(Metrics.globalRegistry); + private static final String QUEUE_SIZE_DISTRIBUTION_SUMMARY_NAME = name(MessagePersister.class, "queueSize"); static final int QUEUE_BATCH_LIMIT = 100; static final int MESSAGE_BATCH_LIMIT = 100; @@ -194,9 +192,7 @@ public class MessagePersister implements Managed { final UUID accountUuid = account.getUuid(); final byte deviceId = device.getId(); - final Tags tags = Tags.of( - "shard", shard, - "platform", DevicePlatformUtil.getDevicePlatform(device) + final Tag platformTag = Tag.of("platform", DevicePlatformUtil.getDevicePlatform(device) .map(platform -> platform.name().toLowerCase(Locale.ROOT)) .orElse("unknown")); @@ -216,6 +212,8 @@ public class MessagePersister implements Managed { final int urgentMessageCount = (int) messages.stream().filter(MessageProtos.Envelope::getUrgent).count(); final int nonUrgentMessageCount = messages.size() - urgentMessageCount; + final Tags tags = Tags.of(platformTag, Tag.of("shard", shard)); + Metrics.counter(PERSISTED_MESSAGE_COUNTER_NAME, tags.and("urgent", "true")).increment(urgentMessageCount); Metrics.counter(PERSISTED_MESSAGE_COUNTER_NAME, tags.and("urgent", "false")).increment(nonUrgentMessageCount); Metrics.counter(PERSISTED_BYTES_COUNTER_NAME, tags) @@ -236,7 +234,11 @@ public class MessagePersister implements Managed { } while (!messages.isEmpty()); - QUEUE_SIZE_DISTRIBUTION_SUMMARY.record(messageCount); + DistributionSummary.builder(QUEUE_SIZE_DISTRIBUTION_SUMMARY_NAME) + .tags(Tags.of(platformTag)) + .publishPercentileHistogram(true) + .register(Metrics.globalRegistry) + .record(messageCount); } catch (ItemCollectionSizeLimitExceededException e) { final boolean isPrimary = deviceId == Device.PRIMARY_ID; Metrics.counter(OVERSIZED_QUEUE_COUNTER_NAME, "primary", String.valueOf(isPrimary)).increment();