More k8s websocket proxying

This commit is contained in:
Radon Rosborough 2023-01-04 18:27:26 -07:00
parent 1aed40f7eb
commit ddb0084b0c
7 changed files with 361 additions and 61 deletions

View File

@ -20,7 +20,7 @@ type clientMessage struct {
}
type serverMessage struct {
// "start", "stdout", "stderr", "exit", "warn", "error"
// "stdout", "stderr", "exit", "warn", "error"
Event string `json:"event"`
// contents of stdout/stderr
Data []byte `json:"data,omitempty"`

View File

@ -3,6 +3,7 @@ package main
import (
"fmt"
"os"
"strings"
"syscall"
"time"
)
@ -26,6 +27,17 @@ type managedProcess struct {
CloseChan chan struct{}
}
func cleanEnv(env []string) []string {
newEnv := []string{}
for _, entry := range env {
if strings.HasPrefix(entry, "RIJU_") || strings.HasPrefix(entry, "KUBERNETES_") {
continue
}
newEnv = append(newEnv, entry)
}
return newEnv
}
func NewManagedProcess(name string, argv []string, attr *os.ProcAttr) (*managedProcess, error) {
mp := &managedProcess{
internalExitChan: make(chan struct{}, 16),
@ -66,6 +78,10 @@ func NewManagedProcess(name string, argv []string, attr *os.ProcAttr) (*managedP
newAttr.Files[1] = mp.stdoutWrite
newAttr.Files[2] = mp.stderrWrite
}
if newAttr.Env == nil {
newAttr.Env = os.Environ()
}
newAttr.Env = cleanEnv(newAttr.Env)
mp.proc, err = os.StartProcess(name, argv, newAttr)
if err != nil {
return mp, fmt.Errorf("spawning process: %w", err)

View File

@ -1,5 +1,6 @@
import * as k8sClient from "@kubernetes/client-node";
import lodash from "lodash";
import fetch from "node-fetch";
import WebSocket from "ws";
const kubeconfig = new k8sClient.KubeConfig();
kubeconfig.loadFromDefault();
@ -46,6 +47,9 @@ export function watchPods() {
callback("add", pods[podName]);
}
},
podExists: (podName) => {
return podName in pods;
},
};
}
@ -125,11 +129,59 @@ export async function createUserSession({
},
command: ["/riju-bin/agent"],
env: [
// For agent
{
name: "RIJU_AGENT_COMMAND_PREFIX",
value: "runuser -u riju --",
},
// For user code
{
name: "HOME",
value: "/home/riju",
},
{
name: "LANG",
value: "C.UTF-8",
},
{
name: "LC_ALL",
value: "C.UTF-8",
},
{
name: "LOGNAME",
value: "riju",
},
{
name: "PATH",
value:
"/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
},
{
name: "PWD",
value: "/home/riju/src",
},
{
name: "SHELL",
value: "/usr/bin/bash",
},
{
name: "TERM",
value: "xterm-256color",
},
{
name: "TMPDIR",
value: "/tmp",
},
{
name: "USER",
value: "riju",
},
{
name: "USERNAME",
value: "riju",
},
],
workingDir: "/home/riju/src",
securityContext: {
runAsUser: 0,
},
@ -182,24 +234,136 @@ export async function createUserSession({
},
})
).body;
return pod.metadata.name;
}
export async function initUserSession({ watcher, podName, proxyInfo }) {
let done = false;
try {
return await new Promise(async (resolve, reject) => {
try {
setTimeout(
() => reject("timed out waiting for pod to become ready"),
5 * 60 * 1000
);
const podIP = await new Promise((resolve, reject) => {
setTimeout(() => reject("timed out"), 5 * 60 * 1000);
watcher.setCallback(pod.metadata.name, (event, pod) => {
watcher.setCallback(podName, (event, pod) => {
if (event == "delete") {
reject(new Error("pod was deleted"));
} else if (pod.status.phase === "Failed") {
reject(new Error("pod status became Failed"));
} else if (
pod.status.podIP &&
lodash.every(pod.status.containerStatuses, (status) => status.ready)
) {
} else if (pod.status.podIP) {
resolve(pod.status.podIP);
} else {
console.log(event, JSON.stringify(pod.status, null, 2));
}
});
});
return podIP;
while (!done) {
let resp;
try {
if (!proxyInfo) {
resp = await fetch(`http://${podIP}:869`);
} else {
resp = await fetch(
`${proxyInfo.httpProtocol}://${proxyInfo.host}:${proxyInfo.port}/${podIP}/health`,
{
headers: {
Authorization: `Basic ${Buffer.from(
`${proxyInfo.username}:${proxyInfo.password}`
).toString("base64")}`,
},
}
);
}
if (!resp.ok) {
throw new Error(`Got HTTP error ${resp.status}`);
} else {
done = true;
}
} catch (err) {
await new Promise((resolve) => setTimeout(resolve, 250));
}
}
resolve({
exec: (cmdline, { on, pty }) => {
// on :: { stdout, stderr, exit, error, close }
if (pty) {
cmdline = ["/riju-bin/ptyify", ...cmdline];
}
let conn;
if (!proxyInfo) {
conn = new WebSocket(
`ws://${podIP}:869/exec?${new URLSearchParams({
cmdline,
}).toString()}`
);
} else {
conn = new WebSocket(
`${proxyInfo.wsProtocol}://${proxyInfo.host}:${
proxyInfo.port
}/${podIP}/exec?${new URLSearchParams({
cmdline,
}).toString()}`,
{
headers: {
Authorization: `Basic ${Buffer.from(
`${proxyInfo.username}:${proxyInfo.password}`
).toString("base64")}`,
},
}
);
}
conn.on("message", (msg) => {
let event, data, text, exitStatus;
try {
({ event, data, text, exitStatus } = JSON.parse(msg));
} catch (err) {
on.error(
`Unable to parse JSON message from agent: ${JSON.stringify(
msg
)}`
);
}
switch (event) {
case "stdout":
on.stdout(data);
break;
case "stderr":
on.stderr(data);
break;
case "exit":
on.exit(exitStatus);
break;
case "warn":
case "error":
on.error(text);
break;
default:
on.error(`Unexpected event type: ${JSON.stringify(event)}`);
break;
}
});
conn.on("close", () => {
on.close();
});
conn.on("error", (err) => {
on.error(`Websocket closed with error: ${err}`);
on.close();
});
return {
stdin: {
write: (data) =>
conn.write(JSON.stringify({ event: "stdin", data })),
},
};
},
});
} catch (err) {
reject(err);
}
});
} finally {
done = true;
}
}
export async function deleteUserSessions(sessionsToDelete) {

View File

@ -4,7 +4,7 @@ import process from "process";
import { readLangConfig } from "../lib/yaml.js";
import * as k8s from "./k8s.js";
import { getUUID, quote } from "./util.js";
import { deptyify, getUUID } from "./util.js";
function die(msg) {
console.error(msg);
@ -27,54 +27,49 @@ async function main() {
const sessionID = getUUID();
console.log(`Starting session with UUID ${sessionID}`);
const watcher = k8s.watchPods();
await k8s.createUserSession({
const podName = await k8s.createUserSession({
watcher,
sessionID,
langConfig,
revisions: {
agent: "20221229-002450-semantic-moccasin-albatross",
agent: "20230104-131916-extensive-aquamarine-crocodile",
ptyify: "20221228-023645-clean-white-gorilla",
langImage: "20221227-195753-forward-harlequin-wolverine",
},
});
// let buffer = "";
// await new Promise((resolve) => {
// session.stdout.on("data", (data) => {
// buffer += data.toString();
// let idx;
// while ((idx = buffer.indexOf("\n")) !== -1) {
// const line = buffer.slice(0, idx);
// buffer = buffer.slice(idx + 1);
// if (line === "riju: container ready") {
// resolve();
// } else {
// console.error(line);
// }
// }
// });
// });
// const args = [].concat.apply(
// ["riju-pty", "-f"],
// privilegedPty(
// { uuid },
// bash(
// `env L='${lang}' LANG_CONFIG=${quote(
// JSON.stringify(langConfig)
// )} bash --rcfile <(cat <<< ${quote(sandboxScript)})`
// )
// )
// );
// const proc = spawn(args[0], args.slice(1), {
// stdio: "inherit",
// });
// try {
// await new Promise((resolve, reject) => {
// proc.on("error", reject);
// proc.on("close", resolve);
// });
// } finally {
// session.kill();
// }
const proxyInfo = {
httpProtocol: "https",
wsProtocol: "wss",
host: "k8s.riju.codes",
port: 1869,
username: "admin",
password: process.env.RIJU_PROXY_PASSWORD,
};
console.log(`Waiting for session to become ready`);
const session = await k8s.initUserSession({
watcher,
podName,
proxyInfo,
});
console.log(`Initializing sandbox`);
let handlePtyInput;
const pty = await deptyify({
handlePtyInput: (data) => handlePtyInput(data),
handlePtyExit: (_status) => {},
});
await new Promise((resolve) => {
const exec = session.exec(["bash"], {
pty: true,
on: {
stdout: (data) => pty.handlePtyOutput(data),
stderr: (data) => process.stderr.write(data),
exit: (status) => process.exit(status),
error: (err) => process.stderr.write(`riju: error: ${err}\n`),
close: () => resolve(),
},
});
handlePtyInput = (data) => exec.stdin.write(data);
});
}
main().catch(die);

View File

@ -1,7 +1,10 @@
import { spawn } from "child_process";
import { promises as fs } from "fs";
import process from "process";
import TailFile from "@logdna/tail-file";
import * as Sentry from "@sentry/node";
import * as tmp from "tmp-promise";
import { v4 as getUUIDOrig } from "uuid";
let sentryEnabled = false;
@ -159,3 +162,73 @@ export function asBool(value, def) {
}
throw new Error(`asBool doesn't understand value: ${value}`);
}
export function deptyify({ handlePtyInput, handlePtyExit }) {
return new Promise((resolve, reject) => {
const done = false;
let triggerDone = () => {
// Calling the function stored in this variable should have the
// effect of terminating the tmp-promise callback and getting
// the temporary directory cleaned up.
done = true;
};
tmp
.withDir(
async (dir) => {
const mkfifo = spawn("mkfifo", ["input", "output"], {
cwd: dir.path,
});
await new Promise((resolve, reject) => {
mkfifo.on("error", reject);
mkfifo.on("exit", (code) => {
if (code === 0) {
resolve();
} else {
reject(code);
}
});
});
const input = new TailFile(`${dir.path}/input`, {
encoding: "utf-8",
});
input.on("data", (data) => handlePtyOutput(data));
input.on("tail_error", logError);
input.on("error", logError);
await input.start();
const output = await fs.open(`${dir.path}/output`, "w");
const proc = spawn(
"system/out/riju-pty",
["-f", "sh", "-c", "cat > input & cat output"],
{
cwd: dir.path,
}
);
proc.on("exit", (status) => {
handlePtyExit(status);
triggerDone();
});
resolve({
handlePtyOutput: async (data) => {
await input.write(data);
},
});
},
{
unsafeCleanup: true,
}
)
.then(async () => {
await new Promise((resolve) => {
if (done) {
resolve();
} else {
triggerDone = resolve;
}
});
})
.catch((err) => {
logError(err);
reject(err);
});
});
}

View File

@ -10,6 +10,7 @@
"@babel/preset-env": "^7.12.11",
"@balena/dockerignore": "^1.0.2",
"@kubernetes/client-node": "^0.18.0",
"@logdna/tail-file": "^3.0.0",
"@sentry/node": "^6.11.0",
"async-lock": "^1.2.6",
"babel-loader": "^8.2.2",
@ -29,6 +30,7 @@
"monaco-editor": "0.20.0",
"monaco-editor-webpack-plugin": "1.9.0",
"monaco-languageclient": "0.13.0",
"node-fetch": "^3.3.0",
"p-queue": "^6.6.2",
"parse-passwd": "^1.0.0",
"prettier": "^2.3.1",
@ -37,17 +39,18 @@
"semaphore": "^1.1.0",
"strip-ansi": "^6.0.0",
"style-loader": "^2.0.0",
"tmp-promise": "^3.0.3",
"unique-names-generator": "^4.7.1",
"uuid": "^8.3.2",
"vscode-languageserver-protocol": "3.15.3",
"webpack": "^4.44.2",
"webpack-cli": "^4.3.0",
"ws": "^8.11.0",
"xterm": "^4.9.0",
"xterm-addon-fit": "^0.4.0",
"yaml": "^1.10.0"
},
"$comments": [
"limiter version pinned due to https://github.com/jhurliman/node-rate-limiter/issues/80",
"monaco-languageclient, monaco-editor, vscode-languageserver-protocol pinned because their APIs changed a bunch and Riju hasn't been updated yet"
]
}

View File

@ -904,6 +904,11 @@
optionalDependencies:
openid-client "^5.3.0"
"@logdna/tail-file@^3.0.0":
version "3.0.0"
resolved "https://registry.yarnpkg.com/@logdna/tail-file/-/tail-file-3.0.0.tgz#0120353e59bf04b318d4861684e95682b0e85071"
integrity sha512-aZy8XzJI9zwl7gbmQa1L5Hr6yJbS062OMQJ7I9sqHD/QMEBfwlhxyytJ4Qe2jAoimefuabopEi4Gk+t/7to6/A==
"@sentry/core@6.11.0":
version "6.11.0"
resolved "https://registry.yarnpkg.com/@sentry/core/-/core-6.11.0.tgz#40e94043afcf6407a109be26655c77832c64e740"
@ -2047,6 +2052,11 @@ dashdash@^1.12.0:
dependencies:
assert-plus "^1.0.0"
data-uri-to-buffer@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/data-uri-to-buffer/-/data-uri-to-buffer-4.0.0.tgz#b5db46aea50f6176428ac05b73be39a57701a64b"
integrity sha512-Vr3mLBA8qWmcuschSLAOogKgQ/Jwxulv3RNE4FXnYWRGujzrRWQI4m12fQqRkwX06C0KanhLr4hK+GydchZsaA==
debounce@^1.2.0:
version "1.2.1"
resolved "https://registry.yarnpkg.com/debounce/-/debounce-1.2.1.tgz#38881d8f4166a5c5848020c11827b834bcb3e0a5"
@ -2441,6 +2451,14 @@ fastest-levenshtein@^1.0.12:
resolved "https://registry.yarnpkg.com/fastest-levenshtein/-/fastest-levenshtein-1.0.12.tgz#9990f7d3a88cc5a9ffd1f1745745251700d497e2"
integrity sha512-On2N+BpYJ15xIC974QNVuYGMOlEVt4s0EOI3wwMqOmK1fdDY+FN/zltPV8vosq4ad4c/gJ1KHScUn/6AWIgiow==
fetch-blob@^3.1.2, fetch-blob@^3.1.4:
version "3.2.0"
resolved "https://registry.yarnpkg.com/fetch-blob/-/fetch-blob-3.2.0.tgz#f09b8d4bbd45adc6f0c20b7e787e793e309dcce9"
integrity sha512-7yAQpD2UMJzLi1Dqv7qFYnPbaPx7ZfFK6PiIxQ4PfkGPyNyl2Ugx+a/umUonmKqjhM4DnfbMvdX6otXq83soQQ==
dependencies:
node-domexception "^1.0.0"
web-streams-polyfill "^3.0.3"
figgy-pudding@^3.5.1:
version "3.5.2"
resolved "https://registry.yarnpkg.com/figgy-pudding/-/figgy-pudding-3.5.2.tgz#b4eee8148abb01dcf1d1ac34367d59e12fa61d6e"
@ -2565,6 +2583,13 @@ form-data@~2.3.2:
combined-stream "^1.0.6"
mime-types "^2.1.12"
formdata-polyfill@^4.0.10:
version "4.0.10"
resolved "https://registry.yarnpkg.com/formdata-polyfill/-/formdata-polyfill-4.0.10.tgz#24807c31c9d402e002ab3d8c720144ceb8848423"
integrity sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g==
dependencies:
fetch-blob "^3.1.2"
forwarded@0.2.0:
version "0.2.0"
resolved "https://registry.yarnpkg.com/forwarded/-/forwarded-0.2.0.tgz#2269936428aad4c15c7ebe9779a84bf0b2a81811"
@ -3621,6 +3646,20 @@ neo-async@^2.5.0, neo-async@^2.6.1:
resolved "https://registry.yarnpkg.com/neo-async/-/neo-async-2.6.2.tgz#b4aafb93e3aeb2d8174ca53cf163ab7d7308305f"
integrity sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw==
node-domexception@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/node-domexception/-/node-domexception-1.0.0.tgz#6888db46a1f71c0b76b3f7555016b63fe64766e5"
integrity sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==
node-fetch@^3.3.0:
version "3.3.0"
resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-3.3.0.tgz#37e71db4ecc257057af828d523a7243d651d91e4"
integrity sha512-BKwRP/O0UvoMKp7GNdwPlObhYGB5DQqwhEDQlNKuoqwVYSxkSZCSbHjnFFmUEtwSKRPU4kNK8PbDYYitwaE3QA==
dependencies:
data-uri-to-buffer "^4.0.0"
fetch-blob "^3.1.4"
formdata-polyfill "^4.0.10"
node-libs-browser@^2.2.1:
version "2.2.1"
resolved "https://registry.yarnpkg.com/node-libs-browser/-/node-libs-browser-2.2.1.tgz#b64f513d18338625f90346d27b0d235e631f6425"
@ -4800,7 +4839,7 @@ timers-browserify@^2.0.4:
dependencies:
setimmediate "^1.0.4"
tmp-promise@^3.0.2:
tmp-promise@^3.0.2, tmp-promise@^3.0.3:
version "3.0.3"
resolved "https://registry.yarnpkg.com/tmp-promise/-/tmp-promise-3.0.3.tgz#60a1a1cc98c988674fcbfd23b6e3367bdeac4ce7"
integrity sha512-RwM7MoPojPxsOBYnyd2hy0bxtIlVrihNs9pj5SUvY8Zz1sQcQG2tG1hSr8PDxfgEB8RNKDhqbIlroIarSNDNsQ==
@ -5116,6 +5155,11 @@ watchpack@^1.7.4:
chokidar "^3.4.1"
watchpack-chokidar2 "^2.0.1"
web-streams-polyfill@^3.0.3:
version "3.2.1"
resolved "https://registry.yarnpkg.com/web-streams-polyfill/-/web-streams-polyfill-3.2.1.tgz#71c2718c52b45fd49dbeee88634b3a60ceab42a6"
integrity sha512-e0MO3wdXWKrLbL0DgGnUV7WHVuw9OUvL4hjgnPkIeEvESk74gAITi5G606JtZPp39cd8HA9VQzCIvA49LpPN5Q==
webpack-cli@^4.3.0:
version "4.7.2"
resolved "https://registry.yarnpkg.com/webpack-cli/-/webpack-cli-4.7.2.tgz#a718db600de6d3906a4357e059ae584a89f4c1a5"
@ -5216,6 +5260,11 @@ ws@^7.3.1:
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.9.tgz#54fa7db29f4c7cec68b1ddd3a89de099942bb591"
integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==
ws@^8.11.0:
version "8.11.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.11.0.tgz#6a0d36b8edfd9f96d8b25683db2f8d7de6e8e143"
integrity sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==
xtend@^4.0.0, xtend@~4.0.1:
version "4.0.2"
resolved "https://registry.yarnpkg.com/xtend/-/xtend-4.0.2.tgz#bb72779f5fa465186b1f438f674fa347fdb5db54"