From fb166f4115ec797ec816982a425542d28fef03fb Mon Sep 17 00:00:00 2001 From: Radon Rosborough Date: Sun, 12 Feb 2023 18:46:56 -0800 Subject: [PATCH] Fix some thread leaks --- backend/k8s.js | 7 ++++++- backend/sandbox-k8s.js | 8 ++++++-- backend/util.js | 45 ++++++++++++++++++++++++++++++++++++++---- system/src/riju-pty.c | 10 ++++++++++ 4 files changed, 63 insertions(+), 7 deletions(-) diff --git a/backend/k8s.js b/backend/k8s.js index 83c98c6..b462b6c 100644 --- a/backend/k8s.js +++ b/backend/k8s.js @@ -237,8 +237,9 @@ export async function initUserSession({ watcher, podName, proxyInfo }) { let done = false; try { return await new Promise(async (resolve, reject) => { + let timeout = null; try { - setTimeout( + timeout = setTimeout( () => reject("timed out waiting for pod to become ready"), 5 * 60 * 1000 ); @@ -365,6 +366,10 @@ export async function initUserSession({ watcher, podName, proxyInfo }) { }); } catch (err) { reject(err); + } finally { + if (timeout) { + clearTimeout(timeout); + } } }); } finally { diff --git a/backend/sandbox-k8s.js b/backend/sandbox-k8s.js index 8174e13..effb26a 100644 --- a/backend/sandbox-k8s.js +++ b/backend/sandbox-k8s.js @@ -59,13 +59,16 @@ async function main() { // Use a queue to resolve the circular dependency between exec and // pty. const outputQueue = new PQueue({ concurrency: 1, autoStart: false }); - let handlePtyOutput; + let handlePtyOutput, handlePtyExit; const exec = await session.exec(["bash"], { pty: true, on: { stdout: (data) => outputQueue.add(() => handlePtyOutput(data)), stderr: (data) => process.stderr.write(data), - exit: (status) => process.exit(status), + exit: (status) => { + handlePtyExit(); + process.exit(status); + }, error: (err) => process.stderr.write(`riju: error: ${err}\n`), close: () => resolve(), }, @@ -75,6 +78,7 @@ async function main() { handlePtyExit: (_status) => {}, }); handlePtyOutput = pty.handlePtyOutput; + handlePtyExit = pty.handlePtyExit; outputQueue.start(); }); } diff --git a/backend/util.js b/backend/util.js index 4fb8162..9ea428f 100644 --- a/backend/util.js +++ b/backend/util.js @@ -165,7 +165,27 @@ export function asBool(value, def) { throw new Error(`asBool doesn't understand value: ${value}`); } -export function deptyify({ handlePtyInput, handlePtyExit }) { +// This function is a little wrapper for riju-pty to help avoid +// reimplementing the C logic in JavaScript, because it was a pain to +// work out the first time. This is done by using named pipes (FIFOs) +// to ferry IPC data back and forth between C and JavaScript. +// +// When calling this function it will create a riju-pty subprocess, +// and invoke your provided handlePtyInput function when the user +// types into the current terminal. It will also return a +// handlePtyOutput function that you can call to cause output to be +// displayed on the current terminal. +// +// Also returned is a handlePtyExit function that you should call when +// the process is supposed to terminate, this will shut down the +// riju-pty subprocess and teardown associated resources. +// +// The point of using this instead of just reading and writing output +// using the normal functions, is that the read/write operations act +// on raw pty control sequences, meaning they can be hooked up to the +// Riju agent control stream from a user session container in the +// cluster. +export function deptyify({ handlePtyInput }) { return new Promise((resolve, reject) => { const done = false; let triggerDone = () => { @@ -204,13 +224,13 @@ export function deptyify({ handlePtyInput, handlePtyExit }) { proc.on("spawn", resolve); proc.on("error", reject); }); - proc.on("exit", (status) => { - handlePtyExit(status); + proc.on("exit", (_status) => { triggerDone(); }); const output = await new Promise((resolve, reject) => { - setTimeout(() => reject("timed out"), 5000); + const timeout = setTimeout(() => reject("timed out"), 5000); resolve(fs.open(`${dir.path}/output`, "w")); + clearTimeout(timeout); }); const input = fsBase.createReadStream(`${dir.path}/input`); setTimeout(async () => { @@ -226,6 +246,23 @@ export function deptyify({ handlePtyInput, handlePtyExit }) { handlePtyOutput: (data) => { output.write(data); }, + handlePtyExit: async () => { + try { + // SIGTERM, wait for proc to exit, if it doesn't, + // then SIGKILL. + proc.kill("SIGTERM"); + try { + await new Promise((resolve, reject) => { + proc.on("exit", resolve); + setTimeout(reject, 1000); + }); + } catch (err) { + proc.kill("SIGKILL"); + } + } catch (err) { + logError(err); + } + }, }); // Wait before deleting tmpdir... await new Promise((resolve) => { diff --git a/system/src/riju-pty.c b/system/src/riju-pty.c index 0db7231..04459a4 100644 --- a/system/src/riju-pty.c +++ b/system/src/riju-pty.c @@ -42,6 +42,12 @@ void restore_tty() die("tcsetattr failed"); } +void handle_signal(int signum) +{ + restore_tty(); + signal(signum, SIG_DFL); +} + int main(int argc, char **argv) { argc -= 1; @@ -76,6 +82,10 @@ int main(int argc, char **argv) die("tcsetattr failed"); if (atexit(restore_tty) < 0) die("atexit failed"); + if (signal(SIGTERM, handle_signal) == SIG_ERR) + die("signal failed"); + if (signal(SIGINT, handle_signal) == SIG_ERR) + die("signal failed"); } else { if (errno != ENOTTY) die("isatty failed");