From 599cd766e1c7bd8f0a9fe953de457c8b5674f885 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 23 Sep 2020 11:19:27 -0400 Subject: [PATCH] Let Dropwizard manage persister thread lifecycles. --- .../textsecuregcm/WhisperServerService.java | 2 +- .../storage/MessagePersister.java | 43 ++++++++----------- .../storage/MessagePersisterTest.java | 26 ++++++----- 3 files changed, 36 insertions(+), 35 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 2610766f7..c42a07130 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -326,7 +326,7 @@ public class WhisperServerService extends Application persistQueuesFuture; private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues")); @@ -44,11 +46,12 @@ public class MessagePersister implements Managed { private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class); - public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final Duration persistDelay) { - this.messagesCache = messagesCache; - this.messagesManager = messagesManager; - this.accountsManager = accountsManager; - this.persistDelay = persistDelay; + public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final ScheduledExecutorService scheduledExecutorService, final Duration persistDelay) { + this.messagesCache = messagesCache; + this.messagesManager = messagesManager; + this.accountsManager = accountsManager; + this.persistDelay = persistDelay; + this.scheduledExecutorService = scheduledExecutorService; } @VisibleForTesting @@ -58,25 +61,17 @@ public class MessagePersister implements Managed { @Override public void start() { - running = true; + if (persistQueuesFuture != null) { + persistQueuesFuture.cancel(false); + } - workerThread = new Thread(() -> { - while (running) { - persistNextQueues(Instant.now()); - Util.sleep(100); - } - }); - - workerThread.start(); + persistQueuesFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> this.persistNextQueues(Instant.now()), 0, 100, TimeUnit.MILLISECONDS); } @Override - public void stop() throws Exception { - running = false; - - if (workerThread != null) { - workerThread.join(); - workerThread = null; + public void stop() { + if (persistQueuesFuture != null) { + persistQueuesFuture.cancel(false); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index 149afe116..cf727e198 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -15,6 +15,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static org.mockito.ArgumentMatchers.any; @@ -30,11 +31,12 @@ import static org.mockito.Mockito.when; public class MessagePersisterTest extends AbstractRedisClusterTest { - private ExecutorService notificationExecutorService; - private MessagesCache messagesCache; - private Messages messagesDatabase; - private MessagePersister messagePersister; - private AccountsManager accountsManager; + private ExecutorService notificationExecutorService; + private ScheduledExecutorService scheduledExecutorService; + private MessagesCache messagesCache; + private Messages messagesDatabase; + private MessagePersister messagePersister; + private AccountsManager accountsManager; private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID(); private static final String DESTINATION_ACCOUNT_NUMBER = "+18005551234"; @@ -60,8 +62,9 @@ public class MessagePersisterTest extends AbstractRedisClusterTest { when(account.getNumber()).thenReturn(DESTINATION_ACCOUNT_NUMBER); notificationExecutorService = Executors.newSingleThreadExecutor(); + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService); - messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, PERSIST_DELAY); + messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, scheduledExecutorService, PERSIST_DELAY); doAnswer(invocation -> { final String destination = invocation.getArgument(0, String.class); @@ -83,6 +86,9 @@ public class MessagePersisterTest extends AbstractRedisClusterTest { notificationExecutorService.shutdown(); notificationExecutorService.awaitTermination(1, TimeUnit.SECONDS); + + scheduledExecutorService.shutdown(); + scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS); } @Test @@ -98,7 +104,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest { final int messageCount = (MessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7; final Instant now = Instant.now(); - insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now); + insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, messageCount, now); setNextSlotToPersist(SlotHash.getSlot(queueName)); messagePersister.persistNextQueues(now.plus(messagePersister.getPersistDelay())); @@ -112,7 +118,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest { final int messageCount = (MessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7; final Instant now = Instant.now(); - insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now); + insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, messageCount, now); setNextSlotToPersist(SlotHash.getSlot(queueName)); messagePersister.persistNextQueues(now); @@ -138,7 +144,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest { when(accountsManager.get(accountUuid)).thenReturn(Optional.of(account)); when(account.getNumber()).thenReturn(accountNumber); - insertMessages(accountUuid, accountNumber, deviceId, messagesPerQueue, now); + insertMessages(accountUuid, deviceId, messagesPerQueue, now); } setNextSlotToPersist(slot); @@ -165,7 +171,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest { throw new IllegalStateException("Could not find a queue name for slot " + slot); } - private void insertMessages(final UUID accountUuid, final String accountNumber, final long deviceId, final int messageCount, final Instant firstMessageTimestamp) { + private void insertMessages(final UUID accountUuid, final long deviceId, final int messageCount, final Instant firstMessageTimestamp) { for (int i = 0; i < messageCount; i++) { final UUID messageGuid = UUID.randomUUID();