Improve two `@Disabled` flaky tests
This commit is contained in:
parent
d7bf815bd5
commit
fb39b2edaf
|
@ -7,12 +7,11 @@ package org.whispersystems.textsecuregcm.storage;
|
|||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
import static org.mockito.Mockito.atMost;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
@ -46,7 +45,6 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
|
@ -64,10 +62,10 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
|||
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
|
||||
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.test.StepVerifier;
|
||||
import reactor.test.publisher.TestPublisher;
|
||||
|
||||
class MessagesCacheTest {
|
||||
|
||||
|
@ -586,14 +584,14 @@ class MessagesCacheTest {
|
|||
// is subscribed. Rather, we should be fetching in pages to satisfy downstream requests, so that memory usage
|
||||
// is limited to few pages of messages
|
||||
|
||||
// we use a combination of Flux.just() and Sinks to control when data is “fetched” from the cache. The initial
|
||||
// Flux.just()s are pages that are readily available, on demand. By design, there are more of these pages than
|
||||
// the initial prefetch. The sinks allow us to create extra demand but defer producing values to satisfy the demand
|
||||
// until later on.
|
||||
// we use a combination of Flux.just() and TestPublishers to control when data is “fetched” and emitted from the
|
||||
// cache. The initial Flux.just()s are pages that are readily available, on demand. By design, there are more of
|
||||
// these pages than the initial prefetch. The publishers allow us to create extra demand but defer producing
|
||||
// values to satisfy the demand until later on.
|
||||
|
||||
final AtomicReference<FluxSink<Object>> page4Sink = new AtomicReference<>();
|
||||
final AtomicReference<FluxSink<Object>> page56Sink = new AtomicReference<>();
|
||||
final AtomicReference<FluxSink<Object>> emptyFinalPageSink = new AtomicReference<>();
|
||||
final TestPublisher<Object> page4Publisher = TestPublisher.create();
|
||||
final TestPublisher<Object> page56Publisher = TestPublisher.create();
|
||||
final TestPublisher<Object> emptyFinalPagePublisher = TestPublisher.create();
|
||||
|
||||
final Deque<List<byte[]>> pages = new ArrayDeque<>();
|
||||
pages.add(generatePage());
|
||||
|
@ -608,9 +606,9 @@ class MessagesCacheTest {
|
|||
.thenReturn(Flux.just(pages.pop()))
|
||||
.thenReturn(Flux.just(pages.pop()))
|
||||
.thenReturn(Flux.just(pages.pop()))
|
||||
.thenReturn(Flux.create(sink -> page4Sink.compareAndSet(null, sink)))
|
||||
.thenReturn(Flux.create(sink -> page56Sink.compareAndSet(null, sink)))
|
||||
.thenReturn(Flux.create(sink -> emptyFinalPageSink.compareAndSet(null, sink)))
|
||||
.thenReturn(Flux.from(page4Publisher))
|
||||
.thenReturn(Flux.from(page56Publisher))
|
||||
.thenReturn(Flux.from(emptyFinalPagePublisher))
|
||||
.thenReturn(Flux.empty());
|
||||
|
||||
final Flux<?> allMessages = messagesCache.getAllMessages(UUID.randomUUID(), 1L);
|
||||
|
@ -635,9 +633,9 @@ class MessagesCacheTest {
|
|||
.thenRequest(halfPage) // page 0.5 requested
|
||||
.expectNextCount(halfPage) // page 0.5 produced
|
||||
// page 0.5 produced, 1.5 remain, so no additional interactions with the cache cluster
|
||||
.then(() -> verify(reactiveCommands, times(expectedReactiveCommandInvocations.get())).evalsha(any(),
|
||||
.then(() -> verify(reactiveCommands, atMost(expectedReactiveCommandInvocations.get())).evalsha(any(),
|
||||
any(), any(), any()))
|
||||
.then(() -> assertNull(page4Sink.get(), "page 4 should not have been fetched yet"))
|
||||
.then(page4Publisher::assertWasNotRequested)
|
||||
.thenRequest(page) // page 1.5 requested
|
||||
.expectNextCount(page) // page 1.5 produced
|
||||
|
||||
|
@ -647,26 +645,26 @@ class MessagesCacheTest {
|
|||
// also NB: times() checks cumulative calls, hence addAndGet
|
||||
.then(() -> verify(reactiveCommands, times(expectedReactiveCommandInvocations.addAndGet(1))).evalsha(any(),
|
||||
any(), any(), any()))
|
||||
.then(() -> assertNotNull(page4Sink.get(), "page 4 should have been fetched"))
|
||||
.then(page4Publisher::assertWasSubscribed)
|
||||
.thenRequest(page + halfPage) // page 3 requested
|
||||
.expectNextCount(page + halfPage) // page 1.5–3 produced
|
||||
|
||||
.thenRequest(halfPage) // page 3.5 requested
|
||||
.then(() -> assertNull(page56Sink.get(), "page 5 should not have been fetched yet"))
|
||||
.then(() -> page4Sink.get().next(pages.pop()).complete())
|
||||
.then(page56Publisher::assertWasNotRequested)
|
||||
.then(() -> page4Publisher.emit(pages.pop()))
|
||||
.expectNextCount(halfPage) // page 3.5 produced
|
||||
.then(() -> verify(reactiveCommands, times(expectedReactiveCommandInvocations.addAndGet(1))).evalsha(any(),
|
||||
any(), any(), any()))
|
||||
.then(() -> assertNotNull(page56Sink.get(), "page 5 should have been fetched"))
|
||||
.then(page56Publisher::assertWasSubscribed)
|
||||
|
||||
.thenRequest(page) // page 4.5 requested
|
||||
.expectNextCount(halfPage) // page 4 produced
|
||||
|
||||
.thenRequest(page * 4) // request more demand than we will ultimately satisfy
|
||||
|
||||
.then(() -> page56Sink.get().next(pages.pop()).next(pages.pop()).complete())
|
||||
.then(() -> page56Publisher.next(pages.pop()).next(pages.pop()).complete())
|
||||
.expectNextCount(page + page) // page 5 and 6 produced
|
||||
.then(() -> emptyFinalPageSink.get().complete())
|
||||
.then(emptyFinalPagePublisher::complete)
|
||||
// confirm that cache calls increased by 2: one for page 5-and-6 (we got a two-fer in next(pop()).next(pop()),
|
||||
// and one for the final, empty page
|
||||
.then(() -> verify(reactiveCommands, times(expectedReactiveCommandInvocations.addAndGet(2))).evalsha(any(),
|
||||
|
|
|
@ -44,13 +44,10 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Stream;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
|
||||
|
@ -68,10 +65,10 @@ import org.whispersystems.websocket.auth.WebSocketAuthenticator.AuthenticationRe
|
|||
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
|
||||
import org.whispersystems.websocket.session.WebSocketSessionContext;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.test.StepVerifier;
|
||||
import reactor.test.publisher.TestPublisher;
|
||||
|
||||
class WebSocketConnectionTest {
|
||||
|
||||
|
@ -744,7 +741,6 @@ class WebSocketConnectionTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Disabled("This test is flaky")
|
||||
void testReactivePublisherLimitRate() {
|
||||
final UUID accountUuid = UUID.randomUUID();
|
||||
|
||||
|
@ -754,18 +750,10 @@ class WebSocketConnectionTest {
|
|||
when(account.getNumber()).thenReturn("+14152222222");
|
||||
when(account.getUuid()).thenReturn(accountUuid);
|
||||
|
||||
final int totalMessages = 10;
|
||||
final AtomicReference<FluxSink<Envelope>> sink = new AtomicReference<>();
|
||||
final int totalMessages = 1000;
|
||||
|
||||
final AtomicLong maxRequest = new AtomicLong(-1);
|
||||
final Flux<Envelope> flux = Flux.create(s -> {
|
||||
sink.set(s);
|
||||
s.onRequest(n -> {
|
||||
if (maxRequest.get() < n) {
|
||||
maxRequest.set(n);
|
||||
}
|
||||
});
|
||||
});
|
||||
final TestPublisher<Envelope> testPublisher = TestPublisher.createCold();
|
||||
final Flux<Envelope> flux = Flux.from(testPublisher);
|
||||
|
||||
when(messagesManager.getMessagesForDeviceReactive(eq(accountUuid), eq(deviceId), anyBoolean()))
|
||||
.thenReturn(flux);
|
||||
|
@ -790,16 +778,16 @@ class WebSocketConnectionTest {
|
|||
.thenRequest(totalMessages * 2)
|
||||
.then(() -> {
|
||||
for (long i = 0; i < totalMessages; i++) {
|
||||
sink.get().next(createMessage(UUID.randomUUID(), accountUuid, 1111 * i + 1, "message " + i));
|
||||
testPublisher.next(createMessage(UUID.randomUUID(), accountUuid, 1111 * i + 1, "message " + i));
|
||||
}
|
||||
sink.get().complete();
|
||||
testPublisher.complete();
|
||||
})
|
||||
.expectNextCount(totalMessages)
|
||||
.expectComplete()
|
||||
.log()
|
||||
.verify();
|
||||
|
||||
assertEquals(WebSocketConnection.MESSAGE_PUBLISHER_LIMIT_RATE, maxRequest.get());
|
||||
testPublisher.assertMaxRequested(WebSocketConnection.MESSAGE_PUBLISHER_LIMIT_RATE);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue