Timeout `sendNextMessagePage` after 5 minutes

This commit is contained in:
Chris Eager 2021-12-08 15:22:38 -08:00 committed by Chris Eager
parent 278b4e810d
commit 3e777df86c
2 changed files with 42 additions and 11 deletions

View File

@ -88,6 +88,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private static final long RETRY_DELAY_MILLIS = 1_000;
private static final int RETRY_DELAY_JITTER_MILLIS = 500;
private static final int DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS = 5 * 60 * 1000;
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private final ReceiptSender receiptSender;
@ -96,6 +98,9 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private final AuthenticatedAccount auth;
private final Device device;
private final WebSocketClient client;
private final int sendFuturesTimeoutMillis;
private final ScheduledExecutorService retrySchedulingExecutor;
private final boolean isDesktopClient;
@ -123,11 +128,31 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
Device device,
WebSocketClient client,
ScheduledExecutorService retrySchedulingExecutor) {
this(receiptSender,
messagesManager,
auth,
device,
client,
DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS,
retrySchedulingExecutor);
}
@VisibleForTesting
WebSocketConnection(ReceiptSender receiptSender,
MessagesManager messagesManager,
AuthenticatedAccount auth,
Device device,
WebSocketClient client,
int sendFuturesTimeoutMillis,
ScheduledExecutorService retrySchedulingExecutor) {
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
this.auth = auth;
this.device = device;
this.client = client;
this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis;
this.retrySchedulingExecutor = retrySchedulingExecutor;
Optional<ClientPlatform> maybePlatform;
@ -329,17 +354,20 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
}
}
CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> {
if (cause == null) {
if (messages.hasMore()) {
sendNextMessagePage(cachedMessagesOnly, queueClearedFuture);
} else {
queueClearedFuture.complete(null);
}
} else {
queueClearedFuture.completeExceptionally(cause);
}
});
// Set a large, non-zero timeout, to prevent any failure to acknowledge receipt from blocking indefinitely
CompletableFuture.allOf(sendFutures)
.orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)
.whenComplete((v, cause) -> {
if (cause == null) {
if (messages.hasMore()) {
sendNextMessagePage(cachedMessagesOnly, queueClearedFuture);
} else {
queueClearedFuture.complete(null);
}
} else {
queueClearedFuture.completeExceptionally(cause);
}
});
} catch (final Exception e) {
queueClearedFuture.completeExceptionally(e);
}

View File

@ -66,6 +66,8 @@ class WebSocketConnectionIntegrationTest {
@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
private static final int SEND_FUTURES_TIMEOUT_MILLIS = 100;
private ExecutorService executorService;
private MessagesDynamoDb messagesDynamoDb;
private MessagesCache messagesCache;
@ -102,6 +104,7 @@ class WebSocketConnectionIntegrationTest {
new AuthenticatedAccount(() -> new Pair<>(account, device)),
device,
webSocketClient,
SEND_FUTURES_TIMEOUT_MILLIS,
retrySchedulingExecutor);
}