diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index 24abea4a7..a4c748ad7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -12,10 +12,12 @@ import io.dropwizard.lifecycle.Managed; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Timer; import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -25,6 +27,7 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.identity.IdentityType; +import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil; import org.whispersystems.textsecuregcm.util.Util; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -44,6 +47,7 @@ public class MessagePersister implements Managed { private volatile boolean running; private static final String OVERSIZED_QUEUE_COUNTER_NAME = name(MessagePersister.class, "persistQueueOversized"); + private static final String PERSISTED_MESSAGE_COUNTER_NAME = name(MessagePersister.class, "persistMessage"); private static final Timer GET_QUEUES_TIMER = Metrics.timer(name(MessagePersister.class, "getQueues")); private static final Timer PERSIST_QUEUE_TIMER = Metrics.timer(name(MessagePersister.class, "persistQueue")); @@ -139,6 +143,7 @@ public class MessagePersister implements Managed { @VisibleForTesting int persistNextQueues(final Instant currentTime) { final int slot = messagesCache.getNextSlotToPersist(); + final String shard = messagesCache.shardForSlot(slot); List queuesToPersist; int queuesPersisted = 0; @@ -162,11 +167,11 @@ public class MessagePersister implements Managed { continue; } try { - persistQueue(maybeAccount.get(), maybeDevice.get()); + persistQueue(maybeAccount.get(), maybeDevice.get(), shard); } catch (final Exception e) { PERSIST_QUEUE_EXCEPTION_METER.increment(); logger.warn("Failed to persist queue {}::{} (slot {}, shard {}); will schedule for retry", - accountUuid, deviceId, slot, messagesCache.shardForSlot(slot), e); + accountUuid, deviceId, slot, shard, e); messagesCache.addQueueToPersist(accountUuid, deviceId); @@ -184,10 +189,16 @@ public class MessagePersister implements Managed { } @VisibleForTesting - void persistQueue(final Account account, final Device device) throws MessagePersistenceException { + void persistQueue(final Account account, final Device device, final String shard) throws MessagePersistenceException { final UUID accountUuid = account.getUuid(); final byte deviceId = device.getId(); + final Tags tags = Tags.of( + "shard", shard, + "platform", DevicePlatformUtil.getDevicePlatform(device) + .map(platform -> platform.name().toLowerCase(Locale.ROOT)) + .orElse("unknown")); + final Timer.Sample sample = Timer.start(); messagesCache.lockQueueForPersistence(accountUuid, deviceId); @@ -201,6 +212,12 @@ public class MessagePersister implements Managed { do { messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT); + final int urgentMessageCount = (int) messages.stream().filter(MessageProtos.Envelope::getUrgent).count(); + final int nonUrgentMessageCount = messages.size() - urgentMessageCount; + + Metrics.counter(PERSISTED_MESSAGE_COUNTER_NAME, tags.and("urgent", "true")).increment(urgentMessageCount); + Metrics.counter(PERSISTED_MESSAGE_COUNTER_NAME, tags.and("urgent", "false")).increment(nonUrgentMessageCount); + int messagesRemovedFromCache = messagesManager.persistMessages(accountUuid, device, messages); messageCount += messages.size(); @@ -235,7 +252,6 @@ public class MessagePersister implements Managed { messagesCache.unlockQueueForPersistence(accountUuid, deviceId); sample.stop(PERSIST_QUEUE_TIMER); } - } private void trimQueue(final Account account, byte deviceId) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index e0167680f..3987c9dbc 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -257,7 +257,7 @@ class MessagePersisterTest { assertTimeoutPreemptively(Duration.ofSeconds(1), () -> assertThrows(MessagePersistenceException.class, - () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE))); + () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE, "test"))); } @Test @@ -298,7 +298,7 @@ class MessagePersisterTest { when(messagesManager.persistMessages(any(UUID.class), any(), anyList())).thenThrow(ItemCollectionSizeLimitExceededException.builder().build()); assertTimeoutPreemptively(Duration.ofSeconds(1), () -> - messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE)); + messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE, "test")); verify(accountsManager, exactly()).removeDevice(destinationAccount, DESTINATION_DEVICE_ID); } @@ -400,7 +400,7 @@ class MessagePersisterTest { when(messagesManager.persistMessages(any(UUID.class), any(), anyList())).thenThrow(ItemCollectionSizeLimitExceededException.builder().build()); when(accountsManager.removeDevice(destinationAccount, DESTINATION_DEVICE_ID)).thenReturn(CompletableFuture.failedFuture(new TimeoutException())); - assertThrows(CompletionException.class, () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE)); + assertThrows(CompletionException.class, () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE, "test")); } @SuppressWarnings("SameParameterValue")