From 30474e3a2bf3a6afdebcebf410db33d0a745cacc Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 23 Sep 2020 18:25:18 -0400 Subject: [PATCH] Add a test for message ordering. --- .../WebSocketConnectionIntegrationTest.java | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 31e38ee6a..ba9862bca 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -1,6 +1,7 @@ package org.whispersystems.textsecuregcm.websocket; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import com.opentable.db.postgres.embedded.LiquibasePreparer; import com.opentable.db.postgres.junit.EmbeddedPostgresRules; import com.opentable.db.postgres.junit.PreparedDbRule; @@ -10,6 +11,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; @@ -26,6 +28,8 @@ import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.messages.WebSocketResponseMessage; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -34,6 +38,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; @@ -98,16 +104,24 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest @Test(timeout = 15_000) public void testProcessStoredMessages() throws InterruptedException { final int persistedMessageCount = 207; - final int cachedMessageCount = 173; + final int cachedMessageCount = 173; + + final List expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount); for (int i = 0; i < persistedMessageCount; i++) { final UUID messageGuid = UUID.randomUUID(); - messages.store(messageGuid, generateRandomMessage(messageGuid), account.getNumber(), device.getId()); + final MessageProtos.Envelope envelope = generateRandomMessage(messageGuid); + + messages.store(messageGuid, envelope, account.getNumber(), device.getId()); + expectedMessages.add(envelope.toBuilder().clearServerGuid().build()); } for (int i = 0; i < cachedMessageCount; i++) { final UUID messageGuid = UUID.randomUUID(); - messagesCache.insert(messageGuid, account.getUuid(), device.getId(), generateRandomMessage(messageGuid)); + final MessageProtos.Envelope envelope = generateRandomMessage(messageGuid); + + messagesCache.insert(messageGuid, account.getUuid(), device.getId(), envelope); + expectedMessages.add(envelope.toBuilder().clearServerGuid().build()); } final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class); @@ -133,8 +147,24 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest } } - verify(webSocketClient, times(persistedMessageCount + cachedMessageCount)).sendRequest(eq("PUT"), eq("/api/v1/message"), anyList(), any()); + final ArgumentCaptor> messageBodyCaptor = ArgumentCaptor.forClass(Optional.class); + + verify(webSocketClient, times(persistedMessageCount + cachedMessageCount)).sendRequest(eq("PUT"), eq("/api/v1/message"), anyList(), messageBodyCaptor.capture()); verify(webSocketClient).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), anyList(), eq(Optional.empty())); + + final List sentMessages = new ArrayList<>(); + + for (final Optional maybeMessageBody : messageBodyCaptor.getAllValues()) { + maybeMessageBody.ifPresent(messageBytes -> { + try { + sentMessages.add(MessageProtos.Envelope.parseFrom(messageBytes)); + } catch (final InvalidProtocolBufferException e) { + fail("Could not parse sent message"); + } + }); + } + + assertEquals(expectedMessages, sentMessages); } @Test(timeout = 15_000)