diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index f1805620c..49bc74abd 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -35,12 +35,13 @@ public class MessagePersister implements Managed { private final Thread workerThread; private volatile boolean running; - private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues")); - private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue")); - private final Meter persistMessageMeter = metricRegistry.meter(name(MessagePersister.class, "persistMessage")); - private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount")); - private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize")); + private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues")); + private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue")); + private final Meter persistMessageMeter = metricRegistry.meter(name(MessagePersister.class, "persistMessage")); + private final Meter persistQueueExceptionMeter = metricRegistry.meter(name(MessagePersister.class, "persistQueueException")); + private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount")); + private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize")); static final int QUEUE_BATCH_LIMIT = 100; static final int MESSAGE_BATCH_LIMIT = 100; @@ -112,7 +113,9 @@ public class MessagePersister implements Managed { try { persistQueue(accountUuid, deviceId); } catch (final Exception e) { + persistQueueExceptionMeter.mark(); logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e); + messagesCache.addQueueToPersist(accountUuid, deviceId); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index de01ca470..8100f2c0f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -62,6 +62,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert"), "ephemeral", "false"); private final Timer insertEphemeralTimer = Metrics.timer(name(MessagesCache.class, "insert"), "ephemeral", "true"); private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get")); + private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist")); private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear")); private final Timer takeEphemeralMessageTimer = Metrics.timer(name(MessagesCache.class, "takeEphemeral")); private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage")); @@ -318,9 +319,9 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp List getQueuesToPersist(final int slot, final Instant maxTime, final int limit) { //noinspection unchecked - return (List)getQueuesToPersistScript.execute(List.of(new String(getQueueIndexKey(slot), StandardCharsets.UTF_8)), - List.of(String.valueOf(maxTime.toEpochMilli()), - String.valueOf(limit))); + return getQueuesToPersistTimer.record(() -> (List)getQueuesToPersistScript.execute(List.of(new String(getQueueIndexKey(slot), StandardCharsets.UTF_8)), + List.of(String.valueOf(maxTime.toEpochMilli()), + String.valueOf(limit)))); } void addQueueToPersist(final UUID accountUuid, final long deviceId) {