Let Dropwizard manage persister thread lifecycles.

This commit is contained in:
Jon Chambers 2020-09-23 11:19:27 -04:00 committed by Jon Chambers
parent 84e02099a2
commit 599cd766e1
3 changed files with 36 additions and 35 deletions

View File

@ -326,7 +326,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
MessagePersister clusterMessagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
MessagePersister clusterMessagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, recurringJobExecutor, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());

View File

@ -10,13 +10,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
@ -26,10 +28,10 @@ public class MessagePersister implements Managed {
private final MessagesManager messagesManager;
private final AccountsManager accountsManager;
private final Duration persistDelay;
private final Duration persistDelay;
private volatile boolean running = false;
private Thread workerThread;
private final ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> persistQueuesFuture;
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
@ -44,11 +46,12 @@ public class MessagePersister implements Managed {
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final Duration persistDelay) {
this.messagesCache = messagesCache;
this.messagesManager = messagesManager;
this.accountsManager = accountsManager;
this.persistDelay = persistDelay;
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final ScheduledExecutorService scheduledExecutorService, final Duration persistDelay) {
this.messagesCache = messagesCache;
this.messagesManager = messagesManager;
this.accountsManager = accountsManager;
this.persistDelay = persistDelay;
this.scheduledExecutorService = scheduledExecutorService;
}
@VisibleForTesting
@ -58,25 +61,17 @@ public class MessagePersister implements Managed {
@Override
public void start() {
running = true;
if (persistQueuesFuture != null) {
persistQueuesFuture.cancel(false);
}
workerThread = new Thread(() -> {
while (running) {
persistNextQueues(Instant.now());
Util.sleep(100);
}
});
workerThread.start();
persistQueuesFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> this.persistNextQueues(Instant.now()), 0, 100, TimeUnit.MILLISECONDS);
}
@Override
public void stop() throws Exception {
running = false;
if (workerThread != null) {
workerThread.join();
workerThread = null;
public void stop() {
if (persistQueuesFuture != null) {
persistQueuesFuture.cancel(false);
}
}

View File

@ -15,6 +15,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.any;
@ -30,11 +31,12 @@ import static org.mockito.Mockito.when;
public class MessagePersisterTest extends AbstractRedisClusterTest {
private ExecutorService notificationExecutorService;
private MessagesCache messagesCache;
private Messages messagesDatabase;
private MessagePersister messagePersister;
private AccountsManager accountsManager;
private ExecutorService notificationExecutorService;
private ScheduledExecutorService scheduledExecutorService;
private MessagesCache messagesCache;
private Messages messagesDatabase;
private MessagePersister messagePersister;
private AccountsManager accountsManager;
private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID();
private static final String DESTINATION_ACCOUNT_NUMBER = "+18005551234";
@ -60,8 +62,9 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
when(account.getNumber()).thenReturn(DESTINATION_ACCOUNT_NUMBER);
notificationExecutorService = Executors.newSingleThreadExecutor();
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService);
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, PERSIST_DELAY);
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, scheduledExecutorService, PERSIST_DELAY);
doAnswer(invocation -> {
final String destination = invocation.getArgument(0, String.class);
@ -83,6 +86,9 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
notificationExecutorService.shutdown();
notificationExecutorService.awaitTermination(1, TimeUnit.SECONDS);
scheduledExecutorService.shutdown();
scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
}
@Test
@ -98,7 +104,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
final int messageCount = (MessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7;
final Instant now = Instant.now();
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now);
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, messageCount, now);
setNextSlotToPersist(SlotHash.getSlot(queueName));
messagePersister.persistNextQueues(now.plus(messagePersister.getPersistDelay()));
@ -112,7 +118,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
final int messageCount = (MessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7;
final Instant now = Instant.now();
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now);
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, messageCount, now);
setNextSlotToPersist(SlotHash.getSlot(queueName));
messagePersister.persistNextQueues(now);
@ -138,7 +144,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
when(accountsManager.get(accountUuid)).thenReturn(Optional.of(account));
when(account.getNumber()).thenReturn(accountNumber);
insertMessages(accountUuid, accountNumber, deviceId, messagesPerQueue, now);
insertMessages(accountUuid, deviceId, messagesPerQueue, now);
}
setNextSlotToPersist(slot);
@ -165,7 +171,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
throw new IllegalStateException("Could not find a queue name for slot " + slot);
}
private void insertMessages(final UUID accountUuid, final String accountNumber, final long deviceId, final int messageCount, final Instant firstMessageTimestamp) {
private void insertMessages(final UUID accountUuid, final long deviceId, final int messageCount, final Instant firstMessageTimestamp) {
for (int i = 0; i < messageCount; i++) {
final UUID messageGuid = UUID.randomUUID();