diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index e501be180..3dc8db775 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -34,8 +34,8 @@ public class MessagePersister implements Managed { private final Duration persistDelay; - private final Thread workerThread; - private volatile boolean running; + private final Thread[] workerThreads = new Thread[WORKER_THREAD_COUNT]; + private volatile boolean running; private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues")); @@ -48,6 +48,8 @@ public class MessagePersister implements Managed { static final int QUEUE_BATCH_LIMIT = 100; static final int MESSAGE_BATCH_LIMIT = 100; + private static final int WORKER_THREAD_COUNT = 4; + private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class); public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final Duration persistDelay) { @@ -56,22 +58,22 @@ public class MessagePersister implements Managed { this.accountsManager = accountsManager; this.persistDelay = persistDelay; - this.workerThread = new Thread(() -> { - while (running) { - try { - final int queuesPersisted = persistNextQueues(Instant.now()); - queueCountHistogram.update(queuesPersisted); + for (int i = 0; i < workerThreads.length; i++) { + workerThreads[i] = new Thread(() -> { + while (running) { + try { + final int queuesPersisted = persistNextQueues(Instant.now()); + queueCountHistogram.update(queuesPersisted); - if (queuesPersisted == 0) { - Util.sleep(100); + if (queuesPersisted == 0) { + Util.sleep(100); + } + } catch (final Throwable t) { + logger.warn("Failed to persist queues", t); } - } catch (final Throwable t) { - logger.warn("Failed to persist queues", t); } - } - }, "MessagePersisterWorker"); - - metricRegistry.gauge(name(getClass(), "workerThreadRunning"), () -> () -> workerThread.isAlive() ? 1 : 0); + }, "MessagePersisterWorker-" + i); + } } @VisibleForTesting @@ -82,17 +84,22 @@ public class MessagePersister implements Managed { @Override public void start() { running = true; - workerThread.start(); + + for (final Thread workerThread : workerThreads) { + workerThread.start(); + } } @Override public void stop() { running = false; - try { - workerThread.join(); - } catch (final InterruptedException e) { - logger.warn("Interrupted while waiting for worker thread to complete current operation"); + for (final Thread workerThread : workerThreads) { + try { + workerThread.join(); + } catch (final InterruptedException e) { + logger.warn("Interrupted while waiting for worker thread to complete current operation"); + } } }