From 0e6242373ebad80043a9785b432598d88ea0a44d Mon Sep 17 00:00:00 2001 From: ravi-signal <99042880+ravi-signal@users.noreply.github.com> Date: Tue, 30 Jan 2024 12:48:07 -0600 Subject: [PATCH] Add a monitor for virtual thread pin events --- .../WhisperServerConfiguration.java | 10 ++ .../textsecuregcm/WhisperServerService.java | 8 ++ .../VirtualThreadConfiguration.java | 9 ++ .../dynamic/DynamicConfiguration.java | 10 +- .../DynamicVirtualThreadConfiguration.java | 10 ++ .../util/VirtualThreadPinEventMonitor.java | 97 +++++++++++++++++ .../VirtualThreadPinEventMonitorTest.java | 101 ++++++++++++++++++ 7 files changed, 244 insertions(+), 1 deletion(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/VirtualThreadConfiguration.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicVirtualThreadConfiguration.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitor.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitorTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 7b32ce23b..dbca46e6a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -56,6 +56,7 @@ import org.whispersystems.textsecuregcm.configuration.SubscriptionConfiguration; import org.whispersystems.textsecuregcm.configuration.TlsKeyStoreConfiguration; import org.whispersystems.textsecuregcm.configuration.TurnSecretConfiguration; import org.whispersystems.textsecuregcm.configuration.UnidentifiedDeliveryConfiguration; +import org.whispersystems.textsecuregcm.configuration.VirtualThreadConfiguration; import org.whispersystems.textsecuregcm.configuration.ZkConfig; import org.whispersystems.textsecuregcm.limits.RateLimiterConfig; import org.whispersystems.websocket.configuration.WebSocketConfiguration; @@ -316,6 +317,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private LinkDeviceSecretConfiguration linkDevice; + @Valid + @NotNull + @JsonProperty + private VirtualThreadConfiguration virtualThreadConfiguration = new VirtualThreadConfiguration(Duration.ofMillis(1)); + public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() { return tlsKeyStore; } @@ -527,4 +533,8 @@ public class WhisperServerConfiguration extends Configuration { public LinkDeviceSecretConfiguration getLinkDeviceSecretConfiguration() { return linkDevice; } + + public VirtualThreadConfiguration getVirtualThreadConfiguration() { + return virtualThreadConfiguration; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index f7287b7d7..f6aa1a83a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -212,6 +212,7 @@ import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig; import org.whispersystems.textsecuregcm.util.ManagedAwsCrt; import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.UsernameHashZkProofVerifier; +import org.whispersystems.textsecuregcm.util.VirtualThreadPinEventMonitor; import org.whispersystems.textsecuregcm.util.logging.LoggingUnhandledExceptionMapper; import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler; import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener; @@ -434,6 +435,8 @@ public class WhisperServerService extends Application dynamicConfigurationManager.getConfiguration().getVirtualThreads().allowedPinEvents(), + config.getVirtualThreadConfiguration().pinEventThreshold()); environment.lifecycle().manage(apnSender); environment.lifecycle().manage(apnPushNotificationScheduler); @@ -638,6 +645,7 @@ public class WhisperServerService extends Application getExperimentEnrollmentConfiguration( final String experimentName) { return Optional.ofNullable(experiments.get(experimentName)); @@ -105,4 +108,9 @@ public class DynamicConfiguration { public DynamicRegistrationConfiguration getRegistrationConfiguration() { return registrationConfiguration; } + + public DynamicVirtualThreadConfiguration getVirtualThreads() { + return virtualThreads; + } + } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicVirtualThreadConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicVirtualThreadConfiguration.java new file mode 100644 index 000000000..045efa5e8 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicVirtualThreadConfiguration.java @@ -0,0 +1,10 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.configuration.dynamic; + +import java.util.Set; + +public record DynamicVirtualThreadConfiguration(Set allowedPinEvents) {} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitor.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitor.java new file mode 100644 index 000000000..2f161a22b --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitor.java @@ -0,0 +1,97 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.util; + +import com.google.common.annotations.VisibleForTesting; +import io.dropwizard.lifecycle.Managed; +import io.micrometer.core.instrument.Metrics; +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.function.BiConsumer; +import java.util.function.Supplier; +import jdk.jfr.consumer.RecordedEvent; +import jdk.jfr.consumer.RecordedFrame; +import jdk.jfr.consumer.RecordingStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; + +/** + * Watches for JFR events indicating that a virtual thread was pinned + */ +public class VirtualThreadPinEventMonitor implements Managed { + + private static final Logger logger = LoggerFactory.getLogger(VirtualThreadPinEventMonitor.class); + private static final String PIN_COUNTER_NAME = MetricsUtil.name(VirtualThreadPinEventMonitor.class, "virtualThreadPinned"); + private static final String JFR_THREAD_PINNED_EVENT_NAME = "jdk.VirtualThreadPinned"; + private static final long MAX_JFR_REPOSITORY_SIZE = 1024 * 1024 * 4L; // 4MiB + + private final ExecutorService executorService; + private final Supplier> allowList; + private final Duration pinEventThreshold; + private final RecordingStream recordingStream; + + private BiConsumer pinEventConsumer; + + @VisibleForTesting + VirtualThreadPinEventMonitor( + final ExecutorService executorService, + final Supplier> allowList, + final Duration pinEventThreshold, + final BiConsumer pinEventConsumer) { + this.executorService = executorService; + this.allowList = allowList; + this.pinEventThreshold = pinEventThreshold; + this.pinEventConsumer = pinEventConsumer; + this.recordingStream = new RecordingStream(); + } + public VirtualThreadPinEventMonitor( + final ExecutorService executorService, + final Supplier> allowList, + final Duration pinEventThreshold) { + this(executorService, allowList, pinEventThreshold, VirtualThreadPinEventMonitor::processPinEvent); + } + + @Override + public void start() { + recordingStream.setMaxSize(MAX_JFR_REPOSITORY_SIZE); + recordingStream.enable(JFR_THREAD_PINNED_EVENT_NAME).withThreshold(pinEventThreshold).withStackTrace(); + recordingStream.onEvent(event -> pinEventConsumer.accept(event, allowed(event))); + executorService.submit(() -> recordingStream.start()); + } + + @Override + public void stop() throws InterruptedException { + // flushes events and waits for callbacks to finish + recordingStream.stop(); + // immediately frees all resources + recordingStream.close(); + } + + private static void processPinEvent(final RecordedEvent event, final boolean allowedPinEvent) { + if (allowedPinEvent) { + logger.info("Long allowed virtual thread pin event detected", event); + } else { + logger.error("Long forbidden virtual thread pin event detected", event); + } + Metrics.counter(PIN_COUNTER_NAME, "allowed", String.valueOf(allowedPinEvent)).increment(); + } + + private boolean allowed(final RecordedEvent event) { + final Set allowedMethodFrames = allowList.get(); + for (RecordedFrame st : event.getStackTrace().getFrames()) { + if (!st.isJavaFrame()) { + continue; + } + final String qualifiedName = "%s.%s".formatted(st.getMethod().getType().getName(), st.getMethod().getName()); + if (allowedMethodFrames.stream().anyMatch(qualifiedName::contains)) { + return true; + } + } + return false; + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitorTest.java new file mode 100644 index 000000000..4c93704eb --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitorTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.util; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import jdk.jfr.consumer.RecordedEvent; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + + +public class VirtualThreadPinEventMonitorTest { + private void synchronizedSleep1() { + synchronized (this) { + try { + Thread.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private void synchronizedSleep2() { + synchronized (this) { + try { + Thread.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + @Test + public void testPinEventProduced() throws InterruptedException, ExecutionException { + final BlockingQueue> bq = new LinkedBlockingQueue<>(); + final ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor(); + VirtualThreadPinEventMonitor eventMonitor = queueingLogger(exec, Set.of(), bq); + eventMonitor.start(); + // give start a moment to begin the event stream thread + Thread.sleep(100); + exec.submit(() -> synchronizedSleep1()).get(); + eventMonitor.stop(); + + final Pair nxt = bq.take(); + assertThat(nxt.getRight()).isFalse(); + assertThat(bq.isEmpty()); + exec.shutdown(); + exec.awaitTermination(1, TimeUnit.MILLISECONDS); + } + + @ParameterizedTest + @ValueSource(strings = {"VirtualThreadPinEventMonitorTest.synchronizedSleep1", "synchronizedSleep1"}) + public void testPinEventFiltered(final String allowString) throws InterruptedException, ExecutionException { + final BlockingQueue> bq = new LinkedBlockingQueue<>(); + final ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor(); + final VirtualThreadPinEventMonitor eventMonitor = queueingLogger(exec, Set.of(allowString), bq); + eventMonitor.start(); + // give start a moment to begin the event stream thread + Thread.sleep(100); + exec.submit(() -> synchronizedSleep1()).get(); + exec.submit(() -> synchronizedSleep2()).get(); + eventMonitor.stop(); + + final boolean sleep1Allowed = bq.take().getRight(); + final boolean sleep2Allowed = bq.take().getRight(); + assertThat(sleep1Allowed).isTrue(); + assertThat(sleep2Allowed).isFalse(); + assertThat(bq.isEmpty()); + exec.shutdown(); + exec.awaitTermination(1, TimeUnit.MILLISECONDS); + } + + private static VirtualThreadPinEventMonitor queueingLogger( + final ExecutorService exec, + final Set allowedMethods, + final BlockingQueue> bq) { + return new VirtualThreadPinEventMonitor(exec, + () -> allowedMethods, + Duration.ofMillis(1), + (event, allowed) -> { + try { + bq.put(Pair.of(event, allowed)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + } +}