From 100955a7db5aaf1016134b93d4a40689d8161eaa Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Tue, 1 Oct 2024 15:31:52 -0400 Subject: [PATCH] Migrate WebSocket duration instrumentation to `OpenWebSocketCounter` --- .../metrics/OpenWebSocketCounter.java | 70 +++++++++++++++---- .../AuthenticatedConnectListener.java | 65 ++--------------- .../ProvisioningConnectListener.java | 3 +- .../websocket/WebSocketConnectionTest.java | 4 +- 4 files changed, 65 insertions(+), 77 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/OpenWebSocketCounter.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/OpenWebSocketCounter.java index dcc25d6e0..163f2e872 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/OpenWebSocketCounter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/OpenWebSocketCounter.java @@ -3,6 +3,7 @@ package org.whispersystems.textsecuregcm.metrics; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; import org.whispersystems.textsecuregcm.util.ua.ClientPlatform; import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException; import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil; @@ -11,6 +12,7 @@ import java.util.Arrays; import java.util.EnumMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; public class OpenWebSocketCounter { @@ -18,14 +20,17 @@ public class OpenWebSocketCounter { private final Map openWebsocketsByClientPlatform; private final AtomicInteger openWebsocketsFromUnknownPlatforms; - public OpenWebSocketCounter(final String openWebSocketGaugeName) { - this(openWebSocketGaugeName, Tags.empty()); + private final Map durationTimersByClientPlatform; + private final Timer durationTimerForUnknownPlatforms; + + public OpenWebSocketCounter(final String openWebSocketGaugeName, final String durationTimerName) { + this(openWebSocketGaugeName, durationTimerName, Tags.empty()); } - public OpenWebSocketCounter(final String openWebSocketGaugeName, final Tags tags) { + public OpenWebSocketCounter(final String openWebSocketGaugeName, final String durationTimerName, final Tags tags) { openWebsocketsByClientPlatform = Arrays.stream(ClientPlatform.values()) .collect(Collectors.toMap( - clientPlatform -> clientPlatform, + Function.identity(), clientPlatform -> buildGauge(openWebSocketGaugeName, clientPlatform.name().toLowerCase(), tags), (a, b) -> { throw new AssertionError("Duplicate client platform enumeration key"); @@ -34,6 +39,18 @@ public class OpenWebSocketCounter { )); openWebsocketsFromUnknownPlatforms = buildGauge(openWebSocketGaugeName, "unknown", tags); + + durationTimersByClientPlatform = Arrays.stream(ClientPlatform.values()) + .collect(Collectors.toMap( + clientPlatform -> clientPlatform, + clientPlatform -> buildTimer(durationTimerName, clientPlatform.name().toLowerCase(), tags), + (a, b) -> { + throw new AssertionError("Duplicate client platform enumeration key"); + }, + () -> new EnumMap<>(ClientPlatform.class) + )); + + durationTimerForUnknownPlatforms = buildTimer(durationTimerName, "unknown", tags); } private static AtomicInteger buildGauge(final String gaugeName, final String clientPlatformName, final Tags tags) { @@ -42,18 +59,45 @@ public class OpenWebSocketCounter { new AtomicInteger(0)); } + private static Timer buildTimer(final String timerName, final String clientPlatformName, final Tags tags) { + return Timer.builder(timerName) + .publishPercentileHistogram(true) + .tags(tags.and(Tag.of(UserAgentTagUtil.PLATFORM_TAG, clientPlatformName))) + .register(Metrics.globalRegistry); + } + public void countOpenWebSocket(final WebSocketSessionContext context) { - final AtomicInteger openWebSocketCounter = getOpenWebsocketCounter(context.getClient().getUserAgent()); + final Timer.Sample sample = Timer.start(); + + // We have to jump through some hoops here to have something "effectively final" for the close listener, but + // assignable from a `catch` block. + final AtomicInteger openWebSocketCounter; + final Timer durationTimer; + + { + AtomicInteger calculatedOpenWebSocketCounter; + Timer calculatedDurationTimer; + + try { + final ClientPlatform clientPlatform = + UserAgentUtil.parseUserAgentString(context.getClient().getUserAgent()).getPlatform(); + + calculatedOpenWebSocketCounter = openWebsocketsByClientPlatform.get(clientPlatform); + calculatedDurationTimer = durationTimersByClientPlatform.get(clientPlatform); + } catch (final UnrecognizedUserAgentException e) { + calculatedOpenWebSocketCounter = openWebsocketsFromUnknownPlatforms; + calculatedDurationTimer = durationTimerForUnknownPlatforms; + } + + openWebSocketCounter = calculatedOpenWebSocketCounter; + durationTimer = calculatedDurationTimer; + } openWebSocketCounter.incrementAndGet(); - context.addWebsocketClosedListener((context1, statusCode, reason) -> openWebSocketCounter.decrementAndGet()); - } - private AtomicInteger getOpenWebsocketCounter(final String userAgentString) { - try { - return openWebsocketsByClientPlatform.get(UserAgentUtil.parseUserAgentString(userAgentString).getPlatform()); - } catch (final UnrecognizedUserAgentException e) { - return openWebsocketsFromUnknownPlatforms; - } + context.addWebsocketClosedListener((context1, statusCode, reason) -> { + sample.stop(durationTimer); + openWebSocketCounter.decrementAndGet(); + }); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java index 1e791f384..68375d341 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java @@ -7,11 +7,7 @@ package org.whispersystems.textsecuregcm.websocket; import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; -import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tags; -import io.micrometer.core.instrument.Timer; -import java.util.EnumMap; -import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -22,7 +18,6 @@ import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice; import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.metrics.OpenWebSocketCounter; -import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; @@ -30,9 +25,6 @@ import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; import org.whispersystems.textsecuregcm.storage.MessagesManager; -import org.whispersystems.textsecuregcm.util.ua.ClientPlatform; -import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException; -import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil; import org.whispersystems.websocket.session.WebSocketSessionContext; import org.whispersystems.websocket.setup.WebSocketConnectListener; import reactor.core.scheduler.Scheduler; @@ -63,11 +55,6 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { private final OpenWebSocketCounter openAuthenticatedWebSocketCounter; private final OpenWebSocketCounter openUnauthenticatedWebSocketCounter; - private final Map durationTimersByClientPlatform; - private final Map unauthenticatedDurationTimersByClientPlatform; - private final Timer durationTimerForUnknownPlatforms; - private final Timer unauthenticatedDurationTimerForUnknownPlatforms; - public AuthenticatedConnectListener(ReceiptSender receiptSender, MessagesManager messagesManager, MessageMetrics messageMetrics, @@ -89,41 +76,17 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { this.clientReleaseManager = clientReleaseManager; this.messageDeliveryLoopMonitor = messageDeliveryLoopMonitor; - durationTimersByClientPlatform = new EnumMap<>(ClientPlatform.class); - unauthenticatedDurationTimersByClientPlatform = new EnumMap<>(ClientPlatform.class); + openAuthenticatedWebSocketCounter = + new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, CONNECTED_DURATION_TIMER_NAME, Tags.of(AUTHENTICATED_TAG_NAME, "true")); - final Tags authenticatedTag = Tags.of(AUTHENTICATED_TAG_NAME, "true"); - final Tags unauthenticatedTag = Tags.of(AUTHENTICATED_TAG_NAME, "false"); - - openAuthenticatedWebSocketCounter = new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, authenticatedTag); - openUnauthenticatedWebSocketCounter = new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, unauthenticatedTag); - - for (final ClientPlatform clientPlatform : ClientPlatform.values()) { - final Tags clientPlatformTag = Tags.of(UserAgentTagUtil.PLATFORM_TAG, clientPlatform.name().toLowerCase()); - - durationTimersByClientPlatform.put(clientPlatform, - Metrics.timer(CONNECTED_DURATION_TIMER_NAME, clientPlatformTag.and(authenticatedTag))); - - unauthenticatedDurationTimersByClientPlatform.put(clientPlatform, - Metrics.timer(CONNECTED_DURATION_TIMER_NAME, clientPlatformTag.and(unauthenticatedTag))); - } - - final Tags unrecognizedPlatform = Tags.of(UserAgentTagUtil.PLATFORM_TAG, "unrecognized"); - - durationTimerForUnknownPlatforms = Metrics.timer(CONNECTED_DURATION_TIMER_NAME, - unrecognizedPlatform.and(authenticatedTag)); - - unauthenticatedDurationTimerForUnknownPlatforms = Metrics.timer(CONNECTED_DURATION_TIMER_NAME, - unrecognizedPlatform.and(unauthenticatedTag)); + openUnauthenticatedWebSocketCounter = + new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, CONNECTED_DURATION_TIMER_NAME, Tags.of(AUTHENTICATED_TAG_NAME, "false")); } @Override public void onWebSocketConnect(WebSocketSessionContext context) { final boolean authenticated = (context.getAuthenticated() != null); - final String userAgent = context.getClient().getUserAgent(); - final Timer connectionTimer = getConnectionTimer(userAgent, authenticated); - final OpenWebSocketCounter openWebSocketCounter = authenticated ? openAuthenticatedWebSocketCounter : openUnauthenticatedWebSocketCounter; @@ -131,7 +94,6 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { if (authenticated) { final AuthenticatedDevice auth = context.getAuthenticated(AuthenticatedDevice.class); - final Timer.Sample sample = Timer.start(); final WebSocketConnection connection = new WebSocketConnection(receiptSender, messagesManager, messageMetrics, @@ -147,8 +109,6 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { final AtomicReference> renewPresenceFutureReference = new AtomicReference<>(); context.addWebsocketClosedListener((closingContext, statusCode, reason) -> { - sample.stop(connectionTimer); - final ScheduledFuture renewPresenceFuture = renewPresenceFutureReference.get(); if (renewPresenceFuture != null) { @@ -197,23 +157,6 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { log.warn("Failed to initialize websocket", e); context.getClient().close(1011, "Unexpected error initializing connection"); } - } else { - final Timer.Sample sample = Timer.start(); - context.addWebsocketClosedListener((context1, statusCode, reason) -> sample.stop(connectionTimer)); - } - } - - private Timer getConnectionTimer(final String userAgentString, - final boolean authenticated) { - try { - final ClientPlatform platform = UserAgentUtil.parseUserAgentString(userAgentString).getPlatform(); - return authenticated - ? durationTimersByClientPlatform.get(platform) - : unauthenticatedDurationTimersByClientPlatform.get(platform); - } catch (final UnrecognizedUserAgentException e) { - return authenticated - ? durationTimerForUnknownPlatforms - : unauthenticatedDurationTimerForUnknownPlatforms; } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/ProvisioningConnectListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/ProvisioningConnectListener.java index 9ca3f43d3..1226596f1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/ProvisioningConnectListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/ProvisioningConnectListener.java @@ -42,7 +42,8 @@ public class ProvisioningConnectListener implements WebSocketConnectListener { public ProvisioningConnectListener(final ProvisioningManager provisioningManager) { this.provisioningManager = provisioningManager; - this.openWebSocketCounter = new OpenWebSocketCounter(MetricsUtil.name(getClass(), "openWebsockets")); + this.openWebSocketCounter = new OpenWebSocketCounter(MetricsUtil.name(getClass(), "openWebsockets"), + MetricsUtil.name(getClass(), "sessionDuration")); } @Override diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index a25e397b4..08c83f60e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -142,7 +142,7 @@ class WebSocketConnectionTest { // authenticated - valid user connectListener.onWebSocketConnect(sessionContext); - verify(sessionContext, times(2)).addWebsocketClosedListener( + verify(sessionContext, times(1)).addWebsocketClosedListener( any(WebSocketSessionContext.WebSocketEventListener.class)); // unauthenticated @@ -152,7 +152,7 @@ class WebSocketConnectionTest { assertFalse(account.invalidCredentialsProvided()); connectListener.onWebSocketConnect(sessionContext); - verify(sessionContext, times(4)).addWebsocketClosedListener( + verify(sessionContext, times(2)).addWebsocketClosedListener( any(WebSocketSessionContext.WebSocketEventListener.class)); verifyNoMoreInteractions(messagesManager);