Count open provisioning WebSockets
This commit is contained in:
parent
764b200289
commit
581e61a85b
|
@ -6,19 +6,30 @@
|
||||||
package org.whispersystems.textsecuregcm.websocket;
|
package org.whispersystems.textsecuregcm.websocket;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
import io.micrometer.core.instrument.Tags;
|
||||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice;
|
import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice;
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||||
import org.whispersystems.textsecuregcm.entities.ProvisioningMessage;
|
import org.whispersystems.textsecuregcm.entities.ProvisioningMessage;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
||||||
import org.whispersystems.textsecuregcm.push.ProvisioningManager;
|
import org.whispersystems.textsecuregcm.push.ProvisioningManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
|
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
|
||||||
import org.whispersystems.textsecuregcm.util.HeaderUtils;
|
import org.whispersystems.textsecuregcm.util.HeaderUtils;
|
||||||
|
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
|
||||||
|
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
|
||||||
|
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
|
||||||
import org.whispersystems.websocket.session.WebSocketSessionContext;
|
import org.whispersystems.websocket.session.WebSocketSessionContext;
|
||||||
import org.whispersystems.websocket.setup.WebSocketConnectListener;
|
import org.whispersystems.websocket.setup.WebSocketConnectListener;
|
||||||
import java.security.SecureRandom;
|
import java.security.SecureRandom;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
|
import java.util.EnumMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A "provisioning WebSocket" provides a mechanism for sending a caller-defined provisioning message from the primary
|
* A "provisioning WebSocket" provides a mechanism for sending a caller-defined provisioning message from the primary
|
||||||
|
@ -38,8 +49,25 @@ public class ProvisioningConnectListener implements WebSocketConnectListener {
|
||||||
|
|
||||||
private final ProvisioningManager provisioningManager;
|
private final ProvisioningManager provisioningManager;
|
||||||
|
|
||||||
|
private final Map<ClientPlatform, AtomicInteger> openWebsocketsByClientPlatform;
|
||||||
|
private final AtomicInteger openWebsocketsFromUnknownPlatforms;
|
||||||
|
|
||||||
|
private static final String OPEN_WEBSOCKET_GAUGE_NAME = name(ProvisioningConnectListener.class, "openWebsockets");
|
||||||
|
|
||||||
public ProvisioningConnectListener(final ProvisioningManager provisioningManager) {
|
public ProvisioningConnectListener(final ProvisioningManager provisioningManager) {
|
||||||
this.provisioningManager = provisioningManager;
|
this.provisioningManager = provisioningManager;
|
||||||
|
|
||||||
|
openWebsocketsByClientPlatform = new EnumMap<>(ClientPlatform.class);
|
||||||
|
|
||||||
|
Arrays.stream(ClientPlatform.values())
|
||||||
|
.forEach(clientPlatform -> openWebsocketsByClientPlatform.put(clientPlatform,
|
||||||
|
Metrics.gauge(OPEN_WEBSOCKET_GAUGE_NAME,
|
||||||
|
Tags.of(UserAgentTagUtil.PLATFORM_TAG, clientPlatform.name().toLowerCase()),
|
||||||
|
new AtomicInteger(0))));
|
||||||
|
|
||||||
|
openWebsocketsFromUnknownPlatforms = Metrics.gauge(OPEN_WEBSOCKET_GAUGE_NAME,
|
||||||
|
Tags.of(UserAgentTagUtil.PLATFORM_TAG, "unrecognized"),
|
||||||
|
new AtomicInteger(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -47,6 +75,11 @@ public class ProvisioningConnectListener implements WebSocketConnectListener {
|
||||||
final String provisioningAddress = generateProvisioningAddress();
|
final String provisioningAddress = generateProvisioningAddress();
|
||||||
context.addWebsocketClosedListener((context1, statusCode, reason) -> provisioningManager.removeListener(provisioningAddress));
|
context.addWebsocketClosedListener((context1, statusCode, reason) -> provisioningManager.removeListener(provisioningAddress));
|
||||||
|
|
||||||
|
getOpenWebsocketCounter(context.getClient().getUserAgent()).incrementAndGet();
|
||||||
|
|
||||||
|
context.addWebsocketClosedListener((context1, statusCode, reason) ->
|
||||||
|
getOpenWebsocketCounter(context.getClient().getUserAgent()).decrementAndGet());
|
||||||
|
|
||||||
provisioningManager.addListener(provisioningAddress, message -> {
|
provisioningManager.addListener(provisioningAddress, message -> {
|
||||||
assert message.getType() == PubSubProtos.PubSubMessage.Type.DELIVER;
|
assert message.getType() == PubSubProtos.PubSubMessage.Type.DELIVER;
|
||||||
|
|
||||||
|
@ -69,4 +102,12 @@ public class ProvisioningConnectListener implements WebSocketConnectListener {
|
||||||
|
|
||||||
return Base64.getUrlEncoder().encodeToString(provisioningAddress);
|
return Base64.getUrlEncoder().encodeToString(provisioningAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private AtomicInteger getOpenWebsocketCounter(final String userAgentString) {
|
||||||
|
try {
|
||||||
|
return openWebsocketsByClientPlatform.get(UserAgentUtil.parseUserAgentString(userAgentString).getPlatform());
|
||||||
|
} catch (final UnrecognizedUserAgentException e) {
|
||||||
|
return openWebsocketsFromUnknownPlatforms;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue