From ddb0084b0cb1faf4afa905f71b5f5d05ff90e1b1 Mon Sep 17 00:00:00 2001 From: Radon Rosborough Date: Wed, 4 Jan 2023 18:27:26 -0700 Subject: [PATCH] More k8s websocket proxying --- agent/main.go | 2 +- agent/process.go | 16 ++++ backend/k8s.js | 198 +++++++++++++++++++++++++++++++++++++---- backend/sandbox-k8s.js | 77 ++++++++-------- backend/util.js | 73 +++++++++++++++ package.json | 5 +- yarn.lock | 51 ++++++++++- 7 files changed, 361 insertions(+), 61 deletions(-) diff --git a/agent/main.go b/agent/main.go index 1eda23e..0861459 100644 --- a/agent/main.go +++ b/agent/main.go @@ -20,7 +20,7 @@ type clientMessage struct { } type serverMessage struct { - // "start", "stdout", "stderr", "exit", "warn", "error" + // "stdout", "stderr", "exit", "warn", "error" Event string `json:"event"` // contents of stdout/stderr Data []byte `json:"data,omitempty"` diff --git a/agent/process.go b/agent/process.go index 5739e9e..727a020 100644 --- a/agent/process.go +++ b/agent/process.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + "strings" "syscall" "time" ) @@ -26,6 +27,17 @@ type managedProcess struct { CloseChan chan struct{} } +func cleanEnv(env []string) []string { + newEnv := []string{} + for _, entry := range env { + if strings.HasPrefix(entry, "RIJU_") || strings.HasPrefix(entry, "KUBERNETES_") { + continue + } + newEnv = append(newEnv, entry) + } + return newEnv +} + func NewManagedProcess(name string, argv []string, attr *os.ProcAttr) (*managedProcess, error) { mp := &managedProcess{ internalExitChan: make(chan struct{}, 16), @@ -66,6 +78,10 @@ func NewManagedProcess(name string, argv []string, attr *os.ProcAttr) (*managedP newAttr.Files[1] = mp.stdoutWrite newAttr.Files[2] = mp.stderrWrite } + if newAttr.Env == nil { + newAttr.Env = os.Environ() + } + newAttr.Env = cleanEnv(newAttr.Env) mp.proc, err = os.StartProcess(name, argv, newAttr) if err != nil { return mp, fmt.Errorf("spawning process: %w", err) diff --git a/backend/k8s.js b/backend/k8s.js index 116e283..57c5318 100644 --- a/backend/k8s.js +++ b/backend/k8s.js @@ -1,5 +1,6 @@ import * as k8sClient from "@kubernetes/client-node"; -import lodash from "lodash"; +import fetch from "node-fetch"; +import WebSocket from "ws"; const kubeconfig = new k8sClient.KubeConfig(); kubeconfig.loadFromDefault(); @@ -46,6 +47,9 @@ export function watchPods() { callback("add", pods[podName]); } }, + podExists: (podName) => { + return podName in pods; + }, }; } @@ -125,11 +129,59 @@ export async function createUserSession({ }, command: ["/riju-bin/agent"], env: [ + // For agent { name: "RIJU_AGENT_COMMAND_PREFIX", value: "runuser -u riju --", }, + // For user code + { + name: "HOME", + value: "/home/riju", + }, + { + name: "LANG", + value: "C.UTF-8", + }, + { + name: "LC_ALL", + value: "C.UTF-8", + }, + { + name: "LOGNAME", + value: "riju", + }, + { + name: "PATH", + value: + "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", + }, + { + name: "PWD", + value: "/home/riju/src", + }, + { + name: "SHELL", + value: "/usr/bin/bash", + }, + { + name: "TERM", + value: "xterm-256color", + }, + { + name: "TMPDIR", + value: "/tmp", + }, + { + name: "USER", + value: "riju", + }, + { + name: "USERNAME", + value: "riju", + }, ], + workingDir: "/home/riju/src", securityContext: { runAsUser: 0, }, @@ -182,24 +234,136 @@ export async function createUserSession({ }, }) ).body; - const podIP = await new Promise((resolve, reject) => { - setTimeout(() => reject("timed out"), 5 * 60 * 1000); - watcher.setCallback(pod.metadata.name, (event, pod) => { - if (event == "delete") { - reject(new Error("pod was deleted")); - } else if (pod.status.phase === "Failed") { - reject(new Error("pod status became Failed")); - } else if ( - pod.status.podIP && - lodash.every(pod.status.containerStatuses, (status) => status.ready) - ) { - resolve(pod.status.podIP); - } else { - console.log(event, JSON.stringify(pod.status, null, 2)); + return pod.metadata.name; +} + +export async function initUserSession({ watcher, podName, proxyInfo }) { + let done = false; + try { + return await new Promise(async (resolve, reject) => { + try { + setTimeout( + () => reject("timed out waiting for pod to become ready"), + 5 * 60 * 1000 + ); + const podIP = await new Promise((resolve, reject) => { + watcher.setCallback(podName, (event, pod) => { + if (event == "delete") { + reject(new Error("pod was deleted")); + } else if (pod.status.phase === "Failed") { + reject(new Error("pod status became Failed")); + } else if (pod.status.podIP) { + resolve(pod.status.podIP); + } + }); + }); + while (!done) { + let resp; + try { + if (!proxyInfo) { + resp = await fetch(`http://${podIP}:869`); + } else { + resp = await fetch( + `${proxyInfo.httpProtocol}://${proxyInfo.host}:${proxyInfo.port}/${podIP}/health`, + { + headers: { + Authorization: `Basic ${Buffer.from( + `${proxyInfo.username}:${proxyInfo.password}` + ).toString("base64")}`, + }, + } + ); + } + if (!resp.ok) { + throw new Error(`Got HTTP error ${resp.status}`); + } else { + done = true; + } + } catch (err) { + await new Promise((resolve) => setTimeout(resolve, 250)); + } + } + resolve({ + exec: (cmdline, { on, pty }) => { + // on :: { stdout, stderr, exit, error, close } + if (pty) { + cmdline = ["/riju-bin/ptyify", ...cmdline]; + } + let conn; + if (!proxyInfo) { + conn = new WebSocket( + `ws://${podIP}:869/exec?${new URLSearchParams({ + cmdline, + }).toString()}` + ); + } else { + conn = new WebSocket( + `${proxyInfo.wsProtocol}://${proxyInfo.host}:${ + proxyInfo.port + }/${podIP}/exec?${new URLSearchParams({ + cmdline, + }).toString()}`, + { + headers: { + Authorization: `Basic ${Buffer.from( + `${proxyInfo.username}:${proxyInfo.password}` + ).toString("base64")}`, + }, + } + ); + } + conn.on("message", (msg) => { + let event, data, text, exitStatus; + try { + ({ event, data, text, exitStatus } = JSON.parse(msg)); + } catch (err) { + on.error( + `Unable to parse JSON message from agent: ${JSON.stringify( + msg + )}` + ); + } + switch (event) { + case "stdout": + on.stdout(data); + break; + case "stderr": + on.stderr(data); + break; + case "exit": + on.exit(exitStatus); + break; + case "warn": + case "error": + on.error(text); + break; + default: + on.error(`Unexpected event type: ${JSON.stringify(event)}`); + break; + } + }); + conn.on("close", () => { + on.close(); + }); + conn.on("error", (err) => { + on.error(`Websocket closed with error: ${err}`); + on.close(); + }); + return { + stdin: { + write: (data) => + conn.write(JSON.stringify({ event: "stdin", data })), + }, + }; + }, + }); + } catch (err) { + reject(err); } }); - }); - return podIP; + } finally { + done = true; + } } export async function deleteUserSessions(sessionsToDelete) { diff --git a/backend/sandbox-k8s.js b/backend/sandbox-k8s.js index def9a81..5a529f9 100644 --- a/backend/sandbox-k8s.js +++ b/backend/sandbox-k8s.js @@ -4,7 +4,7 @@ import process from "process"; import { readLangConfig } from "../lib/yaml.js"; import * as k8s from "./k8s.js"; -import { getUUID, quote } from "./util.js"; +import { deptyify, getUUID } from "./util.js"; function die(msg) { console.error(msg); @@ -27,54 +27,49 @@ async function main() { const sessionID = getUUID(); console.log(`Starting session with UUID ${sessionID}`); const watcher = k8s.watchPods(); - await k8s.createUserSession({ + const podName = await k8s.createUserSession({ watcher, sessionID, langConfig, revisions: { - agent: "20221229-002450-semantic-moccasin-albatross", + agent: "20230104-131916-extensive-aquamarine-crocodile", ptyify: "20221228-023645-clean-white-gorilla", langImage: "20221227-195753-forward-harlequin-wolverine", }, }); - // let buffer = ""; - // await new Promise((resolve) => { - // session.stdout.on("data", (data) => { - // buffer += data.toString(); - // let idx; - // while ((idx = buffer.indexOf("\n")) !== -1) { - // const line = buffer.slice(0, idx); - // buffer = buffer.slice(idx + 1); - // if (line === "riju: container ready") { - // resolve(); - // } else { - // console.error(line); - // } - // } - // }); - // }); - // const args = [].concat.apply( - // ["riju-pty", "-f"], - // privilegedPty( - // { uuid }, - // bash( - // `env L='${lang}' LANG_CONFIG=${quote( - // JSON.stringify(langConfig) - // )} bash --rcfile <(cat <<< ${quote(sandboxScript)})` - // ) - // ) - // ); - // const proc = spawn(args[0], args.slice(1), { - // stdio: "inherit", - // }); - // try { - // await new Promise((resolve, reject) => { - // proc.on("error", reject); - // proc.on("close", resolve); - // }); - // } finally { - // session.kill(); - // } + const proxyInfo = { + httpProtocol: "https", + wsProtocol: "wss", + host: "k8s.riju.codes", + port: 1869, + username: "admin", + password: process.env.RIJU_PROXY_PASSWORD, + }; + console.log(`Waiting for session to become ready`); + const session = await k8s.initUserSession({ + watcher, + podName, + proxyInfo, + }); + console.log(`Initializing sandbox`); + let handlePtyInput; + const pty = await deptyify({ + handlePtyInput: (data) => handlePtyInput(data), + handlePtyExit: (_status) => {}, + }); + await new Promise((resolve) => { + const exec = session.exec(["bash"], { + pty: true, + on: { + stdout: (data) => pty.handlePtyOutput(data), + stderr: (data) => process.stderr.write(data), + exit: (status) => process.exit(status), + error: (err) => process.stderr.write(`riju: error: ${err}\n`), + close: () => resolve(), + }, + }); + handlePtyInput = (data) => exec.stdin.write(data); + }); } main().catch(die); diff --git a/backend/util.js b/backend/util.js index 4637783..6296030 100644 --- a/backend/util.js +++ b/backend/util.js @@ -1,7 +1,10 @@ import { spawn } from "child_process"; +import { promises as fs } from "fs"; import process from "process"; +import TailFile from "@logdna/tail-file"; import * as Sentry from "@sentry/node"; +import * as tmp from "tmp-promise"; import { v4 as getUUIDOrig } from "uuid"; let sentryEnabled = false; @@ -159,3 +162,73 @@ export function asBool(value, def) { } throw new Error(`asBool doesn't understand value: ${value}`); } + +export function deptyify({ handlePtyInput, handlePtyExit }) { + return new Promise((resolve, reject) => { + const done = false; + let triggerDone = () => { + // Calling the function stored in this variable should have the + // effect of terminating the tmp-promise callback and getting + // the temporary directory cleaned up. + done = true; + }; + tmp + .withDir( + async (dir) => { + const mkfifo = spawn("mkfifo", ["input", "output"], { + cwd: dir.path, + }); + await new Promise((resolve, reject) => { + mkfifo.on("error", reject); + mkfifo.on("exit", (code) => { + if (code === 0) { + resolve(); + } else { + reject(code); + } + }); + }); + const input = new TailFile(`${dir.path}/input`, { + encoding: "utf-8", + }); + input.on("data", (data) => handlePtyOutput(data)); + input.on("tail_error", logError); + input.on("error", logError); + await input.start(); + const output = await fs.open(`${dir.path}/output`, "w"); + const proc = spawn( + "system/out/riju-pty", + ["-f", "sh", "-c", "cat > input & cat output"], + { + cwd: dir.path, + } + ); + proc.on("exit", (status) => { + handlePtyExit(status); + triggerDone(); + }); + resolve({ + handlePtyOutput: async (data) => { + await input.write(data); + }, + }); + }, + { + unsafeCleanup: true, + } + ) + .then(async () => { + await new Promise((resolve) => { + if (done) { + resolve(); + } else { + triggerDone = resolve; + } + }); + }) + .catch((err) => { + logError(err); + reject(err); + }); + }); +} diff --git a/package.json b/package.json index dc902f9..ca90f08 100644 --- a/package.json +++ b/package.json @@ -10,6 +10,7 @@ "@babel/preset-env": "^7.12.11", "@balena/dockerignore": "^1.0.2", "@kubernetes/client-node": "^0.18.0", + "@logdna/tail-file": "^3.0.0", "@sentry/node": "^6.11.0", "async-lock": "^1.2.6", "babel-loader": "^8.2.2", @@ -29,6 +30,7 @@ "monaco-editor": "0.20.0", "monaco-editor-webpack-plugin": "1.9.0", "monaco-languageclient": "0.13.0", + "node-fetch": "^3.3.0", "p-queue": "^6.6.2", "parse-passwd": "^1.0.0", "prettier": "^2.3.1", @@ -37,17 +39,18 @@ "semaphore": "^1.1.0", "strip-ansi": "^6.0.0", "style-loader": "^2.0.0", + "tmp-promise": "^3.0.3", "unique-names-generator": "^4.7.1", "uuid": "^8.3.2", "vscode-languageserver-protocol": "3.15.3", "webpack": "^4.44.2", "webpack-cli": "^4.3.0", + "ws": "^8.11.0", "xterm": "^4.9.0", "xterm-addon-fit": "^0.4.0", "yaml": "^1.10.0" }, "$comments": [ - "limiter version pinned due to https://github.com/jhurliman/node-rate-limiter/issues/80", "monaco-languageclient, monaco-editor, vscode-languageserver-protocol pinned because their APIs changed a bunch and Riju hasn't been updated yet" ] } diff --git a/yarn.lock b/yarn.lock index f20a286..68136cc 100644 --- a/yarn.lock +++ b/yarn.lock @@ -904,6 +904,11 @@ optionalDependencies: openid-client "^5.3.0" +"@logdna/tail-file@^3.0.0": + version "3.0.0" + resolved "https://registry.yarnpkg.com/@logdna/tail-file/-/tail-file-3.0.0.tgz#0120353e59bf04b318d4861684e95682b0e85071" + integrity sha512-aZy8XzJI9zwl7gbmQa1L5Hr6yJbS062OMQJ7I9sqHD/QMEBfwlhxyytJ4Qe2jAoimefuabopEi4Gk+t/7to6/A== + "@sentry/core@6.11.0": version "6.11.0" resolved "https://registry.yarnpkg.com/@sentry/core/-/core-6.11.0.tgz#40e94043afcf6407a109be26655c77832c64e740" @@ -2047,6 +2052,11 @@ dashdash@^1.12.0: dependencies: assert-plus "^1.0.0" +data-uri-to-buffer@^4.0.0: + version "4.0.0" + resolved "https://registry.yarnpkg.com/data-uri-to-buffer/-/data-uri-to-buffer-4.0.0.tgz#b5db46aea50f6176428ac05b73be39a57701a64b" + integrity sha512-Vr3mLBA8qWmcuschSLAOogKgQ/Jwxulv3RNE4FXnYWRGujzrRWQI4m12fQqRkwX06C0KanhLr4hK+GydchZsaA== + debounce@^1.2.0: version "1.2.1" resolved "https://registry.yarnpkg.com/debounce/-/debounce-1.2.1.tgz#38881d8f4166a5c5848020c11827b834bcb3e0a5" @@ -2441,6 +2451,14 @@ fastest-levenshtein@^1.0.12: resolved "https://registry.yarnpkg.com/fastest-levenshtein/-/fastest-levenshtein-1.0.12.tgz#9990f7d3a88cc5a9ffd1f1745745251700d497e2" integrity sha512-On2N+BpYJ15xIC974QNVuYGMOlEVt4s0EOI3wwMqOmK1fdDY+FN/zltPV8vosq4ad4c/gJ1KHScUn/6AWIgiow== +fetch-blob@^3.1.2, fetch-blob@^3.1.4: + version "3.2.0" + resolved "https://registry.yarnpkg.com/fetch-blob/-/fetch-blob-3.2.0.tgz#f09b8d4bbd45adc6f0c20b7e787e793e309dcce9" + integrity sha512-7yAQpD2UMJzLi1Dqv7qFYnPbaPx7ZfFK6PiIxQ4PfkGPyNyl2Ugx+a/umUonmKqjhM4DnfbMvdX6otXq83soQQ== + dependencies: + node-domexception "^1.0.0" + web-streams-polyfill "^3.0.3" + figgy-pudding@^3.5.1: version "3.5.2" resolved "https://registry.yarnpkg.com/figgy-pudding/-/figgy-pudding-3.5.2.tgz#b4eee8148abb01dcf1d1ac34367d59e12fa61d6e" @@ -2565,6 +2583,13 @@ form-data@~2.3.2: combined-stream "^1.0.6" mime-types "^2.1.12" +formdata-polyfill@^4.0.10: + version "4.0.10" + resolved "https://registry.yarnpkg.com/formdata-polyfill/-/formdata-polyfill-4.0.10.tgz#24807c31c9d402e002ab3d8c720144ceb8848423" + integrity sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g== + dependencies: + fetch-blob "^3.1.2" + forwarded@0.2.0: version "0.2.0" resolved "https://registry.yarnpkg.com/forwarded/-/forwarded-0.2.0.tgz#2269936428aad4c15c7ebe9779a84bf0b2a81811" @@ -3621,6 +3646,20 @@ neo-async@^2.5.0, neo-async@^2.6.1: resolved "https://registry.yarnpkg.com/neo-async/-/neo-async-2.6.2.tgz#b4aafb93e3aeb2d8174ca53cf163ab7d7308305f" integrity sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw== +node-domexception@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/node-domexception/-/node-domexception-1.0.0.tgz#6888db46a1f71c0b76b3f7555016b63fe64766e5" + integrity sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ== + +node-fetch@^3.3.0: + version "3.3.0" + resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-3.3.0.tgz#37e71db4ecc257057af828d523a7243d651d91e4" + integrity sha512-BKwRP/O0UvoMKp7GNdwPlObhYGB5DQqwhEDQlNKuoqwVYSxkSZCSbHjnFFmUEtwSKRPU4kNK8PbDYYitwaE3QA== + dependencies: + data-uri-to-buffer "^4.0.0" + fetch-blob "^3.1.4" + formdata-polyfill "^4.0.10" + node-libs-browser@^2.2.1: version "2.2.1" resolved "https://registry.yarnpkg.com/node-libs-browser/-/node-libs-browser-2.2.1.tgz#b64f513d18338625f90346d27b0d235e631f6425" @@ -4800,7 +4839,7 @@ timers-browserify@^2.0.4: dependencies: setimmediate "^1.0.4" -tmp-promise@^3.0.2: +tmp-promise@^3.0.2, tmp-promise@^3.0.3: version "3.0.3" resolved "https://registry.yarnpkg.com/tmp-promise/-/tmp-promise-3.0.3.tgz#60a1a1cc98c988674fcbfd23b6e3367bdeac4ce7" integrity sha512-RwM7MoPojPxsOBYnyd2hy0bxtIlVrihNs9pj5SUvY8Zz1sQcQG2tG1hSr8PDxfgEB8RNKDhqbIlroIarSNDNsQ== @@ -5116,6 +5155,11 @@ watchpack@^1.7.4: chokidar "^3.4.1" watchpack-chokidar2 "^2.0.1" +web-streams-polyfill@^3.0.3: + version "3.2.1" + resolved "https://registry.yarnpkg.com/web-streams-polyfill/-/web-streams-polyfill-3.2.1.tgz#71c2718c52b45fd49dbeee88634b3a60ceab42a6" + integrity sha512-e0MO3wdXWKrLbL0DgGnUV7WHVuw9OUvL4hjgnPkIeEvESk74gAITi5G606JtZPp39cd8HA9VQzCIvA49LpPN5Q== + webpack-cli@^4.3.0: version "4.7.2" resolved "https://registry.yarnpkg.com/webpack-cli/-/webpack-cli-4.7.2.tgz#a718db600de6d3906a4357e059ae584a89f4c1a5" @@ -5216,6 +5260,11 @@ ws@^7.3.1: resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.9.tgz#54fa7db29f4c7cec68b1ddd3a89de099942bb591" integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q== +ws@^8.11.0: + version "8.11.0" + resolved "https://registry.yarnpkg.com/ws/-/ws-8.11.0.tgz#6a0d36b8edfd9f96d8b25683db2f8d7de6e8e143" + integrity sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg== + xtend@^4.0.0, xtend@~4.0.1: version "4.0.2" resolved "https://registry.yarnpkg.com/xtend/-/xtend-4.0.2.tgz#bb72779f5fa465186b1f438f674fa347fdb5db54"