Add a test for message ordering.

This commit is contained in:
Jon Chambers 2020-09-23 18:25:18 -04:00 committed by Jon Chambers
parent 460bd98f1b
commit 30474e3a2b
1 changed files with 34 additions and 4 deletions

View File

@ -1,6 +1,7 @@
package org.whispersystems.textsecuregcm.websocket;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.opentable.db.postgres.embedded.LiquibasePreparer;
import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
import com.opentable.db.postgres.junit.PreparedDbRule;
@ -10,6 +11,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
@ -26,6 +28,8 @@ import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@ -34,6 +38,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
@ -98,16 +104,24 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest
@Test(timeout = 15_000)
public void testProcessStoredMessages() throws InterruptedException {
final int persistedMessageCount = 207;
final int cachedMessageCount = 173;
final int cachedMessageCount = 173;
final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount);
for (int i = 0; i < persistedMessageCount; i++) {
final UUID messageGuid = UUID.randomUUID();
messages.store(messageGuid, generateRandomMessage(messageGuid), account.getNumber(), device.getId());
final MessageProtos.Envelope envelope = generateRandomMessage(messageGuid);
messages.store(messageGuid, envelope, account.getNumber(), device.getId());
expectedMessages.add(envelope.toBuilder().clearServerGuid().build());
}
for (int i = 0; i < cachedMessageCount; i++) {
final UUID messageGuid = UUID.randomUUID();
messagesCache.insert(messageGuid, account.getUuid(), device.getId(), generateRandomMessage(messageGuid));
final MessageProtos.Envelope envelope = generateRandomMessage(messageGuid);
messagesCache.insert(messageGuid, account.getUuid(), device.getId(), envelope);
expectedMessages.add(envelope.toBuilder().clearServerGuid().build());
}
final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
@ -133,8 +147,24 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest
}
}
verify(webSocketClient, times(persistedMessageCount + cachedMessageCount)).sendRequest(eq("PUT"), eq("/api/v1/message"), anyList(), any());
final ArgumentCaptor<Optional<byte[]>> messageBodyCaptor = ArgumentCaptor.forClass(Optional.class);
verify(webSocketClient, times(persistedMessageCount + cachedMessageCount)).sendRequest(eq("PUT"), eq("/api/v1/message"), anyList(), messageBodyCaptor.capture());
verify(webSocketClient).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), anyList(), eq(Optional.empty()));
final List<MessageProtos.Envelope> sentMessages = new ArrayList<>();
for (final Optional<byte[]> maybeMessageBody : messageBodyCaptor.getAllValues()) {
maybeMessageBody.ifPresent(messageBytes -> {
try {
sentMessages.add(MessageProtos.Envelope.parseFrom(messageBytes));
} catch (final InvalidProtocolBufferException e) {
fail("Could not parse sent message");
}
});
}
assertEquals(expectedMessages, sentMessages);
}
@Test(timeout = 15_000)