397 lines
11 KiB
JavaScript
397 lines
11 KiB
JavaScript
import * as k8sClient from "@kubernetes/client-node";
|
|
import fetch from "node-fetch";
|
|
import WebSocket from "ws";
|
|
|
|
const kubeconfig = new k8sClient.KubeConfig();
|
|
kubeconfig.loadFromDefault();
|
|
|
|
const k8s = kubeconfig.makeApiClient(k8sClient.CoreV1Api);
|
|
|
|
export function watchPods() {
|
|
const callbacks = {};
|
|
const pods = {};
|
|
|
|
// https://github.com/kubernetes-client/javascript/blob/1f76ee10c54e33a998abb4686488ccff4285366a/examples/typescript/informer/informer.ts
|
|
//
|
|
// The watch functionality seems to be wholly undocumented. Copy,
|
|
// paste, and pray.
|
|
const informer = k8sClient.makeInformer(
|
|
kubeconfig,
|
|
"/api/v1/namespaces/user/pods",
|
|
() => k8s.listNamespacedPod("user")
|
|
);
|
|
|
|
for (const event of ["add", "update", "delete"]) {
|
|
informer.on(event, (pod) => {
|
|
if (pod.metadata.name in callbacks) {
|
|
callbacks[pod.metadata.name](event, pod);
|
|
}
|
|
pods[pod.metadata.name] = pod;
|
|
if (event === "delete") {
|
|
delete callbacks[pod.metadata.name];
|
|
delete pods[pod.metadata.name];
|
|
}
|
|
});
|
|
}
|
|
|
|
informer.on("error", (err) => {
|
|
console.error(err);
|
|
setTimeout(() => informer.start(), 5000);
|
|
});
|
|
informer.start();
|
|
|
|
return {
|
|
setCallback: (podName, callback) => {
|
|
callbacks[podName] = callback;
|
|
if (podName in pods) {
|
|
callback("add", pods[podName]);
|
|
}
|
|
},
|
|
close: () => {
|
|
informer.stop();
|
|
},
|
|
};
|
|
}
|
|
|
|
export function watchConfigMaps() {
|
|
const callbacks = {};
|
|
const configMaps = {};
|
|
|
|
const informer = k8sClient.makeInformer(
|
|
kubeconfig,
|
|
"/api/v1/namespaces/database",
|
|
() => k8s.listNamespacedConfigMap("database")
|
|
);
|
|
|
|
for (const event of ["add", "update", "delete"]) {
|
|
informer.event(event, (configMap) => {
|
|
if (event === "delete") {
|
|
// Ignore this explicitly
|
|
return;
|
|
}
|
|
if (configMap.metadata.name in callbacks) {
|
|
callbacks[configMap.metadata.name](event, configMap);
|
|
}
|
|
configMaps[configMaps.metadata.name] = configMap;
|
|
});
|
|
}
|
|
|
|
informer.on("error", (err) => {
|
|
console.error(err);
|
|
setTimeout(() => informer.start(), 5000);
|
|
});
|
|
informer.start();
|
|
|
|
return {
|
|
setCallback: (configMapName, callback) => {
|
|
callbacks[configMapName] = callback;
|
|
if (configMapName in configMaps) {
|
|
callback("add", configMaps[configMapName]);
|
|
}
|
|
},
|
|
close: () => {
|
|
informer.stop();
|
|
},
|
|
};
|
|
}
|
|
|
|
export async function listUserSessions() {
|
|
return (await k8s.listNamespacedPod("user")).body.items.map((pod) => ({
|
|
podName: pod.metadata.name,
|
|
sessionID: pod.metadata.labels["riju.codes/user-session-id"],
|
|
}));
|
|
}
|
|
|
|
export async function createUserSession({
|
|
sessionID,
|
|
containerHostname,
|
|
langImageTag,
|
|
}) {
|
|
const pod = (
|
|
await k8s.createNamespacedPod("user", {
|
|
metadata: {
|
|
name: `riju-user-session-${sessionID}`,
|
|
labels: {
|
|
"riju.codes/user-session-id": sessionID,
|
|
},
|
|
},
|
|
spec: {
|
|
hostname: containerHostname,
|
|
imagePullSecrets: [
|
|
{
|
|
name: "registry-user-login",
|
|
},
|
|
],
|
|
containers: [
|
|
{
|
|
name: "session",
|
|
image: `localhost:30999/riju-lang:${langImageTag}`,
|
|
resources: {
|
|
requests: {},
|
|
limits: {
|
|
cpu: "1000m",
|
|
memory: "4Gi",
|
|
},
|
|
},
|
|
env: [
|
|
// For agent
|
|
{
|
|
name: "RIJU_AGENT_COMMAND_PREFIX",
|
|
value: "runuser -u riju --",
|
|
},
|
|
// For user code
|
|
{
|
|
name: "HOME",
|
|
value: "/home/riju",
|
|
},
|
|
{
|
|
name: "LANG",
|
|
value: "C.UTF-8",
|
|
},
|
|
{
|
|
name: "LC_ALL",
|
|
value: "C.UTF-8",
|
|
},
|
|
{
|
|
name: "LOGNAME",
|
|
value: "riju",
|
|
},
|
|
{
|
|
name: "PATH",
|
|
value:
|
|
"/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
|
|
},
|
|
{
|
|
name: "PWD",
|
|
value: "/home/riju/src",
|
|
},
|
|
{
|
|
name: "SHELL",
|
|
value: "/usr/bin/bash",
|
|
},
|
|
{
|
|
name: "TERM",
|
|
value: "xterm-256color",
|
|
},
|
|
{
|
|
name: "TMPDIR",
|
|
value: "/tmp",
|
|
},
|
|
{
|
|
name: "USER",
|
|
value: "riju",
|
|
},
|
|
{
|
|
name: "USERNAME",
|
|
value: "riju",
|
|
},
|
|
],
|
|
workingDir: "/home/riju/src",
|
|
securityContext: {
|
|
runAsUser: 0,
|
|
},
|
|
startupProbe: {
|
|
httpGet: {
|
|
path: "/health",
|
|
port: 869,
|
|
scheme: "HTTP",
|
|
},
|
|
failureThreshold: 30,
|
|
initialDelaySeconds: 0,
|
|
periodSeconds: 1,
|
|
successThreshold: 1,
|
|
timeoutSeconds: 2,
|
|
},
|
|
readinessProbe: {
|
|
httpGet: {
|
|
path: "/health",
|
|
port: 869,
|
|
scheme: "HTTP",
|
|
},
|
|
failureThreshold: 1,
|
|
initialDelaySeconds: 2,
|
|
periodSeconds: 10,
|
|
successThreshold: 1,
|
|
timeoutSeconds: 2,
|
|
},
|
|
livenessProbe: {
|
|
httpGet: {
|
|
path: "/health",
|
|
port: 869,
|
|
scheme: "HTTP",
|
|
},
|
|
failureThreshold: 3,
|
|
initialDelaySeconds: 2,
|
|
periodSeconds: 10,
|
|
successThreshold: 1,
|
|
timeoutSeconds: 2,
|
|
},
|
|
volumeMounts: [
|
|
{
|
|
name: "riju-bin",
|
|
mountPath: "/riju-bin",
|
|
readOnly: true,
|
|
},
|
|
],
|
|
},
|
|
],
|
|
restartPolicy: "Never",
|
|
},
|
|
})
|
|
).body;
|
|
return pod.metadata.name;
|
|
}
|
|
|
|
export async function initUserSession({ watcher, podName, proxyInfo }) {
|
|
let done = false;
|
|
try {
|
|
return await new Promise(async (resolve, reject) => {
|
|
let timeout = null;
|
|
try {
|
|
timeout = setTimeout(
|
|
() => reject("timed out waiting for pod to become ready"),
|
|
5 * 60 * 1000
|
|
);
|
|
const podIP = await new Promise((resolve, reject) => {
|
|
watcher.setCallback(podName, (event, pod) => {
|
|
if (event == "delete") {
|
|
reject(new Error("pod was deleted"));
|
|
} else if (pod.status.phase === "Failed") {
|
|
reject(new Error("pod status became Failed"));
|
|
} else if (pod.status.podIP) {
|
|
resolve(pod.status.podIP);
|
|
}
|
|
});
|
|
});
|
|
while (!done) {
|
|
let resp;
|
|
try {
|
|
if (!proxyInfo) {
|
|
resp = await fetch(`http://${podIP}:869`);
|
|
} else {
|
|
resp = await fetch(
|
|
`${proxyInfo.httpProtocol}://${proxyInfo.host}:${proxyInfo.port}/${podIP}/health`,
|
|
{
|
|
headers: {
|
|
Authorization: `Basic ${Buffer.from(
|
|
`${proxyInfo.username}:${proxyInfo.password}`
|
|
).toString("base64")}`,
|
|
},
|
|
}
|
|
);
|
|
}
|
|
if (!resp.ok) {
|
|
throw new Error(`Got HTTP error ${resp.status}`);
|
|
} else {
|
|
done = true;
|
|
}
|
|
} catch (err) {
|
|
await new Promise((resolve) => setTimeout(resolve, 250));
|
|
}
|
|
}
|
|
resolve({
|
|
exec: async (cmdline, { on, pty }) => {
|
|
// on :: { stdout, stderr, exit, error, close }
|
|
if (pty) {
|
|
cmdline = ["/riju-bin/ptyify", ...cmdline];
|
|
}
|
|
const params = new URLSearchParams();
|
|
for (const arg of cmdline) {
|
|
params.append("cmdline", arg);
|
|
}
|
|
let conn;
|
|
if (!proxyInfo) {
|
|
conn = new WebSocket(
|
|
`ws://${podIP}:869/exec?${params.toString()}`
|
|
);
|
|
} else {
|
|
conn = new WebSocket(
|
|
`${proxyInfo.wsProtocol}://${proxyInfo.host}:${
|
|
proxyInfo.port
|
|
}/${podIP}/exec?${params.toString()}`,
|
|
{
|
|
headers: {
|
|
Authorization: `Basic ${Buffer.from(
|
|
`${proxyInfo.username}:${proxyInfo.password}`
|
|
).toString("base64")}`,
|
|
},
|
|
}
|
|
);
|
|
}
|
|
await new Promise((resolve, reject) => {
|
|
conn.on("open", resolve);
|
|
conn.on("error", reject);
|
|
});
|
|
conn.on("message", (msg) => {
|
|
let event, data, text, exitStatus;
|
|
try {
|
|
({ event, data, text, exitStatus } = JSON.parse(msg));
|
|
} catch (err) {
|
|
on.error(
|
|
`Unable to parse JSON message from agent: ${JSON.stringify(
|
|
msg
|
|
)}`
|
|
);
|
|
}
|
|
switch (event) {
|
|
case "stdout":
|
|
on.stdout(Buffer.from(data, "base64"));
|
|
break;
|
|
case "stderr":
|
|
on.stderr(Buffer.from(data, "base64"));
|
|
break;
|
|
case "exit":
|
|
on.exit(exitStatus);
|
|
break;
|
|
case "warn":
|
|
case "error":
|
|
on.error(text);
|
|
break;
|
|
default:
|
|
on.error(`Unexpected event type: ${JSON.stringify(event)}`);
|
|
break;
|
|
}
|
|
});
|
|
conn.on("close", () => {
|
|
on.close();
|
|
});
|
|
conn.on("error", (err) => {
|
|
on.error(`Websocket closed with error: ${err}`);
|
|
on.close();
|
|
});
|
|
return {
|
|
stdin: {
|
|
// data should be of type Buffer
|
|
write: (data) =>
|
|
conn.send(
|
|
JSON.stringify({
|
|
event: "stdin",
|
|
data: data.toString("base64"),
|
|
})
|
|
),
|
|
},
|
|
close: () => {
|
|
conn.close();
|
|
},
|
|
};
|
|
},
|
|
});
|
|
} catch (err) {
|
|
reject(err);
|
|
} finally {
|
|
if (timeout) {
|
|
clearTimeout(timeout);
|
|
}
|
|
}
|
|
});
|
|
} finally {
|
|
done = true;
|
|
}
|
|
}
|
|
|
|
export async function deleteUserSessions(sessionsToDelete) {
|
|
for (const { podName } of sessionsToDelete) {
|
|
await k8s.deleteNamespacedPod(podName, "user");
|
|
}
|
|
}
|