diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 4b153fd50..88c76ed2c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -88,6 +88,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private static final long RETRY_DELAY_MILLIS = 1_000; private static final int RETRY_DELAY_JITTER_MILLIS = 500; + private static final int DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS = 5 * 60 * 1000; + private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class); private final ReceiptSender receiptSender; @@ -96,6 +98,9 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private final AuthenticatedAccount auth; private final Device device; private final WebSocketClient client; + + private final int sendFuturesTimeoutMillis; + private final ScheduledExecutorService retrySchedulingExecutor; private final boolean isDesktopClient; @@ -123,11 +128,31 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac Device device, WebSocketClient client, ScheduledExecutorService retrySchedulingExecutor) { + + this(receiptSender, + messagesManager, + auth, + device, + client, + DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS, + retrySchedulingExecutor); + } + + @VisibleForTesting + WebSocketConnection(ReceiptSender receiptSender, + MessagesManager messagesManager, + AuthenticatedAccount auth, + Device device, + WebSocketClient client, + int sendFuturesTimeoutMillis, + ScheduledExecutorService retrySchedulingExecutor) { + this.receiptSender = receiptSender; this.messagesManager = messagesManager; this.auth = auth; this.device = device; this.client = client; + this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis; this.retrySchedulingExecutor = retrySchedulingExecutor; Optional maybePlatform; @@ -329,17 +354,20 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac } } - CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> { - if (cause == null) { - if (messages.hasMore()) { - sendNextMessagePage(cachedMessagesOnly, queueClearedFuture); - } else { - queueClearedFuture.complete(null); - } - } else { - queueClearedFuture.completeExceptionally(cause); - } - }); + // Set a large, non-zero timeout, to prevent any failure to acknowledge receipt from blocking indefinitely + CompletableFuture.allOf(sendFutures) + .orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS) + .whenComplete((v, cause) -> { + if (cause == null) { + if (messages.hasMore()) { + sendNextMessagePage(cachedMessagesOnly, queueClearedFuture); + } else { + queueClearedFuture.complete(null); + } + } else { + queueClearedFuture.completeExceptionally(cause); + } + }); } catch (final Exception e) { queueClearedFuture.completeExceptionally(e); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index a505c5fbe..08b5de92a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -66,6 +66,8 @@ class WebSocketConnectionIntegrationTest { @RegisterExtension static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); + private static final int SEND_FUTURES_TIMEOUT_MILLIS = 100; + private ExecutorService executorService; private MessagesDynamoDb messagesDynamoDb; private MessagesCache messagesCache; @@ -102,6 +104,7 @@ class WebSocketConnectionIntegrationTest { new AuthenticatedAccount(() -> new Pair<>(account, device)), device, webSocketClient, + SEND_FUTURES_TIMEOUT_MILLIS, retrySchedulingExecutor); }