diff --git a/supervisor/go.mod b/supervisor/go.mod index e766e88..116938f 100644 --- a/supervisor/go.mod +++ b/supervisor/go.mod @@ -1,3 +1,13 @@ module github.com/raxod502/riju/supervisor go 1.16 + +require ( + github.com/aws/aws-sdk-go-v2 v1.7.0 // indirect + github.com/aws/aws-sdk-go-v2/config v1.4.1 // indirect + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.1 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.11.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.5.0 // indirect + github.com/caarlos0/env/v6 v6.6.2 // indirect + github.com/google/uuid v1.2.0 // indirect +) diff --git a/supervisor/go.sum b/supervisor/go.sum new file mode 100644 index 0000000..ed79512 --- /dev/null +++ b/supervisor/go.sum @@ -0,0 +1,41 @@ +github.com/aws/aws-sdk-go-v2 v1.7.0 h1:UYGnoIPIzed+ycmgw8Snb/0HK+KlMD+SndLTneG8ncE= +github.com/aws/aws-sdk-go-v2 v1.7.0/go.mod h1:tb9wi5s61kTDA5qCkcDbt3KRVV74GGslQkl/DRdX/P4= +github.com/aws/aws-sdk-go-v2/config v1.4.1 h1:PcGp9Kf+1dHJmP3EIDZJmAmWfGABFTU0obuvYQNzWH8= +github.com/aws/aws-sdk-go-v2/config v1.4.1/go.mod h1:HCDWZ/oeY59TPtXslxlbkCqLQBsVu6b09kiG43tdP+I= +github.com/aws/aws-sdk-go-v2/credentials v1.3.0 h1:vXxTINCsHn6LKhR043jwSLd6CsL7KOEU7b1woMr1K1A= +github.com/aws/aws-sdk-go-v2/credentials v1.3.0/go.mod h1:tOcv+qDZ0O+6Jk2beMl5JnZX6N0H7O8fw9UsD3bP7GI= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.2.0 h1:ucExzYCoAiL9GpKOsKkQLsa43wTT23tcdP4cDTSbZqY= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.2.0/go.mod h1:XvzoGzuS0kKPzCQtJCC22Xh/mMgVAzfGo/0V+mk/Cu0= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.1 h1:ag1MjvYmE8hnvl2/3LYOog9GZxcguqR6z1ewCUJQ9rE= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.1/go.mod h1:WXrj1wxGcYFfQ6H4xqsbVziISWQT55SlpX8B5+EqLOw= +github.com/aws/aws-sdk-go-v2/internal/ini v1.1.0 h1:DJq/vXXF+LAFaa/kQX9C6arlf4xX4uaaqGWIyAKOCpM= +github.com/aws/aws-sdk-go-v2/internal/ini v1.1.0/go.mod h1:qGQ/9IfkZonRNSNLE99/yBJ7EPA/h8jlWEqtJCcaj+Q= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.2.0 h1:wfI4yrOCMAGdHaEreQ65ycSmPLVc2Q82O+r7ZxYTynA= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.2.0/go.mod h1:2Kc2Pybp1Hr2ZCCOz78mWnNSZYEKKBQgNcizVGk9sko= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.0 h1:g2npzssI/6XsoQaPYCxliMFeC5iNKKvO0aC+/wWOE0A= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.0/go.mod h1:a7XLWNKuVgOxjssEF019IiHPv35k8KHBaWv/wJAfi2A= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.5.0 h1:6KmDU3XCGTcZlWPtP/gh7wYErrovnIxjX7um8iiuVsU= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.5.0/go.mod h1:541bxEA+Z8quwit9ZT7uxv/l9xRz85/HS41l9OxOQdY= +github.com/aws/aws-sdk-go-v2/service/s3 v1.11.0 h1:FuKlyrDBZBk0RFxjqFPtx9y/KDsxTa3MoFVUgIW9w3Q= +github.com/aws/aws-sdk-go-v2/service/s3 v1.11.0/go.mod h1:zJe8mEFDS2F04nO0pKVBPfArAv2ycC6wt3ILvrV4SQw= +github.com/aws/aws-sdk-go-v2/service/sso v1.3.0 h1:DMi9w+TpUam7eJ8ksL7svfzpqpqem2MkDAJKW8+I2/k= +github.com/aws/aws-sdk-go-v2/service/sso v1.3.0/go.mod h1:qWR+TUuvfji9udM79e4CPe87C5+SjMEb2TFXkZaI0Vc= +github.com/aws/aws-sdk-go-v2/service/sts v1.5.0 h1:Y1K9dHE2CYOWOvaJSIITq4mJfLX43iziThTvqs5FqOg= +github.com/aws/aws-sdk-go-v2/service/sts v1.5.0/go.mod h1:HjDKUmissf6Mlut+WzG2r35r6LeTKmLEDJ6p9NryzLg= +github.com/aws/smithy-go v1.5.0 h1:2grDq7LxZlo8BZUDeqRfQnQWLZpInmh2TLPPkJku3YM= +github.com/aws/smithy-go v1.5.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= +github.com/caarlos0/env/v6 v6.6.2 h1:BypLXDWQTA32rS4UM7pBz+/0BOuvs6C7LSeQAxMwyvI= +github.com/caarlos0/env/v6 v6.6.2/go.mod h1:P0BVSgU9zfkxfSpFUs6KsO3uWR4k3Ac0P66ibAGTybM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/supervisor/src/main.go b/supervisor/src/main.go index 433e4ae..85f1673 100644 --- a/supervisor/src/main.go +++ b/supervisor/src/main.go @@ -1,75 +1,394 @@ package main import ( + "context" + "encoding/json" + "errors" "fmt" + "io" "log" "net/http" "net/http/httputil" "net/url" + "os" + "os/exec" + "regexp" "strings" "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsConfig "github.com/aws/aws-sdk-go-v2/config" + s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sts" + "github.com/caarlos0/env/v6" + uuidlib "github.com/google/uuid" ) +const bluePort = 6229 +const greenPort = 6230 + +const blueName = "riju-app-blue" +const greenName = "riju-app-green" + +type deploymentConfig struct { + LangImageTags map[string]string `json:"langImageTags"` + AppImageTag string `json:"appImageTag"` +} + +type supervisorConfig struct { + AccessToken string `env:"SUPERVISOR_ACCESS_TOKEN,notEmpty"` + S3Bucket string `env:"S3_BUCKET,notEmpty"` +} + +type reloadJob struct { + status string + active bool + failed bool +} + type supervisor struct { - proxyHandler http.Handler + config supervisorConfig + + blueProxyHandler http.Handler + greenProxyHandler http.Handler + isGreen bool // blue-green deployment + + awsAccountNumber string + awsRegion string + s3 *s3.Client reloadLock sync.Mutex reloadInProgress bool reloadNeeded bool + reloadUUID string + reloadNextUUID string + reloadJobs map[string]*reloadJob +} + +func (sv *supervisor) status(status string) { + sv.reloadLock.Lock() + sv.reloadJobs[sv.reloadUUID].status = status + sv.reloadLock.Unlock() + log.Println("active: " + status) } func (sv *supervisor) ServeHTTP(w http.ResponseWriter, r *http.Request) { if strings.HasPrefix(r.URL.Path, "/api/supervisor") { if r.URL.Path == "/api/supervisor/v1/reload" { - resp := "" + if r.Method != http.MethodPost { + http.Error(w, "405 method not allowed", http.StatusMethodNotAllowed) + return + } + uuid := "" sv.reloadLock.Lock() if !sv.reloadInProgress { sv.reloadInProgress = true - go sv.reload() - resp = "Triggered reload." + sv.reloadUUID = uuidlib.New().String() + uuid = sv.reloadUUID + go sv.reloadWithScheduling() } else { + if sv.reloadInProgress { + uuid = sv.reloadNextUUID + } else { + sv.reloadNextUUID = uuidlib.New().String() + uuid = sv.reloadNextUUID + } sv.reloadNeeded = true - resp = "Reload already in progress, scheduling another one." } sv.reloadLock.Unlock() - fmt.Fprintln(w, resp) + fmt.Fprintln(w, uuid) + return + } + if r.URL.Path == "/api/supervisor/v1/reload/status" { + if r.Method != http.MethodGet { + http.Error(w, "405 method not allowed", http.StatusMethodNotAllowed) + return + } + uuids := r.URL.Query()["uuid"] + if len(uuids) == 0 { + http.Error( + w, + "400 missing uuid query parameter", + http.StatusBadRequest, + ) + return + } + if len(uuids) > 1 { + http.Error( + w, + "400 more than one uuid query parameter", + http.StatusBadRequest, + ) + return + } + uuid := uuids[0] + sv.reloadLock.Lock() + job := sv.reloadJobs[uuid] + if job == nil { + if uuid == sv.reloadUUID || uuid == sv.reloadNextUUID { + fmt.Fprintln(w, "queued") + } else { + http.Error(w, "404 no such job", http.StatusNotFound) + } + } else if job.active { + fmt.Fprintln(w, "active: " + job.status) + } else if job.failed { + fmt.Fprintln(w, "failed: " + job.status) + } else { + fmt.Fprintln(w, "succeeded: " + job.status) + } + sv.reloadLock.Unlock() return } http.NotFound(w, r) return } - sv.proxyHandler.ServeHTTP(w, r) + if sv.isGreen { + sv.greenProxyHandler.ServeHTTP(w, r) + } else { + sv.blueProxyHandler.ServeHTTP(w, r) + } return } func (sv *supervisor) reloadWithScheduling() { - err := sv.reload() - if err != nil { - log.Printf("failed to reload: %s\n", err.Error()) - } else { - log.Println("successfully reloaded") - } sv.reloadLock.Lock() + sv.reloadJobs[sv.reloadUUID] = &reloadJob{ + status: "initializing", + active: true, + failed: false, + } + sv.reloadLock.Unlock() + err := sv.reload() + sv.reloadLock.Lock() + sv.reloadJobs[sv.reloadUUID].active = false + if err != nil { + log.Println("failed: " + err.Error()) + sv.reloadJobs[sv.reloadUUID].failed = true + sv.reloadJobs[sv.reloadUUID].status = err.Error() + } else { + log.Println("succeeded") + } sv.reloadInProgress = false + sv.reloadUUID = "" if sv.reloadNeeded { + sv.reloadNeeded = false sv.reloadInProgress = true + sv.reloadUUID = sv.reloadNextUUID + sv.reloadNextUUID = "" go sv.reloadWithScheduling() } sv.reloadLock.Unlock() } +var rijuImageRegexp = regexp.MustCompile(`(?:^|/)riju:([^<>]+)$`) + func (sv *supervisor) reload() error { - fmt.Println("starting reload") + sv.status("downloading deployment config from S3") + dl := s3manager.NewDownloader(sv.s3) + buf := s3manager.NewWriteAtBuffer([]byte{}) + if _, err := dl.Download(context.Background(), buf, &s3.GetObjectInput{ + Bucket: &sv.config.S3Bucket, + Key: aws.String("config.json"), + }); err != nil { + return err + } + deployCfg := deploymentConfig{} + if err := json.Unmarshal(buf.Bytes(), &deployCfg); err != nil { + return err + } + sv.status("listing locally available images") + dockerImageLs := exec.Command( + "docker", "image", "ls", "--format", + "{{ .Repository }}:{{ .Tag }}", + ) + out, err := dockerImageLs.Output() + if err != nil { + return err + } + existingTags := map[string]bool{} + for _, line := range strings.Split(string(out), "\n") { + if match := rijuImageRegexp.FindStringSubmatch(line); match != nil { + tag := match[1] + existingTags[tag] = true + } + } + neededTags := []string{} + for _, tag := range deployCfg.LangImageTags { + neededTags = append(neededTags, tag) + } + neededTags = append(neededTags, deployCfg.AppImageTag) + for _, tag := range neededTags { + if !existingTags[tag] { + sv.status("pulling image for " + tag) + fullImage := fmt.Sprintf( + "%s.dkr.ecr.%s.amazonaws.com/riju:%s", + sv.awsAccountNumber, + sv.awsRegion, + tag, + ) + dockerPull := exec.Command("docker", "pull", fullImage) + if err := dockerPull.Run(); err != nil { + return err + } + dockerTag := exec.Command( + "docker", "tag", fullImage, + fmt.Sprintf("riju:%s", tag), + ) + if err := dockerTag.Run(); err != nil { + return err + } + } + } + deployCfgStr, err := json.Marshal(&deployCfg) + if err != nil { + return err + } + var port int + var name string + if sv.isGreen { + port = bluePort + name = blueName + } else { + port = greenPort + name = greenName + } + sv.status("starting container " + name) + dockerRun := exec.Command( + "docker", "run", "-d", + "-v", "/var/run/riju:/var/run/riju", + "-v", "/var/run/docker.sock:/var/run/docker.sock", + "-p", fmt.Sprintf("%s:6119", port), + "-e", "RIJU_DEPLOY_CONFIG", + "--name", name, + fmt.Sprintf("riju:%s", deployCfg.AppImageTag), + ) + dockerRun.Env = append(os.Environ(), fmt.Sprintf("RIJU_DEPLOY_CONFIG=%s", deployCfgStr)) + if err := dockerRun.Run(); err != nil { + return err + } + sv.status("waiting for container to start up") + time.Sleep(5) + sv.status("checking that container is healthy") + resp, err := http.Get(fmt.Sprintf("http://localhost:%d", port)) + if err != nil { + return err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + if !strings.Contains(string(body), "python") { + return errors.New("container did not appear to be healthy") + } + sv.isGreen = !sv.isGreen + sv.status("reload complete") return nil } +var rijuContainerRegexp = regexp.MustCompile(`^([^:]+):(.+)$`) + func main() { - url, err := url.Parse("http://localhost:6119") - if err != nil { - log.Fatal(err) + supervisorCfg := supervisorConfig{} + if err := env.Parse(&supervisorCfg); err != nil { + log.Fatalln(err) } + + blueUrl, err := url.Parse(fmt.Sprintf("http://localhost:%d", bluePort)) + if err != nil { + log.Fatalln(err) + } + greenUrl, err := url.Parse(fmt.Sprintf("http://localhost:%d", greenPort)) + if err != nil { + log.Fatalln(err) + } + + awsCfg, err := awsConfig.LoadDefaultConfig(context.Background()) + if err != nil { + log.Fatalln(err) + } + + stsClient := sts.New(sts.Options{ + Region: awsCfg.Region, + }) + ident, err := stsClient.GetCallerIdentity(context.Background(), &sts.GetCallerIdentityInput{}) + if err != nil { + log.Fatalln(err) + } + + dockerContainerLs := exec.Command( + "docker", "container", "ls", + "--format", "{{ .Names }}:{{ .CreatedAt }}", + ) + out, err := dockerContainerLs.Output() + if err != nil { + log.Fatalln(err) + } + + var blueRunningSince *time.Time + var greenRunningSince *time.Time + for _, line := range strings.Split(string(out), "\n") { + if match := rijuContainerRegexp.FindStringSubmatch(line); match != nil { + name := match[1] + created, err := time.Parse( + "2006-01-02 15:04:05 -0070 MST", + match[2], + ) + if err != nil { + continue + } + if name == blueName { + blueRunningSince = &created + continue + } + if name == greenName { + greenRunningSince = &created + continue + } + } + } + + var isGreen bool + if blueRunningSince == nil && greenRunningSince == nil { + log.Println("did not detect any existing containers") + isGreen = false + } else if blueRunningSince != nil && greenRunningSince == nil { + log.Println("detected existing blue container") + isGreen = false + } else if greenRunningSince != nil && blueRunningSince == nil { + log.Println("detected existing green container") + isGreen = true + } else { + log.Println("detected existing blue and green containers") + isGreen = greenRunningSince.Before(*blueRunningSince) + var color string + var name string + if isGreen { + color = "blue" + name = blueName + } else { + color = "green" + name = greenName + } + log.Printf("stopping %s container as it is newer\n", color) + dockerStop := exec.Command("docker", "stop", name) + if err := dockerStop.Run(); err != nil { + log.Fatalln(err) + } + } + sv := &supervisor{ - proxyHandler: httputil.NewSingleHostReverseProxy(url), + config: supervisorCfg, + blueProxyHandler: httputil.NewSingleHostReverseProxy(blueUrl), + greenProxyHandler: httputil.NewSingleHostReverseProxy(greenUrl), + isGreen: isGreen, + s3: s3.NewFromConfig(awsCfg), + awsRegion: awsCfg.Region, + awsAccountNumber: *ident.Account, + reloadJobs: map[string]*reloadJob{}, } log.Println("listening on http://0.0.0.0:80") log.Fatalln(http.ListenAndServe("0.0.0.0:80", sv))