diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index cde50d13a..90ed7732b 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -31,11 +31,14 @@ import ( "github.com/containerd/cgroups/v3" "github.com/containerd/cgroups/v3/cgroup1" "github.com/containerd/cgroups/v3/cgroup2" + "github.com/coreos/go-iptables/iptables" "github.com/digitalocean/go-qemu/qmp" "github.com/docker/libnetwork/types" "github.com/jpillora/backoff" "github.com/kdomanski/iso9660" "github.com/opencontainers/runtime-spec/specs-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/samber/lo" "github.com/vishvananda/netlink" "go.uber.org/zap" @@ -44,6 +47,7 @@ import ( vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/api" + "github.com/neondatabase/autoscaling/pkg/util" "github.com/neondatabase/autoscaling/pkg/util/taskgroup" ) @@ -91,6 +95,8 @@ const ( // // See also: https://neondb.slack.com/archives/C03TN5G758R/p1693462680623239 cpuLimitOvercommitFactor = 4 + + protocolTCP string = "6" ) var ( @@ -669,6 +675,91 @@ func main() { } } +type NetworkMonitoringMetrics struct { + IngressBytes, EgressBytes, Errors prometheus.Counter + IngressBytesRaw, EgressBytesRaw uint64 // Absolute values to calc increments for Counters +} + +func NewMonitoringMetrics(reg *prometheus.Registry) *NetworkMonitoringMetrics { + m := &NetworkMonitoringMetrics{ + IngressBytes: util.RegisterMetric(reg, prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "runner_vm_ingress_bytes", + Help: "Number of bytes received by the VM from the open internet", + }, + )), + EgressBytes: util.RegisterMetric(reg, prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "runner_vm_egress_bytes", + Help: "Number of bytes sent by the VM to the open internet", + }, + )), + IngressBytesRaw: 0, + EgressBytesRaw: 0, + Errors: util.RegisterMetric(reg, prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "runner_vm_network_fetch_errors_total", + Help: "Number of errors while fetching network monitoring data", + }, + )), + } + return m +} + +func shouldBeIgnored(ip net.IP) bool { + // We need to measure only external traffic to/from vm, so we filter internal traffic + // Don't filter on isUnspecified as it's an iptables rule, not a real ip + return ip.IsLoopback() || ip.IsPrivate() +} + +func getNetworkBytesCounter(iptables *iptables.IPTables, chain string) (uint64, error) { + cnt := uint64(0) + rules, err := iptables.Stats("filter", chain) + if err != nil { + return cnt, err + } + + for _, rawStat := range rules { + stat, err := iptables.ParseStat(rawStat) + if err != nil { + return cnt, err + } + src, dest := stat.Source.IP, stat.Destination.IP + if stat.Protocol == protocolTCP && !shouldBeIgnored(src) && !shouldBeIgnored(dest) { + cnt += stat.Bytes + } + } + return cnt, nil +} + +func (m *NetworkMonitoringMetrics) update(logger *zap.Logger) { + // Rules configured at github.com/neondatabase/cloud/blob/main/compute-init/compute-init.sh#L98 + iptables, err := iptables.New() + if err != nil { + logger.Error("initializing iptables failed", zap.Error(err)) + m.Errors.Inc() + return + } + + ingress, err := getNetworkBytesCounter(iptables, "INPUT") + if err != nil { + logger.Error("getting iptables input counter failed", zap.Error(err)) + m.Errors.Inc() + return + } + m.IngressBytes.Add(float64(ingress - m.IngressBytesRaw)) + m.IngressBytesRaw = ingress + + egress, err := getNetworkBytesCounter(iptables, "OUTPUT") + if err != nil { + logger.Error("getting iptables output counter failed", zap.Error(err)) + m.Errors.Inc() + return + } + m.EgressBytes.Add(float64(egress - m.EgressBytesRaw)) + m.EgressBytesRaw = egress +} + func run(logger *zap.Logger) error { cfg := newConfig(logger) @@ -1077,7 +1168,8 @@ func runQEMU( } wg.Add(1) - go listenForCPUChanges(ctx, logger, vmSpec.RunnerPort, callbacks, &wg) + monitoring := vmSpec.EnableNetworkMonitoring != nil && *vmSpec.EnableNetworkMonitoring + go listenForHTTPRequests(ctx, logger, vmSpec.RunnerPort, callbacks, &wg, monitoring) wg.Add(1) go forwardLogs(ctx, logger, &wg) @@ -1179,12 +1271,13 @@ type cpuServerCallbacks struct { set func(*zap.Logger, vmv1.MilliCPU) error } -func listenForCPUChanges( +func listenForHTTPRequests( ctx context.Context, logger *zap.Logger, port int32, callbacks cpuServerCallbacks, wg *sync.WaitGroup, + networkMonitoring bool, ) { defer wg.Done() mux := http.NewServeMux() @@ -1197,6 +1290,15 @@ func listenForCPUChanges( mux.HandleFunc("/cpu_current", func(w http.ResponseWriter, r *http.Request) { handleCPUCurrent(cpuCurrentLogger, w, r, callbacks.get) }) + if networkMonitoring { + reg := prometheus.NewRegistry() + metrics := NewMonitoringMetrics(reg) + mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + metrics.update(logger) + h := promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}) + h.ServeHTTP(w, r) + }) + } server := http.Server{ Addr: fmt.Sprintf("0.0.0.0:%d", port), Handler: mux, @@ -1211,13 +1313,13 @@ func listenForCPUChanges( select { case err := <-errChan: if errors.Is(err, http.ErrServerClosed) { - logger.Info("cpu_change server closed") + logger.Info("http server closed") } else if err != nil { - logger.Fatal("cpu_change exited with error", zap.Error(err)) + logger.Fatal("http server exited with error", zap.Error(err)) } case <-ctx.Done(): err := server.Shutdown(context.Background()) - logger.Info("shut down cpu_change server", zap.Error(err)) + logger.Info("shut down http server", zap.Error(err)) } } diff --git a/neonvm/apis/neonvm/v1/virtualmachine_types.go b/neonvm/apis/neonvm/v1/virtualmachine_types.go index f6e079c6e..9e93f36ac 100644 --- a/neonvm/apis/neonvm/v1/virtualmachine_types.go +++ b/neonvm/apis/neonvm/v1/virtualmachine_types.go @@ -155,6 +155,11 @@ type VirtualMachineSpec struct { // +kubebuilder:default:=QmpScaling // +optional CpuScalingMode *CpuScalingMode `json:"cpuScalingMode,omitempty"` + + // Enable network monitoring on the VM + // +kubebuilder:default:=false + // +optional + EnableNetworkMonitoring *bool `json:"enableNetworkMonitoring,omitempty"` } func (spec *VirtualMachineSpec) Resources() VirtualMachineResources { diff --git a/neonvm/apis/neonvm/v1/virtualmachine_webhook.go b/neonvm/apis/neonvm/v1/virtualmachine_webhook.go index 36c3bb378..00ae9d67d 100644 --- a/neonvm/apis/neonvm/v1/virtualmachine_webhook.go +++ b/neonvm/apis/neonvm/v1/virtualmachine_webhook.go @@ -137,6 +137,7 @@ func (r *VirtualMachine) ValidateUpdate(old runtime.Object) (admission.Warnings, {".spec.enableAcceleration", func(v *VirtualMachine) any { return v.Spec.EnableAcceleration }}, {".spec.enableSSH", func(v *VirtualMachine) any { return v.Spec.EnableSSH }}, {".spec.initScript", func(v *VirtualMachine) any { return v.Spec.InitScript }}, + {".spec.enableNetworkMonitoring", func(v *VirtualMachine) any { return v.Spec.EnableNetworkMonitoring }}, } for _, info := range immutableFields { diff --git a/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go b/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go index 08e7c3105..9f43de748 100644 --- a/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go +++ b/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go @@ -738,6 +738,11 @@ func (in *VirtualMachineSpec) DeepCopyInto(out *VirtualMachineSpec) { *out = new(CpuScalingMode) **out = **in } + if in.EnableNetworkMonitoring != nil { + in, out := &in.EnableNetworkMonitoring, &out.EnableNetworkMonitoring + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VirtualMachineSpec. diff --git a/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml b/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml index 75d034de6..5e7a9147b 100644 --- a/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml +++ b/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml @@ -1157,6 +1157,10 @@ spec: default: true description: Use KVM acceleation type: boolean + enableNetworkMonitoring: + default: false + description: Enable network monitoring on the VM + type: boolean enableSSH: default: true description: |- diff --git a/vm-deploy.yaml b/vm-deploy.yaml index 09588f60d..5dc694d7c 100644 --- a/vm-deploy.yaml +++ b/vm-deploy.yaml @@ -12,6 +12,7 @@ metadata: autoscaling.neon.tech/testing-only-always-migrate: "false" spec: schedulerName: autoscale-scheduler + enableNetworkMonitoring: true enableSSH: true guest: cpus: { min: 0.25, use: 0.25, max: 1.25 }