Fix some thread leaks

This commit is contained in:
Radon Rosborough 2023-02-12 18:46:56 -08:00
parent e7366b15e0
commit fb166f4115
4 changed files with 63 additions and 7 deletions

View File

@ -237,8 +237,9 @@ export async function initUserSession({ watcher, podName, proxyInfo }) {
let done = false;
try {
return await new Promise(async (resolve, reject) => {
let timeout = null;
try {
setTimeout(
timeout = setTimeout(
() => reject("timed out waiting for pod to become ready"),
5 * 60 * 1000
);
@ -365,6 +366,10 @@ export async function initUserSession({ watcher, podName, proxyInfo }) {
});
} catch (err) {
reject(err);
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
});
} finally {

View File

@ -59,13 +59,16 @@ async function main() {
// Use a queue to resolve the circular dependency between exec and
// pty.
const outputQueue = new PQueue({ concurrency: 1, autoStart: false });
let handlePtyOutput;
let handlePtyOutput, handlePtyExit;
const exec = await session.exec(["bash"], {
pty: true,
on: {
stdout: (data) => outputQueue.add(() => handlePtyOutput(data)),
stderr: (data) => process.stderr.write(data),
exit: (status) => process.exit(status),
exit: (status) => {
handlePtyExit();
process.exit(status);
},
error: (err) => process.stderr.write(`riju: error: ${err}\n`),
close: () => resolve(),
},
@ -75,6 +78,7 @@ async function main() {
handlePtyExit: (_status) => {},
});
handlePtyOutput = pty.handlePtyOutput;
handlePtyExit = pty.handlePtyExit;
outputQueue.start();
});
}

View File

@ -165,7 +165,27 @@ export function asBool(value, def) {
throw new Error(`asBool doesn't understand value: ${value}`);
}
export function deptyify({ handlePtyInput, handlePtyExit }) {
// This function is a little wrapper for riju-pty to help avoid
// reimplementing the C logic in JavaScript, because it was a pain to
// work out the first time. This is done by using named pipes (FIFOs)
// to ferry IPC data back and forth between C and JavaScript.
//
// When calling this function it will create a riju-pty subprocess,
// and invoke your provided handlePtyInput function when the user
// types into the current terminal. It will also return a
// handlePtyOutput function that you can call to cause output to be
// displayed on the current terminal.
//
// Also returned is a handlePtyExit function that you should call when
// the process is supposed to terminate, this will shut down the
// riju-pty subprocess and teardown associated resources.
//
// The point of using this instead of just reading and writing output
// using the normal functions, is that the read/write operations act
// on raw pty control sequences, meaning they can be hooked up to the
// Riju agent control stream from a user session container in the
// cluster.
export function deptyify({ handlePtyInput }) {
return new Promise((resolve, reject) => {
const done = false;
let triggerDone = () => {
@ -204,13 +224,13 @@ export function deptyify({ handlePtyInput, handlePtyExit }) {
proc.on("spawn", resolve);
proc.on("error", reject);
});
proc.on("exit", (status) => {
handlePtyExit(status);
proc.on("exit", (_status) => {
triggerDone();
});
const output = await new Promise((resolve, reject) => {
setTimeout(() => reject("timed out"), 5000);
const timeout = setTimeout(() => reject("timed out"), 5000);
resolve(fs.open(`${dir.path}/output`, "w"));
clearTimeout(timeout);
});
const input = fsBase.createReadStream(`${dir.path}/input`);
setTimeout(async () => {
@ -226,6 +246,23 @@ export function deptyify({ handlePtyInput, handlePtyExit }) {
handlePtyOutput: (data) => {
output.write(data);
},
handlePtyExit: async () => {
try {
// SIGTERM, wait for proc to exit, if it doesn't,
// then SIGKILL.
proc.kill("SIGTERM");
try {
await new Promise((resolve, reject) => {
proc.on("exit", resolve);
setTimeout(reject, 1000);
});
} catch (err) {
proc.kill("SIGKILL");
}
} catch (err) {
logError(err);
}
},
});
// Wait before deleting tmpdir...
await new Promise((resolve) => {

View File

@ -42,6 +42,12 @@ void restore_tty()
die("tcsetattr failed");
}
void handle_signal(int signum)
{
restore_tty();
signal(signum, SIG_DFL);
}
int main(int argc, char **argv)
{
argc -= 1;
@ -76,6 +82,10 @@ int main(int argc, char **argv)
die("tcsetattr failed");
if (atexit(restore_tty) < 0)
die("atexit failed");
if (signal(SIGTERM, handle_signal) == SIG_ERR)
die("signal failed");
if (signal(SIGINT, handle_signal) == SIG_ERR)
die("signal failed");
} else {
if (errno != ENOTTY)
die("isatty failed");