Capture a thread dump when Redis commands time out.
This commit is contained in:
parent
4d5fbec5a5
commit
6c78d7544f
|
@ -16,7 +16,9 @@ import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
|
import org.whispersystems.textsecuregcm.util.ThreadDumpUtil;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
@ -31,8 +33,9 @@ public class FaultTolerantPubSubConnection<K, V> {
|
||||||
private final CircuitBreaker circuitBreaker;
|
private final CircuitBreaker circuitBreaker;
|
||||||
private final Retry retry;
|
private final Retry retry;
|
||||||
|
|
||||||
private final Timer executeTimer;
|
private final Timer executeTimer;
|
||||||
private final Meter commandTimeoutMeter;
|
private final Meter commandTimeoutMeter;
|
||||||
|
private final AtomicBoolean wroteThreadDump = new AtomicBoolean(false);
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class);
|
private static final Logger log = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class);
|
||||||
|
|
||||||
|
@ -56,8 +59,7 @@ public class FaultTolerantPubSubConnection<K, V> {
|
||||||
try (final Timer.Context ignored = executeTimer.time()) {
|
try (final Timer.Context ignored = executeTimer.time()) {
|
||||||
consumer.accept(pubSubConnection);
|
consumer.accept(pubSubConnection);
|
||||||
} catch (final RedisCommandTimeoutException e) {
|
} catch (final RedisCommandTimeoutException e) {
|
||||||
commandTimeoutMeter.mark();
|
recordCommandTimeout(e);
|
||||||
log.warn("Command timeout exception ({}-pubsub)", this.name, e);
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
@ -78,8 +80,7 @@ public class FaultTolerantPubSubConnection<K, V> {
|
||||||
try (final Timer.Context ignored = executeTimer.time()) {
|
try (final Timer.Context ignored = executeTimer.time()) {
|
||||||
return function.apply(pubSubConnection);
|
return function.apply(pubSubConnection);
|
||||||
} catch (final RedisCommandTimeoutException e) {
|
} catch (final RedisCommandTimeoutException e) {
|
||||||
commandTimeoutMeter.mark();
|
recordCommandTimeout(e);
|
||||||
log.warn("Command timeout exception ({}-pubsub)", this.name, e);
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
@ -93,4 +94,13 @@ public class FaultTolerantPubSubConnection<K, V> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void recordCommandTimeout(final RedisCommandTimeoutException e) {
|
||||||
|
commandTimeoutMeter.mark();
|
||||||
|
log.warn("Command timeout exception ({}-pubsub)", this.name, e);
|
||||||
|
|
||||||
|
if (wroteThreadDump.compareAndSet(false, true)) {
|
||||||
|
ThreadDumpUtil.writeThreadDump();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,10 +25,12 @@ import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
|
import org.whispersystems.textsecuregcm.util.ThreadDumpUtil;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -53,7 +55,8 @@ public class FaultTolerantRedisCluster {
|
||||||
private final CircuitBreaker circuitBreaker;
|
private final CircuitBreaker circuitBreaker;
|
||||||
private final Retry retry;
|
private final Retry retry;
|
||||||
|
|
||||||
private final Meter commandTimeoutMeter;
|
private final Meter commandTimeoutMeter;
|
||||||
|
private final AtomicBoolean wroteThreadDump = new AtomicBoolean(false);
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisCluster.class);
|
private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisCluster.class);
|
||||||
|
|
||||||
|
@ -122,8 +125,7 @@ public class FaultTolerantRedisCluster {
|
||||||
try {
|
try {
|
||||||
consumer.accept(connection);
|
consumer.accept(connection);
|
||||||
} catch (final RedisCommandTimeoutException e) {
|
} catch (final RedisCommandTimeoutException e) {
|
||||||
commandTimeoutMeter.mark();
|
recordCommandTimeout(e);
|
||||||
log.warn("Command timeout exception ({})", this.name, e);
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
@ -144,8 +146,7 @@ public class FaultTolerantRedisCluster {
|
||||||
try {
|
try {
|
||||||
return function.apply(connection);
|
return function.apply(connection);
|
||||||
} catch (final RedisCommandTimeoutException e) {
|
} catch (final RedisCommandTimeoutException e) {
|
||||||
commandTimeoutMeter.mark();
|
recordCommandTimeout(e);
|
||||||
log.warn("Command timeout exception ({})", this.name, e);
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
@ -160,6 +161,15 @@ public class FaultTolerantRedisCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void recordCommandTimeout(final RedisCommandTimeoutException e) {
|
||||||
|
commandTimeoutMeter.mark();
|
||||||
|
log.warn("Command timeout exception ({})", this.name, e);
|
||||||
|
|
||||||
|
if (wroteThreadDump.compareAndSet(false, true)) {
|
||||||
|
ThreadDumpUtil.writeThreadDump();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public FaultTolerantPubSubConnection<String, String> createPubSubConnection() {
|
public FaultTolerantPubSubConnection<String, String> createPubSubConnection() {
|
||||||
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub();
|
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub();
|
||||||
pubSubConnections.add(pubSubConnection);
|
pubSubConnections.add(pubSubConnection);
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
package org.whispersystems.textsecuregcm.util;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.lang.management.ThreadInfo;
|
||||||
|
|
||||||
|
public class ThreadDumpUtil {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ThreadDumpUtil.class);
|
||||||
|
|
||||||
|
public static void writeThreadDump() {
|
||||||
|
try {
|
||||||
|
try (final PrintWriter out = new PrintWriter(File.createTempFile("thread_dump_", ".txt"))) {
|
||||||
|
for (ThreadInfo info : ManagementFactory.getThreadMXBean().dumpAllThreads(true, true)) {
|
||||||
|
out.print(info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (final IOException e) {
|
||||||
|
log.warn("Failed to write thread dump", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue