Rename RedisClusterMessagesCache and related classes to just MessagesCache.
This commit is contained in:
parent
18ecd748dd
commit
8356264fe0
|
@ -135,8 +135,8 @@ import org.whispersystems.textsecuregcm.storage.Profiles;
|
||||||
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
|
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.PushFeedbackProcessor;
|
import org.whispersystems.textsecuregcm.storage.PushFeedbackProcessor;
|
||||||
import org.whispersystems.textsecuregcm.storage.RedisClusterMessagePersister;
|
import org.whispersystems.textsecuregcm.storage.MessagePersister;
|
||||||
import org.whispersystems.textsecuregcm.storage.RedisClusterMessagesCache;
|
import org.whispersystems.textsecuregcm.storage.MessagesCache;
|
||||||
import org.whispersystems.textsecuregcm.storage.RegistrationLockVersionCounter;
|
import org.whispersystems.textsecuregcm.storage.RegistrationLockVersionCounter;
|
||||||
import org.whispersystems.textsecuregcm.storage.RemoteConfigs;
|
import org.whispersystems.textsecuregcm.storage.RemoteConfigs;
|
||||||
import org.whispersystems.textsecuregcm.storage.RemoteConfigsManager;
|
import org.whispersystems.textsecuregcm.storage.RemoteConfigsManager;
|
||||||
|
@ -290,9 +290,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster);
|
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster);
|
||||||
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
||||||
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
||||||
RedisClusterMessagesCache clusterMessagesCache = new RedisClusterMessagesCache(messagesCacheCluster, keyspaceNotificationDispatchExecutor);
|
MessagesCache messagesCache = new MessagesCache(messagesCacheCluster, keyspaceNotificationDispatchExecutor);
|
||||||
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
||||||
MessagesManager messagesManager = new MessagesManager(messages, clusterMessagesCache, pushLatencyManager);
|
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, pushLatencyManager);
|
||||||
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
||||||
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, recurringJobExecutor);
|
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, recurringJobExecutor);
|
||||||
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
||||||
|
@ -321,7 +321,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
|
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
|
||||||
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
||||||
|
|
||||||
RedisClusterMessagePersister clusterMessagePersister = new RedisClusterMessagePersister(clusterMessagesCache, messagesManager, pubSubManager, pushSender, accountsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
|
MessagePersister clusterMessagePersister = new MessagePersister(messagesCache, messagesManager, pubSubManager, pushSender, accountsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
|
||||||
|
|
||||||
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
|
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
|
||||||
|
|
||||||
|
@ -341,7 +341,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
environment.lifecycle().manage(pushSender);
|
environment.lifecycle().manage(pushSender);
|
||||||
environment.lifecycle().manage(accountDatabaseCrawler);
|
environment.lifecycle().manage(accountDatabaseCrawler);
|
||||||
environment.lifecycle().manage(remoteConfigsManager);
|
environment.lifecycle().manage(remoteConfigsManager);
|
||||||
environment.lifecycle().manage(clusterMessagesCache);
|
environment.lifecycle().manage(messagesCache);
|
||||||
environment.lifecycle().manage(clusterMessagePersister);
|
environment.lifecycle().manage(clusterMessagePersister);
|
||||||
environment.lifecycle().manage(clientPresenceManager);
|
environment.lifecycle().manage(clientPresenceManager);
|
||||||
environment.lifecycle().manage(featureFlagsManager);
|
environment.lifecycle().manage(featureFlagsManager);
|
||||||
|
|
|
@ -23,13 +23,13 @@ import java.util.UUID;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
|
|
||||||
public class RedisClusterMessagePersister implements Managed {
|
public class MessagePersister implements Managed {
|
||||||
|
|
||||||
private final RedisClusterMessagesCache messagesCache;
|
private final MessagesCache messagesCache;
|
||||||
private final MessagesManager messagesManager;
|
private final MessagesManager messagesManager;
|
||||||
private final PubSubManager pubSubManager;
|
private final PubSubManager pubSubManager;
|
||||||
private final PushSender pushSender;
|
private final PushSender pushSender;
|
||||||
private final AccountsManager accountsManager;
|
private final AccountsManager accountsManager;
|
||||||
|
|
||||||
private final Duration persistDelay;
|
private final Duration persistDelay;
|
||||||
|
|
||||||
|
@ -37,20 +37,20 @@ public class RedisClusterMessagePersister implements Managed {
|
||||||
private Thread workerThread;
|
private Thread workerThread;
|
||||||
|
|
||||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||||
private final Timer getQueuesTimer = metricRegistry.timer(name(RedisClusterMessagePersister.class, "getQueues"));
|
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
|
||||||
private final Timer persistQueueTimer = metricRegistry.timer(name(RedisClusterMessagePersister.class, "persistQueue"));
|
private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue"));
|
||||||
private final Timer notifySubscribersTimer = metricRegistry.timer(name(RedisClusterMessagePersister.class, "notifySubscribers"));
|
private final Timer notifySubscribersTimer = metricRegistry.timer(name(MessagePersister.class, "notifySubscribers"));
|
||||||
private final Histogram queueCountHistogram = metricRegistry.histogram(name(RedisClusterMessagePersister.class, "queueCount"));
|
private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount"));
|
||||||
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(RedisClusterMessagePersister.class, "queueSize"));
|
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize"));
|
||||||
|
|
||||||
static final int QUEUE_BATCH_LIMIT = 100;
|
static final int QUEUE_BATCH_LIMIT = 100;
|
||||||
static final int MESSAGE_BATCH_LIMIT = 100;
|
static final int MESSAGE_BATCH_LIMIT = 100;
|
||||||
|
|
||||||
static final String ENABLE_PERSISTENCE_FLAG = "enable-cluster-persister";
|
static final String ENABLE_PERSISTENCE_FLAG = "enable-cluster-persister";
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RedisClusterMessagePersister.class);
|
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
|
||||||
|
|
||||||
public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final MessagesManager messagesManager, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final Duration persistDelay) {
|
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final Duration persistDelay) {
|
||||||
this.messagesCache = messagesCache;
|
this.messagesCache = messagesCache;
|
||||||
this.messagesManager = messagesManager;
|
this.messagesManager = messagesManager;
|
||||||
this.pubSubManager = pubSubManager;
|
this.pubSubManager = pubSubManager;
|
||||||
|
@ -102,7 +102,7 @@ public class RedisClusterMessagePersister implements Managed {
|
||||||
|
|
||||||
for (final String queue : queuesToPersist) {
|
for (final String queue : queuesToPersist) {
|
||||||
persistQueue(queue);
|
persistQueue(queue);
|
||||||
notifyClients(RedisClusterMessagesCache.getAccountUuidFromQueueName(queue), RedisClusterMessagesCache.getDeviceIdFromQueueName(queue));
|
notifyClients(MessagesCache.getAccountUuidFromQueueName(queue), MessagesCache.getDeviceIdFromQueueName(queue));
|
||||||
}
|
}
|
||||||
|
|
||||||
queuesPersisted += queuesToPersist.size();
|
queuesPersisted += queuesToPersist.size();
|
||||||
|
@ -113,8 +113,8 @@ public class RedisClusterMessagePersister implements Managed {
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void persistQueue(final String queue) {
|
void persistQueue(final String queue) {
|
||||||
final UUID accountUuid = RedisClusterMessagesCache.getAccountUuidFromQueueName(queue);
|
final UUID accountUuid = MessagesCache.getAccountUuidFromQueueName(queue);
|
||||||
final long deviceId = RedisClusterMessagesCache.getDeviceIdFromQueueName(queue);
|
final long deviceId = MessagesCache.getDeviceIdFromQueueName(queue);
|
||||||
|
|
||||||
final Optional<Account> maybeAccount = accountsManager.get(accountUuid);
|
final Optional<Account> maybeAccount = accountsManager.get(accountUuid);
|
||||||
|
|
|
@ -37,7 +37,7 @@ import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
|
|
||||||
public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed {
|
public class MessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed {
|
||||||
|
|
||||||
private final FaultTolerantRedisCluster redisCluster;
|
private final FaultTolerantRedisCluster redisCluster;
|
||||||
private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
|
private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
|
||||||
|
@ -55,12 +55,12 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
|
||||||
private final Map<String, MessageAvailabilityListener> messageListenersByQueueName = new HashMap<>();
|
private final Map<String, MessageAvailabilityListener> messageListenersByQueueName = new HashMap<>();
|
||||||
private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>();
|
private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>();
|
||||||
|
|
||||||
private final Timer insertTimer = Metrics.timer(name(RedisClusterMessagesCache.class, "insert"));
|
private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert"));
|
||||||
private final Timer getMessagesTimer = Metrics.timer(name(RedisClusterMessagesCache.class, "get"));
|
private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get"));
|
||||||
private final Timer clearQueueTimer = Metrics.timer(name(RedisClusterMessagesCache.class, "clear"));
|
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
|
||||||
private final Counter pubSubMessageCounter = Metrics.counter(name(RedisClusterMessagesCache.class, "pubSubMessage"));
|
private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage"));
|
||||||
private final Counter newMessageNotificationCounter = Metrics.counter(name(RedisClusterMessagesCache.class, "newMessageNotification"));
|
private final Counter newMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification"));
|
||||||
private final Counter queuePersistedNotificationCounter = Metrics.counter(name(RedisClusterMessagesCache.class, "queuePersisted"));
|
private final Counter queuePersistedNotificationCounter = Metrics.counter(name(MessagesCache.class, "queuePersisted"));
|
||||||
|
|
||||||
static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot";
|
static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot";
|
||||||
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);
|
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);
|
||||||
|
@ -68,16 +68,16 @@ public class RedisClusterMessagesCache extends RedisClusterPubSubAdapter<String,
|
||||||
private static final String QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue::";
|
private static final String QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue::";
|
||||||
private static final String PERSISTING_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_persisting::";
|
private static final String PERSISTING_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_persisting::";
|
||||||
|
|
||||||
private static final String REMOVE_TIMER_NAME = name(RedisClusterMessagesCache.class, "remove");
|
private static final String REMOVE_TIMER_NAME = name(MessagesCache.class, "remove");
|
||||||
|
|
||||||
private static final String REMOVE_METHOD_TAG = "method";
|
private static final String REMOVE_METHOD_TAG = "method";
|
||||||
private static final String REMOVE_METHOD_ID = "id";
|
private static final String REMOVE_METHOD_ID = "id";
|
||||||
private static final String REMOVE_METHOD_SENDER = "sender";
|
private static final String REMOVE_METHOD_SENDER = "sender";
|
||||||
private static final String REMOVE_METHOD_UUID = "uuid";
|
private static final String REMOVE_METHOD_UUID = "uuid";
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RedisClusterMessagesCache.class);
|
private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class);
|
||||||
|
|
||||||
public RedisClusterMessagesCache(final FaultTolerantRedisCluster redisCluster, final ExecutorService notificationExecutorService) throws IOException {
|
public MessagesCache(final FaultTolerantRedisCluster redisCluster, final ExecutorService notificationExecutorService) throws IOException {
|
||||||
|
|
||||||
this.redisCluster = redisCluster;
|
this.redisCluster = redisCluster;
|
||||||
this.pubSubConnection = redisCluster.createPubSubConnection();
|
this.pubSubConnection = redisCluster.createPubSubConnection();
|
|
@ -27,18 +27,18 @@ public class MessagesManager {
|
||||||
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" ));
|
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" ));
|
||||||
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
|
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
|
||||||
|
|
||||||
private final Messages messages;
|
private final Messages messages;
|
||||||
private final RedisClusterMessagesCache clusterMessagesCache;
|
private final MessagesCache messagesCache;
|
||||||
private final PushLatencyManager pushLatencyManager;
|
private final PushLatencyManager pushLatencyManager;
|
||||||
|
|
||||||
public MessagesManager(Messages messages, RedisClusterMessagesCache clusterMessagesCache, PushLatencyManager pushLatencyManager) {
|
public MessagesManager(Messages messages, MessagesCache messagesCache, PushLatencyManager pushLatencyManager) {
|
||||||
this.messages = messages;
|
this.messages = messages;
|
||||||
this.clusterMessagesCache = clusterMessagesCache;
|
this.messagesCache = messagesCache;
|
||||||
this.pushLatencyManager = pushLatencyManager;
|
this.pushLatencyManager = pushLatencyManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) {
|
public void insert(String destination, UUID destinationUuid, long destinationDevice, Envelope message) {
|
||||||
clusterMessagesCache.insert(UUID.randomUUID(), destination, destinationUuid, destinationDevice, message);
|
messagesCache.insert(UUID.randomUUID(), destination, destinationUuid, destinationDevice, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
||||||
|
@ -47,25 +47,25 @@ public class MessagesManager {
|
||||||
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
||||||
|
|
||||||
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
||||||
messages.addAll(clusterMessagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));
|
messages.addAll(messagesCache.get(destination, destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE);
|
return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear(String destination, UUID destinationUuid) {
|
public void clear(String destination, UUID destinationUuid) {
|
||||||
this.clusterMessagesCache.clear(destination, destinationUuid);
|
this.messagesCache.clear(destination, destinationUuid);
|
||||||
this.messages.clear(destination);
|
this.messages.clear(destination);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear(String destination, UUID destinationUuid, long deviceId) {
|
public void clear(String destination, UUID destinationUuid, long deviceId) {
|
||||||
this.clusterMessagesCache.clear(destination, destinationUuid, deviceId);
|
this.messagesCache.clear(destination, destinationUuid, deviceId);
|
||||||
this.messages.clear(destination, deviceId);
|
this.messages.clear(destination, deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp)
|
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp)
|
||||||
{
|
{
|
||||||
Optional<OutgoingMessageEntity> removed = clusterMessagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp);
|
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destination, destinationUuid, destinationDevice, source, timestamp);
|
||||||
|
|
||||||
if (!removed.isPresent()) {
|
if (!removed.isPresent()) {
|
||||||
removed = this.messages.remove(destination, destinationDevice, source, timestamp);
|
removed = this.messages.remove(destination, destinationDevice, source, timestamp);
|
||||||
|
@ -78,7 +78,7 @@ public class MessagesManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
|
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
|
||||||
Optional<OutgoingMessageEntity> removed = clusterMessagesCache.remove(destination, destinationUuid, deviceId, guid);
|
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destination, destinationUuid, deviceId, guid);
|
||||||
|
|
||||||
if (!removed.isPresent()) {
|
if (!removed.isPresent()) {
|
||||||
removed = this.messages.remove(destination, guid);
|
removed = this.messages.remove(destination, guid);
|
||||||
|
@ -92,7 +92,7 @@ public class MessagesManager {
|
||||||
|
|
||||||
public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) {
|
public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) {
|
||||||
if (cached) {
|
if (cached) {
|
||||||
clusterMessagesCache.remove(destination, destinationUuid, deviceId, id);
|
messagesCache.remove(destination, destinationUuid, deviceId, id);
|
||||||
cacheHitByIdMeter.mark();
|
cacheHitByIdMeter.mark();
|
||||||
} else {
|
} else {
|
||||||
this.messages.remove(destination, id);
|
this.messages.remove(destination, id);
|
||||||
|
@ -102,14 +102,14 @@ public class MessagesManager {
|
||||||
|
|
||||||
public void persistMessage(String destination, UUID destinationUuid, Envelope envelope, UUID messageGuid, long deviceId) {
|
public void persistMessage(String destination, UUID destinationUuid, Envelope envelope, UUID messageGuid, long deviceId) {
|
||||||
messages.store(messageGuid, envelope, destination, deviceId);
|
messages.store(messageGuid, envelope, destination, deviceId);
|
||||||
clusterMessagesCache.remove(destination, destinationUuid, deviceId, messageGuid);
|
messagesCache.remove(destination, destinationUuid, deviceId, messageGuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
||||||
clusterMessagesCache.addMessageAvailabilityListener(destinationUuid, deviceId, listener);
|
messagesCache.addMessageAvailabilityListener(destinationUuid, deviceId, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) {
|
public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) {
|
||||||
clusterMessagesCache.removeMessageAvailabilityListener(listener);
|
messagesCache.removeMessageAvailabilityListener(listener);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,13 +29,13 @@ import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
|
public class MessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
private ExecutorService notificationExecutorService;
|
private ExecutorService notificationExecutorService;
|
||||||
private RedisClusterMessagesCache messagesCache;
|
private MessagesCache messagesCache;
|
||||||
private Messages messagesDatabase;
|
private Messages messagesDatabase;
|
||||||
private PubSubManager pubSubManager;
|
private PubSubManager pubSubManager;
|
||||||
private RedisClusterMessagePersister messagePersister;
|
private MessagePersister messagePersister;
|
||||||
private AccountsManager accountsManager;
|
private AccountsManager accountsManager;
|
||||||
|
|
||||||
private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID();
|
private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID();
|
||||||
|
@ -51,7 +51,7 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
final MessagesManager messagesManager = mock(MessagesManager.class);
|
final MessagesManager messagesManager = mock(MessagesManager.class);
|
||||||
final FeatureFlagsManager featureFlagsManager = mock(FeatureFlagsManager.class);
|
final FeatureFlagsManager featureFlagsManager = mock(FeatureFlagsManager.class);
|
||||||
when(featureFlagsManager.isFeatureFlagActive(RedisClusterMessagePersister.ENABLE_PERSISTENCE_FLAG)).thenReturn(true);
|
when(featureFlagsManager.isFeatureFlagActive(MessagePersister.ENABLE_PERSISTENCE_FLAG)).thenReturn(true);
|
||||||
|
|
||||||
messagesDatabase = mock(Messages.class);
|
messagesDatabase = mock(Messages.class);
|
||||||
accountsManager = mock(AccountsManager.class);
|
accountsManager = mock(AccountsManager.class);
|
||||||
|
@ -63,8 +63,8 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
when(account.getNumber()).thenReturn(DESTINATION_ACCOUNT_NUMBER);
|
when(account.getNumber()).thenReturn(DESTINATION_ACCOUNT_NUMBER);
|
||||||
|
|
||||||
notificationExecutorService = Executors.newSingleThreadExecutor();
|
notificationExecutorService = Executors.newSingleThreadExecutor();
|
||||||
messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService);
|
messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService);
|
||||||
messagePersister = new RedisClusterMessagePersister(messagesCache, messagesManager, pubSubManager, mock(PushSender.class), accountsManager, PERSIST_DELAY);
|
messagePersister = new MessagePersister(messagesCache, messagesManager, pubSubManager, mock(PushSender.class), accountsManager, PERSIST_DELAY);
|
||||||
|
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
final String destination = invocation.getArgument(0, String.class);
|
final String destination = invocation.getArgument(0, String.class);
|
||||||
|
@ -97,8 +97,8 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPersistNextQueuesSingleQueue() {
|
public void testPersistNextQueuesSingleQueue() {
|
||||||
final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
|
final String queueName = new String(MessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
|
||||||
final int messageCount = (RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7;
|
final int messageCount = (MessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7;
|
||||||
final Instant now = Instant.now();
|
final Instant now = Instant.now();
|
||||||
|
|
||||||
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now);
|
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now);
|
||||||
|
@ -111,8 +111,8 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPersistNextQueuesSingleQueueTooSoon() {
|
public void testPersistNextQueuesSingleQueueTooSoon() {
|
||||||
final String queueName = new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
|
final String queueName = new String(MessagesCache.getMessageQueueKey(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8);
|
||||||
final int messageCount = (RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7;
|
final int messageCount = (MessagePersister.MESSAGE_BATCH_LIMIT * 3) + 7;
|
||||||
final Instant now = Instant.now();
|
final Instant now = Instant.now();
|
||||||
|
|
||||||
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now);
|
insertMessages(DESTINATION_ACCOUNT_UUID, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID, messageCount, now);
|
||||||
|
@ -126,14 +126,14 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
@Test
|
@Test
|
||||||
public void testPersistNextQueuesMultiplePages() {
|
public void testPersistNextQueuesMultiplePages() {
|
||||||
final int slot = 7;
|
final int slot = 7;
|
||||||
final int queueCount = (RedisClusterMessagePersister.QUEUE_BATCH_LIMIT * 3) + 7;
|
final int queueCount = (MessagePersister.QUEUE_BATCH_LIMIT * 3) + 7;
|
||||||
final int messagesPerQueue = 10;
|
final int messagesPerQueue = 10;
|
||||||
final Instant now = Instant.now();
|
final Instant now = Instant.now();
|
||||||
|
|
||||||
for (int i = 0; i < queueCount; i++) {
|
for (int i = 0; i < queueCount; i++) {
|
||||||
final String queueName = generateRandomQueueNameForSlot(slot);
|
final String queueName = generateRandomQueueNameForSlot(slot);
|
||||||
final UUID accountUuid = RedisClusterMessagesCache.getAccountUuidFromQueueName(queueName);
|
final UUID accountUuid = MessagesCache.getAccountUuidFromQueueName(queueName);
|
||||||
final long deviceId = RedisClusterMessagesCache.getDeviceIdFromQueueName(queueName);
|
final long deviceId = MessagesCache.getDeviceIdFromQueueName(queueName);
|
||||||
final String accountNumber = "+1" + RandomStringUtils.randomNumeric(10);
|
final String accountNumber = "+1" + RandomStringUtils.randomNumeric(10);
|
||||||
|
|
||||||
final Account account = mock(Account.class);
|
final Account account = mock(Account.class);
|
||||||
|
@ -186,6 +186,6 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setNextSlotToPersist(final int nextSlot) {
|
private void setNextSlotToPersist(final int nextSlot) {
|
||||||
getRedisCluster().useCluster(connection -> connection.sync().set(RedisClusterMessagesCache.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(nextSlot - 1)));
|
getRedisCluster().useCluster(connection -> connection.sync().set(MessagesCache.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(nextSlot - 1)));
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -29,10 +29,10 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
@RunWith(JUnitParamsRunner.class)
|
@RunWith(JUnitParamsRunner.class)
|
||||||
public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest {
|
public class MessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
private ExecutorService notificationExecutorService;
|
private ExecutorService notificationExecutorService;
|
||||||
private RedisClusterMessagesCache messagesCache;
|
private MessagesCache messagesCache;
|
||||||
|
|
||||||
private final Random random = new Random();
|
private final Random random = new Random();
|
||||||
private long serialTimestamp = 0;
|
private long serialTimestamp = 0;
|
||||||
|
@ -49,7 +49,7 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
getRedisCluster().useCluster(connection -> connection.sync().masters().commands().configSet("notify-keyspace-events", "K$gz"));
|
getRedisCluster().useCluster(connection -> connection.sync().masters().commands().configSet("notify-keyspace-events", "K$gz"));
|
||||||
|
|
||||||
notificationExecutorService = Executors.newSingleThreadExecutor();
|
notificationExecutorService = Executors.newSingleThreadExecutor();
|
||||||
messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService);
|
messagesCache = new MessagesCache(getRedisCluster(), notificationExecutorService);
|
||||||
|
|
||||||
messagesCache.start();
|
messagesCache.start();
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,7 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageId);
|
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageId);
|
||||||
|
|
||||||
assertTrue(maybeRemovedMessage.isPresent());
|
assertTrue(maybeRemovedMessage.isPresent());
|
||||||
assertEquals(RedisClusterMessagesCache.constructEntityFromEnvelope(messageId, message), maybeRemovedMessage.get());
|
assertEquals(MessagesCache.constructEntityFromEnvelope(messageId, message), maybeRemovedMessage.get());
|
||||||
assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageId));
|
assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,7 +95,7 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message.getSource(), message.getTimestamp());
|
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message.getSource(), message.getTimestamp());
|
||||||
|
|
||||||
assertTrue(maybeRemovedMessage.isPresent());
|
assertTrue(maybeRemovedMessage.isPresent());
|
||||||
assertEquals(RedisClusterMessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get());
|
assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get());
|
||||||
assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message.getSource(), message.getTimestamp()));
|
assertEquals(Optional.empty(), messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message.getSource(), message.getTimestamp()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid);
|
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageGuid);
|
||||||
|
|
||||||
assertTrue(maybeRemovedMessage.isPresent());
|
assertTrue(maybeRemovedMessage.isPresent());
|
||||||
assertEquals(RedisClusterMessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get());
|
assertEquals(MessagesCache.constructEntityFromEnvelope(0, message), maybeRemovedMessage.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -127,7 +127,7 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
|
final MessageProtos.Envelope message = generateRandomMessage(messageGuid, sealedSender);
|
||||||
final long messageId = messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
|
final long messageId = messagesCache.insert(messageGuid, DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, message);
|
||||||
|
|
||||||
expectedMessages.add(RedisClusterMessagesCache.constructEntityFromEnvelope(messageId, message));
|
expectedMessages.add(MessagesCache.constructEntityFromEnvelope(messageId, message));
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals(expectedMessages, messagesCache.get(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
|
assertEquals(expectedMessages, messagesCache.get(DESTINATION_ACCOUNT, DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
|
||||||
|
@ -198,19 +198,19 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
@Test
|
@Test
|
||||||
public void testGetAccountFromQueueName() {
|
public void testGetAccountFromQueueName() {
|
||||||
assertEquals(DESTINATION_UUID,
|
assertEquals(DESTINATION_UUID,
|
||||||
RedisClusterMessagesCache.getAccountUuidFromQueueName(new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8)));
|
MessagesCache.getAccountUuidFromQueueName(new String(MessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetDeviceIdFromQueueName() {
|
public void testGetDeviceIdFromQueueName() {
|
||||||
assertEquals(DESTINATION_DEVICE_ID,
|
assertEquals(DESTINATION_DEVICE_ID,
|
||||||
RedisClusterMessagesCache.getDeviceIdFromQueueName(new String(RedisClusterMessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8)));
|
MessagesCache.getDeviceIdFromQueueName(new String(MessagesCache.getMessageQueueKey(DESTINATION_UUID, DESTINATION_DEVICE_ID), StandardCharsets.UTF_8)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetQueueNameFromKeyspaceChannel() {
|
public void testGetQueueNameFromKeyspaceChannel() {
|
||||||
assertEquals("1b363a31-a429-4fb6-8959-984a025e72ff::7",
|
assertEquals("1b363a31-a429-4fb6-8959-984a025e72ff::7",
|
||||||
RedisClusterMessagesCache.getQueueNameFromKeyspaceChannel("__keyspace@0__:user_queue::{1b363a31-a429-4fb6-8959-984a025e72ff::7}"));
|
MessagesCache.getQueueNameFromKeyspaceChannel("__keyspace@0__:user_queue::{1b363a31-a429-4fb6-8959-984a025e72ff::7}"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -226,8 +226,8 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
final List<String> queues = messagesCache.getQueuesToPersist(slot, Instant.now().plusSeconds(60), 100);
|
final List<String> queues = messagesCache.getQueuesToPersist(slot, Instant.now().plusSeconds(60), 100);
|
||||||
|
|
||||||
assertEquals(1, queues.size());
|
assertEquals(1, queues.size());
|
||||||
assertEquals(DESTINATION_UUID, RedisClusterMessagesCache.getAccountUuidFromQueueName(queues.get(0)));
|
assertEquals(DESTINATION_UUID, MessagesCache.getAccountUuidFromQueueName(queues.get(0)));
|
||||||
assertEquals(DESTINATION_DEVICE_ID, RedisClusterMessagesCache.getDeviceIdFromQueueName(queues.get(0)));
|
assertEquals(DESTINATION_DEVICE_ID, MessagesCache.getDeviceIdFromQueueName(queues.get(0)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 5_000L)
|
@Test(timeout = 5_000L)
|
||||||
|
@ -281,8 +281,8 @@ public class RedisClusterMessagesCacheTest extends AbstractRedisClusterTest {
|
||||||
|
|
||||||
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
|
messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener);
|
||||||
|
|
||||||
messagesCache.lockQueueForPersistence(RedisClusterMessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID));
|
messagesCache.lockQueueForPersistence(MessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID));
|
||||||
messagesCache.unlockQueueForPersistence(RedisClusterMessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID));
|
messagesCache.unlockQueueForPersistence(MessagesCache.getQueueName(DESTINATION_UUID, DESTINATION_DEVICE_ID));
|
||||||
|
|
||||||
synchronized (notified) {
|
synchronized (notified) {
|
||||||
while (!notified.get()) {
|
while (!notified.get()) {
|
Loading…
Reference in New Issue