Let MessagePersister manage its own worker thread.
This commit is contained in:
parent
010770904f
commit
fc05529574
|
@ -326,7 +326,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
|
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
|
||||||
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
||||||
|
|
||||||
MessagePersister clusterMessagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, recurringJobExecutor, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
|
MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
|
||||||
|
|
||||||
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
|
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
|
||||||
|
|
||||||
|
@ -347,7 +347,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
environment.lifecycle().manage(accountDatabaseCrawler);
|
environment.lifecycle().manage(accountDatabaseCrawler);
|
||||||
environment.lifecycle().manage(remoteConfigsManager);
|
environment.lifecycle().manage(remoteConfigsManager);
|
||||||
environment.lifecycle().manage(messagesCache);
|
environment.lifecycle().manage(messagesCache);
|
||||||
environment.lifecycle().manage(clusterMessagePersister);
|
environment.lifecycle().manage(messagePersister);
|
||||||
environment.lifecycle().manage(clientPresenceManager);
|
environment.lifecycle().manage(clientPresenceManager);
|
||||||
environment.lifecycle().manage(featureFlagsManager);
|
environment.lifecycle().manage(featureFlagsManager);
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
|
import org.whispersystems.textsecuregcm.util.Util;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
@ -31,8 +32,8 @@ public class MessagePersister implements Managed {
|
||||||
|
|
||||||
private final Duration persistDelay;
|
private final Duration persistDelay;
|
||||||
|
|
||||||
private final ScheduledExecutorService scheduledExecutorService;
|
private final Thread workerThread;
|
||||||
private ScheduledFuture<?> persistQueuesFuture;
|
private volatile boolean running;
|
||||||
|
|
||||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||||
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
|
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
|
||||||
|
@ -48,12 +49,22 @@ public class MessagePersister implements Managed {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
||||||
|
|
||||||
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final ScheduledExecutorService scheduledExecutorService, final Duration persistDelay) {
|
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final Duration persistDelay) {
|
||||||
this.messagesCache = messagesCache;
|
this.messagesCache = messagesCache;
|
||||||
this.messagesManager = messagesManager;
|
this.messagesManager = messagesManager;
|
||||||
this.accountsManager = accountsManager;
|
this.accountsManager = accountsManager;
|
||||||
this.persistDelay = persistDelay;
|
this.persistDelay = persistDelay;
|
||||||
this.scheduledExecutorService = scheduledExecutorService;
|
|
||||||
|
this.workerThread = new Thread(() -> {
|
||||||
|
while (running) {
|
||||||
|
try {
|
||||||
|
persistNextQueues(Instant.now());
|
||||||
|
Util.sleep(100);
|
||||||
|
} catch (final Throwable t) {
|
||||||
|
logger.warn("Failed to persist queues", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -63,23 +74,18 @@ public class MessagePersister implements Managed {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
if (persistQueuesFuture != null) {
|
running = true;
|
||||||
persistQueuesFuture.cancel(false);
|
workerThread.start();
|
||||||
}
|
|
||||||
|
|
||||||
persistQueuesFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {
|
|
||||||
try {
|
|
||||||
persistNextQueues(Instant.now());
|
|
||||||
} catch (final Exception e) {
|
|
||||||
logger.warn("Failed to persist queues", e);
|
|
||||||
}
|
|
||||||
}, 0, 100, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void stop() {
|
||||||
if (persistQueuesFuture != null) {
|
running = false;
|
||||||
persistQueuesFuture.cancel(false);
|
|
||||||
|
try {
|
||||||
|
workerThread.join();
|
||||||
|
} catch (final InterruptedException e) {
|
||||||
|
logger.warn("Interrupted while waiting for worker thread to complete current operation");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,8 +24,8 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
@ -38,11 +38,11 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest {
|
||||||
@Rule
|
@Rule
|
||||||
public PreparedDbRule db = EmbeddedPostgresRules.preparedDatabase(LiquibasePreparer.forClasspathLocation("messagedb.xml"));
|
public PreparedDbRule db = EmbeddedPostgresRules.preparedDatabase(LiquibasePreparer.forClasspathLocation("messagedb.xml"));
|
||||||
|
|
||||||
private ScheduledExecutorService scheduledExecutorService;
|
private ExecutorService notificationExecutorService;
|
||||||
private MessagesCache messagesCache;
|
private MessagesCache messagesCache;
|
||||||
private MessagesManager messagesManager;
|
private MessagesManager messagesManager;
|
||||||
private MessagePersister messagePersister;
|
private MessagePersister messagePersister;
|
||||||
private Account account;
|
private Account account;
|
||||||
|
|
||||||
private static final Duration PERSIST_DELAY = Duration.ofMinutes(10);
|
private static final Duration PERSIST_DELAY = Duration.ofMinutes(10);
|
||||||
|
|
||||||
|
@ -59,10 +59,10 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest {
|
||||||
final Messages messages = new Messages(new FaultTolerantDatabase("messages-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration()));
|
final Messages messages = new Messages(new FaultTolerantDatabase("messages-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration()));
|
||||||
final AccountsManager accountsManager = mock(AccountsManager.class);
|
final AccountsManager accountsManager = mock(AccountsManager.class);
|
||||||
|
|
||||||
scheduledExecutorService = Executors.newScheduledThreadPool(4);
|
notificationExecutorService = Executors.newSingleThreadExecutor();
|
||||||
messagesCache = new MessagesCache(getRedisCluster(), scheduledExecutorService);
|
messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService);
|
||||||
messagesManager = new MessagesManager(messages, messagesCache, mock(PushLatencyManager.class));
|
messagesManager = new MessagesManager(messages, messagesCache, mock(PushLatencyManager.class));
|
||||||
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, scheduledExecutorService, PERSIST_DELAY);
|
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, PERSIST_DELAY);
|
||||||
|
|
||||||
account = mock(Account.class);
|
account = mock(Account.class);
|
||||||
|
|
||||||
|
@ -80,8 +80,8 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest {
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
super.tearDown();
|
super.tearDown();
|
||||||
|
|
||||||
scheduledExecutorService.shutdown();
|
notificationExecutorService.shutdown();
|
||||||
scheduledExecutorService.awaitTermination(15, TimeUnit.SECONDS);
|
notificationExecutorService.awaitTermination(15, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 15_000)
|
@Test(timeout = 15_000)
|
||||||
|
|
|
@ -18,7 +18,6 @@ import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -36,7 +35,6 @@ import static org.mockito.Mockito.when;
|
||||||
public class MessagePersisterTest extends AbstractRedisClusterTest {
|
public class MessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
private ExecutorService notificationExecutorService;
|
private ExecutorService notificationExecutorService;
|
||||||
private ScheduledExecutorService scheduledExecutorService;
|
|
||||||
private MessagesCache messagesCache;
|
private MessagesCache messagesCache;
|
||||||
private Messages messagesDatabase;
|
private Messages messagesDatabase;
|
||||||
private MessagePersister messagePersister;
|
private MessagePersister messagePersister;
|
||||||
|
@ -66,9 +64,8 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
when(account.getNumber()).thenReturn(DESTINATION_ACCOUNT_NUMBER);
|
when(account.getNumber()).thenReturn(DESTINATION_ACCOUNT_NUMBER);
|
||||||
|
|
||||||
notificationExecutorService = Executors.newSingleThreadExecutor();
|
notificationExecutorService = Executors.newSingleThreadExecutor();
|
||||||
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
|
||||||
messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService);
|
messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService);
|
||||||
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, scheduledExecutorService, PERSIST_DELAY);
|
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, PERSIST_DELAY);
|
||||||
|
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
final String destination = invocation.getArgument(0, String.class);
|
final String destination = invocation.getArgument(0, String.class);
|
||||||
|
@ -92,9 +89,6 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
notificationExecutorService.shutdown();
|
notificationExecutorService.shutdown();
|
||||||
notificationExecutorService.awaitTermination(1, TimeUnit.SECONDS);
|
notificationExecutorService.awaitTermination(1, TimeUnit.SECONDS);
|
||||||
|
|
||||||
scheduledExecutorService.shutdown();
|
|
||||||
scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue