From 4cfcdb0c96fc369f0814537ac18f1e680dfade4c Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Wed, 27 Jul 2022 13:36:28 -0500 Subject: [PATCH] editorconfig formatting --- .../storage/MessagePersister.java | 255 +++++++++--------- 1 file changed, 130 insertions(+), 125 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index 9f69feac7..b6c5fcdaf 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -28,151 +28,156 @@ import org.whispersystems.textsecuregcm.util.Util; public class MessagePersister implements Managed { - private final MessagesCache messagesCache; - private final MessagesManager messagesManager; - private final AccountsManager accountsManager; + private final MessagesCache messagesCache; + private final MessagesManager messagesManager; + private final AccountsManager accountsManager; - private final Duration persistDelay; + private final Duration persistDelay; - private final Thread[] workerThreads = new Thread[WORKER_THREAD_COUNT]; - private volatile boolean running; + private final Thread[] workerThreads = new Thread[WORKER_THREAD_COUNT]; + private volatile boolean running; - private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues")); - private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue")); - private final Meter persistQueueExceptionMeter = metricRegistry.meter(name(MessagePersister.class, "persistQueueException")); - private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount")); - private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize")); + private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues")); + private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue")); + private final Meter persistQueueExceptionMeter = metricRegistry.meter( + name(MessagePersister.class, "persistQueueException")); + private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount")); + private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize")); - static final int QUEUE_BATCH_LIMIT = 100; - static final int MESSAGE_BATCH_LIMIT = 100; + static final int QUEUE_BATCH_LIMIT = 100; + static final int MESSAGE_BATCH_LIMIT = 100; - private static final long EXCEPTION_PAUSE_MILLIS = Duration.ofSeconds(3).toMillis(); + private static final long EXCEPTION_PAUSE_MILLIS = Duration.ofSeconds(3).toMillis(); - private static final String DISABLE_PERSISTER_FEATURE_FLAG = "DISABLE_MESSAGE_PERSISTER"; - private static final int WORKER_THREAD_COUNT = 4; + private static final String DISABLE_PERSISTER_FEATURE_FLAG = "DISABLE_MESSAGE_PERSISTER"; + private static final int WORKER_THREAD_COUNT = 4; - private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class); + private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class); - public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final DynamicConfigurationManager dynamicConfigurationManager, final Duration persistDelay) { - this.messagesCache = messagesCache; - this.messagesManager = messagesManager; - this.accountsManager = accountsManager; - this.persistDelay = persistDelay; + public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, + final AccountsManager accountsManager, + final DynamicConfigurationManager dynamicConfigurationManager, + final Duration persistDelay) { + this.messagesCache = messagesCache; + this.messagesManager = messagesManager; + this.accountsManager = accountsManager; + this.persistDelay = persistDelay; - for (int i = 0; i < workerThreads.length; i++) { - workerThreads[i] = new Thread(() -> { - while (running) { - if (dynamicConfigurationManager.getConfiguration().getActiveFeatureFlags().contains(DISABLE_PERSISTER_FEATURE_FLAG)) { - Util.sleep(1000); - } else { - try { - final int queuesPersisted = persistNextQueues(Instant.now()); - queueCountHistogram.update(queuesPersisted); - - if (queuesPersisted == 0) { - Util.sleep(100); - } - } catch (final Throwable t) { - logger.warn("Failed to persist queues", t); - Util.sleep(EXCEPTION_PAUSE_MILLIS); - } - } - } - }, "MessagePersisterWorker-" + i); - } - } - - @VisibleForTesting - Duration getPersistDelay() { - return persistDelay; - } - - @Override - public void start() { - running = true; - - for (final Thread workerThread : workerThreads) { - workerThread.start(); - } - } - - @Override - public void stop() { - running = false; - - for (final Thread workerThread : workerThreads) { + for (int i = 0; i < workerThreads.length; i++) { + workerThreads[i] = new Thread(() -> { + while (running) { + if (dynamicConfigurationManager.getConfiguration().getActiveFeatureFlags() + .contains(DISABLE_PERSISTER_FEATURE_FLAG)) { + Util.sleep(1000); + } else { try { - workerThread.join(); - } catch (final InterruptedException e) { - logger.warn("Interrupted while waiting for worker thread to complete current operation"); + final int queuesPersisted = persistNextQueues(Instant.now()); + queueCountHistogram.update(queuesPersisted); + + if (queuesPersisted == 0) { + Util.sleep(100); + } + } catch (final Throwable t) { + logger.warn("Failed to persist queues", t); + Util.sleep(EXCEPTION_PAUSE_MILLIS); } + } } + }, "MessagePersisterWorker-" + i); + } + } + + @VisibleForTesting + Duration getPersistDelay() { + return persistDelay; + } + + @Override + public void start() { + running = true; + + for (final Thread workerThread : workerThreads) { + workerThread.start(); + } + } + + @Override + public void stop() { + running = false; + + for (final Thread workerThread : workerThreads) { + try { + workerThread.join(); + } catch (final InterruptedException e) { + logger.warn("Interrupted while waiting for worker thread to complete current operation"); + } + } + } + + @VisibleForTesting + int persistNextQueues(final Instant currentTime) { + final int slot = messagesCache.getNextSlotToPersist(); + + List queuesToPersist; + int queuesPersisted = 0; + + do { + try (final Timer.Context ignored = getQueuesTimer.time()) { + queuesToPersist = messagesCache.getQueuesToPersist(slot, currentTime.minus(persistDelay), QUEUE_BATCH_LIMIT); + } + + for (final String queue : queuesToPersist) { + final UUID accountUuid = MessagesCache.getAccountUuidFromQueueName(queue); + final long deviceId = MessagesCache.getDeviceIdFromQueueName(queue); + + try { + persistQueue(accountUuid, deviceId); + } catch (final Exception e) { + persistQueueExceptionMeter.mark(); + logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e); + + messagesCache.addQueueToPersist(accountUuid, deviceId); + + Util.sleep(EXCEPTION_PAUSE_MILLIS); + } + } + + queuesPersisted += queuesToPersist.size(); + } while (queuesToPersist.size() >= QUEUE_BATCH_LIMIT); + + return queuesPersisted; + } + + @VisibleForTesting + void persistQueue(final UUID accountUuid, final long deviceId) { + final Optional maybeAccount = accountsManager.getByAccountIdentifier(accountUuid); + + if (maybeAccount.isEmpty()) { + logger.error("No account record found for account {}", accountUuid); + return; } - @VisibleForTesting - int persistNextQueues(final Instant currentTime) { - final int slot = messagesCache.getNextSlotToPersist(); + try (final Timer.Context ignored = persistQueueTimer.time()) { + messagesCache.lockQueueForPersistence(accountUuid, deviceId); - List queuesToPersist; - int queuesPersisted = 0; + try { + int messageCount = 0; + List messages; do { - try (final Timer.Context ignored = getQueuesTimer.time()) { - queuesToPersist = messagesCache.getQueuesToPersist(slot, currentTime.minus(persistDelay), QUEUE_BATCH_LIMIT); - } + messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT); - for (final String queue : queuesToPersist) { - final UUID accountUuid = MessagesCache.getAccountUuidFromQueueName(queue); - final long deviceId = MessagesCache.getDeviceIdFromQueueName(queue); + messagesManager.persistMessages(accountUuid, deviceId, messages); + messageCount += messages.size(); - try { - persistQueue(accountUuid, deviceId); - } catch (final Exception e) { - persistQueueExceptionMeter.mark(); - logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e); - messagesCache.addQueueToPersist(accountUuid, deviceId); + } while (!messages.isEmpty()); - Util.sleep(EXCEPTION_PAUSE_MILLIS); - } - } - - queuesPersisted += queuesToPersist.size(); - } while (queuesToPersist.size() >= QUEUE_BATCH_LIMIT); - - return queuesPersisted; - } - - @VisibleForTesting - void persistQueue(final UUID accountUuid, final long deviceId) { - final Optional maybeAccount = accountsManager.getByAccountIdentifier(accountUuid); - - if (maybeAccount.isEmpty()) { - logger.error("No account record found for account {}", accountUuid); - return; - } - - try (final Timer.Context ignored = persistQueueTimer.time()) { - messagesCache.lockQueueForPersistence(accountUuid, deviceId); - - try { - int messageCount = 0; - List messages; - - do { - messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT); - - messagesManager.persistMessages(accountUuid, deviceId, messages); - messageCount += messages.size(); - - - } while (!messages.isEmpty()); - - queueSizeHistogram.update(messageCount); - } finally { - messagesCache.unlockQueueForPersistence(accountUuid, deviceId); - } - } + queueSizeHistogram.update(messageCount); + } finally { + messagesCache.unlockQueueForPersistence(accountUuid, deviceId); + } } + } }