Supervisor script core functionality

This commit is contained in:
Radon Rosborough 2021-07-04 03:08:55 +00:00
parent 7bb2582c6d
commit ad9bd56bb8
3 changed files with 388 additions and 18 deletions

View File

@ -1,3 +1,13 @@
module github.com/raxod502/riju/supervisor
go 1.16
require (
github.com/aws/aws-sdk-go-v2 v1.7.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.4.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.1 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.11.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.5.0 // indirect
github.com/caarlos0/env/v6 v6.6.2 // indirect
github.com/google/uuid v1.2.0 // indirect
)

41
supervisor/go.sum Normal file
View File

@ -0,0 +1,41 @@
github.com/aws/aws-sdk-go-v2 v1.7.0 h1:UYGnoIPIzed+ycmgw8Snb/0HK+KlMD+SndLTneG8ncE=
github.com/aws/aws-sdk-go-v2 v1.7.0/go.mod h1:tb9wi5s61kTDA5qCkcDbt3KRVV74GGslQkl/DRdX/P4=
github.com/aws/aws-sdk-go-v2/config v1.4.1 h1:PcGp9Kf+1dHJmP3EIDZJmAmWfGABFTU0obuvYQNzWH8=
github.com/aws/aws-sdk-go-v2/config v1.4.1/go.mod h1:HCDWZ/oeY59TPtXslxlbkCqLQBsVu6b09kiG43tdP+I=
github.com/aws/aws-sdk-go-v2/credentials v1.3.0 h1:vXxTINCsHn6LKhR043jwSLd6CsL7KOEU7b1woMr1K1A=
github.com/aws/aws-sdk-go-v2/credentials v1.3.0/go.mod h1:tOcv+qDZ0O+6Jk2beMl5JnZX6N0H7O8fw9UsD3bP7GI=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.2.0 h1:ucExzYCoAiL9GpKOsKkQLsa43wTT23tcdP4cDTSbZqY=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.2.0/go.mod h1:XvzoGzuS0kKPzCQtJCC22Xh/mMgVAzfGo/0V+mk/Cu0=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.1 h1:ag1MjvYmE8hnvl2/3LYOog9GZxcguqR6z1ewCUJQ9rE=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.1/go.mod h1:WXrj1wxGcYFfQ6H4xqsbVziISWQT55SlpX8B5+EqLOw=
github.com/aws/aws-sdk-go-v2/internal/ini v1.1.0 h1:DJq/vXXF+LAFaa/kQX9C6arlf4xX4uaaqGWIyAKOCpM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.1.0/go.mod h1:qGQ/9IfkZonRNSNLE99/yBJ7EPA/h8jlWEqtJCcaj+Q=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.2.0 h1:wfI4yrOCMAGdHaEreQ65ycSmPLVc2Q82O+r7ZxYTynA=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.2.0/go.mod h1:2Kc2Pybp1Hr2ZCCOz78mWnNSZYEKKBQgNcizVGk9sko=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.0 h1:g2npzssI/6XsoQaPYCxliMFeC5iNKKvO0aC+/wWOE0A=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.0/go.mod h1:a7XLWNKuVgOxjssEF019IiHPv35k8KHBaWv/wJAfi2A=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.5.0 h1:6KmDU3XCGTcZlWPtP/gh7wYErrovnIxjX7um8iiuVsU=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.5.0/go.mod h1:541bxEA+Z8quwit9ZT7uxv/l9xRz85/HS41l9OxOQdY=
github.com/aws/aws-sdk-go-v2/service/s3 v1.11.0 h1:FuKlyrDBZBk0RFxjqFPtx9y/KDsxTa3MoFVUgIW9w3Q=
github.com/aws/aws-sdk-go-v2/service/s3 v1.11.0/go.mod h1:zJe8mEFDS2F04nO0pKVBPfArAv2ycC6wt3ILvrV4SQw=
github.com/aws/aws-sdk-go-v2/service/sso v1.3.0 h1:DMi9w+TpUam7eJ8ksL7svfzpqpqem2MkDAJKW8+I2/k=
github.com/aws/aws-sdk-go-v2/service/sso v1.3.0/go.mod h1:qWR+TUuvfji9udM79e4CPe87C5+SjMEb2TFXkZaI0Vc=
github.com/aws/aws-sdk-go-v2/service/sts v1.5.0 h1:Y1K9dHE2CYOWOvaJSIITq4mJfLX43iziThTvqs5FqOg=
github.com/aws/aws-sdk-go-v2/service/sts v1.5.0/go.mod h1:HjDKUmissf6Mlut+WzG2r35r6LeTKmLEDJ6p9NryzLg=
github.com/aws/smithy-go v1.5.0 h1:2grDq7LxZlo8BZUDeqRfQnQWLZpInmh2TLPPkJku3YM=
github.com/aws/smithy-go v1.5.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/caarlos0/env/v6 v6.6.2 h1:BypLXDWQTA32rS4UM7pBz+/0BOuvs6C7LSeQAxMwyvI=
github.com/caarlos0/env/v6 v6.6.2/go.mod h1:P0BVSgU9zfkxfSpFUs6KsO3uWR4k3Ac0P66ibAGTybM=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@ -1,75 +1,394 @@
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/http/httputil"
"net/url"
"os"
"os/exec"
"regexp"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/caarlos0/env/v6"
uuidlib "github.com/google/uuid"
)
const bluePort = 6229
const greenPort = 6230
const blueName = "riju-app-blue"
const greenName = "riju-app-green"
type deploymentConfig struct {
LangImageTags map[string]string `json:"langImageTags"`
AppImageTag string `json:"appImageTag"`
}
type supervisorConfig struct {
AccessToken string `env:"SUPERVISOR_ACCESS_TOKEN,notEmpty"`
S3Bucket string `env:"S3_BUCKET,notEmpty"`
}
type reloadJob struct {
status string
active bool
failed bool
}
type supervisor struct {
proxyHandler http.Handler
config supervisorConfig
blueProxyHandler http.Handler
greenProxyHandler http.Handler
isGreen bool // blue-green deployment
awsAccountNumber string
awsRegion string
s3 *s3.Client
reloadLock sync.Mutex
reloadInProgress bool
reloadNeeded bool
reloadUUID string
reloadNextUUID string
reloadJobs map[string]*reloadJob
}
func (sv *supervisor) status(status string) {
sv.reloadLock.Lock()
sv.reloadJobs[sv.reloadUUID].status = status
sv.reloadLock.Unlock()
log.Println("active: " + status)
}
func (sv *supervisor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/api/supervisor") {
if r.URL.Path == "/api/supervisor/v1/reload" {
resp := ""
if r.Method != http.MethodPost {
http.Error(w, "405 method not allowed", http.StatusMethodNotAllowed)
return
}
uuid := ""
sv.reloadLock.Lock()
if !sv.reloadInProgress {
sv.reloadInProgress = true
go sv.reload()
resp = "Triggered reload."
sv.reloadUUID = uuidlib.New().String()
uuid = sv.reloadUUID
go sv.reloadWithScheduling()
} else {
if sv.reloadInProgress {
uuid = sv.reloadNextUUID
} else {
sv.reloadNextUUID = uuidlib.New().String()
uuid = sv.reloadNextUUID
}
sv.reloadNeeded = true
resp = "Reload already in progress, scheduling another one."
}
sv.reloadLock.Unlock()
fmt.Fprintln(w, resp)
fmt.Fprintln(w, uuid)
return
}
if r.URL.Path == "/api/supervisor/v1/reload/status" {
if r.Method != http.MethodGet {
http.Error(w, "405 method not allowed", http.StatusMethodNotAllowed)
return
}
uuids := r.URL.Query()["uuid"]
if len(uuids) == 0 {
http.Error(
w,
"400 missing uuid query parameter",
http.StatusBadRequest,
)
return
}
if len(uuids) > 1 {
http.Error(
w,
"400 more than one uuid query parameter",
http.StatusBadRequest,
)
return
}
uuid := uuids[0]
sv.reloadLock.Lock()
job := sv.reloadJobs[uuid]
if job == nil {
if uuid == sv.reloadUUID || uuid == sv.reloadNextUUID {
fmt.Fprintln(w, "queued")
} else {
http.Error(w, "404 no such job", http.StatusNotFound)
}
} else if job.active {
fmt.Fprintln(w, "active: " + job.status)
} else if job.failed {
fmt.Fprintln(w, "failed: " + job.status)
} else {
fmt.Fprintln(w, "succeeded: " + job.status)
}
sv.reloadLock.Unlock()
return
}
http.NotFound(w, r)
return
}
sv.proxyHandler.ServeHTTP(w, r)
if sv.isGreen {
sv.greenProxyHandler.ServeHTTP(w, r)
} else {
sv.blueProxyHandler.ServeHTTP(w, r)
}
return
}
func (sv *supervisor) reloadWithScheduling() {
err := sv.reload()
if err != nil {
log.Printf("failed to reload: %s\n", err.Error())
} else {
log.Println("successfully reloaded")
}
sv.reloadLock.Lock()
sv.reloadJobs[sv.reloadUUID] = &reloadJob{
status: "initializing",
active: true,
failed: false,
}
sv.reloadLock.Unlock()
err := sv.reload()
sv.reloadLock.Lock()
sv.reloadJobs[sv.reloadUUID].active = false
if err != nil {
log.Println("failed: " + err.Error())
sv.reloadJobs[sv.reloadUUID].failed = true
sv.reloadJobs[sv.reloadUUID].status = err.Error()
} else {
log.Println("succeeded")
}
sv.reloadInProgress = false
sv.reloadUUID = ""
if sv.reloadNeeded {
sv.reloadNeeded = false
sv.reloadInProgress = true
sv.reloadUUID = sv.reloadNextUUID
sv.reloadNextUUID = ""
go sv.reloadWithScheduling()
}
sv.reloadLock.Unlock()
}
var rijuImageRegexp = regexp.MustCompile(`(?:^|/)riju:([^<>]+)$`)
func (sv *supervisor) reload() error {
fmt.Println("starting reload")
sv.status("downloading deployment config from S3")
dl := s3manager.NewDownloader(sv.s3)
buf := s3manager.NewWriteAtBuffer([]byte{})
if _, err := dl.Download(context.Background(), buf, &s3.GetObjectInput{
Bucket: &sv.config.S3Bucket,
Key: aws.String("config.json"),
}); err != nil {
return err
}
deployCfg := deploymentConfig{}
if err := json.Unmarshal(buf.Bytes(), &deployCfg); err != nil {
return err
}
sv.status("listing locally available images")
dockerImageLs := exec.Command(
"docker", "image", "ls", "--format",
"{{ .Repository }}:{{ .Tag }}",
)
out, err := dockerImageLs.Output()
if err != nil {
return err
}
existingTags := map[string]bool{}
for _, line := range strings.Split(string(out), "\n") {
if match := rijuImageRegexp.FindStringSubmatch(line); match != nil {
tag := match[1]
existingTags[tag] = true
}
}
neededTags := []string{}
for _, tag := range deployCfg.LangImageTags {
neededTags = append(neededTags, tag)
}
neededTags = append(neededTags, deployCfg.AppImageTag)
for _, tag := range neededTags {
if !existingTags[tag] {
sv.status("pulling image for " + tag)
fullImage := fmt.Sprintf(
"%s.dkr.ecr.%s.amazonaws.com/riju:%s",
sv.awsAccountNumber,
sv.awsRegion,
tag,
)
dockerPull := exec.Command("docker", "pull", fullImage)
if err := dockerPull.Run(); err != nil {
return err
}
dockerTag := exec.Command(
"docker", "tag", fullImage,
fmt.Sprintf("riju:%s", tag),
)
if err := dockerTag.Run(); err != nil {
return err
}
}
}
deployCfgStr, err := json.Marshal(&deployCfg)
if err != nil {
return err
}
var port int
var name string
if sv.isGreen {
port = bluePort
name = blueName
} else {
port = greenPort
name = greenName
}
sv.status("starting container " + name)
dockerRun := exec.Command(
"docker", "run", "-d",
"-v", "/var/run/riju:/var/run/riju",
"-v", "/var/run/docker.sock:/var/run/docker.sock",
"-p", fmt.Sprintf("%s:6119", port),
"-e", "RIJU_DEPLOY_CONFIG",
"--name", name,
fmt.Sprintf("riju:%s", deployCfg.AppImageTag),
)
dockerRun.Env = append(os.Environ(), fmt.Sprintf("RIJU_DEPLOY_CONFIG=%s", deployCfgStr))
if err := dockerRun.Run(); err != nil {
return err
}
sv.status("waiting for container to start up")
time.Sleep(5)
sv.status("checking that container is healthy")
resp, err := http.Get(fmt.Sprintf("http://localhost:%d", port))
if err != nil {
return err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
if !strings.Contains(string(body), "python") {
return errors.New("container did not appear to be healthy")
}
sv.isGreen = !sv.isGreen
sv.status("reload complete")
return nil
}
var rijuContainerRegexp = regexp.MustCompile(`^([^:]+):(.+)$`)
func main() {
url, err := url.Parse("http://localhost:6119")
if err != nil {
log.Fatal(err)
supervisorCfg := supervisorConfig{}
if err := env.Parse(&supervisorCfg); err != nil {
log.Fatalln(err)
}
blueUrl, err := url.Parse(fmt.Sprintf("http://localhost:%d", bluePort))
if err != nil {
log.Fatalln(err)
}
greenUrl, err := url.Parse(fmt.Sprintf("http://localhost:%d", greenPort))
if err != nil {
log.Fatalln(err)
}
awsCfg, err := awsConfig.LoadDefaultConfig(context.Background())
if err != nil {
log.Fatalln(err)
}
stsClient := sts.New(sts.Options{
Region: awsCfg.Region,
})
ident, err := stsClient.GetCallerIdentity(context.Background(), &sts.GetCallerIdentityInput{})
if err != nil {
log.Fatalln(err)
}
dockerContainerLs := exec.Command(
"docker", "container", "ls",
"--format", "{{ .Names }}:{{ .CreatedAt }}",
)
out, err := dockerContainerLs.Output()
if err != nil {
log.Fatalln(err)
}
var blueRunningSince *time.Time
var greenRunningSince *time.Time
for _, line := range strings.Split(string(out), "\n") {
if match := rijuContainerRegexp.FindStringSubmatch(line); match != nil {
name := match[1]
created, err := time.Parse(
"2006-01-02 15:04:05 -0070 MST",
match[2],
)
if err != nil {
continue
}
if name == blueName {
blueRunningSince = &created
continue
}
if name == greenName {
greenRunningSince = &created
continue
}
}
}
var isGreen bool
if blueRunningSince == nil && greenRunningSince == nil {
log.Println("did not detect any existing containers")
isGreen = false
} else if blueRunningSince != nil && greenRunningSince == nil {
log.Println("detected existing blue container")
isGreen = false
} else if greenRunningSince != nil && blueRunningSince == nil {
log.Println("detected existing green container")
isGreen = true
} else {
log.Println("detected existing blue and green containers")
isGreen = greenRunningSince.Before(*blueRunningSince)
var color string
var name string
if isGreen {
color = "blue"
name = blueName
} else {
color = "green"
name = greenName
}
log.Printf("stopping %s container as it is newer\n", color)
dockerStop := exec.Command("docker", "stop", name)
if err := dockerStop.Run(); err != nil {
log.Fatalln(err)
}
}
sv := &supervisor{
proxyHandler: httputil.NewSingleHostReverseProxy(url),
config: supervisorCfg,
blueProxyHandler: httputil.NewSingleHostReverseProxy(blueUrl),
greenProxyHandler: httputil.NewSingleHostReverseProxy(greenUrl),
isGreen: isGreen,
s3: s3.NewFromConfig(awsCfg),
awsRegion: awsCfg.Region,
awsAccountNumber: *ident.Account,
reloadJobs: map[string]*reloadJob{},
}
log.Println("listening on http://0.0.0.0:80")
log.Fatalln(http.ListenAndServe("0.0.0.0:80", sv))