Clear would-be-persisted messages from the cache cluster, but don't store them to the database.
This commit is contained in:
parent
7bf8650d59
commit
76389bd584
|
@ -128,18 +128,18 @@ public class RedisClusterMessagePersister implements Managed {
|
||||||
int messageCount = 0;
|
int messageCount = 0;
|
||||||
List<MessageProtos.Envelope> messages;
|
List<MessageProtos.Envelope> messages;
|
||||||
|
|
||||||
/* do {
|
do {
|
||||||
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
|
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
|
||||||
|
|
||||||
for (final MessageProtos.Envelope message : messages) {
|
for (final MessageProtos.Envelope message : messages) {
|
||||||
final UUID uuid = UUID.fromString(message.getServerGuid());
|
final UUID uuid = UUID.fromString(message.getServerGuid());
|
||||||
|
|
||||||
messagesDatabase.store(uuid, message, accountNumber, deviceId);
|
// messagesDatabase.store(uuid, message, accountNumber, deviceId);
|
||||||
messagesCache.remove(accountNumber, accountUuid, deviceId, uuid);
|
messagesCache.remove(accountNumber, accountUuid, deviceId, uuid);
|
||||||
|
|
||||||
messageCount++;
|
messageCount++;
|
||||||
}
|
}
|
||||||
} while (messages.size() == MESSAGE_BATCH_LIMIT); */
|
} while (messages.size() == MESSAGE_BATCH_LIMIT);
|
||||||
|
|
||||||
queueSizeHistogram.update(messageCount);
|
queueSizeHistogram.update(messageCount);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -3,7 +3,6 @@ package org.whispersystems.textsecuregcm.storage;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||||
|
@ -28,7 +27,6 @@ import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@Ignore
|
|
||||||
public class RedisClusterMessagePersisterTest {
|
public class RedisClusterMessagePersisterTest {
|
||||||
|
|
||||||
private RedisClusterMessagesCache messagesCache;
|
private RedisClusterMessagesCache messagesCache;
|
||||||
|
@ -143,7 +141,7 @@ public class RedisClusterMessagePersisterTest {
|
||||||
|
|
||||||
verify(messagesCache).lockQueueForPersistence(queueName);
|
verify(messagesCache).lockQueueForPersistence(queueName);
|
||||||
verify(messagesCache).getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT);
|
verify(messagesCache).getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT);
|
||||||
verify(messagesDatabase).store(UUID.fromString(message.getServerGuid()), message, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID);
|
// verify(messagesDatabase).store(UUID.fromString(message.getServerGuid()), message, DESTINATION_ACCOUNT_NUMBER, DESTINATION_DEVICE_ID);
|
||||||
verify(messagesCache).remove(DESTINATION_ACCOUNT_NUMBER, DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, UUID.fromString(message.getServerGuid()));
|
verify(messagesCache).remove(DESTINATION_ACCOUNT_NUMBER, DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, UUID.fromString(message.getServerGuid()));
|
||||||
verify(messagesCache).unlockQueueForPersistence(queueName);
|
verify(messagesCache).unlockQueueForPersistence(queueName);
|
||||||
}
|
}
|
||||||
|
@ -169,7 +167,7 @@ public class RedisClusterMessagePersisterTest {
|
||||||
|
|
||||||
verify(messagesCache).lockQueueForPersistence(queueName);
|
verify(messagesCache).lockQueueForPersistence(queueName);
|
||||||
verify(messagesCache, times(4)).getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT);
|
verify(messagesCache, times(4)).getMessagesToPersist(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID, RedisClusterMessagePersister.MESSAGE_BATCH_LIMIT);
|
||||||
verify(messagesDatabase, times(messageCount)).store(any(UUID.class), any(MessageProtos.Envelope.class), eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_DEVICE_ID));
|
// verify(messagesDatabase, times(messageCount)).store(any(UUID.class), any(MessageProtos.Envelope.class), eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_DEVICE_ID));
|
||||||
verify(messagesCache, times(messageCount)).remove(eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_ACCOUNT_UUID), eq(DESTINATION_DEVICE_ID), any(UUID.class));
|
verify(messagesCache, times(messageCount)).remove(eq(DESTINATION_ACCOUNT_NUMBER), eq(DESTINATION_ACCOUNT_UUID), eq(DESTINATION_DEVICE_ID), any(UUID.class));
|
||||||
verify(messagesCache).unlockQueueForPersistence(queueName);
|
verify(messagesCache).unlockQueueForPersistence(queueName);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue