editorconfig formatting
This commit is contained in:
parent
36050f580e
commit
4cfcdb0c96
|
@ -28,151 +28,156 @@ import org.whispersystems.textsecuregcm.util.Util;
|
||||||
|
|
||||||
public class MessagePersister implements Managed {
|
public class MessagePersister implements Managed {
|
||||||
|
|
||||||
private final MessagesCache messagesCache;
|
private final MessagesCache messagesCache;
|
||||||
private final MessagesManager messagesManager;
|
private final MessagesManager messagesManager;
|
||||||
private final AccountsManager accountsManager;
|
private final AccountsManager accountsManager;
|
||||||
|
|
||||||
private final Duration persistDelay;
|
private final Duration persistDelay;
|
||||||
|
|
||||||
private final Thread[] workerThreads = new Thread[WORKER_THREAD_COUNT];
|
private final Thread[] workerThreads = new Thread[WORKER_THREAD_COUNT];
|
||||||
private volatile boolean running;
|
private volatile boolean running;
|
||||||
|
|
||||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||||
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
|
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
|
||||||
private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue"));
|
private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue"));
|
||||||
private final Meter persistQueueExceptionMeter = metricRegistry.meter(name(MessagePersister.class, "persistQueueException"));
|
private final Meter persistQueueExceptionMeter = metricRegistry.meter(
|
||||||
private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount"));
|
name(MessagePersister.class, "persistQueueException"));
|
||||||
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize"));
|
private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount"));
|
||||||
|
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize"));
|
||||||
|
|
||||||
static final int QUEUE_BATCH_LIMIT = 100;
|
static final int QUEUE_BATCH_LIMIT = 100;
|
||||||
static final int MESSAGE_BATCH_LIMIT = 100;
|
static final int MESSAGE_BATCH_LIMIT = 100;
|
||||||
|
|
||||||
private static final long EXCEPTION_PAUSE_MILLIS = Duration.ofSeconds(3).toMillis();
|
private static final long EXCEPTION_PAUSE_MILLIS = Duration.ofSeconds(3).toMillis();
|
||||||
|
|
||||||
private static final String DISABLE_PERSISTER_FEATURE_FLAG = "DISABLE_MESSAGE_PERSISTER";
|
private static final String DISABLE_PERSISTER_FEATURE_FLAG = "DISABLE_MESSAGE_PERSISTER";
|
||||||
private static final int WORKER_THREAD_COUNT = 4;
|
private static final int WORKER_THREAD_COUNT = 4;
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
||||||
|
|
||||||
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager, final Duration persistDelay) {
|
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager,
|
||||||
this.messagesCache = messagesCache;
|
final AccountsManager accountsManager,
|
||||||
this.messagesManager = messagesManager;
|
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
|
||||||
this.accountsManager = accountsManager;
|
final Duration persistDelay) {
|
||||||
this.persistDelay = persistDelay;
|
this.messagesCache = messagesCache;
|
||||||
|
this.messagesManager = messagesManager;
|
||||||
|
this.accountsManager = accountsManager;
|
||||||
|
this.persistDelay = persistDelay;
|
||||||
|
|
||||||
for (int i = 0; i < workerThreads.length; i++) {
|
for (int i = 0; i < workerThreads.length; i++) {
|
||||||
workerThreads[i] = new Thread(() -> {
|
workerThreads[i] = new Thread(() -> {
|
||||||
while (running) {
|
while (running) {
|
||||||
if (dynamicConfigurationManager.getConfiguration().getActiveFeatureFlags().contains(DISABLE_PERSISTER_FEATURE_FLAG)) {
|
if (dynamicConfigurationManager.getConfiguration().getActiveFeatureFlags()
|
||||||
Util.sleep(1000);
|
.contains(DISABLE_PERSISTER_FEATURE_FLAG)) {
|
||||||
} else {
|
Util.sleep(1000);
|
||||||
try {
|
} else {
|
||||||
final int queuesPersisted = persistNextQueues(Instant.now());
|
|
||||||
queueCountHistogram.update(queuesPersisted);
|
|
||||||
|
|
||||||
if (queuesPersisted == 0) {
|
|
||||||
Util.sleep(100);
|
|
||||||
}
|
|
||||||
} catch (final Throwable t) {
|
|
||||||
logger.warn("Failed to persist queues", t);
|
|
||||||
Util.sleep(EXCEPTION_PAUSE_MILLIS);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, "MessagePersisterWorker-" + i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
Duration getPersistDelay() {
|
|
||||||
return persistDelay;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() {
|
|
||||||
running = true;
|
|
||||||
|
|
||||||
for (final Thread workerThread : workerThreads) {
|
|
||||||
workerThread.start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop() {
|
|
||||||
running = false;
|
|
||||||
|
|
||||||
for (final Thread workerThread : workerThreads) {
|
|
||||||
try {
|
try {
|
||||||
workerThread.join();
|
final int queuesPersisted = persistNextQueues(Instant.now());
|
||||||
} catch (final InterruptedException e) {
|
queueCountHistogram.update(queuesPersisted);
|
||||||
logger.warn("Interrupted while waiting for worker thread to complete current operation");
|
|
||||||
|
if (queuesPersisted == 0) {
|
||||||
|
Util.sleep(100);
|
||||||
|
}
|
||||||
|
} catch (final Throwable t) {
|
||||||
|
logger.warn("Failed to persist queues", t);
|
||||||
|
Util.sleep(EXCEPTION_PAUSE_MILLIS);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}, "MessagePersisterWorker-" + i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Duration getPersistDelay() {
|
||||||
|
return persistDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
running = true;
|
||||||
|
|
||||||
|
for (final Thread workerThread : workerThreads) {
|
||||||
|
workerThread.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
running = false;
|
||||||
|
|
||||||
|
for (final Thread workerThread : workerThreads) {
|
||||||
|
try {
|
||||||
|
workerThread.join();
|
||||||
|
} catch (final InterruptedException e) {
|
||||||
|
logger.warn("Interrupted while waiting for worker thread to complete current operation");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int persistNextQueues(final Instant currentTime) {
|
||||||
|
final int slot = messagesCache.getNextSlotToPersist();
|
||||||
|
|
||||||
|
List<String> queuesToPersist;
|
||||||
|
int queuesPersisted = 0;
|
||||||
|
|
||||||
|
do {
|
||||||
|
try (final Timer.Context ignored = getQueuesTimer.time()) {
|
||||||
|
queuesToPersist = messagesCache.getQueuesToPersist(slot, currentTime.minus(persistDelay), QUEUE_BATCH_LIMIT);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final String queue : queuesToPersist) {
|
||||||
|
final UUID accountUuid = MessagesCache.getAccountUuidFromQueueName(queue);
|
||||||
|
final long deviceId = MessagesCache.getDeviceIdFromQueueName(queue);
|
||||||
|
|
||||||
|
try {
|
||||||
|
persistQueue(accountUuid, deviceId);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
persistQueueExceptionMeter.mark();
|
||||||
|
logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e);
|
||||||
|
|
||||||
|
messagesCache.addQueueToPersist(accountUuid, deviceId);
|
||||||
|
|
||||||
|
Util.sleep(EXCEPTION_PAUSE_MILLIS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
queuesPersisted += queuesToPersist.size();
|
||||||
|
} while (queuesToPersist.size() >= QUEUE_BATCH_LIMIT);
|
||||||
|
|
||||||
|
return queuesPersisted;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void persistQueue(final UUID accountUuid, final long deviceId) {
|
||||||
|
final Optional<Account> maybeAccount = accountsManager.getByAccountIdentifier(accountUuid);
|
||||||
|
|
||||||
|
if (maybeAccount.isEmpty()) {
|
||||||
|
logger.error("No account record found for account {}", accountUuid);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
try (final Timer.Context ignored = persistQueueTimer.time()) {
|
||||||
int persistNextQueues(final Instant currentTime) {
|
messagesCache.lockQueueForPersistence(accountUuid, deviceId);
|
||||||
final int slot = messagesCache.getNextSlotToPersist();
|
|
||||||
|
|
||||||
List<String> queuesToPersist;
|
try {
|
||||||
int queuesPersisted = 0;
|
int messageCount = 0;
|
||||||
|
List<MessageProtos.Envelope> messages;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
try (final Timer.Context ignored = getQueuesTimer.time()) {
|
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
|
||||||
queuesToPersist = messagesCache.getQueuesToPersist(slot, currentTime.minus(persistDelay), QUEUE_BATCH_LIMIT);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (final String queue : queuesToPersist) {
|
messagesManager.persistMessages(accountUuid, deviceId, messages);
|
||||||
final UUID accountUuid = MessagesCache.getAccountUuidFromQueueName(queue);
|
messageCount += messages.size();
|
||||||
final long deviceId = MessagesCache.getDeviceIdFromQueueName(queue);
|
|
||||||
|
|
||||||
try {
|
|
||||||
persistQueue(accountUuid, deviceId);
|
|
||||||
} catch (final Exception e) {
|
|
||||||
persistQueueExceptionMeter.mark();
|
|
||||||
logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e);
|
|
||||||
|
|
||||||
messagesCache.addQueueToPersist(accountUuid, deviceId);
|
} while (!messages.isEmpty());
|
||||||
|
|
||||||
Util.sleep(EXCEPTION_PAUSE_MILLIS);
|
queueSizeHistogram.update(messageCount);
|
||||||
}
|
} finally {
|
||||||
}
|
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
|
||||||
|
}
|
||||||
queuesPersisted += queuesToPersist.size();
|
|
||||||
} while (queuesToPersist.size() >= QUEUE_BATCH_LIMIT);
|
|
||||||
|
|
||||||
return queuesPersisted;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
void persistQueue(final UUID accountUuid, final long deviceId) {
|
|
||||||
final Optional<Account> maybeAccount = accountsManager.getByAccountIdentifier(accountUuid);
|
|
||||||
|
|
||||||
if (maybeAccount.isEmpty()) {
|
|
||||||
logger.error("No account record found for account {}", accountUuid);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try (final Timer.Context ignored = persistQueueTimer.time()) {
|
|
||||||
messagesCache.lockQueueForPersistence(accountUuid, deviceId);
|
|
||||||
|
|
||||||
try {
|
|
||||||
int messageCount = 0;
|
|
||||||
List<MessageProtos.Envelope> messages;
|
|
||||||
|
|
||||||
do {
|
|
||||||
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
|
|
||||||
|
|
||||||
messagesManager.persistMessages(accountUuid, deviceId, messages);
|
|
||||||
messageCount += messages.size();
|
|
||||||
|
|
||||||
|
|
||||||
} while (!messages.isEmpty());
|
|
||||||
|
|
||||||
queueSizeHistogram.update(messageCount);
|
|
||||||
} finally {
|
|
||||||
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue