Put message persisters behind feature flags.

This commit is contained in:
Jon Chambers 2020-08-27 13:15:08 -04:00 committed by Jon Chambers
parent cd4b85b0b5
commit 2b50367d7f
4 changed files with 63 additions and 43 deletions

View File

@ -332,8 +332,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
MessagePersister messagePersister = new MessagePersister(messagesClient, messagesManager, pubSubManager, pushSender, accountsManager,config.getMessageCacheConfiguration().getPersistDelayMinutes(), TimeUnit.MINUTES);
RedisClusterMessagePersister clusterMessagePersister = new RedisClusterMessagePersister(clusterMessagesCache, messages, pubSubManager, pushSender, accountsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
MessagePersister messagePersister = new MessagePersister(messagesClient, messagesManager, pubSubManager, pushSender, accountsManager, featureFlagsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes(), TimeUnit.MINUTES);
RedisClusterMessagePersister clusterMessagePersister = new RedisClusterMessagePersister(clusterMessagesCache, messages, pubSubManager, pushSender, accountsManager, featureFlagsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());

View File

@ -49,30 +49,35 @@ public class MessagePersister implements Managed, Runnable {
private final long delayTime;
private final TimeUnit delayTimeUnit;
private final MessagesManager messagesManager;
private final PubSubManager pubSubManager;
private final PushSender pushSender;
private final AccountsManager accountsManager;
private final MessagesManager messagesManager;
private final PubSubManager pubSubManager;
private final PushSender pushSender;
private final AccountsManager accountsManager;
private final FeatureFlagsManager featureFlagsManager;
private final LuaScript getQueuesScript;
private boolean finished = false;
private static final String DISABLE_PERSISTENCE_FLAG = "disable-singleton-persister";
public MessagePersister(final ReplicatedJedisPool jedisPool,
final MessagesManager messagesManager,
final PubSubManager pubSubManager,
final PushSender pushSender,
final AccountsManager accountsManager,
final long delayTime,
final TimeUnit delayTimeUnit)
final MessagesManager messagesManager,
final PubSubManager pubSubManager,
final PushSender pushSender,
final AccountsManager accountsManager,
final FeatureFlagsManager featureFlagsManager,
final long delayTime,
final TimeUnit delayTimeUnit)
throws IOException
{
this.jedisPool = jedisPool;
this.jedisPool = jedisPool;
this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager;
this.pushSender = pushSender;
this.accountsManager = accountsManager;
this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager;
this.pushSender = pushSender;
this.accountsManager = accountsManager;
this.featureFlagsManager = featureFlagsManager;
this.delayTime = delayTime;
this.delayTimeUnit = delayTimeUnit;
@ -87,23 +92,30 @@ public class MessagePersister implements Managed, Runnable {
@Override
public void run() {
while (running.get()) {
try {
List<byte[]> queuesToPersist = getQueuesToPersist();
queueCountHistogram.update(queuesToPersist.size());
if (!featureFlagsManager.isFeatureFlagActive(DISABLE_PERSISTENCE_FLAG)) {
try {
List<byte[]> queuesToPersist = getQueuesToPersist();
queueCountHistogram.update(queuesToPersist.size());
for (byte[] queue : queuesToPersist) {
Key key = Key.fromUserMessageQueue(queue);
for (byte[] queue : queuesToPersist) {
Key key = Key.fromUserMessageQueue(queue);
persistQueue(jedisPool, key);
notifyClients(accountsManager, pubSubManager, pushSender, key);
persistQueue(jedisPool, key);
notifyClients(accountsManager, pubSubManager, pushSender, key);
}
if (queuesToPersist.isEmpty()) {
//noinspection BusyWait
Thread.sleep(10_000);
}
} catch (Throwable t) {
logger.error("Exception while persisting: ", t);
}
if (queuesToPersist.isEmpty()) {
//noinspection BusyWait
} else {
try {
Thread.sleep(10_000);
} catch (final InterruptedException ignored) {
}
} catch (Throwable t) {
logger.error("Exception while persisting: ", t);
}
}

View File

@ -30,6 +30,7 @@ public class RedisClusterMessagePersister implements Managed {
private final PubSubManager pubSubManager;
private final PushSender pushSender;
private final AccountsManager accountsManager;
private final FeatureFlagsManager featureFlagsManager;
private final Duration persistDelay;
@ -46,16 +47,18 @@ public class RedisClusterMessagePersister implements Managed {
static final int QUEUE_BATCH_LIMIT = 100;
static final int MESSAGE_BATCH_LIMIT = 100;
static final String ENABLE_PERSISTENCE_FLAG = "enable-cluster-persister";
private static final Logger logger = LoggerFactory.getLogger(RedisClusterMessagePersister.class);
public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final Messages messagesDatabase, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final Duration persistDelay) {
this.messagesCache = messagesCache;
this.messagesDatabase = messagesDatabase;
this.pubSubManager = pubSubManager;
this.pushSender = pushSender;
this.accountsManager = accountsManager;
this.persistDelay = persistDelay;
public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final Messages messagesDatabase, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final FeatureFlagsManager featureFlagsManager, final Duration persistDelay) {
this.messagesCache = messagesCache;
this.messagesDatabase = messagesDatabase;
this.pubSubManager = pubSubManager;
this.pushSender = pushSender;
this.accountsManager = accountsManager;
this.featureFlagsManager = featureFlagsManager;
this.persistDelay = persistDelay;
}
@VisibleForTesting
@ -69,7 +72,10 @@ public class RedisClusterMessagePersister implements Managed {
workerThread = new Thread(() -> {
while (running) {
persistNextQueues(Instant.now());
if (featureFlagsManager.isFeatureFlagActive(ENABLE_PERSISTENCE_FLAG)) {
persistNextQueues(Instant.now());
}
Util.sleep(100);
}
});
@ -130,7 +136,7 @@ public class RedisClusterMessagePersister implements Managed {
messagesCache.lockQueueForPersistence(queue);
try {
/* int messageCount = 0;
int messageCount = 0;
List<MessageProtos.Envelope> messages;
do {
@ -146,7 +152,7 @@ public class RedisClusterMessagePersister implements Managed {
}
} while (!messages.isEmpty());
queueSizeHistogram.update(messageCount); */
queueSizeHistogram.update(messageCount);
} finally {
messagesCache.unlockQueueForPersistence(queue);
}

View File

@ -4,7 +4,6 @@ import com.google.protobuf.ByteString;
import io.lettuce.core.cluster.SlotHash;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.push.PushSender;
@ -29,7 +28,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@Ignore
public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
private ExecutorService notificationExecutorService;
@ -50,6 +48,9 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
public void setUp() throws Exception {
super.setUp();
final FeatureFlagsManager featureFlagsManager = mock(FeatureFlagsManager.class);
when(featureFlagsManager.isFeatureFlagActive(RedisClusterMessagePersister.ENABLE_PERSISTENCE_FLAG)).thenReturn(true);
messagesDatabase = mock(Messages.class);
accountsManager = mock(AccountsManager.class);
pubSubManager = mock(PubSubManager.class);
@ -61,7 +62,7 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
notificationExecutorService = Executors.newSingleThreadExecutor();
messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService);
messagePersister = new RedisClusterMessagePersister(messagesCache, messagesDatabase, pubSubManager, mock(PushSender.class), accountsManager, PERSIST_DELAY);
messagePersister = new RedisClusterMessagePersister(messagesCache, messagesDatabase, pubSubManager, mock(PushSender.class), accountsManager, featureFlagsManager, PERSIST_DELAY);
}
@Override
@ -136,6 +137,7 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
verify(messagesDatabase, times(queueCount * messagesPerQueue)).store(any(UUID.class), any(MessageProtos.Envelope.class), anyString(), anyLong());
}
@SuppressWarnings("SameParameterValue")
private static String generateRandomQueueNameForSlot(final int slot) {
final UUID uuid = UUID.randomUUID();