Mirror persistence operations from the new persister to the old persister.
This commit is contained in:
parent
2b50367d7f
commit
8409986ef5
|
@ -333,7 +333,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
|
||||
|
||||
MessagePersister messagePersister = new MessagePersister(messagesClient, messagesManager, pubSubManager, pushSender, accountsManager, featureFlagsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes(), TimeUnit.MINUTES);
|
||||
RedisClusterMessagePersister clusterMessagePersister = new RedisClusterMessagePersister(clusterMessagesCache, messages, pubSubManager, pushSender, accountsManager, featureFlagsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
|
||||
RedisClusterMessagePersister clusterMessagePersister = new RedisClusterMessagePersister(clusterMessagesCache, messagesManager, pubSubManager, pushSender, accountsManager, featureFlagsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
|
||||
|
||||
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
|
||||
|
||||
|
|
|
@ -169,7 +169,7 @@ public class MessagePersister implements Managed, Runnable {
|
|||
|
||||
envelope = envelope.toBuilder().clearServerGuid().build();
|
||||
|
||||
messagesManager.persistMessage(key.getAddress(), destinationUuid, envelope, guid, key.getDeviceId(), score);
|
||||
messagesManager.persistMessage(key.getAddress(), destinationUuid, envelope, guid, key.getDeviceId());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
logger.error("Error parsing envelope", e);
|
||||
}
|
||||
|
|
|
@ -120,11 +120,9 @@ public class MessagesManager {
|
|||
}
|
||||
}
|
||||
|
||||
public void persistMessage(String destination, UUID destinationUuid, Envelope envelope, UUID messageGuid, long deviceId, long id) {
|
||||
public void persistMessage(String destination, UUID destinationUuid, Envelope envelope, UUID messageGuid, long deviceId) {
|
||||
messages.store(messageGuid, envelope, destination, deviceId);
|
||||
|
||||
final Optional<OutgoingMessageEntity> maybeRemovedMessage = messagesCache.remove(destination, destinationUuid, deviceId, id);
|
||||
removeByIdExperiment.compareSupplierResult(maybeRemovedMessage, () -> clusterMessagesCache.remove(destination, destinationUuid, deviceId, id));
|
||||
delete(destination, destinationUuid, deviceId, messageGuid);
|
||||
}
|
||||
|
||||
public void addMessageAvailabilityListener(final UUID destinationUuid, final long deviceId, final MessageAvailabilityListener listener) {
|
||||
|
|
|
@ -26,7 +26,7 @@ import static com.codahale.metrics.MetricRegistry.name;
|
|||
public class RedisClusterMessagePersister implements Managed {
|
||||
|
||||
private final RedisClusterMessagesCache messagesCache;
|
||||
private final Messages messagesDatabase;
|
||||
private final MessagesManager messagesManager;
|
||||
private final PubSubManager pubSubManager;
|
||||
private final PushSender pushSender;
|
||||
private final AccountsManager accountsManager;
|
||||
|
@ -51,9 +51,9 @@ public class RedisClusterMessagePersister implements Managed {
|
|||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RedisClusterMessagePersister.class);
|
||||
|
||||
public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final Messages messagesDatabase, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final FeatureFlagsManager featureFlagsManager, final Duration persistDelay) {
|
||||
public RedisClusterMessagePersister(final RedisClusterMessagesCache messagesCache, final MessagesManager messagesManager, final PubSubManager pubSubManager, final PushSender pushSender, final AccountsManager accountsManager, final FeatureFlagsManager featureFlagsManager, final Duration persistDelay) {
|
||||
this.messagesCache = messagesCache;
|
||||
this.messagesDatabase = messagesDatabase;
|
||||
this.messagesManager = messagesManager;
|
||||
this.pubSubManager = pubSubManager;
|
||||
this.pushSender = pushSender;
|
||||
this.accountsManager = accountsManager;
|
||||
|
@ -143,11 +143,7 @@ public class RedisClusterMessagePersister implements Managed {
|
|||
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
|
||||
|
||||
for (final MessageProtos.Envelope message : messages) {
|
||||
final UUID uuid = UUID.fromString(message.getServerGuid());
|
||||
|
||||
messagesDatabase.store(uuid, message, accountNumber, deviceId);
|
||||
messagesCache.remove(accountNumber, accountUuid, deviceId, uuid);
|
||||
|
||||
messagesManager.persistMessage(accountNumber, accountUuid, message, UUID.fromString(message.getServerGuid()), deviceId);
|
||||
messageCount++;
|
||||
}
|
||||
} while (!messages.isEmpty());
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.any;
|
|||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
@ -48,6 +49,7 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
|
|||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
final MessagesManager messagesManager = mock(MessagesManager.class);
|
||||
final FeatureFlagsManager featureFlagsManager = mock(FeatureFlagsManager.class);
|
||||
when(featureFlagsManager.isFeatureFlagActive(RedisClusterMessagePersister.ENABLE_PERSISTENCE_FLAG)).thenReturn(true);
|
||||
|
||||
|
@ -62,7 +64,20 @@ public class RedisClusterMessagePersisterTest extends AbstractRedisClusterTest {
|
|||
|
||||
notificationExecutorService = Executors.newSingleThreadExecutor();
|
||||
messagesCache = new RedisClusterMessagesCache(getRedisCluster(), notificationExecutorService);
|
||||
messagePersister = new RedisClusterMessagePersister(messagesCache, messagesDatabase, pubSubManager, mock(PushSender.class), accountsManager, featureFlagsManager, PERSIST_DELAY);
|
||||
messagePersister = new RedisClusterMessagePersister(messagesCache, messagesManager, pubSubManager, mock(PushSender.class), accountsManager, featureFlagsManager, PERSIST_DELAY);
|
||||
|
||||
doAnswer(invocation -> {
|
||||
final String destination = invocation.getArgument(0, String.class);
|
||||
final UUID destinationUuid = invocation.getArgument(1, UUID.class);
|
||||
final MessageProtos.Envelope message = invocation.getArgument(2, MessageProtos.Envelope.class);
|
||||
final UUID messageGuid = invocation.getArgument(3, UUID.class);
|
||||
final long deviceId = invocation.getArgument(4, Long.class);
|
||||
|
||||
messagesDatabase.store(messageGuid, message, destination, deviceId);
|
||||
messagesCache.remove(destination, destinationUuid, deviceId, messageGuid);
|
||||
|
||||
return null;
|
||||
}).when(messagesManager).persistMessage(anyString(), any(UUID.class), any(MessageProtos.Envelope.class), any(UUID.class), anyLong());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue