Generalize the "watch for websockets that need to be refreshed" listener
This commit is contained in:
parent
41735ed40e
commit
49ccbba2e3
|
@ -62,7 +62,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.dispatch.DispatchManager;
|
import org.whispersystems.dispatch.DispatchManager;
|
||||||
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
|
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
|
||||||
import org.whispersystems.textsecuregcm.auth.AuthEnablementApplicationEventListener;
|
import org.whispersystems.textsecuregcm.auth.WebsocketRefreshApplicationEventListener;
|
||||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
|
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
|
||||||
import org.whispersystems.textsecuregcm.auth.CertificateGenerator;
|
import org.whispersystems.textsecuregcm.auth.CertificateGenerator;
|
||||||
import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccountAuthenticator;
|
import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccountAuthenticator;
|
||||||
|
@ -614,7 +614,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
DisabledPermittedAuthenticatedAccount.class, disabledPermittedAccountAuthFilter)));
|
DisabledPermittedAuthenticatedAccount.class, disabledPermittedAccountAuthFilter)));
|
||||||
environment.jersey().register(new PolymorphicAuthValueFactoryProvider.Binder<>(
|
environment.jersey().register(new PolymorphicAuthValueFactoryProvider.Binder<>(
|
||||||
ImmutableSet.of(AuthenticatedAccount.class, DisabledPermittedAuthenticatedAccount.class)));
|
ImmutableSet.of(AuthenticatedAccount.class, DisabledPermittedAuthenticatedAccount.class)));
|
||||||
environment.jersey().register(new AuthEnablementApplicationEventListener(clientPresenceManager));
|
environment.jersey().register(new WebsocketRefreshApplicationEventListener(clientPresenceManager));
|
||||||
environment.jersey().register(new TimestampResponseFilter());
|
environment.jersey().register(new TimestampResponseFilter());
|
||||||
environment.jersey().register(new VoiceVerificationController(config.getVoiceVerificationConfiguration().getUrl(),
|
environment.jersey().register(new VoiceVerificationController(config.getVoiceVerificationConfiguration().getUrl(),
|
||||||
config.getVoiceVerificationConfiguration().getLocales()));
|
config.getVoiceVerificationConfiguration().getLocales()));
|
||||||
|
@ -626,7 +626,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
webSocketEnvironment.setConnectListener(
|
webSocketEnvironment.setConnectListener(
|
||||||
new AuthenticatedConnectListener(receiptSender, messagesManager, messageSender, apnFallbackManager,
|
new AuthenticatedConnectListener(receiptSender, messagesManager, messageSender, apnFallbackManager,
|
||||||
clientPresenceManager, retrySchedulingExecutor));
|
clientPresenceManager, retrySchedulingExecutor));
|
||||||
webSocketEnvironment.jersey().register(new AuthEnablementApplicationEventListener(clientPresenceManager));
|
webSocketEnvironment.jersey().register(new WebsocketRefreshApplicationEventListener(clientPresenceManager));
|
||||||
webSocketEnvironment.jersey().register(new ContentLengthFilter(TrafficSource.WEBSOCKET));
|
webSocketEnvironment.jersey().register(new ContentLengthFilter(TrafficSource.WEBSOCKET));
|
||||||
webSocketEnvironment.jersey().register(MultiRecipientMessageProvider.class);
|
webSocketEnvironment.jersey().register(MultiRecipientMessageProvider.class);
|
||||||
webSocketEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET));
|
webSocketEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET));
|
||||||
|
@ -668,7 +668,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
|
|
||||||
WebSocketEnvironment<AuthenticatedAccount> provisioningEnvironment = new WebSocketEnvironment<>(environment,
|
WebSocketEnvironment<AuthenticatedAccount> provisioningEnvironment = new WebSocketEnvironment<>(environment,
|
||||||
webSocketEnvironment.getRequestLog(), 60000);
|
webSocketEnvironment.getRequestLog(), 60000);
|
||||||
provisioningEnvironment.jersey().register(new AuthEnablementApplicationEventListener(clientPresenceManager));
|
provisioningEnvironment.jersey().register(new WebsocketRefreshApplicationEventListener(clientPresenceManager));
|
||||||
provisioningEnvironment.setConnectListener(new ProvisioningConnectListener(pubSubManager));
|
provisioningEnvironment.setConnectListener(new ProvisioningConnectListener(pubSubManager));
|
||||||
provisioningEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET));
|
provisioningEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET));
|
||||||
provisioningEnvironment.jersey().register(new KeepAliveController(clientPresenceManager));
|
provisioningEnvironment.jersey().register(new KeepAliveController(clientPresenceManager));
|
||||||
|
|
|
@ -0,0 +1,127 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2021 Signal Messenger, LLC
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.whispersystems.textsecuregcm.auth;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import javax.ws.rs.core.SecurityContext;
|
||||||
|
import org.glassfish.jersey.server.ContainerRequest;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
|
import org.whispersystems.textsecuregcm.util.Pair;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This {@link WebsocketRefreshRequirementProvider} observes intra-request changes in {@link Account#isEnabled()} and
|
||||||
|
* {@link Device#isEnabled()}.
|
||||||
|
* <p>
|
||||||
|
* If a change in {@link Account#isEnabled()} is observed, then any active WebSocket connections for the account must be
|
||||||
|
* closed, in order for clients to get a refreshed {@link io.dropwizard.auth.Auth} object.
|
||||||
|
* <p>
|
||||||
|
* If a change in {@link Device#isEnabled()} is observed, including deletion of the {@link Device}, then any active
|
||||||
|
* WebSocket connections for the device must be closed and re-authenticated.
|
||||||
|
*
|
||||||
|
* @see AuthenticatedAccount
|
||||||
|
* @see DisabledPermittedAuthenticatedAccount
|
||||||
|
*/
|
||||||
|
public class AuthEnablementRefreshRequirementProvider implements WebsocketRefreshRequirementProvider {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(AuthEnablementRefreshRequirementProvider.class);
|
||||||
|
|
||||||
|
private static final String ACCOUNT_ENABLED = AuthEnablementRefreshRequirementProvider.class.getName() + ".accountEnabled";
|
||||||
|
private static final String DEVICES_ENABLED = AuthEnablementRefreshRequirementProvider.class.getName() + ".devicesEnabled";
|
||||||
|
|
||||||
|
private Optional<Account> findAccount(final ContainerRequest containerRequest) {
|
||||||
|
return Optional.ofNullable(containerRequest.getSecurityContext())
|
||||||
|
.map(SecurityContext::getUserPrincipal)
|
||||||
|
.map(principal -> {
|
||||||
|
if (principal instanceof AccountAndAuthenticatedDeviceHolder) {
|
||||||
|
return ((AccountAndAuthenticatedDeviceHolder) principal).getAccount();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Map<Long, Boolean> buildDevicesEnabledMap(final Account account) {
|
||||||
|
return account.getDevices().stream()
|
||||||
|
.collect(() -> new HashMap<>(account.getDevices().size()),
|
||||||
|
(map, device) -> map.put(device.getId(), device.isEnabled()), HashMap::putAll);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleRequestStart(final ContainerRequest request) {
|
||||||
|
// The authenticated principal, if any, will be available after filters have run.
|
||||||
|
// Now that the account is known, capture a snapshot of `isEnabled` for the account and its devices,
|
||||||
|
// before carrying out the request’s business logic.
|
||||||
|
findAccount(request)
|
||||||
|
.ifPresent(
|
||||||
|
account -> {
|
||||||
|
request.setProperty(ACCOUNT_ENABLED, account.isEnabled());
|
||||||
|
request.setProperty(DEVICES_ENABLED, buildDevicesEnabledMap(account));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Pair<UUID, Long>> handleRequestFinished(final ContainerRequest request) {
|
||||||
|
// Now that the request is finished, check whether `isEnabled` changed for any of the devices, or the account
|
||||||
|
// as a whole. If the value did change, the affected device(s) must disconnect and reauthenticate.
|
||||||
|
// If a device was removed, it must also disconnect.
|
||||||
|
if (request.getProperty(ACCOUNT_ENABLED) != null &&
|
||||||
|
request.getProperty(DEVICES_ENABLED) != null) {
|
||||||
|
|
||||||
|
final boolean accountInitiallyEnabled = (boolean) request.getProperty(ACCOUNT_ENABLED);
|
||||||
|
@SuppressWarnings("unchecked") final Map<Long, Boolean> initialDevicesEnabled =
|
||||||
|
(Map<Long, Boolean>) request.getProperty(DEVICES_ENABLED);
|
||||||
|
|
||||||
|
return findAccount(request).map(account -> {
|
||||||
|
final Set<Long> deviceIdsToDisplace;
|
||||||
|
|
||||||
|
if (account.isEnabled() != accountInitiallyEnabled) {
|
||||||
|
// the @Auth for all active connections must change when account.isEnabled() changes
|
||||||
|
deviceIdsToDisplace = account.getDevices().stream()
|
||||||
|
.map(Device::getId).collect(Collectors.toSet());
|
||||||
|
|
||||||
|
deviceIdsToDisplace.addAll(initialDevicesEnabled.keySet());
|
||||||
|
|
||||||
|
} else if (!initialDevicesEnabled.isEmpty()) {
|
||||||
|
|
||||||
|
deviceIdsToDisplace = new HashSet<>();
|
||||||
|
final Map<Long, Boolean> currentDevicesEnabled = buildDevicesEnabledMap(account);
|
||||||
|
|
||||||
|
initialDevicesEnabled.forEach((deviceId, enabled) -> {
|
||||||
|
// `null` indicates the device was removed from the account. Any active presence should be removed.
|
||||||
|
final boolean enabledMatches = Objects.equals(enabled,
|
||||||
|
currentDevicesEnabled.getOrDefault(deviceId, null));
|
||||||
|
|
||||||
|
if (!enabledMatches) {
|
||||||
|
deviceIdsToDisplace.add(deviceId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
deviceIdsToDisplace = Collections.emptySet();
|
||||||
|
}
|
||||||
|
|
||||||
|
return deviceIdsToDisplace.stream().map(deviceId -> new Pair<>(account.getUuid(), deviceId))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}).orElseGet(() -> {
|
||||||
|
logger.error("Request had account, but it is no longer present");
|
||||||
|
return Collections.emptyList();
|
||||||
|
});
|
||||||
|
} else
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,152 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2021 Signal Messenger, LLC
|
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.auth;
|
|
||||||
|
|
||||||
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import io.micrometer.core.instrument.Counter;
|
|
||||||
import io.micrometer.core.instrument.Metrics;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import javax.ws.rs.core.SecurityContext;
|
|
||||||
import org.glassfish.jersey.server.ContainerRequest;
|
|
||||||
import org.glassfish.jersey.server.monitoring.RequestEvent;
|
|
||||||
import org.glassfish.jersey.server.monitoring.RequestEvent.Type;
|
|
||||||
import org.glassfish.jersey.server.monitoring.RequestEventListener;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
|
||||||
import org.whispersystems.textsecuregcm.storage.Device;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This {@link RequestEventListener} observes intra-request changes in {@link Account#isEnabled()} and {@link
|
|
||||||
* Device#isEnabled()}.
|
|
||||||
* <p>
|
|
||||||
* If a change in {@link Account#isEnabled()} is observed, then any active WebSocket connections for the account must be
|
|
||||||
* closed, in order for clients to get a refreshed {@link io.dropwizard.auth.Auth} object.
|
|
||||||
* <p>
|
|
||||||
* If a change in {@link Device#isEnabled()} is observed, including deletion of the {@link Device}, then any active
|
|
||||||
* WebSocket connections for the device must be closed and re-authenticated.
|
|
||||||
*
|
|
||||||
* @see AuthenticatedAccount
|
|
||||||
* @see DisabledPermittedAuthenticatedAccount
|
|
||||||
*/
|
|
||||||
public class AuthEnablementRequestEventListener implements RequestEventListener {
|
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(AuthEnablementRequestEventListener.class);
|
|
||||||
|
|
||||||
private static final String ACCOUNT_ENABLED = AuthEnablementRequestEventListener.class.getName() + ".accountEnabled";
|
|
||||||
private static final String DEVICES_ENABLED = AuthEnablementRequestEventListener.class.getName() + ".devicesEnabled";
|
|
||||||
|
|
||||||
private static final Counter DISPLACED_ACCOUNTS = Metrics.counter(
|
|
||||||
name(AuthEnablementRequestEventListener.class, "displacedAccounts"));
|
|
||||||
private static final Counter DISPLACED_DEVICES = Metrics.counter(
|
|
||||||
name(AuthEnablementRequestEventListener.class, "displacedDevices"));
|
|
||||||
|
|
||||||
private final ClientPresenceManager clientPresenceManager;
|
|
||||||
|
|
||||||
public AuthEnablementRequestEventListener(final ClientPresenceManager clientPresenceManager) {
|
|
||||||
this.clientPresenceManager = clientPresenceManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onEvent(final RequestEvent event) {
|
|
||||||
|
|
||||||
if (event.getType() == Type.REQUEST_FILTERED) {
|
|
||||||
// The authenticated principal, if any, will be available after filters have run.
|
|
||||||
// Now that the account is known, capture a snapshot of `isEnabled` for the account and its devices,
|
|
||||||
// before carrying out the request’s business logic.
|
|
||||||
findAccount(event.getContainerRequest())
|
|
||||||
.ifPresent(
|
|
||||||
account -> {
|
|
||||||
event.getContainerRequest().setProperty(ACCOUNT_ENABLED, account.isEnabled());
|
|
||||||
event.getContainerRequest().setProperty(DEVICES_ENABLED, buildDevicesEnabledMap(account));
|
|
||||||
});
|
|
||||||
|
|
||||||
} else if (event.getType() == Type.FINISHED) {
|
|
||||||
// Now that the request is finished, check whether `isEnabled` changed for any of the devices, or the account
|
|
||||||
// as a whole. If the value did change, the affected device(s) must disconnect and reauthenticate.
|
|
||||||
// If a device was removed, it must also disconnect.
|
|
||||||
if (event.getContainerRequest().getProperty(ACCOUNT_ENABLED) != null &&
|
|
||||||
event.getContainerRequest().getProperty(DEVICES_ENABLED) != null) {
|
|
||||||
|
|
||||||
final boolean accountInitiallyEnabled = (boolean) event.getContainerRequest().getProperty(ACCOUNT_ENABLED);
|
|
||||||
@SuppressWarnings("unchecked") final Map<Long, Boolean> initialDevicesEnabled = (Map<Long, Boolean>) event.getContainerRequest()
|
|
||||||
.getProperty(DEVICES_ENABLED);
|
|
||||||
|
|
||||||
findAccount(event.getContainerRequest()).ifPresentOrElse(account -> {
|
|
||||||
final Set<Long> deviceIdsToDisplace;
|
|
||||||
|
|
||||||
if (account.isEnabled() != accountInitiallyEnabled) {
|
|
||||||
// the @Auth for all active connections must change when account.isEnabled() changes
|
|
||||||
deviceIdsToDisplace = account.getDevices().stream()
|
|
||||||
.map(Device::getId).collect(Collectors.toSet());
|
|
||||||
|
|
||||||
deviceIdsToDisplace.addAll(initialDevicesEnabled.keySet());
|
|
||||||
|
|
||||||
DISPLACED_ACCOUNTS.increment();
|
|
||||||
|
|
||||||
} else if (!initialDevicesEnabled.isEmpty()) {
|
|
||||||
|
|
||||||
deviceIdsToDisplace = new HashSet<>();
|
|
||||||
final Map<Long, Boolean> currentDevicesEnabled = buildDevicesEnabledMap(account);
|
|
||||||
|
|
||||||
initialDevicesEnabled.forEach((deviceId, enabled) -> {
|
|
||||||
// `null` indicates the device was removed from the account. Any active presence should be removed.
|
|
||||||
final boolean enabledMatches = Objects.equals(enabled,
|
|
||||||
currentDevicesEnabled.getOrDefault(deviceId, null));
|
|
||||||
|
|
||||||
if (!enabledMatches) {
|
|
||||||
deviceIdsToDisplace.add(deviceId);
|
|
||||||
|
|
||||||
DISPLACED_DEVICES.increment();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
deviceIdsToDisplace = Collections.emptySet();
|
|
||||||
}
|
|
||||||
|
|
||||||
deviceIdsToDisplace.forEach(deviceId -> {
|
|
||||||
try {
|
|
||||||
// displacing presence will cause a reauthorization for the device’s active connections
|
|
||||||
clientPresenceManager.displacePresence(account.getUuid(), deviceId);
|
|
||||||
} catch (final Exception e) {
|
|
||||||
logger.error("Could not displace device presence", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
},
|
|
||||||
() -> logger.error("Request had account, but it is no longer present")
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Optional<Account> findAccount(final ContainerRequest containerRequest) {
|
|
||||||
return Optional.ofNullable(containerRequest.getSecurityContext())
|
|
||||||
.map(SecurityContext::getUserPrincipal)
|
|
||||||
.map(principal -> {
|
|
||||||
if (principal instanceof AccountAndAuthenticatedDeviceHolder) {
|
|
||||||
return ((AccountAndAuthenticatedDeviceHolder) principal).getAccount();
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
Map<Long, Boolean> buildDevicesEnabledMap(final Account account) {
|
|
||||||
return account.getDevices().stream()
|
|
||||||
.collect(() -> new HashMap<>(account.getDevices().size()),
|
|
||||||
(map, device) -> map.put(device.getId(), device.isEnabled()), HashMap::putAll);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -12,14 +12,15 @@ import org.glassfish.jersey.server.monitoring.RequestEventListener;
|
||||||
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delegates request events to a listener that handles auth-enablement changes
|
* Delegates request events to a listener that watches for intra-request changes that require websocket refreshes
|
||||||
*/
|
*/
|
||||||
public class AuthEnablementApplicationEventListener implements ApplicationEventListener {
|
public class WebsocketRefreshApplicationEventListener implements ApplicationEventListener {
|
||||||
|
|
||||||
private final AuthEnablementRequestEventListener authEnablementRequestEventListener;
|
private final WebsocketRefreshRequestEventListener websocketRefreshRequestEventListener;
|
||||||
|
|
||||||
public AuthEnablementApplicationEventListener(final ClientPresenceManager clientPresenceManager) {
|
public WebsocketRefreshApplicationEventListener(final ClientPresenceManager clientPresenceManager) {
|
||||||
this.authEnablementRequestEventListener = new AuthEnablementRequestEventListener(clientPresenceManager);
|
this.websocketRefreshRequestEventListener = new WebsocketRefreshRequestEventListener(clientPresenceManager,
|
||||||
|
new AuthEnablementRefreshRequirementProvider());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -28,6 +29,6 @@ public class AuthEnablementApplicationEventListener implements ApplicationEventL
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RequestEventListener onRequest(final RequestEvent requestEvent) {
|
public RequestEventListener onRequest(final RequestEvent requestEvent) {
|
||||||
return authEnablementRequestEventListener;
|
return websocketRefreshRequestEventListener;
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,69 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2013-2021 Signal Messenger, LLC
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.whispersystems.textsecuregcm.auth;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import io.micrometer.core.instrument.Counter;
|
||||||
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
import org.glassfish.jersey.server.monitoring.RequestEvent;
|
||||||
|
import org.glassfish.jersey.server.monitoring.RequestEvent.Type;
|
||||||
|
import org.glassfish.jersey.server.monitoring.RequestEventListener;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
||||||
|
|
||||||
|
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||||
|
|
||||||
|
public class WebsocketRefreshRequestEventListener implements RequestEventListener {
|
||||||
|
|
||||||
|
private final ClientPresenceManager clientPresenceManager;
|
||||||
|
private final WebsocketRefreshRequirementProvider[] providers;
|
||||||
|
|
||||||
|
private static final Counter DISPLACED_ACCOUNTS = Metrics.counter(
|
||||||
|
name(WebsocketRefreshRequestEventListener.class, "displacedAccounts"));
|
||||||
|
|
||||||
|
private static final Counter DISPLACED_DEVICES = Metrics.counter(
|
||||||
|
name(WebsocketRefreshRequestEventListener.class, "displacedDevices"));
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(WebsocketRefreshRequestEventListener.class);
|
||||||
|
|
||||||
|
public WebsocketRefreshRequestEventListener(
|
||||||
|
final ClientPresenceManager clientPresenceManager,
|
||||||
|
final WebsocketRefreshRequirementProvider... providers) {
|
||||||
|
|
||||||
|
this.clientPresenceManager = clientPresenceManager;
|
||||||
|
this.providers = providers;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onEvent(final RequestEvent event) {
|
||||||
|
if (event.getType() == Type.REQUEST_FILTERED) {
|
||||||
|
for (final WebsocketRefreshRequirementProvider provider : providers) {
|
||||||
|
provider.handleRequestStart(event.getContainerRequest());
|
||||||
|
}
|
||||||
|
} else if (event.getType() == Type.FINISHED) {
|
||||||
|
final AtomicInteger displacedDevices = new AtomicInteger(0);
|
||||||
|
|
||||||
|
Arrays.stream(providers)
|
||||||
|
.flatMap(provider -> provider.handleRequestFinished(event.getContainerRequest()).stream())
|
||||||
|
.distinct()
|
||||||
|
.forEach(pair -> {
|
||||||
|
try {
|
||||||
|
displacedDevices.incrementAndGet();
|
||||||
|
clientPresenceManager.displacePresence(pair.first(), pair.second());
|
||||||
|
} catch (final Exception e) {
|
||||||
|
logger.error("Could not displace device presence", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (displacedDevices.get() > 0) {
|
||||||
|
DISPLACED_ACCOUNTS.increment();
|
||||||
|
DISPLACED_DEVICES.increment(displacedDevices.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2013-2021 Signal Messenger, LLC
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.whispersystems.textsecuregcm.auth;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import org.glassfish.jersey.server.ContainerRequest;
|
||||||
|
import org.whispersystems.textsecuregcm.util.Pair;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A websocket refresh requirement provider watches for intra-request changes (e.g. to authentication status) that
|
||||||
|
* require a websocket refresh.
|
||||||
|
*/
|
||||||
|
public interface WebsocketRefreshRequirementProvider {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Processes a request after filters have run and the request has been mapped to a destination controller.
|
||||||
|
*
|
||||||
|
* @param request the request to observe
|
||||||
|
*/
|
||||||
|
void handleRequestStart(ContainerRequest request);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Processes a request after all normal request handling has been completed.
|
||||||
|
*
|
||||||
|
* @param request the request to observe
|
||||||
|
* @return a list of pairs of account UUID/device ID pairs identifying websockets that need to be refreshed as a
|
||||||
|
* result of the observed request
|
||||||
|
*/
|
||||||
|
List<Pair<UUID, Long>> handleRequestFinished(ContainerRequest request);
|
||||||
|
}
|
|
@ -85,7 +85,7 @@ import org.whispersystems.websocket.messages.protobuf.SubProtocol;
|
||||||
import org.whispersystems.websocket.session.WebSocketSessionContextValueFactoryProvider;
|
import org.whispersystems.websocket.session.WebSocketSessionContextValueFactoryProvider;
|
||||||
|
|
||||||
@ExtendWith(DropwizardExtensionsSupport.class)
|
@ExtendWith(DropwizardExtensionsSupport.class)
|
||||||
class AuthEnablementRequestEventListenerTest {
|
class AuthEnablementRefreshRequirementProviderTest {
|
||||||
|
|
||||||
private final ApplicationEventListener applicationEventListener = mock(ApplicationEventListener.class);
|
private final ApplicationEventListener applicationEventListener = mock(ApplicationEventListener.class);
|
||||||
|
|
||||||
|
@ -109,12 +109,14 @@ class AuthEnablementRequestEventListenerTest {
|
||||||
|
|
||||||
private ClientPresenceManager clientPresenceManager;
|
private ClientPresenceManager clientPresenceManager;
|
||||||
|
|
||||||
private AuthEnablementRequestEventListener listener;
|
private WebsocketRefreshRequestEventListener listener;
|
||||||
|
private AuthEnablementRefreshRequirementProvider provider;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setup() {
|
void setup() {
|
||||||
clientPresenceManager = mock(ClientPresenceManager.class);
|
clientPresenceManager = mock(ClientPresenceManager.class);
|
||||||
listener = new AuthEnablementRequestEventListener(clientPresenceManager);
|
provider = new AuthEnablementRefreshRequirementProvider();
|
||||||
|
listener = new WebsocketRefreshRequestEventListener(clientPresenceManager, provider);
|
||||||
when(applicationEventListener.onRequest(any())).thenReturn(listener);
|
when(applicationEventListener.onRequest(any())).thenReturn(listener);
|
||||||
|
|
||||||
final UUID uuid = UUID.randomUUID();
|
final UUID uuid = UUID.randomUUID();
|
||||||
|
@ -146,7 +148,7 @@ class AuthEnablementRequestEventListenerTest {
|
||||||
devices.add(device);
|
devices.add(device);
|
||||||
});
|
});
|
||||||
|
|
||||||
final Map<Long, Boolean> devicesEnabled = listener.buildDevicesEnabledMap(account);
|
final Map<Long, Boolean> devicesEnabled = provider.buildDevicesEnabledMap(account);
|
||||||
|
|
||||||
assertEquals(4, devicesEnabled.size());
|
assertEquals(4, devicesEnabled.size());
|
||||||
|
|
||||||
|
@ -372,7 +374,7 @@ class AuthEnablementRequestEventListenerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("org.whispersystems.textsecuregcm.auth.AuthEnablementRequestEventListenerTest#testAccountEnabledChanged")
|
@MethodSource("org.whispersystems.textsecuregcm.auth.AuthEnablementRefreshRequirementProviderTest#testAccountEnabledChanged")
|
||||||
void testAccountEnabledChangedWebSocket(final long authenticatedDeviceId, final boolean initialEnabled,
|
void testAccountEnabledChangedWebSocket(final long authenticatedDeviceId, final boolean initialEnabled,
|
||||||
final boolean finalEnabled) throws Exception {
|
final boolean finalEnabled) throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue