diff --git a/agent/go.mod b/agent/go.mod new file mode 100644 index 0000000..8d3a29b --- /dev/null +++ b/agent/go.mod @@ -0,0 +1,5 @@ +module github.com/radian-software/riju/agent + +go 1.18 + +require github.com/gorilla/websocket v1.5.0 // indirect diff --git a/agent/go.sum b/agent/go.sum new file mode 100644 index 0000000..e5a03d4 --- /dev/null +++ b/agent/go.sum @@ -0,0 +1,2 @@ +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/agent/main.go b/agent/main.go new file mode 100644 index 0000000..7b0dc44 --- /dev/null +++ b/agent/main.go @@ -0,0 +1,364 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "syscall" + "time" + + "github.com/gorilla/websocket" +) + +type clientMessage struct { + // "stdin" + Event string `json:"event"` + // contents of stdin + Data []byte `json:"data,omitempty"` +} + +type serverMessage struct { + // "start", "stdout", "stderr", "exit", "warn", "error" + Event string `json:"event"` + // contents of stdout/stderr + Data []byte `json:"data,omitempty"` + // error message + Text string `json:"text,omitempty"` + // exit status + ExitStatus *int `json:"exitStatus,omitempty"` +} + +const ( + pingPeriod = 15 * time.Second + pongWait = 10 * time.Second + writeDeadline = 1 * time.Second +) + +var upgrader = websocket.Upgrader{} + +func logWarn(err error) { + log.Println(err.Error()) +} + +func logWarnf(format string, arg ...interface{}) { + logWarn(fmt.Errorf(format, arg...)) +} + +func logError(err error) { + log.Println(err.Error()) +} + +func logErrorf(format string, arg ...interface{}) { + logError(fmt.Errorf(format, arg...)) +} + +func tryClose(obj io.Closer, objName string) { + err := obj.Close() + if err != nil { + logErrorf("error closing %s: %w", objName, err) + } +} + +func closeWs(ws *websocket.Conn) { + err := ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + logErrorf("sending close message: %w", err) + } + tryClose(ws, "websocket") +} + +func send(ws *websocket.Conn, msg *serverMessage) { + data, err := json.Marshal(msg) + if err != nil { + logErrorf("marshaling message: %w", err) + closeWs(ws) + return + } + err = ws.WriteMessage(websocket.TextMessage, data) + if err != nil { + logErrorf("sending message: %w", err) + closeWs(ws) + return + } +} + +func fatal(ws *websocket.Conn, err error) { + send(ws, &serverMessage{ + Event: "fatal", + Text: err.Error(), + }) +} + +func fatalf(ws *websocket.Conn, format string, arg ...interface{}) { + fatal(ws, fmt.Errorf(format, arg...)) +} + +func warn(ws *websocket.Conn, err error) { + send(ws, &serverMessage{ + Event: "warn", + Text: err.Error(), + }) +} + +func warnf(ws *websocket.Conn, format string, arg ...interface{}) { + warn(ws, fmt.Errorf(format, arg...)) +} + +func handleClientMessages(ws *websocket.Conn, stdinChan chan<- []byte) { + // Close channel after we exit + defer close(stdinChan) + // Stop processing reads some time after we stop receiving + // timely responses to our pings. + ws.SetReadDeadline(time.Now().Add(pongWait)) + ws.SetPongHandler(func(string) error { + ws.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + // Read data and dispatch appropriately. Return on timeout or + // error. Caller is responsible for cleanup. + for { + msgtype, data, err := ws.ReadMessage() + if err != nil { + fatalf(ws, "reading message: %w", err) + return + } + if msgtype != websocket.TextMessage { + fatalf(ws, "received non-text message type %d", msgtype) + return + } + msg := clientMessage{} + err = json.Unmarshal(data, &msg) + if err != nil { + fatalf(ws, "parsing json: %w", err) + return + } + switch msg.Event { + case "stdin": + stdinChan <- msg.Data + default: + logWarnf("received unknown event type %s", msg.Event) + } + } +} + +// https://github.com/gorilla/websocket/blob/76ecc29eff79f0cedf70c530605e486fc32131d1/examples/command/main.go +func handler(w http.ResponseWriter, r *http.Request) { + // Upgrade http connection to websocket + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + logErrorf("upgrading connection: %w", err) + return + } + // Close websocket on error or when we exit + defer closeWs(ws) + // Parse request query parameters; do this after upgrading to + // websocket so that we can send errors back on the websocket + // which is easier for clients to parse + cmdline := r.URL.Query()["cmdline"] + if len(cmdline) == 0 { + fatalf(ws, "cmdline query parameter missing") + return + } + // Create pipes for communicating with subprocess + stdinRead, stdinWrite, err := os.Pipe() + if err != nil { + fatalf(ws, "creating stdin pipe: %w", err) + return + } + defer tryClose(stdinRead, "read end of stdin pipe") + defer tryClose(stdinWrite, "write end of stdin pipe") + stdoutRead, stdoutWrite, err := os.Pipe() + if err != nil { + fatalf(ws, "creating stdout pipe: %w", err) + return + } + defer tryClose(stdoutRead, "read end of stdout pipe") + defer tryClose(stdoutWrite, "write end of stdout pipe") + stderrRead, stderrWrite, err := os.Pipe() + if err != nil { + fatalf(ws, "creating stderr pipe: %w", err) + return + } + defer tryClose(stderrRead, "read end of stderr pipe") + defer tryClose(stderrWrite, "write end of stderr pipe") + // Spawn subprocess + proc, err := os.StartProcess(cmdline[0], cmdline, &os.ProcAttr{ + Files: []*os.File{stdinRead, stdoutWrite, stderrWrite}, + }) + if err != nil { + fatalf(ws, "spawning process: %w", err) + return + } + // Setup a way for other goroutines to report a fatal error, + // use increased capacity to avoid blockage with large number + // of write callsites + doneChan := make(chan struct{}, 10) + // Setup channels and variables to monitor process state + waitChan := make(chan struct{}, 1) + state := (*os.ProcessState)(nil) + // Monitor the process to see when it exits + go func() { + s, err := proc.Wait() + if err != nil { + fatalf(ws, "waiting for process to exit: %w", err) + } else { + state = s + } + waitChan <- struct{}{} + doneChan <- struct{}{} + }() + // Arrange to send information about the process exit status + // if we have obtained it by the time we return + defer func() { + if state != nil { + status := state.ExitCode() + send(ws, &serverMessage{ + Event: "exit", + ExitStatus: &status, + }) + } + }() + // Arrange for subprocess to be killed when we exit + defer func() { + // See if process has already exited or is about to + select { + case <-waitChan: + return + case <-time.NewTimer(500 * time.Millisecond).C: + // + } + // Try killing the process by closing stdin + tryClose(stdinWrite, "stdin to child") + select { + case <-waitChan: + return + case <-time.NewTimer(500 * time.Millisecond).C: + // + } + // Try killing the process with SIGTERM, SIGINT, then + // finally SIGKILL + for _, sig := range []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL} { + err = proc.Signal(sig) + if err != nil { + logWarnf("sending %s to child: %w", sig.String(), err) + } + select { + case <-waitChan: + return + case <-time.NewTimer(500 * time.Millisecond).C: + // + } + } + // We are unable to kill the process + fatalf(ws, "unable to kill child") + }() + // Close our copies of pipe ends passed to subprocess + err = stdinRead.Close() + if err != nil { + fatalf(ws, "closing read end of stdin pipe from parent") + return + } + err = stdoutWrite.Close() + if err != nil { + fatalf(ws, "closing write end of stdout pipe from parent") + return + } + err = stderrWrite.Close() + if err != nil { + fatalf(ws, "closing write end of stderr pipe from parent") + return + } + // Handle received messages from client + stdinChan := make(chan []byte) + go func() { + handleClientMessages(ws, stdinChan) + doneChan <- struct{}{} + }() + go func() { + for data := range stdinChan { + _, err := stdinWrite.Write(data) + if err != nil { + warnf(ws, "writing to stdin: %w", err) + return + } + } + }() + // Send regular pings to ensure we get regular pongs to + // satisfy the read deadline on handleClientMessages + pingDoneChan := make(chan struct{}, 1) + defer func() { + pingDoneChan <- struct{}{} + }() + go func() { + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeDeadline)) + if err != nil { + logErrorf("sending ping: %w", err) + doneChan <- struct{}{} + return + } + case <-pingDoneChan: + return + } + } + }() + // Proxy stdout and stderr back to client + go func() { + for { + buf := make([]byte, 1024) + nr, err := stdoutRead.Read(buf) + if err != nil { + warnf(ws, "reading from stdout: %w", err) + return + } + if nr == 0 { + continue + } + data, err := json.Marshal(&serverMessage{ + Event: "stdout", + Data: buf[:nr], + }) + if err != nil { + fatalf(ws, "wrapping stdout in json: %w", err) + doneChan <- struct{}{} + return + } + ws.SetWriteDeadline(time.Now().Add(writeDeadline)) + err = ws.WriteMessage(websocket.TextMessage, data) + if err != nil { + fatalf(ws, "sending message: %w", err) + doneChan <- struct{}{} + return + } + } + }() + // Wait until either process is exited or a websocket + // operation fails + <-doneChan + // Process and websocket will be cleaned up after return + return +} + +func main() { + port := os.Getenv("RIJU_AGENT_PORT") + if port == "" { + port = "869" + } + host := os.Getenv("RIJU_AGENT_HOST") + if host == "" { + host = "0.0.0.0" + } + fmt.Printf("Listening on http://%s:%s\n", host, port) + err := http.ListenAndServe(fmt.Sprintf("%s:%s", host, port), http.HandlerFunc(handler)) + if err != nil { + logError(err) + os.Exit(1) + } +} diff --git a/backend/k8s.js b/backend/k8s.js new file mode 100644 index 0000000..aeb1aa0 --- /dev/null +++ b/backend/k8s.js @@ -0,0 +1,123 @@ +import * as k8sClient from "@kubernetes/client-node"; + +const kubeconfig = new k8sClient.KubeConfig(); +kubeconfig.loadFromDefault(); + +const k8s = kubeconfig.makeApiClient(k8sClient.CoreV1Api); + +async function listUserSessions() { + return (await k8s.listNamespacedPod("riju-user")).body.items.map((pod) => ({ + podName: pod.metadata.name, + sessionID: pod.metadata.labels["riju.codes/user-session-id"], + })); +} + +async function createUserSession({ sessionID, langConfig, revisions }) { + await k8s.createNamespacedPod("riju-user", { + metadata: { + name: `riju-user-session-${sessionID}`, + labels: { + "riju.codes/user-session-id": sessionID, + }, + }, + spec: { + volumes: [ + { + name: "minio-config", + secret: { + secretName: "minio-user-login", + }, + }, + { + name: "riju-bin", + emptyDir: {}, + }, + ], + imagePullSecrets: [ + { + name: "registry-user-login", + }, + ], + initContainers: [ + { + name: "download", + image: "minio/mc:RELEASE.2022-12-13T00-23-28Z", + resources: {}, + args: [ + "sh", + "-c", + `mc cp riju/agent/${revisions.agent} /riju-bin/agent &&` + + `mc cp riju/ptyify/${revisions.ptyify} /riju-bin/ptyify`, + ], + volumeMounts: [ + { + name: "minio-config", + mountPath: "/root/.mc", + readOnly: true, + }, + { + name: "riju-bin", + mountPath: "/riju-bin", + }, + ], + }, + ], + containers: [ + { + name: "session", + image: `localhost:30999/riju-lang:${langConfig.id}-${revisions.langImage}`, + resources: { + limits: { + cpu: "1000m", + memory: "4Gi", + }, + }, + startupProbe: { + httpGet: { + path: "/health", + port: 869, + scheme: "HTTP", + }, + failureThreshold: 30, + initialDelaySeconds: 0, + periodSeconds: 1, + successThreshold: 1, + timeoutSeconds: 2, + }, + readinessProbe: { + httpGet: { + path: "/health", + port: 869, + scheme: "HTTP", + }, + failureThreshold: 1, + initialDelaySeconds: 2, + periodSeconds: 10, + successThreshold: 1, + timeoutSeconds: 2, + }, + livenessProbe: { + httpGet: { + path: "/health", + port: 869, + scheme: "HTTP", + }, + failureThreshold: 3, + initialDelaySeconds: 2, + periodSeconds: 10, + successThreshold: 1, + timeoutSeconds: 2, + }, + volumeMounts: [ + { + name: "riju-bin", + mountPath: "/riju-bin", + readOnly: true, + }, + ], + }, + ], + restartPolicy: "Never", + }, + }); +} diff --git a/k8s/riju-minio-rbac.yaml b/k8s/riju-minio-rbac.yaml deleted file mode 100644 index a19d215..0000000 --- a/k8s/riju-minio-rbac.yaml +++ /dev/null @@ -1,36 +0,0 @@ ---- -kind: ServiceAccount -apiVersion: v1 -metadata: - namespace: riju - name: minio - ---- -kind: Role -apiVersion: rbac.authorization.k8s.io/v1 -metadata: - namespace: riju - name: minio -rules: - - apiGroups: - - "" - resources: - - secrets - resourceNames: - - minio-keys - verbs: - - get - ---- -kind: RoleBinding -apiVersion: rbac.authorization.k8s.io/v1 -metadata: - namespace: riju - name: minio -roleRef: - kind: Role - apiGroup: rbac.authorization.k8s.io - name: minio -subjects: - - kind: ServiceAccount - name: minio diff --git a/k8s/riju-minio.yaml b/k8s/riju-minio.yaml index 739a184..486a937 100644 --- a/k8s/riju-minio.yaml +++ b/k8s/riju-minio.yaml @@ -24,7 +24,6 @@ spec: labels: app: minio spec: - serviceAccountName: minio containers: - name: minio image: "minio/minio:RELEASE.2022-12-12T19-27-27Z" diff --git a/k8s/secrets.in.yaml b/k8s/secrets.in.yaml index 44fe7db..dca6132 100644 --- a/k8s/secrets.in.yaml +++ b/k8s/secrets.in.yaml @@ -63,3 +63,24 @@ metadata: stringData: access-key: "{{ .minio.accessKey }}" secret-key: "{{ .minio.secretKey }}" + +--- +kind: Secret +apiVersion: v1 +metadata: + namespace: riju-user + name: minio-user-login +stringData: + config.json: | + { + "version": 10, + "aliases": { + "riju": { + "url": "http://minio.riju.svc", + "accessKey": "{{ .minio.accessKey }}", + "secretKey": "{{ .minio.secretKey }}", + "api": "s3v4", + "path": "auto" + } + } + }