Skip to content

Commit

Permalink
neonvm-runner: Move http server to its own file
Browse files Browse the repository at this point in the history
  • Loading branch information
sharnoff committed Dec 22, 2024
1 parent 45b3019 commit deaa2a8
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 127 deletions.
143 changes: 143 additions & 0 deletions neonvm-runner/cmd/httpserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/api"
)

type cpuServerCallbacks struct {
get func(*zap.Logger) (*vmv1.MilliCPU, error)
set func(*zap.Logger, vmv1.MilliCPU) error
}

func listenForHTTPRequests(
ctx context.Context,
logger *zap.Logger,
port int32,
callbacks cpuServerCallbacks,
wg *sync.WaitGroup,
networkMonitoring bool,
) {
defer wg.Done()
mux := http.NewServeMux()
loggerHandlers := logger.Named("http-handlers")
cpuChangeLogger := loggerHandlers.Named("cpu_change")
mux.HandleFunc("/cpu_change", func(w http.ResponseWriter, r *http.Request) {
handleCPUChange(cpuChangeLogger, w, r, callbacks.set)
})
cpuCurrentLogger := loggerHandlers.Named("cpu_current")
mux.HandleFunc("/cpu_current", func(w http.ResponseWriter, r *http.Request) {
handleCPUCurrent(cpuCurrentLogger, w, r, callbacks.get)
})
if networkMonitoring {
reg := prometheus.NewRegistry()
metrics := NewMonitoringMetrics(reg)
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
metrics.update(logger)
h := promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})
h.ServeHTTP(w, r)
})
}
server := http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", port),
Handler: mux,
ReadTimeout: 5 * time.Second,
ReadHeaderTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
}
errChan := make(chan error)
go func() {
errChan <- server.ListenAndServe()
}()
select {
case err := <-errChan:
if errors.Is(err, http.ErrServerClosed) {
logger.Info("http server closed")
} else if err != nil {
logger.Fatal("http server exited with error", zap.Error(err))
}
case <-ctx.Done():
err := server.Shutdown(context.Background())
logger.Info("shut down http server", zap.Error(err))
}
}

func handleCPUChange(
logger *zap.Logger,
w http.ResponseWriter,
r *http.Request,
set func(*zap.Logger, vmv1.MilliCPU) error,
) {
if r.Method != "POST" {
logger.Error("unexpected method", zap.String("method", r.Method))
w.WriteHeader(400)
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
logger.Error("could not read body", zap.Error(err))
w.WriteHeader(400)
return
}

var parsed api.VCPUChange
if err = json.Unmarshal(body, &parsed); err != nil {
logger.Error("could not parse body", zap.Error(err))
w.WriteHeader(400)
return
}

// update cgroup
logger.Info("got CPU update", zap.Float64("CPU", parsed.VCPUs.AsFloat64()))
err = set(logger, parsed.VCPUs)
if err != nil {
logger.Error("could not set cgroup limit", zap.Error(err))
w.WriteHeader(500)
return
}

w.WriteHeader(200)
}

func handleCPUCurrent(
logger *zap.Logger,
w http.ResponseWriter,
r *http.Request,
get func(*zap.Logger) (*vmv1.MilliCPU, error),
) {
if r.Method != "GET" {
logger.Error("unexpected method", zap.String("method", r.Method))
w.WriteHeader(400)
return
}

cpus, err := get(logger)
if err != nil {
logger.Error("could not get cgroup quota", zap.Error(err))
w.WriteHeader(500)
return
}
resp := api.VCPUCgroup{VCPUs: *cpus}
body, err := json.Marshal(resp)
if err != nil {
logger.Error("could not marshal body", zap.Error(err))
w.WriteHeader(500)
return
}

w.Header().Add("Content-Type", "application/json")
w.Write(body) //nolint:errcheck // Not much to do with the error here. TODO: log it?
}
127 changes: 0 additions & 127 deletions neonvm-runner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ import (

"github.com/digitalocean/go-qemu/qmp"
"github.com/jpillora/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/samber/lo"
"go.uber.org/zap"

"k8s.io/apimachinery/pkg/api/resource"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

Expand Down Expand Up @@ -559,130 +556,6 @@ func getMachineType(architecture string) string {
}
}

func handleCPUChange(
logger *zap.Logger,
w http.ResponseWriter,
r *http.Request,
set func(*zap.Logger, vmv1.MilliCPU) error,
) {
if r.Method != "POST" {
logger.Error("unexpected method", zap.String("method", r.Method))
w.WriteHeader(400)
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
logger.Error("could not read body", zap.Error(err))
w.WriteHeader(400)
return
}

var parsed api.VCPUChange
if err = json.Unmarshal(body, &parsed); err != nil {
logger.Error("could not parse body", zap.Error(err))
w.WriteHeader(400)
return
}

// update cgroup
logger.Info("got CPU update", zap.Float64("CPU", parsed.VCPUs.AsFloat64()))
err = set(logger, parsed.VCPUs)
if err != nil {
logger.Error("could not set cgroup limit", zap.Error(err))
w.WriteHeader(500)
return
}

w.WriteHeader(200)
}

func handleCPUCurrent(
logger *zap.Logger,
w http.ResponseWriter,
r *http.Request,
get func(*zap.Logger) (*vmv1.MilliCPU, error),
) {
if r.Method != "GET" {
logger.Error("unexpected method", zap.String("method", r.Method))
w.WriteHeader(400)
return
}

cpus, err := get(logger)
if err != nil {
logger.Error("could not get cgroup quota", zap.Error(err))
w.WriteHeader(500)
return
}
resp := api.VCPUCgroup{VCPUs: *cpus}
body, err := json.Marshal(resp)
if err != nil {
logger.Error("could not marshal body", zap.Error(err))
w.WriteHeader(500)
return
}

w.Header().Add("Content-Type", "application/json")
w.Write(body) //nolint:errcheck // Not much to do with the error here. TODO: log it?
}

type cpuServerCallbacks struct {
get func(*zap.Logger) (*vmv1.MilliCPU, error)
set func(*zap.Logger, vmv1.MilliCPU) error
}

func listenForHTTPRequests(
ctx context.Context,
logger *zap.Logger,
port int32,
callbacks cpuServerCallbacks,
wg *sync.WaitGroup,
networkMonitoring bool,
) {
defer wg.Done()
mux := http.NewServeMux()
loggerHandlers := logger.Named("http-handlers")
cpuChangeLogger := loggerHandlers.Named("cpu_change")
mux.HandleFunc("/cpu_change", func(w http.ResponseWriter, r *http.Request) {
handleCPUChange(cpuChangeLogger, w, r, callbacks.set)
})
cpuCurrentLogger := loggerHandlers.Named("cpu_current")
mux.HandleFunc("/cpu_current", func(w http.ResponseWriter, r *http.Request) {
handleCPUCurrent(cpuCurrentLogger, w, r, callbacks.get)
})
if networkMonitoring {
reg := prometheus.NewRegistry()
metrics := NewMonitoringMetrics(reg)
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
metrics.update(logger)
h := promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})
h.ServeHTTP(w, r)
})
}
server := http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", port),
Handler: mux,
ReadTimeout: 5 * time.Second,
ReadHeaderTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
}
errChan := make(chan error)
go func() {
errChan <- server.ListenAndServe()
}()
select {
case err := <-errChan:
if errors.Is(err, http.ErrServerClosed) {
logger.Info("http server closed")
} else if err != nil {
logger.Fatal("http server exited with error", zap.Error(err))
}
case <-ctx.Done():
err := server.Shutdown(context.Background())
logger.Info("shut down http server", zap.Error(err))
}
}

func printWithNewline(slice []byte) error {
if len(slice) == 0 {
return nil
Expand Down

0 comments on commit deaa2a8

Please sign in to comment.