Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
myrrc committed Nov 27, 2024
1 parent 0bc5fe1 commit b5822bd
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 59 deletions.
4 changes: 3 additions & 1 deletion autoscaler-agent/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ data:
"activeTimeMetricName": "active_time_seconds",
"collectEverySeconds": 4,
"accumulateEverySeconds": 24,
"clients": {}
"clients": {},
"ingressBytesMetricName": "ingress_bytes",
"egressBytesMetricName": "egress_bytes"
},
"monitor": {
"serverPort": 10301,
Expand Down
121 changes: 98 additions & 23 deletions neonvm-runner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/containerd/cgroups/v3"
"github.com/containerd/cgroups/v3/cgroup1"
"github.com/containerd/cgroups/v3/cgroup2"
"github.com/coreos/go-iptables/iptables"
"github.com/digitalocean/go-qemu/qmp"
"github.com/docker/libnetwork/types"
"github.com/jpillora/backoff"
Expand Down Expand Up @@ -606,28 +607,30 @@ func runInitScript(logger *zap.Logger, script string) error {
}

type Config struct {
vmSpecDump string
vmStatusDump string
kernelPath string
appendKernelCmdline string
skipCgroupManagement bool
diskCacheSettings string
memoryProvider vmv1.MemoryProvider
autoMovableRatio string
cpuScalingMode vmv1.CpuScalingMode
vmSpecDump string
vmStatusDump string
kernelPath string
appendKernelCmdline string
skipCgroupManagement bool
enableNetworkMonitoring bool
diskCacheSettings string
memoryProvider vmv1.MemoryProvider
autoMovableRatio string
cpuScalingMode vmv1.CpuScalingMode
}

func newConfig(logger *zap.Logger) *Config {
cfg := &Config{
vmSpecDump: "",
vmStatusDump: "",
kernelPath: defaultKernelPath,
appendKernelCmdline: "",
skipCgroupManagement: false,
diskCacheSettings: "cache=none",
memoryProvider: "", // Require that this is explicitly set. We'll check later.
autoMovableRatio: "", // Require that this is explicitly set IFF memoryProvider is VirtioMem. We'll check later.
cpuScalingMode: "", // Require that this is explicitly set. We'll check later.
vmSpecDump: "",
vmStatusDump: "",
kernelPath: defaultKernelPath,
appendKernelCmdline: "",
skipCgroupManagement: false,
enableNetworkMonitoring: false,
diskCacheSettings: "cache=none",
memoryProvider: "", // Require that this is explicitly set. We'll check later.
autoMovableRatio: "", // Require that this is explicitly set IFF memoryProvider is VirtioMem. We'll check later.
cpuScalingMode: "", // Require that this is explicitly set. We'll check later.
}
flag.StringVar(&cfg.vmSpecDump, "vmspec", cfg.vmSpecDump,
"Base64 encoded VirtualMachine json specification")
Expand All @@ -640,6 +643,8 @@ func newConfig(logger *zap.Logger) *Config {
flag.BoolVar(&cfg.skipCgroupManagement, "skip-cgroup-management",
cfg.skipCgroupManagement,
"Don't try to manage CPU")
flag.BoolVar(&cfg.enableNetworkMonitoring, "enable-network-monitoring",
cfg.enableNetworkMonitoring, "Enable in/out traffic measuring with iptables")
flag.StringVar(&cfg.diskCacheSettings, "qemu-disk-cache-settings",
cfg.diskCacheSettings, "Cache settings to add to -drive args for VM disks")
flag.Func("memory-provider", "Set provider for memory hotplug", cfg.memoryProvider.FlagFunc)
Expand Down Expand Up @@ -1077,7 +1082,7 @@ func runQEMU(
}

wg.Add(1)
go listenForCPUChanges(ctx, logger, vmSpec.RunnerPort, callbacks, &wg)
go listenForHTTPRequests(ctx, logger, vmSpec.RunnerPort, callbacks, &wg, cfg.enableNetworkMonitoring)
wg.Add(1)
go forwardLogs(ctx, logger, &wg)

Expand Down Expand Up @@ -1179,12 +1184,13 @@ type cpuServerCallbacks struct {
set func(*zap.Logger, vmv1.MilliCPU) error
}

func listenForCPUChanges(
func listenForHTTPRequests(
ctx context.Context,
logger *zap.Logger,
port int32,
callbacks cpuServerCallbacks,
wg *sync.WaitGroup,
networkUsageRoute bool,
) {
defer wg.Done()
mux := http.NewServeMux()
Expand All @@ -1197,6 +1203,13 @@ func listenForCPUChanges(
mux.HandleFunc("/cpu_current", func(w http.ResponseWriter, r *http.Request) {
handleCPUCurrent(cpuCurrentLogger, w, r, callbacks.get)
})
if networkUsageRoute {
networkUsageLogger := loggerHandlers.Named("network_usage")
logger.Info("Listening for network_usage")
mux.HandleFunc("/network_usage", func(w http.ResponseWriter, r *http.Request) {
handleGetNetworkUsage(networkUsageLogger, w, r)
})
}
server := http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", port),
Handler: mux,
Expand All @@ -1211,14 +1224,76 @@ func listenForCPUChanges(
select {
case err := <-errChan:
if errors.Is(err, http.ErrServerClosed) {
logger.Info("cpu_change server closed")
logger.Info("http server closed")
} else if err != nil {
logger.Fatal("cpu_change exited with error", zap.Error(err))
logger.Fatal("http server exited with error", zap.Error(err))
}
case <-ctx.Done():
err := server.Shutdown(context.Background())
logger.Info("shut down cpu_change server", zap.Error(err))
logger.Info("shut down http server", zap.Error(err))
}
}

func incNetBytesCounter(iptables *iptables.IPTables, chain string) (vmv1.NetworkBytes, error) {
cnt := vmv1.NetworkBytes(0)
rules, err := iptables.Stats("filter", chain)
if err != nil {
return cnt, err
}
for _, rawStat := range rules {
stat, err := iptables.ParseStat(rawStat)
if err != nil {
return cnt, err
}
if stat.Protocol == "6" { // tcp
cnt += vmv1.NetworkBytes(stat.Bytes)
}
}
return cnt, nil
}

func handleGetNetworkUsage(logger *zap.Logger, w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
logger.Error("unexpected method", zap.String("method", r.Method))
w.WriteHeader(400)
return
}

counts := vmv1.VirtualMachineNetworkUsage{IngressBytes: 0, EgressBytes: 0}
// Rules configured at github.com/neondatabase/cloud/blob/main/compute-init/compute-init.sh#L98
iptables, err := iptables.New()
if err != nil {
logger.Error("error initializing iptables", zap.Error(err))
w.WriteHeader(500)
return
}

if ingress, err := incNetBytesCounter(iptables, "INPUT"); err != nil {
logger.Error("error reading ingress byte counts", zap.Error(err))
w.WriteHeader(500)
return
} else {
counts.IngressBytes += ingress
}

if egress, err := incNetBytesCounter(iptables, "OUTPUT"); err != nil {
logger.Error("error reading egress byte counts", zap.Error(err))
w.WriteHeader(500)
return
} else {
counts.EgressBytes += egress
}

body, err := json.Marshal(counts)
if err != nil {
logger.Error("could not marshal byte counts", zap.Error(err))
w.WriteHeader(500)
return
}

w.Header().Add("Content-Type", "application/json")
w.Write(body) //nolint:errcheck // Not much to do with the error here
logger.Info("Responded with byte counts", zap.String("byte_counts", string(body)))
}

func printWithNewline(slice []byte) error {
Expand Down
84 changes: 84 additions & 0 deletions neonvm/apis/neonvm/v1/virtualmachine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ limitations under the License.
package v1

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"slices"
"time"

Expand Down Expand Up @@ -68,6 +71,16 @@ type VirtualMachineResources struct {
MemorySlotSize resource.Quantity `json:"memorySlotSize"`
}

type NetworkBytes uint64

// VirtualMachineNetworkUsage provides information about VM's incoming and outgound traffic
type VirtualMachineNetworkUsage struct {
// Number of bytes received by the VM from the open internet
IngressBytes NetworkBytes `json:"ingress_bytes"`
// Number of bytes sent by the VM to the open internet
EgressBytes NetworkBytes `json:"egress_bytes"`
}

// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// VirtualMachineSpec defines the desired state of VirtualMachine
Expand Down Expand Up @@ -155,6 +168,11 @@ type VirtualMachineSpec struct {
// +kubebuilder:default:=QmpScaling
// +optional
CpuScalingMode *CpuScalingMode `json:"cpuScalingMode,omitempty"`

// Enable network monitoring on the VM
// +kubebuilder:default:=false
// +optional
EnableNetworkMonitoring *bool `json:"enableNetworkMonitoring,omitempty"`
}

func (spec *VirtualMachineSpec) Resources() VirtualMachineResources {
Expand Down Expand Up @@ -623,6 +641,41 @@ func (p VmPhase) IsAlive() bool {
}
}

// +kubebuilder:object:generate=false
type NetworkUsageJSONError struct {
Err error
}

func (e NetworkUsageJSONError) Error() string {
return fmt.Sprintf("Error unmarshaling network usage: %s", e.Err.Error())
}

func (e NetworkUsageJSONError) Unwrap() error {
return e.Err
}

// +kubebuilder:object:generate=false
type NetworkUsageRequestError struct {
Err error
}

func (e NetworkUsageRequestError) Error() string {
return fmt.Sprintf("Error fetching network usage: %s", e.Err.Error())
}

func (e NetworkUsageRequestError) Unwrap() error {
return e.Err
}

// +kubebuilder:object:generate=false
type NetworkUsageStatusCodeError struct {
StatusCode int
}

func (e NetworkUsageStatusCodeError) Error() string {
return fmt.Sprintf("Unexpected HTTP status code when fetching network usage: %d", e.StatusCode)
}

//+genclient
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
Expand All @@ -647,6 +700,37 @@ type VirtualMachine struct {
Status VirtualMachineStatus `json:"status,omitempty"`
}

func (vm *VirtualMachine) GetNetworkUsage(ctx context.Context, timeout time.Duration) (*VirtualMachineNetworkUsage, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
url := fmt.Sprintf("http://%s:%d/network_usage", vm.Status.PodIP, vm.Spec.RunnerPort)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("error initializing http request to fetch network usage: %w", err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, NetworkUsageRequestError{Err: err}
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, NetworkUsageStatusCodeError{StatusCode: resp.StatusCode}
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading http response body: %w", err)
}

var result VirtualMachineNetworkUsage
if err := json.Unmarshal(body, &result); err != nil {
return nil, NetworkUsageJSONError{Err: err}
}

return &result, nil
}

func (vm *VirtualMachine) Cleanup() {
vm.Status.PodName = ""
vm.Status.PodIP = ""
Expand Down
1 change: 1 addition & 0 deletions neonvm/apis/neonvm/v1/virtualmachine_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (r *VirtualMachine) ValidateUpdate(old runtime.Object) (admission.Warnings,
{".spec.enableAcceleration", func(v *VirtualMachine) any { return v.Spec.EnableAcceleration }},
{".spec.enableSSH", func(v *VirtualMachine) any { return v.Spec.EnableSSH }},
{".spec.initScript", func(v *VirtualMachine) any { return v.Spec.InitScript }},
{".spec.enableNetworkMonitoring", func(v *VirtualMachine) any { return v.Spec.EnableNetworkMonitoring }},
}

for _, info := range immutableFields {
Expand Down
20 changes: 20 additions & 0 deletions neonvm/apis/neonvm/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,10 @@ spec:
default: true
description: Use KVM acceleation
type: boolean
enableNetworkMonitoring:
default: false
description: Enable network monitoring on the VM
type: boolean
enableSSH:
default: true
description: |-
Expand Down
Loading

0 comments on commit b5822bd

Please sign in to comment.