Oculus/main.go

376 lines
9.6 KiB
Go

package main
import (
"crypto/md5"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
)
type ContainerInfo struct {
ID string
Name string
CName string
Interval string
Ignores []string
}
type MonitoredContainer struct {
Info ContainerInfo
LastChecked time.Time
}
var (
apiAddress string
glitchtipDSN string
notifiedChanges = make(map[string]string)
monitoredContainers = make(map[string]*MonitoredContainer)
mu sync.Mutex
httpClient *http.Client
goGlitchPath string
oculusFilterPath string
)
func init() {
apiAddress = os.Getenv("API_ADDRESS")
if apiAddress == "" {
apiAddress = "http://localhost:8080"
}
glitchtipDSN = os.Getenv("GLITCHTIP_DSN")
if glitchtipDSN == "" {
log.Fatal("GLITCHTIP_DSN environment variable is not set")
}
goGlitchPath = "/usr/local/bin/go-glitch"
oculusFilterPath = "/usr/local/bin/oculus_filter"
httpClient = &http.Client{
Timeout: 30 * time.Second,
}
}
func main() {
log.Println("Starting Oculus...")
// Ensure the diffs directory exists
err := os.MkdirAll("./diffs", os.ModePerm)
if err != nil {
log.Fatalf("Error creating diffs directory: %v", err)
}
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Println("Fetching and monitoring containers...")
err := fetchAndMonitorContainers()
if err != nil {
log.Printf("Error in fetching and monitoring containers: %v", err)
}
// Cleanup temporary files after each check
cleanupTempFiles()
default:
time.Sleep(1 * time.Second)
err := monitorContainers()
if err != nil {
log.Printf("Error in monitoring containers: %v", err)
}
}
}
}
func fetchAndMonitorContainers() error {
containers, err := fetchContainers()
if err != nil {
return err
}
log.Printf("Fetched %d containers\n", len(containers))
mu.Lock()
defer mu.Unlock()
// Remove containers that are no longer present
for id := range monitoredContainers {
found := false
for _, container := range containers {
if container.ID == id {
found = true
break
}
}
if !found {
delete(monitoredContainers, id)
log.Printf("Stopped monitoring container %s\n", id)
}
}
// Add or update containers
for _, container := range containers {
if _, exists := monitoredContainers[container.ID]; !exists {
interval, err := time.ParseDuration(container.Interval)
if err != nil {
log.Printf("Invalid interval for container %s (%s): %v", container.Name, container.ID, err)
continue
}
monitoredContainers[container.ID] = &MonitoredContainer{
Info: container,
LastChecked: time.Now().Add(-interval),
}
log.Printf("Started monitoring container %s (%s) with interval %s\n", container.Name, container.ID, container.Interval)
} else {
monitoredContainers[container.ID].Info = container
}
}
return nil
}
func monitorContainers() error {
var wg sync.WaitGroup
for _, monitoredContainer := range monitoredContainers {
wg.Add(1)
go func(mc *MonitoredContainer) {
defer wg.Done()
interval, err := time.ParseDuration(mc.Info.Interval)
if err != nil {
log.Printf("Invalid interval for container %s (%s): %v", mc.Info.Name, mc.Info.ID, err)
return
}
mu.Lock()
if time.Since(mc.LastChecked) >= interval {
mc.LastChecked = time.Now()
mu.Unlock()
log.Printf("Checking container %s (%s) with interval %s\n", mc.Info.Name, mc.Info.ID, mc.Info.Interval)
log.Printf("Container %s (%s) ignores: %v\n", mc.Info.Name, mc.Info.ID, mc.Info.Ignores)
checkAndNotify(mc.Info)
} else {
mu.Unlock()
}
}(monitoredContainer)
}
wg.Wait()
return nil
}
func fetchContainers() ([]ContainerInfo, error) {
log.Printf("Fetching containers from API: %s/containers\n", apiAddress)
resp, err := httpClient.Get(fmt.Sprintf("%s/containers", apiAddress))
if err != nil {
log.Printf("Error fetching containers: %v", err)
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Printf("Error fetching containers: %s", resp.Status)
return nil, fmt.Errorf("error fetching containers: %s", resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("Error reading response body: %v", err)
return nil, err
}
log.Printf("Response body: %s\n", body)
var containers []ContainerInfo
if err := json.Unmarshal(body, &containers); err != nil {
log.Printf("Error unmarshaling response body: %v", err)
return nil, err
}
return containers, nil
}
func fetchDiffOutput(containerID string) (string, error) {
log.Printf("Fetching diff output for container ID: %s\n", containerID)
resp, err := httpClient.Get(fmt.Sprintf("%s/diff?id=%s", apiAddress, containerID))
if err != nil {
log.Printf("Error fetching diff: %v", err)
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Printf("Error fetching diff: %s", resp.Status)
return "", fmt.Errorf("error fetching diff: %s", resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("Error reading response body: %v", err)
return "", err
}
log.Printf("Diff output: %s\n", body)
return string(body), nil
}
func checkAndNotify(container ContainerInfo) {
diffOutput, err := fetchDiffOutput(container.ID)
if err != nil {
log.Printf("Error getting diffs for container %s (%s): %v", container.Name, container.ID, err)
return
}
filteredOutput, err := filterDiffOutput(diffOutput, container.CName, container.Ignores)
if err != nil {
log.Printf("Error filtering diffs for container %s (%s): %v", container.Name, container.ID, err)
return
}
log.Printf("Filtered diff output for container %s: %s", container.Name, filteredOutput)
if filteredOutput != "" {
diffHash := fmt.Sprintf("%x", md5.Sum([]byte(filteredOutput)))
log.Printf("Diff hash for container %s: %s", container.Name, diffHash)
mu.Lock()
lastNotifiedHash, notified := notifiedChanges[container.ID]
mu.Unlock()
if !notified || lastNotifiedHash != diffHash {
filename := filepath.Join("diffs", fmt.Sprintf("%s.diff", container.CName))
err := writeToFile(filename, filteredOutput)
if err != nil {
log.Printf("Error writing diff to file: %v", err)
return
}
log.Printf("Sending notification for container %s\n", container.Name)
err = sendNotification(filteredOutput)
if err != nil {
log.Printf("Error sending notification for container %s: %v", container.ID, err)
} else {
mu.Lock()
notifiedChanges[container.ID] = diffHash
mu.Unlock()
log.Printf("Notification sent and hash updated for container: %s (%s)", container.Name, container.ID)
}
} else {
log.Printf("No new changes detected for container: %s (%s)", container.Name, container.ID)
}
} else {
log.Printf("No significant changes detected for container: %s (%s)", container.Name, container.ID)
}
}
func filterDiffOutput(diffOutput, cname string, ignores []string) (string, error) {
filename := filepath.Join("diffs", fmt.Sprintf("%s.diff", cname))
// Write the diff output to the file
err := ioutil.WriteFile(filename, []byte(diffOutput), 0644)
if err != nil {
return "", fmt.Errorf("error writing diff to file: %s", err)
}
if len(ignores) == 0 {
log.Println("No ignore patterns provided.")
return "", fmt.Errorf("no ignore patterns provided")
}
// Construct the filter command
args := append([]string{filename}, ignores...)
cmd := exec.Command(oculusFilterPath, args...)
fullCommand := fmt.Sprintf("%s %s", oculusFilterPath, strings.Join(cmd.Args, " "))
log.Printf("Running command: %s", fullCommand)
fmt.Printf("Running command: %s\n", fullCommand) // Print the command to stdout for debugging
output, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("error running filter command: %s, output: %s", err, output)
}
// Read the filtered output
filteredOutput, err := ioutil.ReadFile(filename)
if err != nil {
return "", fmt.Errorf("error reading filtered diff file: %s", err)
}
return string(filteredOutput), nil
}
func writeToFile(filename, content string) error {
tempFile, err := os.CreateTemp("", "filtered_output")
if err != nil {
return fmt.Errorf("error creating temporary file: %s", err)
}
defer os.Remove(tempFile.Name()) // Ensure the temporary file is removed
_, err = tempFile.WriteString(content)
if err != nil {
return fmt.Errorf("error writing to temporary file: %s", err)
}
err = copyFile(tempFile.Name(), filename)
if err != nil {
return fmt.Errorf("error copying temporary file to final destination: %s", err)
}
return nil
}
func copyFile(src, dst string) error {
srcFile, err := os.Open(src)
if err != nil {
return err
}
defer srcFile.Close()
dstFile, err := os.Create(dst)
if err != nil {
return err
}
defer dstFile.Close()
_, err = io.Copy(dstFile, srcFile)
return err
}
func sendNotification(content string) error {
cmd := exec.Command(goGlitchPath)
cmd.Stdin = strings.NewReader(content)
output, err := cmd.CombinedOutput()
if err != nil {
log.Printf("go-glitch output: %s", output)
return err
}
log.Printf("go-glitch output: %s", output)
return nil
}
func cleanupTempFiles() {
files, err := filepath.Glob("/tmp/filtered_output*")
if err != nil {
log.Printf("Error finding temporary files: %v", err)
return
}
log.Printf("Found %d temporary files", len(files))
for _, file := range files {
log.Printf("Attempting to remove temporary file: %s", file)
err := os.Remove(file)
if err != nil {
log.Printf("Error removing temporary file %s: %v", file, err)
} else {
log.Printf("Removed temporary file %s", file)
}
}
}