Add (failing) test for send message timeouts
This commit is contained in:
parent
346c7cd743
commit
278b4e810d
|
@ -33,7 +33,6 @@ import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
@ -243,8 +242,91 @@ class WebSocketConnectionIntegrationTest {
|
||||||
} catch (InvalidProtocolBufferException e) {
|
} catch (InvalidProtocolBufferException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
})
|
}).toList();
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
assertTrue(expectedMessages.containsAll(sentMessages));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testProcessStoredMessagesSendFutureTimeout() {
|
||||||
|
final int persistedMessageCount = 207;
|
||||||
|
final int cachedMessageCount = 173;
|
||||||
|
|
||||||
|
final List<MessageProtos.Envelope> expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount);
|
||||||
|
|
||||||
|
assertTimeoutPreemptively(Duration.ofSeconds(15), () -> {
|
||||||
|
|
||||||
|
{
|
||||||
|
final List<MessageProtos.Envelope> persistedMessages = new ArrayList<>(persistedMessageCount);
|
||||||
|
|
||||||
|
for (int i = 0; i < persistedMessageCount; i++) {
|
||||||
|
final MessageProtos.Envelope envelope = generateRandomMessage(UUID.randomUUID());
|
||||||
|
persistedMessages.add(envelope);
|
||||||
|
expectedMessages.add(envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
messagesDynamoDb.store(persistedMessages, account.getUuid(), device.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < cachedMessageCount; i++) {
|
||||||
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
|
final MessageProtos.Envelope envelope = generateRandomMessage(messageGuid);
|
||||||
|
messagesCache.insert(messageGuid, account.getUuid(), device.getId(), envelope);
|
||||||
|
|
||||||
|
expectedMessages.add(envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
|
||||||
|
when(successResponse.getStatus()).thenReturn(200);
|
||||||
|
|
||||||
|
final CompletableFuture<WebSocketResponseMessage> neverCompleting = new CompletableFuture<>();
|
||||||
|
|
||||||
|
// for the first message, return a future that never completes
|
||||||
|
when(webSocketClient.sendRequest(eq("PUT"), eq("/api/v1/message"), anyList(), any()))
|
||||||
|
.thenReturn(neverCompleting)
|
||||||
|
.thenReturn(CompletableFuture.completedFuture(successResponse));
|
||||||
|
|
||||||
|
when(webSocketClient.isOpen()).thenReturn(true);
|
||||||
|
|
||||||
|
final AtomicBoolean queueCleared = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
when(webSocketClient.sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), anyList(), any())).thenAnswer(
|
||||||
|
(Answer<CompletableFuture<WebSocketResponseMessage>>) invocation -> {
|
||||||
|
synchronized (queueCleared) {
|
||||||
|
queueCleared.set(true);
|
||||||
|
queueCleared.notifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
return CompletableFuture.completedFuture(successResponse);
|
||||||
|
});
|
||||||
|
|
||||||
|
webSocketConnection.processStoredMessages();
|
||||||
|
|
||||||
|
synchronized (queueCleared) {
|
||||||
|
while (!queueCleared.get()) {
|
||||||
|
queueCleared.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//noinspection unchecked
|
||||||
|
ArgumentCaptor<Optional<byte[]>> messageBodyCaptor = ArgumentCaptor.forClass(Optional.class);
|
||||||
|
|
||||||
|
// We expect all of the messages from both pools to be sent, plus one for the future that times out
|
||||||
|
verify(webSocketClient, atMost(persistedMessageCount + cachedMessageCount + 1)).sendRequest(eq("PUT"),
|
||||||
|
eq("/api/v1/message"), anyList(), messageBodyCaptor.capture());
|
||||||
|
|
||||||
|
verify(webSocketClient).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), anyList(), eq(Optional.empty()));
|
||||||
|
|
||||||
|
final List<MessageProtos.Envelope> sentMessages = messageBodyCaptor.getAllValues().stream()
|
||||||
|
.map(Optional::get)
|
||||||
|
.map(messageBytes -> {
|
||||||
|
try {
|
||||||
|
return Envelope.parseFrom(messageBytes);
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}).toList();
|
||||||
|
|
||||||
assertTrue(expectedMessages.containsAll(sentMessages));
|
assertTrue(expectedMessages.containsAll(sentMessages));
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue