Use a single cluster instance in MessagesCache

This commit is contained in:
Chris Eager 2024-04-12 13:31:17 -05:00 committed by Chris Eager
parent b734d58ab7
commit a302275187
8 changed files with 37 additions and 47 deletions

View File

@ -566,8 +566,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, recurringJobExecutor,
keyspaceNotificationDispatchExecutor);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster,
keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionAsyncExecutor, clock);
MessagesCache messagesCache = new MessagesCache(messagesCluster, keyspaceNotificationDispatchExecutor,
messageDeliveryScheduler, messageDeletionAsyncExecutor, clock);
ClientReleaseManager clientReleaseManager = new ClientReleaseManager(clientReleases,
recurringJobExecutor,
config.getClientReleaseConfiguration().refreshInterval(),

View File

@ -60,7 +60,7 @@ import reactor.core.scheduler.Schedulers;
public class MessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed {
private final FaultTolerantRedisCluster readDeleteCluster;
private final FaultTolerantRedisCluster redisCluster;
private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
private final Clock clock;
@ -110,12 +110,12 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class);
public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster,
final ExecutorService notificationExecutorService, final Scheduler messageDeliveryScheduler,
final ExecutorService messageDeletionExecutorService, final Clock clock) throws IOException {
public MessagesCache(final FaultTolerantRedisCluster redisCluster, final ExecutorService notificationExecutorService,
final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock)
throws IOException {
this.readDeleteCluster = readDeleteCluster;
this.pubSubConnection = readDeleteCluster.createPubSubConnection();
this.redisCluster = redisCluster;
this.pubSubConnection = redisCluster.createPubSubConnection();
this.clock = clock;
this.notificationExecutorService = notificationExecutorService;
@ -123,13 +123,13 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
this.messageDeletionExecutorService = messageDeletionExecutorService;
this.messageDeletionScheduler = Schedulers.fromExecutorService(messageDeletionExecutorService, "messageDeletion");
this.insertScript = ClusterLuaScript.fromResource(insertCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER);
this.removeByGuidScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_item_by_guid.lua",
this.insertScript = ClusterLuaScript.fromResource(redisCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER);
this.removeByGuidScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_guid.lua",
ScriptOutputType.MULTI);
this.getItemsScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_items.lua", ScriptOutputType.MULTI);
this.removeQueueScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/remove_queue.lua",
this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.MULTI);
this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua",
ScriptOutputType.STATUS);
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(readDeleteCluster, "lua/get_queues_to_persist.lua",
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua",
ScriptOutputType.MULTI);
}
@ -209,7 +209,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
}
public boolean hasMessages(final UUID destinationUuid, final byte destinationDevice) {
return readDeleteCluster.withBinaryCluster(
return redisCluster.withBinaryCluster(
connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0);
}
@ -324,7 +324,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
List<MessageProtos.Envelope> getMessagesToPersist(final UUID accountUuid, final byte destinationDevice,
final int limit) {
return getMessagesTimer.record(() -> {
final List<ScoredValue<byte[]>> scoredMessages = readDeleteCluster.withBinaryCluster(
final List<ScoredValue<byte[]>> scoredMessages = redisCluster.withBinaryCluster(
connection -> connection.sync()
.zrangeWithScores(getMessageQueueKey(accountUuid, destinationDevice), 0, limit));
final List<MessageProtos.Envelope> envelopes = new ArrayList<>(scoredMessages.size());
@ -360,7 +360,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
}
int getNextSlotToPersist() {
return (int) (readDeleteCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY))
return (int) (redisCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY))
% SlotHash.SLOT_COUNT);
}
@ -373,23 +373,23 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
}
void addQueueToPersist(final UUID accountUuid, final byte deviceId) {
readDeleteCluster.useBinaryCluster(connection -> connection.sync()
redisCluster.useBinaryCluster(connection -> connection.sync()
.zadd(getQueueIndexKey(accountUuid, deviceId), ZAddArgs.Builder.nx(), System.currentTimeMillis(),
getMessageQueueKey(accountUuid, deviceId)));
}
void lockQueueForPersistence(final UUID accountUuid, final byte deviceId) {
readDeleteCluster.useBinaryCluster(
redisCluster.useBinaryCluster(
connection -> connection.sync().setex(getPersistInProgressKey(accountUuid, deviceId), 30, LOCK_VALUE));
}
void unlockQueueForPersistence(final UUID accountUuid, final byte deviceId) {
readDeleteCluster.useBinaryCluster(
redisCluster.useBinaryCluster(
connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId)));
}
boolean lockAccountForMessagePersisterCleanup(final UUID accountUuid) {
return readDeleteCluster.withBinaryCluster(
return redisCluster.withBinaryCluster(
connection -> "OK".equals(
connection.sync().set(
getUnlinkInProgressKey(accountUuid),
@ -398,7 +398,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
}
void unlockAccountForMessagePersisterCleanup(final UUID accountUuid) {
readDeleteCluster.useBinaryCluster(
redisCluster.useBinaryCluster(
connection -> connection.sync().del(getUnlinkInProgressKey(accountUuid)));
}

View File

@ -173,10 +173,7 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
configuration.getDynamoDbTables().getMessages().getTableName(),
configuration.getDynamoDbTables().getMessages().getExpiration(),
messageDeletionExecutor);
FaultTolerantRedisCluster messageInsertCacheCluster = new FaultTolerantRedisCluster("message_insert_cluster",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster messageReadDeleteCluster = new FaultTolerantRedisCluster(
"message_read_delete_cluster",
FaultTolerantRedisCluster messagesCluster = new FaultTolerantRedisCluster("messages",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster clientPresenceCluster = new FaultTolerantRedisCluster("client_presence",
configuration.getClientPresenceClusterConfiguration(), redisClientResourcesBuilder);
@ -189,8 +186,8 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
storageServiceExecutor, storageServiceRetryExecutor, configuration.getSecureStorageServiceConfiguration());
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster,
Executors.newSingleThreadScheduledExecutor(), keyspaceNotificationDispatchExecutor);
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster,
keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC());
MessagesCache messagesCache = new MessagesCache(messagesCluster, keyspaceNotificationDispatchExecutor,
messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC());
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient,
configuration.getDynamoDbTables().getReportMessage().getTableName(),

View File

@ -163,10 +163,7 @@ record CommandDependencies(
configuration.getDynamoDbTables().getMessages().getTableName(),
configuration.getDynamoDbTables().getMessages().getExpiration(),
messageDeletionExecutor);
FaultTolerantRedisCluster messageInsertCacheCluster = new FaultTolerantRedisCluster("message_insert_cluster",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster messageReadDeleteCluster = new FaultTolerantRedisCluster(
"message_read_delete_cluster",
FaultTolerantRedisCluster messagesCluster = new FaultTolerantRedisCluster("messages",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster clientPresenceCluster = new FaultTolerantRedisCluster("client_presence",
configuration.getClientPresenceClusterConfiguration(), redisClientResourcesBuilder);
@ -180,8 +177,8 @@ record CommandDependencies(
storageServiceExecutor, storageServiceRetryExecutor, configuration.getSecureStorageServiceConfiguration());
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster,
recurringJobExecutor, keyspaceNotificationDispatchExecutor);
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster,
keyspaceNotificationDispatchExecutor, messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC());
MessagesCache messagesCache = new MessagesCache(messagesCluster, keyspaceNotificationDispatchExecutor,
messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC());
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient,
configuration.getDynamoDbTables().getReportMessage().getTableName(),

View File

@ -34,7 +34,6 @@ import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfigurati
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
import org.whispersystems.textsecuregcm.storage.KeysManager;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ -79,8 +78,7 @@ class MessagePersisterIntegrationTest {
final AccountsManager accountsManager = mock(AccountsManager.class);
notificationExecutorService = Executors.newSingleThreadExecutor();
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), notificationExecutorService,
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), notificationExecutorService,
messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC());
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class),
messageDeletionExecutorService);

View File

@ -98,9 +98,8 @@ class MessagePersisterTest {
sharedExecutorService = Executors.newSingleThreadExecutor();
resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler,
sharedExecutorService, Clock.systemUTC());
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService,
messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC());
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, clientPresenceManager,
keysManager, dynamicConfigurationManager, PERSIST_DELAY, 1, MoreExecutors.newDirectExecutorService());

View File

@ -98,8 +98,8 @@ class MessagesCacheTest {
sharedExecutorService = Executors.newSingleThreadExecutor();
resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC());
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService,
messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC());
messagesCache.start();
}
@ -272,7 +272,7 @@ class MessagesCacheTest {
}
final MessagesCache messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, cacheClock);
sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, cacheClock);
final List<MessageProtos.Envelope> actualMessages = Flux.from(
messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID))
@ -561,8 +561,8 @@ class MessagesCacheTest {
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(mockCluster, mockCluster, mock(ExecutorService.class),
messageDeliveryScheduler, Executors.newSingleThreadExecutor(), Clock.systemUTC());
messagesCache = new MessagesCache(mockCluster, mock(ExecutorService.class), messageDeliveryScheduler,
Executors.newSingleThreadExecutor(), Clock.systemUTC());
}
@AfterEach

View File

@ -57,7 +57,6 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import reactor.core.scheduler.Scheduler;
@ -90,8 +89,8 @@ class WebSocketConnectionIntegrationTest {
sharedExecutorService = Executors.newSingleThreadExecutor();
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC());
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService,
messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC());
messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(),
DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.MESSAGES.tableName(), Duration.ofDays(7),
sharedExecutorService);