Count individual persisted messages

This commit is contained in:
Jon Chambers 2025-04-21 21:04:33 -04:00 committed by Jon Chambers
parent 4072dcdda5
commit cac979c7fd
2 changed files with 23 additions and 7 deletions

View File

@ -12,10 +12,12 @@ import io.dropwizard.lifecycle.Managed;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
@ -25,6 +27,7 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -44,6 +47,7 @@ public class MessagePersister implements Managed {
private volatile boolean running;
private static final String OVERSIZED_QUEUE_COUNTER_NAME = name(MessagePersister.class, "persistQueueOversized");
private static final String PERSISTED_MESSAGE_COUNTER_NAME = name(MessagePersister.class, "persistMessage");
private static final Timer GET_QUEUES_TIMER = Metrics.timer(name(MessagePersister.class, "getQueues"));
private static final Timer PERSIST_QUEUE_TIMER = Metrics.timer(name(MessagePersister.class, "persistQueue"));
@ -139,6 +143,7 @@ public class MessagePersister implements Managed {
@VisibleForTesting
int persistNextQueues(final Instant currentTime) {
final int slot = messagesCache.getNextSlotToPersist();
final String shard = messagesCache.shardForSlot(slot);
List<String> queuesToPersist;
int queuesPersisted = 0;
@ -162,11 +167,11 @@ public class MessagePersister implements Managed {
continue;
}
try {
persistQueue(maybeAccount.get(), maybeDevice.get());
persistQueue(maybeAccount.get(), maybeDevice.get(), shard);
} catch (final Exception e) {
PERSIST_QUEUE_EXCEPTION_METER.increment();
logger.warn("Failed to persist queue {}::{} (slot {}, shard {}); will schedule for retry",
accountUuid, deviceId, slot, messagesCache.shardForSlot(slot), e);
accountUuid, deviceId, slot, shard, e);
messagesCache.addQueueToPersist(accountUuid, deviceId);
@ -184,10 +189,16 @@ public class MessagePersister implements Managed {
}
@VisibleForTesting
void persistQueue(final Account account, final Device device) throws MessagePersistenceException {
void persistQueue(final Account account, final Device device, final String shard) throws MessagePersistenceException {
final UUID accountUuid = account.getUuid();
final byte deviceId = device.getId();
final Tags tags = Tags.of(
"shard", shard,
"platform", DevicePlatformUtil.getDevicePlatform(device)
.map(platform -> platform.name().toLowerCase(Locale.ROOT))
.orElse("unknown"));
final Timer.Sample sample = Timer.start();
messagesCache.lockQueueForPersistence(accountUuid, deviceId);
@ -201,6 +212,12 @@ public class MessagePersister implements Managed {
do {
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
final int urgentMessageCount = (int) messages.stream().filter(MessageProtos.Envelope::getUrgent).count();
final int nonUrgentMessageCount = messages.size() - urgentMessageCount;
Metrics.counter(PERSISTED_MESSAGE_COUNTER_NAME, tags.and("urgent", "true")).increment(urgentMessageCount);
Metrics.counter(PERSISTED_MESSAGE_COUNTER_NAME, tags.and("urgent", "false")).increment(nonUrgentMessageCount);
int messagesRemovedFromCache = messagesManager.persistMessages(accountUuid, device, messages);
messageCount += messages.size();
@ -235,7 +252,6 @@ public class MessagePersister implements Managed {
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
sample.stop(PERSIST_QUEUE_TIMER);
}
}
private void trimQueue(final Account account, byte deviceId) {

View File

@ -257,7 +257,7 @@ class MessagePersisterTest {
assertTimeoutPreemptively(Duration.ofSeconds(1), () ->
assertThrows(MessagePersistenceException.class,
() -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE)));
() -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE, "test")));
}
@Test
@ -298,7 +298,7 @@ class MessagePersisterTest {
when(messagesManager.persistMessages(any(UUID.class), any(), anyList())).thenThrow(ItemCollectionSizeLimitExceededException.builder().build());
assertTimeoutPreemptively(Duration.ofSeconds(1), () ->
messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE));
messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE, "test"));
verify(accountsManager, exactly()).removeDevice(destinationAccount, DESTINATION_DEVICE_ID);
}
@ -400,7 +400,7 @@ class MessagePersisterTest {
when(messagesManager.persistMessages(any(UUID.class), any(), anyList())).thenThrow(ItemCollectionSizeLimitExceededException.builder().build());
when(accountsManager.removeDevice(destinationAccount, DESTINATION_DEVICE_ID)).thenReturn(CompletableFuture.failedFuture(new TimeoutException()));
assertThrows(CompletionException.class, () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE));
assertThrows(CompletionException.class, () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE, "test"));
}
@SuppressWarnings("SameParameterValue")