diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index c42a07130..2d5070cd8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -326,7 +326,7 @@ public class WhisperServerService extends Application persistQueuesFuture; + private final Thread workerThread; + private volatile boolean running; private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues")); @@ -48,12 +49,22 @@ public class MessagePersister implements Managed { private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class); - public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final ScheduledExecutorService scheduledExecutorService, final Duration persistDelay) { + public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final Duration persistDelay) { this.messagesCache = messagesCache; this.messagesManager = messagesManager; this.accountsManager = accountsManager; this.persistDelay = persistDelay; - this.scheduledExecutorService = scheduledExecutorService; + + this.workerThread = new Thread(() -> { + while (running) { + try { + persistNextQueues(Instant.now()); + Util.sleep(100); + } catch (final Throwable t) { + logger.warn("Failed to persist queues", t); + } + } + }); } @VisibleForTesting @@ -63,23 +74,18 @@ public class MessagePersister implements Managed { @Override public void start() { - if (persistQueuesFuture != null) { - persistQueuesFuture.cancel(false); - } - - persistQueuesFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> { - try { - persistNextQueues(Instant.now()); - } catch (final Exception e) { - logger.warn("Failed to persist queues", e); - } - }, 0, 100, TimeUnit.MILLISECONDS); + running = true; + workerThread.start(); } @Override public void stop() { - if (persistQueuesFuture != null) { - persistQueuesFuture.cancel(false); + running = false; + + try { + workerThread.join(); + } catch (final InterruptedException e) { + logger.warn("Interrupted while waiting for worker thread to complete current operation"); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index d7aaaf9a3..478bb8cc2 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -24,8 +24,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,11 +38,11 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest { @Rule public PreparedDbRule db = EmbeddedPostgresRules.preparedDatabase(LiquibasePreparer.forClasspathLocation("messagedb.xml")); - private ScheduledExecutorService scheduledExecutorService; - private MessagesCache messagesCache; - private MessagesManager messagesManager; - private MessagePersister messagePersister; - private Account account; + private ExecutorService notificationExecutorService; + private MessagesCache messagesCache; + private MessagesManager messagesManager; + private MessagePersister messagePersister; + private Account account; private static final Duration PERSIST_DELAY = Duration.ofMinutes(10); @@ -59,10 +59,10 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest { final Messages messages = new Messages(new FaultTolerantDatabase("messages-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration())); final AccountsManager accountsManager = mock(AccountsManager.class); - scheduledExecutorService = Executors.newScheduledThreadPool(4); - messagesCache = new MessagesCache(getRedisCluster(), scheduledExecutorService); - messagesManager = new MessagesManager(messages, messagesCache, mock(PushLatencyManager.class)); - messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, scheduledExecutorService, PERSIST_DELAY); + notificationExecutorService = Executors.newSingleThreadExecutor(); + messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService); + messagesManager = new MessagesManager(messages, messagesCache, mock(PushLatencyManager.class)); + messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, PERSIST_DELAY); account = mock(Account.class); @@ -80,8 +80,8 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest { public void tearDown() throws Exception { super.tearDown(); - scheduledExecutorService.shutdown(); - scheduledExecutorService.awaitTermination(15, TimeUnit.SECONDS); + notificationExecutorService.shutdown(); + notificationExecutorService.awaitTermination(15, TimeUnit.SECONDS); } @Test(timeout = 15_000) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index 13185b8bd..192d6548a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -18,7 +18,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -36,7 +35,6 @@ import static org.mockito.Mockito.when; public class MessagePersisterTest extends AbstractRedisClusterTest { private ExecutorService notificationExecutorService; - private ScheduledExecutorService scheduledExecutorService; private MessagesCache messagesCache; private Messages messagesDatabase; private MessagePersister messagePersister; @@ -66,9 +64,8 @@ public class MessagePersisterTest extends AbstractRedisClusterTest { when(account.getNumber()).thenReturn(DESTINATION_ACCOUNT_NUMBER); notificationExecutorService = Executors.newSingleThreadExecutor(); - scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService); - messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, scheduledExecutorService, PERSIST_DELAY); + messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, PERSIST_DELAY); doAnswer(invocation -> { final String destination = invocation.getArgument(0, String.class); @@ -92,9 +89,6 @@ public class MessagePersisterTest extends AbstractRedisClusterTest { notificationExecutorService.shutdown(); notificationExecutorService.awaitTermination(1, TimeUnit.SECONDS); - - scheduledExecutorService.shutdown(); - scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS); } @Test