Allow message persisters to be disabled by a feature flag.
This commit is contained in:
parent
ba1e100b42
commit
0a23ce870a
|
@ -344,7 +344,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
|
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
|
||||||
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
||||||
|
|
||||||
MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
|
MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, featureFlagsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
|
||||||
|
|
||||||
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = new ArrayList<>();
|
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = new ArrayList<>();
|
||||||
accountDatabaseCrawlerListeners.add(new PushFeedbackProcessor(accountsManager, directoryQueue));
|
accountDatabaseCrawlerListeners.add(new PushFeedbackProcessor(accountsManager, directoryQueue));
|
||||||
|
|
|
@ -31,6 +31,7 @@ public class MessagePersister implements Managed {
|
||||||
private final MessagesCache messagesCache;
|
private final MessagesCache messagesCache;
|
||||||
private final MessagesManager messagesManager;
|
private final MessagesManager messagesManager;
|
||||||
private final AccountsManager accountsManager;
|
private final AccountsManager accountsManager;
|
||||||
|
private final FeatureFlagsManager featureFlagsManager;
|
||||||
|
|
||||||
private final Duration persistDelay;
|
private final Duration persistDelay;
|
||||||
|
|
||||||
|
@ -52,15 +53,19 @@ public class MessagePersister implements Managed {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
||||||
|
|
||||||
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final Duration persistDelay) {
|
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final FeatureFlagsManager featureFlagsManager, final Duration persistDelay) {
|
||||||
this.messagesCache = messagesCache;
|
this.messagesCache = messagesCache;
|
||||||
this.messagesManager = messagesManager;
|
this.messagesManager = messagesManager;
|
||||||
this.accountsManager = accountsManager;
|
this.accountsManager = accountsManager;
|
||||||
|
this.featureFlagsManager = featureFlagsManager;
|
||||||
this.persistDelay = persistDelay;
|
this.persistDelay = persistDelay;
|
||||||
|
|
||||||
for (int i = 0; i < workerThreads.length; i++) {
|
for (int i = 0; i < workerThreads.length; i++) {
|
||||||
workerThreads[i] = new Thread(() -> {
|
workerThreads[i] = new Thread(() -> {
|
||||||
while (running) {
|
while (running) {
|
||||||
|
if (featureFlagsManager.isFeatureFlagActive("DISABLE_MESSAGE_PERSISTER")) {
|
||||||
|
Util.sleep(1000);
|
||||||
|
} else {
|
||||||
try {
|
try {
|
||||||
final int queuesPersisted = persistNextQueues(Instant.now());
|
final int queuesPersisted = persistNextQueues(Instant.now());
|
||||||
queueCountHistogram.update(queuesPersisted);
|
queueCountHistogram.update(queuesPersisted);
|
||||||
|
@ -72,6 +77,7 @@ public class MessagePersister implements Managed {
|
||||||
logger.warn("Failed to persist queues", t);
|
logger.warn("Failed to persist queues", t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}, "MessagePersisterWorker-" + i);
|
}, "MessagePersisterWorker-" + i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest {
|
||||||
notificationExecutorService = Executors.newSingleThreadExecutor();
|
notificationExecutorService = Executors.newSingleThreadExecutor();
|
||||||
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService);
|
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService);
|
||||||
messagesManager = new MessagesManager(messages, messagesCache, mock(PushLatencyManager.class));
|
messagesManager = new MessagesManager(messages, messagesCache, mock(PushLatencyManager.class));
|
||||||
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, PERSIST_DELAY);
|
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, mock(FeatureFlagsManager.class), PERSIST_DELAY);
|
||||||
|
|
||||||
account = mock(Account.class);
|
account = mock(Account.class);
|
||||||
|
|
||||||
|
|
|
@ -68,7 +68,7 @@ public class MessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
notificationExecutorService = Executors.newSingleThreadExecutor();
|
notificationExecutorService = Executors.newSingleThreadExecutor();
|
||||||
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService);
|
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService);
|
||||||
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, PERSIST_DELAY);
|
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, mock(FeatureFlagsManager.class), PERSIST_DELAY);
|
||||||
|
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
final String destination = invocation.getArgument(0, String.class);
|
final String destination = invocation.getArgument(0, String.class);
|
||||||
|
|
Loading…
Reference in New Issue