Pause only if we're running low on queues to persist.
This commit is contained in:
parent
2ba36ee04c
commit
65e585e122
|
@ -58,8 +58,12 @@ public class MessagePersister implements Managed {
|
||||||
this.workerThread = new Thread(() -> {
|
this.workerThread = new Thread(() -> {
|
||||||
while (running) {
|
while (running) {
|
||||||
try {
|
try {
|
||||||
persistNextQueues(Instant.now());
|
final int queuesPersisted = persistNextQueues(Instant.now());
|
||||||
Util.sleep(100);
|
queueCountHistogram.update(queuesPersisted);
|
||||||
|
|
||||||
|
if (queuesPersisted == 0) {
|
||||||
|
Util.sleep(100);
|
||||||
|
}
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
logger.warn("Failed to persist queues", t);
|
logger.warn("Failed to persist queues", t);
|
||||||
}
|
}
|
||||||
|
@ -92,7 +96,7 @@ public class MessagePersister implements Managed {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void persistNextQueues(final Instant currentTime) {
|
int persistNextQueues(final Instant currentTime) {
|
||||||
final int slot = messagesCache.getNextSlotToPersist();
|
final int slot = messagesCache.getNextSlotToPersist();
|
||||||
|
|
||||||
List<String> queuesToPersist;
|
List<String> queuesToPersist;
|
||||||
|
@ -118,7 +122,7 @@ public class MessagePersister implements Managed {
|
||||||
queuesPersisted += queuesToPersist.size();
|
queuesPersisted += queuesToPersist.size();
|
||||||
} while (queuesToPersist.size() >= QUEUE_BATCH_LIMIT);
|
} while (queuesToPersist.size() >= QUEUE_BATCH_LIMIT);
|
||||||
|
|
||||||
queueCountHistogram.update(queuesPersisted);
|
return queuesPersisted;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
Loading…
Reference in New Issue