diff --git a/autoscaler-agent/config_map.yaml b/autoscaler-agent/config_map.yaml index 21a7d0c75..95556ee3b 100644 --- a/autoscaler-agent/config_map.yaml +++ b/autoscaler-agent/config_map.yaml @@ -26,7 +26,9 @@ data: "activeTimeMetricName": "active_time_seconds", "collectEverySeconds": 4, "accumulateEverySeconds": 24, - "clients": {} + "clients": {}, + "ingressBytesMetricName": "ingress_bytes", + "egressBytesMetricName": "egress_bytes" }, "monitor": { "serverPort": 10301, diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index cde50d13a..48b968082 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -31,6 +31,7 @@ import ( "github.com/containerd/cgroups/v3" "github.com/containerd/cgroups/v3/cgroup1" "github.com/containerd/cgroups/v3/cgroup2" + "github.com/coreos/go-iptables/iptables" "github.com/digitalocean/go-qemu/qmp" "github.com/docker/libnetwork/types" "github.com/jpillora/backoff" @@ -606,28 +607,30 @@ func runInitScript(logger *zap.Logger, script string) error { } type Config struct { - vmSpecDump string - vmStatusDump string - kernelPath string - appendKernelCmdline string - skipCgroupManagement bool - diskCacheSettings string - memoryProvider vmv1.MemoryProvider - autoMovableRatio string - cpuScalingMode vmv1.CpuScalingMode + vmSpecDump string + vmStatusDump string + kernelPath string + appendKernelCmdline string + skipCgroupManagement bool + enableNetworkMonitoring bool + diskCacheSettings string + memoryProvider vmv1.MemoryProvider + autoMovableRatio string + cpuScalingMode vmv1.CpuScalingMode } func newConfig(logger *zap.Logger) *Config { cfg := &Config{ - vmSpecDump: "", - vmStatusDump: "", - kernelPath: defaultKernelPath, - appendKernelCmdline: "", - skipCgroupManagement: false, - diskCacheSettings: "cache=none", - memoryProvider: "", // Require that this is explicitly set. We'll check later. - autoMovableRatio: "", // Require that this is explicitly set IFF memoryProvider is VirtioMem. We'll check later. - cpuScalingMode: "", // Require that this is explicitly set. We'll check later. + vmSpecDump: "", + vmStatusDump: "", + kernelPath: defaultKernelPath, + appendKernelCmdline: "", + skipCgroupManagement: false, + enableNetworkMonitoring: false, + diskCacheSettings: "cache=none", + memoryProvider: "", // Require that this is explicitly set. We'll check later. + autoMovableRatio: "", // Require that this is explicitly set IFF memoryProvider is VirtioMem. We'll check later. + cpuScalingMode: "", // Require that this is explicitly set. We'll check later. } flag.StringVar(&cfg.vmSpecDump, "vmspec", cfg.vmSpecDump, "Base64 encoded VirtualMachine json specification") @@ -640,6 +643,8 @@ func newConfig(logger *zap.Logger) *Config { flag.BoolVar(&cfg.skipCgroupManagement, "skip-cgroup-management", cfg.skipCgroupManagement, "Don't try to manage CPU") + flag.BoolVar(&cfg.enableNetworkMonitoring, "enable-network-monitoring", + cfg.enableNetworkMonitoring, "Enable in/out traffic measuring with iptables") flag.StringVar(&cfg.diskCacheSettings, "qemu-disk-cache-settings", cfg.diskCacheSettings, "Cache settings to add to -drive args for VM disks") flag.Func("memory-provider", "Set provider for memory hotplug", cfg.memoryProvider.FlagFunc) @@ -1077,7 +1082,7 @@ func runQEMU( } wg.Add(1) - go listenForCPUChanges(ctx, logger, vmSpec.RunnerPort, callbacks, &wg) + go listenForHTTPRequests(ctx, logger, vmSpec.RunnerPort, callbacks, &wg, cfg.enableNetworkMonitoring) wg.Add(1) go forwardLogs(ctx, logger, &wg) @@ -1179,12 +1184,13 @@ type cpuServerCallbacks struct { set func(*zap.Logger, vmv1.MilliCPU) error } -func listenForCPUChanges( +func listenForHTTPRequests( ctx context.Context, logger *zap.Logger, port int32, callbacks cpuServerCallbacks, wg *sync.WaitGroup, + networkUsageRoute bool, ) { defer wg.Done() mux := http.NewServeMux() @@ -1197,6 +1203,13 @@ func listenForCPUChanges( mux.HandleFunc("/cpu_current", func(w http.ResponseWriter, r *http.Request) { handleCPUCurrent(cpuCurrentLogger, w, r, callbacks.get) }) + if networkUsageRoute { + networkUsageLogger := loggerHandlers.Named("network_usage") + logger.Info("Listening for network_usage") + mux.HandleFunc("/network_usage", func(w http.ResponseWriter, r *http.Request) { + handleGetNetworkUsage(networkUsageLogger, w, r) + }) + } server := http.Server{ Addr: fmt.Sprintf("0.0.0.0:%d", port), Handler: mux, @@ -1211,14 +1224,76 @@ func listenForCPUChanges( select { case err := <-errChan: if errors.Is(err, http.ErrServerClosed) { - logger.Info("cpu_change server closed") + logger.Info("http server closed") } else if err != nil { - logger.Fatal("cpu_change exited with error", zap.Error(err)) + logger.Fatal("http server exited with error", zap.Error(err)) } case <-ctx.Done(): err := server.Shutdown(context.Background()) - logger.Info("shut down cpu_change server", zap.Error(err)) + logger.Info("shut down http server", zap.Error(err)) + } +} + +func incNetBytesCounter(iptables *iptables.IPTables, chain string) (vmv1.NetworkBytes, error) { + cnt := vmv1.NetworkBytes(0) + rules, err := iptables.Stats("filter", chain) + if err != nil { + return cnt, err + } + for _, rawStat := range rules { + stat, err := iptables.ParseStat(rawStat) + if err != nil { + return cnt, err + } + if stat.Protocol == "6" { // tcp + cnt += vmv1.NetworkBytes(stat.Bytes) + } } + return cnt, nil +} + +func handleGetNetworkUsage(logger *zap.Logger, w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + logger.Error("unexpected method", zap.String("method", r.Method)) + w.WriteHeader(400) + return + } + + counts := vmv1.VirtualMachineNetworkUsage{IngressBytes: 0, EgressBytes: 0} + // Rules configured at github.com/neondatabase/cloud/blob/main/compute-init/compute-init.sh#L98 + iptables, err := iptables.New() + if err != nil { + logger.Error("error initializing iptables", zap.Error(err)) + w.WriteHeader(500) + return + } + + if ingress, err := incNetBytesCounter(iptables, "INPUT"); err != nil { + logger.Error("error reading ingress byte counts", zap.Error(err)) + w.WriteHeader(500) + return + } else { + counts.IngressBytes += ingress + } + + if egress, err := incNetBytesCounter(iptables, "OUTPUT"); err != nil { + logger.Error("error reading egress byte counts", zap.Error(err)) + w.WriteHeader(500) + return + } else { + counts.EgressBytes += egress + } + + body, err := json.Marshal(counts) + if err != nil { + logger.Error("could not marshal byte counts", zap.Error(err)) + w.WriteHeader(500) + return + } + + w.Header().Add("Content-Type", "application/json") + w.Write(body) //nolint:errcheck // Not much to do with the error here + logger.Info("Responded with byte counts", zap.String("byte_counts", string(body))) } func printWithNewline(slice []byte) error { diff --git a/neonvm/apis/neonvm/v1/virtualmachine_types.go b/neonvm/apis/neonvm/v1/virtualmachine_types.go index f6e079c6e..e1eb2c157 100644 --- a/neonvm/apis/neonvm/v1/virtualmachine_types.go +++ b/neonvm/apis/neonvm/v1/virtualmachine_types.go @@ -17,8 +17,11 @@ limitations under the License. package v1 import ( + "context" "encoding/json" "fmt" + "io" + "net/http" "slices" "time" @@ -68,6 +71,16 @@ type VirtualMachineResources struct { MemorySlotSize resource.Quantity `json:"memorySlotSize"` } +type NetworkBytes uint64 + +// VirtualMachineNetworkUsage provides information about VM's incoming and outgound traffic +type VirtualMachineNetworkUsage struct { + // Number of bytes received by the VM from the open internet + IngressBytes NetworkBytes `json:"ingress_bytes"` + // Number of bytes sent by the VM to the open internet + EgressBytes NetworkBytes `json:"egress_bytes"` +} + // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // VirtualMachineSpec defines the desired state of VirtualMachine @@ -155,6 +168,11 @@ type VirtualMachineSpec struct { // +kubebuilder:default:=QmpScaling // +optional CpuScalingMode *CpuScalingMode `json:"cpuScalingMode,omitempty"` + + // Enable network monitoring on the VM + // +kubebuilder:default:=false + // +optional + EnableNetworkMonitoring *bool `json:"enableNetworkMonitoring,omitempty"` } func (spec *VirtualMachineSpec) Resources() VirtualMachineResources { @@ -623,6 +641,41 @@ func (p VmPhase) IsAlive() bool { } } +// +kubebuilder:object:generate=false +type NetworkUsageJSONError struct { + Err error +} + +func (e NetworkUsageJSONError) Error() string { + return fmt.Sprintf("Error unmarshaling network usage: %s", e.Err.Error()) +} + +func (e NetworkUsageJSONError) Unwrap() error { + return e.Err +} + +// +kubebuilder:object:generate=false +type NetworkUsageRequestError struct { + Err error +} + +func (e NetworkUsageRequestError) Error() string { + return fmt.Sprintf("Error fetching network usage: %s", e.Err.Error()) +} + +func (e NetworkUsageRequestError) Unwrap() error { + return e.Err +} + +// +kubebuilder:object:generate=false +type NetworkUsageStatusCodeError struct { + StatusCode int +} + +func (e NetworkUsageStatusCodeError) Error() string { + return fmt.Sprintf("Unexpected HTTP status code when fetching network usage: %d", e.StatusCode) +} + //+genclient //+kubebuilder:object:root=true //+kubebuilder:subresource:status @@ -647,6 +700,37 @@ type VirtualMachine struct { Status VirtualMachineStatus `json:"status,omitempty"` } +func (vm *VirtualMachine) GetNetworkUsage(ctx context.Context, timeout time.Duration) (*VirtualMachineNetworkUsage, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + url := fmt.Sprintf("http://%s:%d/network_usage", vm.Status.PodIP, vm.Spec.RunnerPort) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("error initializing http request to fetch network usage: %w", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, NetworkUsageRequestError{Err: err} + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + return nil, NetworkUsageStatusCodeError{StatusCode: resp.StatusCode} + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("error reading http response body: %w", err) + } + + var result VirtualMachineNetworkUsage + if err := json.Unmarshal(body, &result); err != nil { + return nil, NetworkUsageJSONError{Err: err} + } + + return &result, nil +} + func (vm *VirtualMachine) Cleanup() { vm.Status.PodName = "" vm.Status.PodIP = "" diff --git a/neonvm/apis/neonvm/v1/virtualmachine_webhook.go b/neonvm/apis/neonvm/v1/virtualmachine_webhook.go index 36c3bb378..00ae9d67d 100644 --- a/neonvm/apis/neonvm/v1/virtualmachine_webhook.go +++ b/neonvm/apis/neonvm/v1/virtualmachine_webhook.go @@ -137,6 +137,7 @@ func (r *VirtualMachine) ValidateUpdate(old runtime.Object) (admission.Warnings, {".spec.enableAcceleration", func(v *VirtualMachine) any { return v.Spec.EnableAcceleration }}, {".spec.enableSSH", func(v *VirtualMachine) any { return v.Spec.EnableSSH }}, {".spec.initScript", func(v *VirtualMachine) any { return v.Spec.InitScript }}, + {".spec.enableNetworkMonitoring", func(v *VirtualMachine) any { return v.Spec.EnableNetworkMonitoring }}, } for _, info := range immutableFields { diff --git a/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go b/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go index 08e7c3105..5e61073c6 100644 --- a/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go +++ b/neonvm/apis/neonvm/v1/zz_generated.deepcopy.go @@ -637,6 +637,21 @@ func (in *VirtualMachineMigrationStatus) DeepCopy() *VirtualMachineMigrationStat return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *VirtualMachineNetworkUsage) DeepCopyInto(out *VirtualMachineNetworkUsage) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VirtualMachineNetworkUsage. +func (in *VirtualMachineNetworkUsage) DeepCopy() *VirtualMachineNetworkUsage { + if in == nil { + return nil + } + out := new(VirtualMachineNetworkUsage) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VirtualMachineResources) DeepCopyInto(out *VirtualMachineResources) { *out = *in @@ -738,6 +753,11 @@ func (in *VirtualMachineSpec) DeepCopyInto(out *VirtualMachineSpec) { *out = new(CpuScalingMode) **out = **in } + if in.EnableNetworkMonitoring != nil { + in, out := &in.EnableNetworkMonitoring, &out.EnableNetworkMonitoring + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VirtualMachineSpec. diff --git a/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml b/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml index c5dde4b74..a7a583ea6 100644 --- a/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml +++ b/neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml @@ -1033,6 +1033,10 @@ spec: default: true description: Use KVM acceleation type: boolean + enableNetworkMonitoring: + default: false + description: Enable network monitoring on the VM + type: boolean enableSSH: default: true description: |- diff --git a/pkg/agent/billing/billing.go b/pkg/agent/billing/billing.go index c2bdfca1e..174e3da2c 100644 --- a/pkg/agent/billing/billing.go +++ b/pkg/agent/billing/billing.go @@ -3,6 +3,7 @@ package billing import ( "context" "errors" + "fmt" "math" "time" @@ -14,12 +15,15 @@ import ( "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/billing" "github.com/neondatabase/autoscaling/pkg/reporting" + "github.com/neondatabase/autoscaling/pkg/util" ) type Config struct { Clients ClientsConfig `json:"clients"` CPUMetricName string `json:"cpuMetricName"` ActiveTimeMetricName string `json:"activeTimeMetricName"` + IngressBytesMetricName string `json:"ingressBytesMetricName"` + EgressBytesMetricName string `json:"egressBytesMetricName"` CollectEverySeconds uint `json:"collectEverySeconds"` AccumulateEverySeconds uint `json:"accumulateEverySeconds"` } @@ -37,8 +41,10 @@ type metricsKey struct { } type vmMetricsHistory struct { - lastSlice *metricsTimeSlice - total vmMetricsSeconds + lastSlice *metricsTimeSlice + total vmMetricsSeconds + totalIngressBytes vmv1.NetworkBytes + totalEgressBytes vmv1.NetworkBytes } type metricsTimeSlice struct { @@ -52,6 +58,10 @@ func (m *metricsTimeSlice) Duration() time.Duration { return m.endTime.Sub(m.sta type vmMetricsInstant struct { // cpu stores the cpu allocation at a particular instant. cpu vmv1.MilliCPU + // number of bytes received by the VM from the open internet since the last time slice + ingressBytes vmv1.NetworkBytes + // number of bytes sent by the VM to the open internet since the last time slice + egressBytes vmv1.NetworkBytes } // vmMetricsSeconds is like vmMetrics, but the values cover the allocation over time @@ -112,7 +122,7 @@ func (mc *MetricsCollector) Run( pushWindowStart: time.Now(), } - state.collect(logger, store, mc.metrics) + state.collect(ctx, logger, store, mc.metrics) for { select { @@ -123,7 +133,7 @@ func (mc *MetricsCollector) Run( logger.Panic("Validation check failed", zap.Error(err)) return err } - state.collect(logger, store, mc.metrics) + state.collect(ctx, logger, store, mc.metrics) case <-accumulateTicker.C: logger.Info("Creating billing batch") state.drainEnqueue(logger, mc.conf, billing.GetHostname(), mc.sink) @@ -133,7 +143,59 @@ func (mc *MetricsCollector) Run( } } -func (s *metricsState) collect(logger *zap.Logger, store VMStoreForNode, metrics PromMetrics) { +type vmMetricsKV struct { + key metricsKey + value vmMetricsInstant +} + +func collectMetricsForVM( + ctx context.Context, + logger *zap.Logger, + vm *vmv1.VirtualMachine, + metricsChan chan vmMetricsKV, + promMetrics *PromMetrics, +) { + byteCounts := &vmv1.VirtualMachineNetworkUsage{ + IngressBytes: 0, + EgressBytes: 0, + } + if vm.Spec.EnableNetworkMonitoring != nil && *vm.Spec.EnableNetworkMonitoring { + bc, err := vm.GetNetworkUsage(ctx, 500*time.Millisecond) + if err != nil { + logger.Error("Failed to collect network usage", util.VMNameFields(vm), zap.Error(err)) + + var rootErr string + switch e := err.(type) { //nolint:errorlint // we want to know the type + case vmv1.NetworkUsageJSONError: + rootErr = "JSON unmarshaling" + case vmv1.NetworkUsageStatusCodeError: + rootErr = fmt.Sprintf("HTTP code %d", e.StatusCode) + default: + rootErr = util.RootError(err).Error() + } + promMetrics.fetchNetworkUsageErrorsTotal.WithLabelValues(rootErr).Inc() + } else { + byteCounts = bc + } + } + endpointID := vm.Annotations[api.AnnotationBillingEndpointID] + metricsChan <- vmMetricsKV{ + key: metricsKey{ + uid: vm.UID, + endpointID: endpointID, + }, + // Note: Usually, vmMetricInstant stores deltas for the byte counts. But the API + // returns totals, which get recorded here. They will get subtracted off from the previous + // instant in the second half of the collection loop. + value: vmMetricsInstant{ + cpu: *vm.Status.CPUs, + ingressBytes: byteCounts.IngressBytes, + egressBytes: byteCounts.EgressBytes, + }, + } +} + +func (s *metricsState) collect(ctx context.Context, logger *zap.Logger, store VMStoreForNode, metrics PromMetrics) { now := time.Now() metricsBatch := metrics.forBatch() @@ -149,8 +211,10 @@ func (s *metricsState) collect(logger *zap.Logger, store VMStoreForNode, metrics return i.List() }) } + metricsChan := make(chan vmMetricsKV, len(vmsOnThisNode)) + metricsToCollect := 0 for _, vm := range vmsOnThisNode { - endpointID, isEndpoint := vm.Annotations[api.AnnotationBillingEndpointID] + _, isEndpoint := vm.Annotations[api.AnnotationBillingEndpointID] metricsBatch.inc(isEndpointFlag(isEndpoint), autoscalingEnabledFlag(api.HasAutoscalingEnabled(vm)), vm.Status.Phase) if !isEndpoint { // we're only reporting metrics for VMs with endpoint IDs, and this VM doesn't have one @@ -161,38 +225,47 @@ func (s *metricsState) collect(logger *zap.Logger, store VMStoreForNode, metrics continue } - key := metricsKey{ - uid: vm.UID, - endpointID: endpointID, - } - presentMetrics := vmMetricsInstant{ - cpu: *vm.Status.CPUs, - } - if oldMetrics, ok := old[key]; ok { - // The VM was present from s.lastTime to now. Add a time slice to its metrics history. - timeSlice := metricsTimeSlice{ - metrics: vmMetricsInstant{ - // strategically under-bill by assigning the minimum to the entire time slice. - cpu: min(oldMetrics.cpu, presentMetrics.cpu), - }, - // note: we know s.lastTime != nil because otherwise old would be empty. - startTime: *s.lastCollectTime, - endTime: now, - } + go collectMetricsForVM(ctx, logger, vm, metricsChan, &metrics) + metricsToCollect += 1 + } - vmHistory, ok := s.historical[key] - if !ok { - vmHistory = vmMetricsHistory{ - lastSlice: nil, - total: vmMetricsSeconds{cpu: 0, activeTime: time.Duration(0)}, + for i := 0; i < metricsToCollect; i++ { + select { + case <-ctx.Done(): + logger.Error("Timed out collecting metrics", zap.Error(ctx.Err())) + break + case kv := <-metricsChan: + key, presentMetrics := kv.key, kv.value + + if oldMetrics, ok := old[key]; ok { + // The VM was present from s.lastTime to now. Add a time slice to its metrics history. + timeSlice := metricsTimeSlice{ + metrics: vmMetricsInstant{ + // strategically under-bill by assigning the minimum to the entire time slice. + cpu: min(oldMetrics.cpu, presentMetrics.cpu), + ingressBytes: presentMetrics.ingressBytes - oldMetrics.ingressBytes, + egressBytes: presentMetrics.egressBytes - oldMetrics.egressBytes, + }, + // note: we know s.lastTime != nil because otherwise old would be empty. + startTime: *s.lastCollectTime, + endTime: now, } + + vmHistory, ok := s.historical[key] + if !ok { + vmHistory = vmMetricsHistory{ + lastSlice: nil, + total: vmMetricsSeconds{cpu: 0, activeTime: time.Duration(0)}, + totalIngressBytes: 0, + totalEgressBytes: 0, + } + } + // append the slice, merging with the previous if the resource usage was the same + vmHistory.appendSlice(timeSlice) + s.historical[key] = vmHistory } - // append the slice, merging with the previous if the resource usage was the same - vmHistory.appendSlice(timeSlice) - s.historical[key] = vmHistory + s.present[key] = presentMetrics } - - s.present[key] = presentMetrics } s.lastCollectTime = &now @@ -231,6 +304,8 @@ func (h *vmMetricsHistory) finalizeCurrentTimeSlice() { } h.total.cpu += metricsSeconds.cpu h.total.activeTime += metricsSeconds.activeTime + h.totalIngressBytes += h.lastSlice.metrics.ingressBytes + h.totalEgressBytes += h.lastSlice.metrics.egressBytes h.lastSlice = nil } @@ -272,9 +347,14 @@ func (s *metricsState) drainEnqueue( enqueue := sink.Enqueue - for key, history := range s.historical { + for _, history := range s.historical { history.finalizeCurrentTimeSlice() + if history.totalIngressBytes != 0 || history.totalEgressBytes != 0 { + batchSize += 2 + } + } + for key, history := range s.historical { countInBatch += 1 enqueue(logAddedEvent(logger, billing.Enrich(now, hostname, countInBatch, batchSize, &billing.IncrementalEvent{ MetricName: conf.CPUMetricName, @@ -297,6 +377,28 @@ func (s *metricsState) drainEnqueue( StopTime: now, Value: int(math.Round(history.total.activeTime.Seconds())), }))) + if history.totalIngressBytes != 0 || history.totalEgressBytes != 0 { + countInBatch += 1 + enqueue(logAddedEvent(logger, billing.Enrich(now, hostname, countInBatch, batchSize, &billing.IncrementalEvent{ + MetricName: conf.IngressBytesMetricName, + Type: "", // set by billing.Enrich + IdempotencyKey: "", // set by billing.Enrich + EndpointID: key.endpointID, + StartTime: s.pushWindowStart, + StopTime: now, + Value: int(history.totalIngressBytes), + }))) + countInBatch += 1 + enqueue(logAddedEvent(logger, billing.Enrich(now, hostname, countInBatch, batchSize, &billing.IncrementalEvent{ + MetricName: conf.EgressBytesMetricName, + Type: "", // set by billing.Enrich + IdempotencyKey: "", // set by billing.Enrich + EndpointID: key.endpointID, + StartTime: s.pushWindowStart, + StopTime: now, + Value: int(history.totalEgressBytes), + }))) + } } s.pushWindowStart = now diff --git a/pkg/agent/billing/prommetrics.go b/pkg/agent/billing/prommetrics.go index f15346674..e4540b1d3 100644 --- a/pkg/agent/billing/prommetrics.go +++ b/pkg/agent/billing/prommetrics.go @@ -16,6 +16,8 @@ type PromMetrics struct { vmsProcessedTotal *prometheus.CounterVec vmsCurrent *prometheus.GaugeVec + // TODO(myrrc) this should be in EventSinkMetrics + fetchNetworkUsageErrorsTotal *prometheus.CounterVec } func NewPromMetrics() PromMetrics { @@ -36,6 +38,13 @@ func NewPromMetrics() PromMetrics { }, []string{"is_endpoint", "autoscaling_enabled", "phase"}, ), + fetchNetworkUsageErrorsTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "autoscaling_agent_billing_fetch_network_usage_errors_total", + Help: "Total errors from attempting to fetch network usage", + }, + []string{"cause"}, + ), } } @@ -43,6 +52,7 @@ func (m PromMetrics) MustRegister(reg *prometheus.Registry) { m.reporting.MustRegister(reg) reg.MustRegister(m.vmsProcessedTotal) reg.MustRegister(m.vmsCurrent) + reg.MustRegister(m.fetchNetworkUsageErrorsTotal) } type batchMetrics struct { diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 254e7f6ae..f4aa00db8 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -176,6 +176,8 @@ func (c *Config) validate() error { erc.Whenf(ec, c.Billing.ActiveTimeMetricName == "", emptyTmpl, ".billing.activeTimeMetricName") erc.Whenf(ec, c.Billing.CPUMetricName == "", emptyTmpl, ".billing.cpuMetricName") + erc.Whenf(ec, c.Billing.IngressBytesMetricName == "", emptyTmpl, ".billing.ingressBytesMetricName") + erc.Whenf(ec, c.Billing.EgressBytesMetricName == "", emptyTmpl, ".billing.egressBytesMetricName") erc.Whenf(ec, c.Billing.CollectEverySeconds == 0, zeroTmpl, ".billing.collectEverySeconds") erc.Whenf(ec, c.Billing.AccumulateEverySeconds == 0, zeroTmpl, ".billing.accumulateEverySeconds") if c.Billing.Clients.AzureBlob != nil { diff --git a/pkg/neonvm/controllers/vm_controller.go b/pkg/neonvm/controllers/vm_controller.go index 170c65333..929a5bb12 100644 --- a/pkg/neonvm/controllers/vm_controller.go +++ b/pkg/neonvm/controllers/vm_controller.go @@ -1369,6 +1369,9 @@ func podSpec( if memoryProvider == vmv1.MemoryProviderVirtioMem { cmd = append(cmd, "-memhp-auto-movable-ratio", config.MemhpAutoMovableRatio) } + if vm.Spec.EnableNetworkMonitoring != nil && *vm.Spec.EnableNetworkMonitoring { + cmd = append(cmd, "-enable-network-monitoring") + } // put these last, so that the earlier args are easier to see (because these // can get quite large) cmd = append( diff --git a/tests/e2e/s3/utils.py b/tests/e2e/s3/utils.py index a4737d4ea..42085805a 100644 --- a/tests/e2e/s3/utils.py +++ b/tests/e2e/s3/utils.py @@ -75,6 +75,8 @@ def agent_add_s3(): update_agent_biling_config( { "cpuMetricName": "effective_compute_seconds", + "ingressBytesMetricName": "ingress_bytes", + "egressBytesMetricName": "egress_bytes", "activeTimeMetricName": "active_time_seconds", "collectEverySeconds": 5, "accumulateEverySeconds": 5, @@ -101,6 +103,8 @@ def agent_remove_s3(): "collectEverySeconds": 5, "accumulateEverySeconds": 5, "clients": {}, + "ingressBytesMetricName": "ingress_bytes", + "egressBytesMetricName": "egress_bytes" } ) diff --git a/vm-deploy.yaml b/vm-deploy.yaml index 09588f60d..5dc694d7c 100644 --- a/vm-deploy.yaml +++ b/vm-deploy.yaml @@ -12,6 +12,7 @@ metadata: autoscaling.neon.tech/testing-only-always-migrate: "false" spec: schedulerName: autoscale-scheduler + enableNetworkMonitoring: true enableSSH: true guest: cpus: { min: 0.25, use: 0.25, max: 1.25 }