Retry attempts to get messages after a delay; close connections after a finite number of retries.

This commit is contained in:
Jon Chambers 2021-03-18 19:06:07 -04:00 committed by Jon Chambers
parent 7509520883
commit 089b6b1644
5 changed files with 126 additions and 39 deletions

View File

@ -315,6 +315,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(2).build(); ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(2).build();
ScheduledExecutorService declinedMessageReceiptExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "declined-receipt-%d")).threads(2).build(); ScheduledExecutorService declinedMessageReceiptExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "declined-receipt-%d")).threads(2).build();
ScheduledExecutorService retrySchedulingExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "retry-%d")).threads(2).build();
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build(); ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build();
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build(); ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build();
ExecutorService gcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "gcmSender-%d")).maxThreads(1).minThreads(1).build(); ExecutorService gcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "gcmSender-%d")).maxThreads(1).minThreads(1).build();
@ -449,7 +450,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
/// ///
WebSocketEnvironment<Account> webSocketEnvironment = new WebSocketEnvironment<>(environment, config.getWebSocketConfiguration(), 90000); WebSocketEnvironment<Account> webSocketEnvironment = new WebSocketEnvironment<>(environment, config.getWebSocketConfiguration(), 90000);
webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator)); webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator));
webSocketEnvironment.setConnectListener(new AuthenticatedConnectListener(receiptSender, messagesManager, messageSender, apnFallbackManager, clientPresenceManager)); webSocketEnvironment.setConnectListener(new AuthenticatedConnectListener(receiptSender, messagesManager, messageSender, apnFallbackManager, clientPresenceManager, retrySchedulingExecutor));
webSocketEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET)); webSocketEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET));
webSocketEnvironment.jersey().register(new KeepAliveController(clientPresenceManager)); webSocketEnvironment.jersey().register(new KeepAliveController(clientPresenceManager));
webSocketEnvironment.jersey().register(messageController); webSocketEnvironment.jersey().register(messageController);

View File

@ -23,6 +23,8 @@ import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.websocket.session.WebSocketSessionContext; import org.whispersystems.websocket.session.WebSocketSessionContext;
import org.whispersystems.websocket.setup.WebSocketConnectListener; import org.whispersystems.websocket.setup.WebSocketConnectListener;
import java.util.concurrent.ScheduledExecutorService;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
public class AuthenticatedConnectListener implements WebSocketConnectListener { public class AuthenticatedConnectListener implements WebSocketConnectListener {
@ -39,17 +41,20 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
private final MessageSender messageSender; private final MessageSender messageSender;
private final ApnFallbackManager apnFallbackManager; private final ApnFallbackManager apnFallbackManager;
private final ClientPresenceManager clientPresenceManager; private final ClientPresenceManager clientPresenceManager;
private final ScheduledExecutorService retrySchedulingExecutor;
public AuthenticatedConnectListener(ReceiptSender receiptSender, public AuthenticatedConnectListener(ReceiptSender receiptSender,
MessagesManager messagesManager, MessagesManager messagesManager,
final MessageSender messageSender, ApnFallbackManager apnFallbackManager, final MessageSender messageSender, ApnFallbackManager apnFallbackManager,
ClientPresenceManager clientPresenceManager) ClientPresenceManager clientPresenceManager,
ScheduledExecutorService retrySchedulingExecutor)
{ {
this.receiptSender = receiptSender; this.receiptSender = receiptSender;
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
this.messageSender = messageSender; this.messageSender = messageSender;
this.apnFallbackManager = apnFallbackManager; this.apnFallbackManager = apnFallbackManager;
this.clientPresenceManager = clientPresenceManager; this.clientPresenceManager = clientPresenceManager;
this.retrySchedulingExecutor = retrySchedulingExecutor;
} }
@Override @Override
@ -60,7 +65,8 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
final Timer.Context timer = durationTimer.time(); final Timer.Context timer = durationTimer.time();
final WebSocketConnection connection = new WebSocketConnection(receiptSender, final WebSocketConnection connection = new WebSocketConnection(receiptSender,
messagesManager, account, device, messagesManager, account, device,
context.getClient()); context.getClient(),
retrySchedulingExecutor);
openWebsocketCounter.inc(); openWebsocketCounter.inc();
RedisOperation.unchecked(() -> apnFallbackManager.cancel(account, device)); RedisOperation.unchecked(() -> apnFallbackManager.cancel(account, device));
@ -71,6 +77,8 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
openWebsocketCounter.dec(); openWebsocketCounter.dec();
timer.stop(); timer.stop();
connection.stop();
RedisOperation.unchecked(() -> clientPresenceManager.clearPresence(account.getUuid(), device.getId())); RedisOperation.unchecked(() -> clientPresenceManager.clearPresence(account.getUuid(), device.getId()));
RedisOperation.unchecked(() -> { RedisOperation.unchecked(() -> {
messagesManager.removeMessageAvailabilityListener(connection); messagesManager.removeMessageAvailabilityListener(connection);

View File

@ -20,11 +20,15 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
@ -79,6 +83,11 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
@VisibleForTesting @VisibleForTesting
static final int MAX_DESKTOP_MESSAGE_SIZE = 1024 * 1024; static final int MAX_DESKTOP_MESSAGE_SIZE = 1024 * 1024;
@VisibleForTesting
static final int MAX_CONSECUTIVE_RETRIES = 5;
private static final long RETRY_DELAY_MILLIS = 1_000;
private static final int RETRY_DELAY_JITTER_MILLIS = 500;
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class); private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private final ReceiptSender receiptSender; private final ReceiptSender receiptSender;
@ -87,6 +96,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private final Account account; private final Account account;
private final Device device; private final Device device;
private final WebSocketClient client; private final WebSocketClient client;
private final ScheduledExecutorService retrySchedulingExecutor;
private final boolean isDesktopClient; private final boolean isDesktopClient;
@ -95,6 +105,10 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false); private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
private final LongAdder sentMessageCounter = new LongAdder(); private final LongAdder sentMessageCounter = new LongAdder();
private final AtomicLong queueDrainStartTime = new AtomicLong(); private final AtomicLong queueDrainStartTime = new AtomicLong();
private final AtomicInteger consecutiveRetries = new AtomicInteger();
private final AtomicReference<ScheduledFuture<?>> retryFuture = new AtomicReference<>();
private final Random random = new Random();
private enum StoredMessageState { private enum StoredMessageState {
EMPTY, EMPTY,
@ -103,16 +117,18 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
} }
public WebSocketConnection(ReceiptSender receiptSender, public WebSocketConnection(ReceiptSender receiptSender,
MessagesManager messagesManager, MessagesManager messagesManager,
Account account, Account account,
Device device, Device device,
WebSocketClient client) WebSocketClient client,
ScheduledExecutorService retrySchedulingExecutor)
{ {
this.receiptSender = receiptSender; this.receiptSender = receiptSender;
this.messagesManager = messagesManager; this.messagesManager = messagesManager;
this.account = account; this.account = account;
this.device = device; this.device = device;
this.client = client; this.client = client;
this.retrySchedulingExecutor = retrySchedulingExecutor;
Optional<ClientPlatform> maybePlatform; Optional<ClientPlatform> maybePlatform;
@ -131,6 +147,12 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
} }
public void stop() { public void stop() {
final ScheduledFuture<?> future = retryFuture.get();
if (future != null) {
future.cancel(false);
}
client.close(1000, "OK"); client.close(1000, "OK");
} }
@ -203,24 +225,40 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueClearedFuture); sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueClearedFuture);
queueClearedFuture.whenComplete((v, cause) -> { queueClearedFuture.whenComplete((v, cause) -> {
if (cause == null && sentInitialQueueEmptyMessage.compareAndSet(false, true)) { if (cause == null) {
final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent())); consecutiveRetries.set(0);
final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get();
Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCounter.sum()); if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) {
Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, tags).record(drainDuration, TimeUnit.MILLISECONDS); final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent()));
final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get();
if (drainDuration > SLOW_DRAIN_THRESHOLD) { Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCounter.sum());
Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, tags).increment(); Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, tags).record(drainDuration, TimeUnit.MILLISECONDS);
if (drainDuration > SLOW_DRAIN_THRESHOLD) {
Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, tags).increment();
}
client.sendRequest("PUT", "/api/v1/queue/empty",
Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
} }
} else {
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty()); storedMessageState.compareAndSet(StoredMessageState.EMPTY, state);
} }
processStoredMessagesSemaphore.release(); processStoredMessagesSemaphore.release();
if (cause == null && storedMessageState.get() != StoredMessageState.EMPTY) { if (cause == null) {
processStoredMessages(); if (storedMessageState.get() != StoredMessageState.EMPTY) {
processStoredMessages();
}
} else {
if (consecutiveRetries.incrementAndGet() > MAX_CONSECUTIVE_RETRIES) {
client.close(1011, "Failed to retrieve messages");
} else {
final long delay = RETRY_DELAY_MILLIS + random.nextInt(RETRY_DELAY_JITTER_MILLIS);
retryFuture.set(retrySchedulingExecutor.schedule(this::processStoredMessages, delay, TimeUnit.MILLISECONDS));
}
} }
}); });
} }

View File

@ -28,6 +28,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
@ -62,12 +63,9 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest
private Device device; private Device device;
private WebSocketClient webSocketClient; private WebSocketClient webSocketClient;
private WebSocketConnection webSocketConnection; private WebSocketConnection webSocketConnection;
private ScheduledExecutorService retrySchedulingExecutor;
private long serialTimestamp = System.currentTimeMillis(); private long serialTimestamp = System.currentTimeMillis();
@Before
public void setupAccountsDao() {
}
@Before @Before
@Override @Override
@ -80,6 +78,7 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest
account = mock(Account.class); account = mock(Account.class);
device = mock(Device.class); device = mock(Device.class);
webSocketClient = mock(WebSocketClient.class); webSocketClient = mock(WebSocketClient.class);
retrySchedulingExecutor = Executors.newSingleThreadScheduledExecutor();
when(account.getNumber()).thenReturn("+18005551234"); when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID()); when(account.getUuid()).thenReturn(UUID.randomUUID());
@ -90,7 +89,8 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest
new MessagesManager(messagesDynamoDb, messagesCache, mock(PushLatencyManager.class)), new MessagesManager(messagesDynamoDb, messagesCache, mock(PushLatencyManager.class)),
account, account,
device, device,
webSocketClient); webSocketClient,
retrySchedulingExecutor);
} }
@After @After
@ -99,6 +99,9 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest
executorService.shutdown(); executorService.shutdown();
executorService.awaitTermination(2, TimeUnit.SECONDS); executorService.awaitTermination(2, TimeUnit.SECONDS);
retrySchedulingExecutor.shutdown();
retrySchedulingExecutor.awaitTermination(2, TimeUnit.SECONDS);
super.tearDown(); super.tearDown();
} }

View File

@ -35,8 +35,11 @@ import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.lettuce.core.RedisException;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.junit.Before; import org.junit.Before;
@ -75,6 +78,7 @@ public class WebSocketConnectionTest {
private UpgradeRequest upgradeRequest; private UpgradeRequest upgradeRequest;
private ReceiptSender receiptSender; private ReceiptSender receiptSender;
private ApnFallbackManager apnFallbackManager; private ApnFallbackManager apnFallbackManager;
private ScheduledExecutorService retrySchedulingExecutor;
@Before @Before
public void setup() { public void setup() {
@ -85,13 +89,15 @@ public class WebSocketConnectionTest {
upgradeRequest = mock(UpgradeRequest.class); upgradeRequest = mock(UpgradeRequest.class);
receiptSender = mock(ReceiptSender.class); receiptSender = mock(ReceiptSender.class);
apnFallbackManager = mock(ApnFallbackManager.class); apnFallbackManager = mock(ApnFallbackManager.class);
retrySchedulingExecutor = mock(ScheduledExecutorService.class);
} }
@Test @Test
public void testCredentials() throws Exception { public void testCredentials() throws Exception {
MessagesManager storedMessages = mock(MessagesManager.class); MessagesManager storedMessages = mock(MessagesManager.class);
WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator); WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator);
AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, storedMessages, mock(MessageSender.class), apnFallbackManager, mock(ClientPresenceManager.class)); AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(receiptSender, storedMessages, mock(MessageSender.class), apnFallbackManager, mock(ClientPresenceManager.class),
retrySchedulingExecutor);
WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class);
when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD)))) when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD))))
@ -178,7 +184,7 @@ public class WebSocketConnectionTest {
}); });
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages,
account, device, client); account, device, client, retrySchedulingExecutor);
connection.start(); connection.start();
verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.<Optional<byte[]>>any()); verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.<Optional<byte[]>>any());
@ -203,7 +209,7 @@ public class WebSocketConnectionTest {
public void testOnlineSend() throws Exception { public void testOnlineSend() throws Exception {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, retrySchedulingExecutor);
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
@ -330,7 +336,7 @@ public class WebSocketConnectionTest {
}); });
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages,
account, device, client); account, device, client, retrySchedulingExecutor);
connection.start(); connection.start();
@ -353,7 +359,7 @@ public class WebSocketConnectionTest {
public void testProcessStoredMessageConcurrency() throws InterruptedException { public void testProcessStoredMessageConcurrency() throws InterruptedException {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, retrySchedulingExecutor);
when(account.getNumber()).thenReturn("+18005551234"); when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID()); when(account.getUuid()).thenReturn(UUID.randomUUID());
@ -414,7 +420,7 @@ public class WebSocketConnectionTest {
public void testProcessStoredMessagesMultiplePages() throws InterruptedException { public void testProcessStoredMessagesMultiplePages() throws InterruptedException {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, retrySchedulingExecutor);
when(account.getNumber()).thenReturn("+18005551234"); when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID()); when(account.getUuid()).thenReturn(UUID.randomUUID());
@ -457,7 +463,7 @@ public class WebSocketConnectionTest {
public void testProcessStoredMessagesContainsSenderUuid() throws InterruptedException { public void testProcessStoredMessagesContainsSenderUuid() throws InterruptedException {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, retrySchedulingExecutor);
when(account.getNumber()).thenReturn("+18005551234"); when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID()); when(account.getUuid()).thenReturn(UUID.randomUUID());
@ -507,7 +513,7 @@ public class WebSocketConnectionTest {
public void testProcessStoredMessagesSingleEmptyCall() { public void testProcessStoredMessagesSingleEmptyCall() {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, retrySchedulingExecutor);
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
@ -536,7 +542,7 @@ public class WebSocketConnectionTest {
public void testRequeryOnStateMismatch() throws InterruptedException { public void testRequeryOnStateMismatch() throws InterruptedException {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, retrySchedulingExecutor);
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
when(account.getNumber()).thenReturn("+18005551234"); when(account.getNumber()).thenReturn("+18005551234");
@ -583,7 +589,7 @@ public class WebSocketConnectionTest {
public void testProcessCachedMessagesOnly() { public void testProcessCachedMessagesOnly() {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, retrySchedulingExecutor);
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
@ -615,7 +621,7 @@ public class WebSocketConnectionTest {
public void testProcessDatabaseMessagesAfterPersist() { public void testProcessDatabaseMessagesAfterPersist() {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, account, device, client, retrySchedulingExecutor);
final UUID accountUuid = UUID.randomUUID(); final UUID accountUuid = UUID.randomUUID();
@ -693,7 +699,7 @@ public class WebSocketConnectionTest {
} }
}); });
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, account, device, client); WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, account, device, client, retrySchedulingExecutor);
connection.start(); connection.start();
verify(client, times(2)).sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.<Optional<byte[]>>any()); verify(client, times(2)).sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.<Optional<byte[]>>any());
@ -766,7 +772,7 @@ public class WebSocketConnectionTest {
} }
}); });
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, account, device, client); WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, account, device, client, retrySchedulingExecutor);
connection.start(); connection.start();
verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.<Optional<byte[]>>any()); verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), ArgumentMatchers.nullable(List.class), ArgumentMatchers.<Optional<byte[]>>any());
@ -785,6 +791,37 @@ public class WebSocketConnectionTest {
verify(client).close(anyInt(), anyString()); verify(client).close(anyInt(), anyString());
} }
@Test
public void testRetrieveMessageException() {
MessagesManager storedMessages = mock(MessagesManager.class);
UUID accountUuid = UUID.randomUUID();
when(device.getId()).thenReturn(2L);
when(account.getAuthenticatedDevice()).thenReturn(Optional.of(device));
when(account.getNumber()).thenReturn("+14152222222");
when(account.getUuid()).thenReturn(accountUuid);
String userAgent = "Signal-Android/4.68.3";
when(storedMessages.getMessagesForDevice(account.getUuid(), device.getId(), userAgent, false))
.thenThrow(new RedisException("OH NO"));
when(retrySchedulingExecutor.schedule(any(Runnable.class), anyLong(), any())).thenAnswer((Answer<ScheduledFuture<?>>) invocation -> {
invocation.getArgument(0, Runnable.class).run();
return mock(ScheduledFuture.class);
});
final WebSocketClient client = mock(WebSocketClient.class);
WebSocketConnection connection = new WebSocketConnection(receiptSender, storedMessages, account, device, client, retrySchedulingExecutor);
connection.start();
verify(retrySchedulingExecutor, times(WebSocketConnection.MAX_CONSECUTIVE_RETRIES)).schedule(any(Runnable.class), anyLong(), any());
verify(client).close(eq(1011), anyString());
}
private OutgoingMessageEntity createMessage(long id, boolean cached, String sender, UUID senderUuid, long timestamp, boolean receipt, String content) { private OutgoingMessageEntity createMessage(long id, boolean cached, String sender, UUID senderUuid, long timestamp, boolean receipt, String content) {
return new OutgoingMessageEntity(id, cached, UUID.randomUUID(), receipt ? Envelope.Type.RECEIPT_VALUE : Envelope.Type.CIPHERTEXT_VALUE, return new OutgoingMessageEntity(id, cached, UUID.randomUUID(), receipt ? Envelope.Type.RECEIPT_VALUE : Envelope.Type.CIPHERTEXT_VALUE,
null, timestamp, sender, senderUuid, 1, content.getBytes(), null, 0); null, timestamp, sender, senderUuid, 1, content.getBytes(), null, 0);