Parallelize message persisters.

This commit is contained in:
Jon Chambers 2021-01-12 11:46:56 -05:00 committed by Jon Chambers
parent ff0bdcd0c2
commit ad30786f4a
1 changed files with 27 additions and 20 deletions

View File

@ -34,8 +34,8 @@ public class MessagePersister implements Managed {
private final Duration persistDelay; private final Duration persistDelay;
private final Thread workerThread; private final Thread[] workerThreads = new Thread[WORKER_THREAD_COUNT];
private volatile boolean running; private volatile boolean running;
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues")); private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
@ -48,6 +48,8 @@ public class MessagePersister implements Managed {
static final int QUEUE_BATCH_LIMIT = 100; static final int QUEUE_BATCH_LIMIT = 100;
static final int MESSAGE_BATCH_LIMIT = 100; static final int MESSAGE_BATCH_LIMIT = 100;
private static final int WORKER_THREAD_COUNT = 4;
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class); private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final Duration persistDelay) { public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final Duration persistDelay) {
@ -56,22 +58,22 @@ public class MessagePersister implements Managed {
this.accountsManager = accountsManager; this.accountsManager = accountsManager;
this.persistDelay = persistDelay; this.persistDelay = persistDelay;
this.workerThread = new Thread(() -> { for (int i = 0; i < workerThreads.length; i++) {
while (running) { workerThreads[i] = new Thread(() -> {
try { while (running) {
final int queuesPersisted = persistNextQueues(Instant.now()); try {
queueCountHistogram.update(queuesPersisted); final int queuesPersisted = persistNextQueues(Instant.now());
queueCountHistogram.update(queuesPersisted);
if (queuesPersisted == 0) { if (queuesPersisted == 0) {
Util.sleep(100); Util.sleep(100);
}
} catch (final Throwable t) {
logger.warn("Failed to persist queues", t);
} }
} catch (final Throwable t) {
logger.warn("Failed to persist queues", t);
} }
} }, "MessagePersisterWorker-" + i);
}, "MessagePersisterWorker"); }
metricRegistry.gauge(name(getClass(), "workerThreadRunning"), () -> () -> workerThread.isAlive() ? 1 : 0);
} }
@VisibleForTesting @VisibleForTesting
@ -82,17 +84,22 @@ public class MessagePersister implements Managed {
@Override @Override
public void start() { public void start() {
running = true; running = true;
workerThread.start();
for (final Thread workerThread : workerThreads) {
workerThread.start();
}
} }
@Override @Override
public void stop() { public void stop() {
running = false; running = false;
try { for (final Thread workerThread : workerThreads) {
workerThread.join(); try {
} catch (final InterruptedException e) { workerThread.join();
logger.warn("Interrupted while waiting for worker thread to complete current operation"); } catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for worker thread to complete current operation");
}
} }
} }