reintroduce VirtualThreadPinEventMonitor
This commit is contained in:
parent
480abebf7e
commit
b483159b3a
|
@ -56,6 +56,7 @@ import org.whispersystems.textsecuregcm.configuration.SubscriptionConfiguration;
|
|||
import org.whispersystems.textsecuregcm.configuration.TlsKeyStoreConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.TurnSecretConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.UnidentifiedDeliveryConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.VirtualThreadConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.ZkConfig;
|
||||
import org.whispersystems.textsecuregcm.limits.RateLimiterConfig;
|
||||
import org.whispersystems.websocket.configuration.WebSocketConfiguration;
|
||||
|
@ -316,6 +317,11 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
@JsonProperty
|
||||
private LinkDeviceSecretConfiguration linkDevice;
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private VirtualThreadConfiguration virtualThreadConfiguration = new VirtualThreadConfiguration(Duration.ofMillis(1));
|
||||
|
||||
public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() {
|
||||
return tlsKeyStore;
|
||||
}
|
||||
|
@ -527,4 +533,8 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
public LinkDeviceSecretConfiguration getLinkDeviceSecretConfiguration() {
|
||||
return linkDevice;
|
||||
}
|
||||
|
||||
public VirtualThreadConfiguration getVirtualThreadConfiguration() {
|
||||
return virtualThreadConfiguration;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -212,6 +212,7 @@ import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
|
|||
import org.whispersystems.textsecuregcm.util.ManagedAwsCrt;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
import org.whispersystems.textsecuregcm.util.UsernameHashZkProofVerifier;
|
||||
import org.whispersystems.textsecuregcm.util.VirtualThreadPinEventMonitor;
|
||||
import org.whispersystems.textsecuregcm.util.logging.LoggingUnhandledExceptionMapper;
|
||||
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
|
||||
import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener;
|
||||
|
@ -434,6 +435,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
.executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build();
|
||||
ExecutorService storageServiceExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
|
||||
ExecutorService virtualThreadEventLoggerExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "virtualThreadEventLogger-%d")).minThreads(1).maxThreads(1).build();
|
||||
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "secureValueRecoveryServiceRetry-%d")).threads(1).build();
|
||||
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
|
||||
|
@ -629,6 +632,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
CoinMarketCapClient coinMarketCapClient = new CoinMarketCapClient(currencyClient, config.getPaymentsServiceConfiguration().coinMarketCapApiKey().value(), config.getPaymentsServiceConfiguration().coinMarketCapCurrencyIds());
|
||||
CurrencyConversionManager currencyManager = new CurrencyConversionManager(fixerClient, coinMarketCapClient,
|
||||
cacheCluster, config.getPaymentsServiceConfiguration().paymentCurrencies(), recurringJobExecutor, Clock.systemUTC());
|
||||
VirtualThreadPinEventMonitor virtualThreadPinEventMonitor = new VirtualThreadPinEventMonitor(
|
||||
virtualThreadEventLoggerExecutor,
|
||||
() -> dynamicConfigurationManager.getConfiguration().getVirtualThreads().allowedPinEvents(),
|
||||
config.getVirtualThreadConfiguration().pinEventThreshold());
|
||||
|
||||
environment.lifecycle().manage(apnSender);
|
||||
environment.lifecycle().manage(apnPushNotificationScheduler);
|
||||
|
@ -638,6 +645,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
environment.lifecycle().manage(currencyManager);
|
||||
environment.lifecycle().manage(registrationServiceClient);
|
||||
environment.lifecycle().manage(clientReleaseManager);
|
||||
environment.lifecycle().manage(virtualThreadPinEventMonitor);
|
||||
|
||||
final RegistrationCaptchaManager registrationCaptchaManager = new RegistrationCaptchaManager(captchaChecker,
|
||||
rateLimiters, config.getTestDevices(), dynamicConfigurationManager);
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.configuration;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
public record VirtualThreadConfiguration(Duration pinEventThreshold) {}
|
|
@ -55,11 +55,14 @@ public class DynamicConfiguration {
|
|||
@Valid
|
||||
DynamicInboundMessageByteLimitConfiguration inboundMessageByteLimit = new DynamicInboundMessageByteLimitConfiguration(true);
|
||||
|
||||
|
||||
@JsonProperty
|
||||
@Valid
|
||||
DynamicRegistrationConfiguration registrationConfiguration = new DynamicRegistrationConfiguration(false);
|
||||
|
||||
@JsonProperty
|
||||
@Valid
|
||||
DynamicVirtualThreadConfiguration virtualThreads = new DynamicVirtualThreadConfiguration(Collections.emptySet());
|
||||
|
||||
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
|
||||
final String experimentName) {
|
||||
return Optional.ofNullable(experiments.get(experimentName));
|
||||
|
@ -105,4 +108,9 @@ public class DynamicConfiguration {
|
|||
public DynamicRegistrationConfiguration getRegistrationConfiguration() {
|
||||
return registrationConfiguration;
|
||||
}
|
||||
|
||||
public DynamicVirtualThreadConfiguration getVirtualThreads() {
|
||||
return virtualThreads;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.configuration.dynamic;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public record DynamicVirtualThreadConfiguration(Set<String> allowedPinEvents) {}
|
|
@ -0,0 +1,142 @@
|
|||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.util;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import java.time.Duration;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import jdk.jfr.consumer.RecordedEvent;
|
||||
import jdk.jfr.consumer.RecordedFrame;
|
||||
import jdk.jfr.consumer.RecordedStackTrace;
|
||||
import jdk.jfr.consumer.RecordedThread;
|
||||
import jdk.jfr.consumer.RecordingStream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
|
||||
/**
|
||||
* Watches for JFR events indicating that a virtual thread was pinned
|
||||
*/
|
||||
public class VirtualThreadPinEventMonitor implements Managed {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(VirtualThreadPinEventMonitor.class);
|
||||
private static final String PIN_COUNTER_NAME = MetricsUtil.name(VirtualThreadPinEventMonitor.class,
|
||||
"virtualThreadPinned");
|
||||
private static final String JFR_THREAD_PINNED_EVENT_NAME = "jdk.VirtualThreadPinned";
|
||||
private static final long MAX_JFR_REPOSITORY_SIZE = 1024 * 1024 * 4L; // 4MiB
|
||||
|
||||
private final ExecutorService executorService;
|
||||
private final Supplier<Set<String>> allowList;
|
||||
private final Duration pinEventThreshold;
|
||||
private final RecordingStream recordingStream;
|
||||
|
||||
private final BiConsumer<RecordedEvent, Boolean> pinEventConsumer;
|
||||
|
||||
@VisibleForTesting
|
||||
VirtualThreadPinEventMonitor(
|
||||
final ExecutorService executorService,
|
||||
final Supplier<Set<String>> allowList,
|
||||
final Duration pinEventThreshold,
|
||||
final BiConsumer<RecordedEvent, Boolean> pinEventConsumer) {
|
||||
this.executorService = executorService;
|
||||
this.allowList = allowList;
|
||||
this.pinEventThreshold = pinEventThreshold;
|
||||
this.pinEventConsumer = pinEventConsumer;
|
||||
this.recordingStream = new RecordingStream();
|
||||
}
|
||||
public VirtualThreadPinEventMonitor(
|
||||
final ExecutorService executorService,
|
||||
final Supplier<Set<String>> allowList,
|
||||
final Duration pinEventThreshold) {
|
||||
this(executorService, allowList, pinEventThreshold, VirtualThreadPinEventMonitor::processPinEvent);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
recordingStream.setMaxSize(MAX_JFR_REPOSITORY_SIZE);
|
||||
recordingStream.enable(JFR_THREAD_PINNED_EVENT_NAME).withThreshold(pinEventThreshold).withStackTrace();
|
||||
recordingStream.onEvent(JFR_THREAD_PINNED_EVENT_NAME, event -> pinEventConsumer.accept(event, allowed(event)));
|
||||
executorService.submit(recordingStream::start);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws InterruptedException {
|
||||
// flushes events and waits for callbacks to finish
|
||||
recordingStream.stop();
|
||||
// immediately frees all resources
|
||||
recordingStream.close();
|
||||
}
|
||||
|
||||
private static void processPinEvent(final RecordedEvent event, final boolean allowedPinEvent) {
|
||||
if (allowedPinEvent) {
|
||||
logger.info("Long allowed virtual thread pin event detected {}", prettyEventString(event));
|
||||
} else {
|
||||
logger.error("Long forbidden virtual thread pin event detected {}", prettyEventString(event));
|
||||
}
|
||||
Metrics.counter(PIN_COUNTER_NAME, "allowed", String.valueOf(allowedPinEvent)).increment();
|
||||
}
|
||||
|
||||
private boolean allowed(final RecordedEvent event) {
|
||||
final Set<String> allowedMethodFrames = allowList.get();
|
||||
if (event.getStackTrace() == null) {
|
||||
return false;
|
||||
}
|
||||
for (RecordedFrame st : event.getStackTrace().getFrames()) {
|
||||
if (!st.isJavaFrame()) {
|
||||
continue;
|
||||
}
|
||||
final String qualifiedName = "%s.%s".formatted(st.getMethod().getType().getName(), st.getMethod().getName());
|
||||
if (allowedMethodFrames.stream().anyMatch(qualifiedName::contains)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static String prettyEventString(final RecordedEvent event) {
|
||||
// event.toString() hard codes a stack depth of 5, which is not enough to
|
||||
// determine the source of the event in most cases
|
||||
|
||||
return """
|
||||
%s {
|
||||
startTime = %s
|
||||
duration = %s
|
||||
eventThread = %s
|
||||
stackTrace = %s
|
||||
}""".formatted(event.getEventType().getName(),
|
||||
event.getStartTime(),
|
||||
event.getDuration(),
|
||||
prettyThreadString(event.getThread()),
|
||||
prettyStackTraceString(event.getStackTrace(), " "));
|
||||
}
|
||||
|
||||
private static String prettyStackTraceString(final RecordedStackTrace st, final String indent) {
|
||||
if (st == null) {
|
||||
return "n/a";
|
||||
}
|
||||
// No need to put a limit, by default JFR stack traces are limited to 64 frames. They can be increased at jvm start
|
||||
// with the FlightRecorderOptions stackdepth option
|
||||
return "[\n" + indent + indent + st.getFrames().stream()
|
||||
.filter(RecordedFrame::isJavaFrame)
|
||||
.map(frame -> "%s.%s:%s".formatted(frame.getMethod().getType().getName(), frame.getMethod().getName(), frame.getLineNumber()))
|
||||
.collect(Collectors.joining(",\n" + indent + indent))
|
||||
+ "\n" + indent + "]";
|
||||
}
|
||||
|
||||
private static String prettyThreadString(final RecordedThread thread) {
|
||||
if (thread == null) {
|
||||
return "n/a";
|
||||
}
|
||||
return "%s (javaThreadId = %s)".formatted(thread.getJavaName(), thread.getJavaThreadId()) ;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.util;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import jdk.jfr.consumer.RecordedEvent;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
|
||||
public class VirtualThreadPinEventMonitorTest {
|
||||
private void synchronizedSleep1() {
|
||||
synchronized (this) {
|
||||
try {
|
||||
Thread.sleep(2);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void synchronizedSleep2() {
|
||||
synchronized (this) {
|
||||
try {
|
||||
Thread.sleep(2);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled("flaky: no way to ensure the sequencing between the start of the recording stream and emitting the event")
|
||||
public void testPinEventProduced() throws InterruptedException, ExecutionException {
|
||||
final BlockingQueue<Pair<RecordedEvent, Boolean>> bq = new LinkedBlockingQueue<>();
|
||||
final ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor();
|
||||
VirtualThreadPinEventMonitor eventMonitor = queueingLogger(exec, Set.of(), bq);
|
||||
eventMonitor.start();
|
||||
// give start a moment to begin the event stream thread
|
||||
Thread.sleep(100);
|
||||
exec.submit(() -> synchronizedSleep1()).get();
|
||||
eventMonitor.stop();
|
||||
|
||||
final Pair<RecordedEvent, Boolean> event = bq.poll(1, TimeUnit.SECONDS);
|
||||
assertThat(event).isNotNull();
|
||||
assertThat(event.getRight()).isFalse();
|
||||
assertThat(bq.isEmpty());
|
||||
exec.shutdown();
|
||||
exec.awaitTermination(1, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {"VirtualThreadPinEventMonitorTest.synchronizedSleep1", "synchronizedSleep1"})
|
||||
@Disabled("flaky: no way to ensure the sequencing between the start of the recording stream and emitting the event")
|
||||
public void testPinEventFiltered(final String allowString) throws InterruptedException, ExecutionException {
|
||||
final BlockingQueue<Pair<RecordedEvent, Boolean>> bq = new LinkedBlockingQueue<>();
|
||||
final ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor();
|
||||
final VirtualThreadPinEventMonitor eventMonitor = queueingLogger(exec, Set.of(allowString), bq);
|
||||
eventMonitor.start();
|
||||
// give start a moment to begin the event stream thread
|
||||
Thread.sleep(100);
|
||||
exec.submit(() -> synchronizedSleep1()).get();
|
||||
exec.submit(() -> synchronizedSleep2()).get();
|
||||
eventMonitor.stop();
|
||||
|
||||
final Pair<RecordedEvent, Boolean> sleep1Event = bq.poll(1, TimeUnit.SECONDS);
|
||||
final Pair<RecordedEvent, Boolean> sleep2Event = bq.poll(1, TimeUnit.SECONDS);
|
||||
assertThat(sleep1Event).isNotNull();
|
||||
assertThat(sleep2Event).isNotNull();
|
||||
assertThat(sleep1Event.getRight()).isTrue();
|
||||
assertThat(sleep2Event.getRight()).isFalse();
|
||||
assertThat(bq.isEmpty());
|
||||
exec.shutdown();
|
||||
exec.awaitTermination(1, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private static VirtualThreadPinEventMonitor queueingLogger(
|
||||
final ExecutorService exec,
|
||||
final Set<String> allowedMethods,
|
||||
final BlockingQueue<Pair<RecordedEvent, Boolean>> bq) {
|
||||
return new VirtualThreadPinEventMonitor(exec,
|
||||
() -> allowedMethods,
|
||||
Duration.ofMillis(1),
|
||||
(event, allowed) -> {
|
||||
try {
|
||||
bq.put(Pair.of(event, allowed));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue