enforce provisioning websocket timeouts
This commit is contained in:
parent
8c2f3c839f
commit
c2e3ab832c
|
@ -61,16 +61,14 @@ public class ProvisioningConnectListener implements WebSocketConnectListener {
|
||||||
public void onWebSocketConnect(WebSocketSessionContext context) {
|
public void onWebSocketConnect(WebSocketSessionContext context) {
|
||||||
openWebSocketCounter.countOpenWebSocket(context);
|
openWebSocketCounter.countOpenWebSocket(context);
|
||||||
|
|
||||||
final Optional<ScheduledFuture<?>> maybeTimeoutFuture = context.getClient().supportsProvisioningSocketTimeouts()
|
final ScheduledFuture<?> timeoutFuture = timeoutExecutor.schedule(() ->
|
||||||
? Optional.of(timeoutExecutor.schedule(() ->
|
context.getClient().close(1000, "Timeout"), timeout.toSeconds(), TimeUnit.SECONDS);
|
||||||
context.getClient().close(1000, "Timeout"), timeout.toSeconds(), TimeUnit.SECONDS))
|
|
||||||
: Optional.empty();
|
|
||||||
|
|
||||||
final String provisioningAddress = generateProvisioningAddress();
|
final String provisioningAddress = generateProvisioningAddress();
|
||||||
|
|
||||||
context.addWebsocketClosedListener((context1, statusCode, reason) -> {
|
context.addWebsocketClosedListener((context1, statusCode, reason) -> {
|
||||||
provisioningManager.removeListener(provisioningAddress);
|
provisioningManager.removeListener(provisioningAddress);
|
||||||
maybeTimeoutFuture.ifPresent(future -> future.cancel(false));
|
timeoutFuture.cancel(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
provisioningManager.addListener(provisioningAddress, message -> {
|
provisioningManager.addListener(provisioningAddress, message -> {
|
||||||
|
|
|
@ -133,7 +133,6 @@ public class ProvisioningTimeoutIntegrationTest {
|
||||||
.thenReturn(mock(ScheduledFuture.class));
|
.thenReturn(mock(ScheduledFuture.class));
|
||||||
|
|
||||||
final ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
|
final ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
|
||||||
upgradeRequest.setHeader(WebsocketHeaders.X_SIGNAL_WEBSOCKET_TIMEOUT_HEADER, "");
|
|
||||||
try (Session ignored = client.connect(testWebsocketListener,
|
try (Session ignored = client.connect(testWebsocketListener,
|
||||||
URI.create(String.format("ws://127.0.0.1:%d/websocket", DROPWIZARD_APP_EXTENSION.getLocalPort())),
|
URI.create(String.format("ws://127.0.0.1:%d/websocket", DROPWIZARD_APP_EXTENSION.getLocalPort())),
|
||||||
upgradeRequest).join()) {
|
upgradeRequest).join()) {
|
||||||
|
@ -151,22 +150,6 @@ public class ProvisioningTimeoutIntegrationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void websocketTimeoutNoHeader() throws IOException {
|
|
||||||
final TestProvisioningListener testWebsocketListener = new TestProvisioningListener();
|
|
||||||
|
|
||||||
final ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
|
|
||||||
try (Session ignored = client.connect(testWebsocketListener,
|
|
||||||
URI.create(String.format("ws://127.0.0.1:%d/websocket", DROPWIZARD_APP_EXTENSION.getLocalPort())),
|
|
||||||
upgradeRequest).join()) {
|
|
||||||
assertThat(testWebsocketListener.closeFuture()).isNotDone();
|
|
||||||
|
|
||||||
final TestApplication testApplication = DROPWIZARD_APP_EXTENSION.getApplication();
|
|
||||||
verify(testApplication.scheduler, never()).schedule(any(Runnable.class), anyLong(), any());
|
|
||||||
assertThat(testWebsocketListener.closeFuture()).isNotDone();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void websocketTimeoutCancelled() throws IOException {
|
public void websocketTimeoutCancelled() throws IOException {
|
||||||
final TestProvisioningListener testWebsocketListener = new TestProvisioningListener();
|
final TestProvisioningListener testWebsocketListener = new TestProvisioningListener();
|
||||||
|
@ -176,7 +159,6 @@ public class ProvisioningTimeoutIntegrationTest {
|
||||||
doReturn(scheduled).when(testApplication.scheduler).schedule(any(Runnable.class), anyLong(), any());
|
doReturn(scheduled).when(testApplication.scheduler).schedule(any(Runnable.class), anyLong(), any());
|
||||||
|
|
||||||
final ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
|
final ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
|
||||||
upgradeRequest.setHeader(WebsocketHeaders.X_SIGNAL_WEBSOCKET_TIMEOUT_HEADER, "");
|
|
||||||
final Session session = client.connect(testWebsocketListener,
|
final Session session = client.connect(testWebsocketListener,
|
||||||
URI.create(String.format("ws://127.0.0.1:%d/websocket", DROPWIZARD_APP_EXTENSION.getLocalPort())),
|
URI.create(String.format("ws://127.0.0.1:%d/websocket", DROPWIZARD_APP_EXTENSION.getLocalPort())),
|
||||||
upgradeRequest).join();
|
upgradeRequest).join();
|
||||||
|
|
|
@ -47,6 +47,8 @@ class ProvisioningConnectListenerTest {
|
||||||
void onWebSocketConnect() {
|
void onWebSocketConnect() {
|
||||||
final WebSocketClient webSocketClient = mock(WebSocketClient.class);
|
final WebSocketClient webSocketClient = mock(WebSocketClient.class);
|
||||||
final WebSocketSessionContext context = new WebSocketSessionContext(webSocketClient);
|
final WebSocketSessionContext context = new WebSocketSessionContext(webSocketClient);
|
||||||
|
final ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
|
||||||
|
doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any());
|
||||||
|
|
||||||
provisioningConnectListener.onWebSocketConnect(context);
|
provisioningConnectListener.onWebSocketConnect(context);
|
||||||
context.notifyClosed(1000, "Test");
|
context.notifyClosed(1000, "Test");
|
||||||
|
@ -81,7 +83,6 @@ class ProvisioningConnectListenerTest {
|
||||||
final WebSocketClient webSocketClient = mock(WebSocketClient.class);
|
final WebSocketClient webSocketClient = mock(WebSocketClient.class);
|
||||||
final WebSocketSessionContext context = new WebSocketSessionContext(webSocketClient);
|
final WebSocketSessionContext context = new WebSocketSessionContext(webSocketClient);
|
||||||
|
|
||||||
when(webSocketClient.supportsProvisioningSocketTimeouts()).thenReturn(true);
|
|
||||||
final ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
|
final ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
|
||||||
doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any());
|
doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any());
|
||||||
|
|
||||||
|
@ -99,7 +100,6 @@ class ProvisioningConnectListenerTest {
|
||||||
final WebSocketClient webSocketClient = mock(WebSocketClient.class);
|
final WebSocketClient webSocketClient = mock(WebSocketClient.class);
|
||||||
final WebSocketSessionContext context = new WebSocketSessionContext(webSocketClient);
|
final WebSocketSessionContext context = new WebSocketSessionContext(webSocketClient);
|
||||||
|
|
||||||
when(webSocketClient.supportsProvisioningSocketTimeouts()).thenReturn(true);
|
|
||||||
final ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
|
final ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
|
||||||
doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any());
|
doReturn(scheduledFuture).when(scheduledExecutorService).schedule(any(Runnable.class), anyLong(), any());
|
||||||
|
|
||||||
|
@ -111,13 +111,4 @@ class ProvisioningConnectListenerTest {
|
||||||
verify(scheduledFuture).cancel(false);
|
verify(scheduledFuture).cancel(false);
|
||||||
verify(webSocketClient, never()).close(anyInt(), any());
|
verify(webSocketClient, never()).close(anyInt(), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
void skipsTimeoutIfUnsupported() {
|
|
||||||
final WebSocketClient webSocketClient = mock(WebSocketClient.class);
|
|
||||||
final WebSocketSessionContext context = new WebSocketSessionContext(webSocketClient);
|
|
||||||
provisioningConnectListener.onWebSocketConnect(context);
|
|
||||||
verify(scheduledExecutorService, never())
|
|
||||||
.schedule(any(Runnable.class), eq(TIMEOUT.getSeconds()), eq(TimeUnit.SECONDS));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,10 +102,6 @@ public class WebSocketClient {
|
||||||
return WebsocketHeaders.parseReceiveStoriesHeader(value);
|
return WebsocketHeaders.parseReceiveStoriesHeader(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean supportsProvisioningSocketTimeouts() {
|
|
||||||
return session.getUpgradeRequest().getHeader(WebsocketHeaders.X_SIGNAL_WEBSOCKET_TIMEOUT_HEADER) != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private long generateRequestId() {
|
private long generateRequestId() {
|
||||||
return Math.abs(SECURE_RANDOM.nextLong());
|
return Math.abs(SECURE_RANDOM.nextLong());
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ package org.whispersystems.websocket;
|
||||||
*/
|
*/
|
||||||
public class WebsocketHeaders {
|
public class WebsocketHeaders {
|
||||||
public final static String X_SIGNAL_RECEIVE_STORIES = "X-Signal-Receive-Stories";
|
public final static String X_SIGNAL_RECEIVE_STORIES = "X-Signal-Receive-Stories";
|
||||||
public static final String X_SIGNAL_WEBSOCKET_TIMEOUT_HEADER = "X-Signal-Websocket-Timeout";
|
|
||||||
|
|
||||||
public static boolean parseReceiveStoriesHeader(String s) {
|
public static boolean parseReceiveStoriesHeader(String s) {
|
||||||
return "true".equals(s);
|
return "true".equals(s);
|
||||||
|
|
Loading…
Reference in New Issue