From 2b50367d7f424d7fd262ba76cd01a2aff87c2d52 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 27 Aug 2020 13:15:08 -0400 Subject: [PATCH] Put message persisters behind feature flags. --- .../textsecuregcm/WhisperServerService.java | 4 +- .../storage/MessagePersister.java | 66 +++++++++++-------- .../storage/RedisClusterMessagePersister.java | 28 ++++---- .../RedisClusterMessagePersisterTest.java | 8 ++- 4 files changed, 63 insertions(+), 43 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index f97b3dd7b..a73f6c642 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -332,8 +332,8 @@ public class WhisperServerService extends Application queuesToPersist = getQueuesToPersist(); - queueCountHistogram.update(queuesToPersist.size()); + if (!featureFlagsManager.isFeatureFlagActive(DISABLE_PERSISTENCE_FLAG)) { + try { + List queuesToPersist = getQueuesToPersist(); + queueCountHistogram.update(queuesToPersist.size()); - for (byte[] queue : queuesToPersist) { - Key key = Key.fromUserMessageQueue(queue); + for (byte[] queue : queuesToPersist) { + Key key = Key.fromUserMessageQueue(queue); - persistQueue(jedisPool, key); - notifyClients(accountsManager, pubSubManager, pushSender, key); + persistQueue(jedisPool, key); + notifyClients(accountsManager, pubSubManager, pushSender, key); + } + + if (queuesToPersist.isEmpty()) { + //noinspection BusyWait + Thread.sleep(10_000); + } + } catch (Throwable t) { + logger.error("Exception while persisting: ", t); } - - if (queuesToPersist.isEmpty()) { - //noinspection BusyWait + } else { + try { Thread.sleep(10_000); + } catch (final InterruptedException ignored) { } - } catch (Throwable t) { - logger.error("Exception while persisting: ", t); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java index c199b8923..270f837c0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersister.java @@ -30,6 +30,7 @@ public class RedisClusterMessagePersister implements Managed { private final PubSubManager pubSubManager; private final PushSender pushSender; private final AccountsManager accountsManager; + private final FeatureFlagsManager featureFlagsManager; private final Duration persistDelay; @@ -46,16 +47,18 @@ public class RedisClusterMessagePersister implements Managed { static final int QUEUE_BATCH_LIMIT = 100; static final int MESSAGE_BATCH_LIMIT = 100; + static final String ENABLE_PERSISTENCE_FLAG = "enable-cluster-persister"; + private static final Logger logger = LoggerFactory.getLogger(RedisClusterMessagePersister.class); - public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final Messages messagesDatabase, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final Duration persistDelay) { - this.messagesCache = messagesCache; - this.messagesDatabase = messagesDatabase; - this.pubSubManager = pubSubManager; - this.pushSender = pushSender; - this.accountsManager = accountsManager; - - this.persistDelay = persistDelay; + public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final Messages messagesDatabase, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final FeatureFlagsManager featureFlagsManager, final Duration persistDelay) { + this.messagesCache = messagesCache; + this.messagesDatabase = messagesDatabase; + this.pubSubManager = pubSubManager; + this.pushSender = pushSender; + this.accountsManager = accountsManager; + this.featureFlagsManager = featureFlagsManager; + this.persistDelay = persistDelay; } @VisibleForTesting @@ -69,7 +72,10 @@ public class RedisClusterMessagePersister implements Managed { workerThread = new Thread(() -> { while (running) { - persistNextQueues(Instant.now()); + if (featureFlagsManager.isFeatureFlagActive(ENABLE_PERSISTENCE_FLAG)) { + persistNextQueues(Instant.now()); + } + Util.sleep(100); } }); @@ -130,7 +136,7 @@ public class RedisClusterMessagePersister implements Managed { messagesCache.lockQueueForPersistence(queue); try { - /* int messageCount = 0; + int messageCount = 0; List messages; do { @@ -146,7 +152,7 @@ public class RedisClusterMessagePersister implements Managed { } } while (!messages.isEmpty()); - queueSizeHistogram.update(messageCount); */ + queueSizeHistogram.update(messageCount); } finally { messagesCache.unlockQueueForPersistence(queue); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java index 4ae56d558..25bd30ac3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisClusterMessagePersisterTest.java @@ -4,7 +4,6 @@ import com.google.protobuf.ByteString; import io.lettuce.core.cluster.SlotHash; import org.apache.commons.lang3.RandomStringUtils; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.push.PushSender; @@ -29,7 +28,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@Ignore public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { private ExecutorService notificationExecutorService; @@ -50,6 +48,9 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { public void setUp() throws Exception { super.setUp(); + final FeatureFlagsManager featureFlagsManager = mock(FeatureFlagsManager.class); + when(featureFlagsManager.isFeatureFlagActive(RedisClusterMessagePersister.ENABLE_PERSISTENCE_FLAG)).thenReturn(true); + messagesDatabase = mock(Messages.class); accountsManager = mock(AccountsManager.class); pubSubManager = mock(PubSubManager.class); @@ -61,7 +62,7 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { notificationExecutorService = Executors.newSingleThreadExecutor(); messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService); - messagePersister = new RedisClusterMessagePersister(messagesCache, messagesDatabase, pubSubManager, mock(PushSender.class), accountsManager, PERSIST_DELAY); + messagePersister = new RedisClusterMessagePersister(messagesCache, messagesDatabase, pubSubManager, mock(PushSender.class), accountsManager, featureFlagsManager, PERSIST_DELAY); } @Override @@ -136,6 +137,7 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest { verify(messagesDatabase, times(queueCount * messagesPerQueue)).store(any(UUID.class), any(MessageProtos.Envelope.class), anyString(), anyLong()); } + @SuppressWarnings("SameParameterValue") private static String generateRandomQueueNameForSlot(final int slot) { final UUID uuid = UUID.randomUUID();