Skip to content

Commit

Permalink
neonvm: apply code review fixes
Browse files Browse the repository at this point in the history
pass cpuScalingMode as argument to the vm-runner
rename arguments, constants and functions here and there
drop unused code
move default cpu scaling mode to controller argument

Signed-off-by: Misha Sakhnov <[email protected]>
  • Loading branch information
mikhail-sakhnov committed Oct 21, 2024
1 parent 82a4827 commit 636e001
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 103 deletions.
5 changes: 3 additions & 2 deletions neonvm-controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func main() {
var concurrencyLimit int
var skipUpdateValidationFor map[types.NamespacedName]struct{}
var disableRunnerCgroup bool
var useCpuSysfsStateScaling bool
var defaultCpuScalingMode string
var qemuDiskCacheSettings string
var defaultMemoryProvider vmv1.MemoryProvider
var memhpAutoMovableRatio string
Expand Down Expand Up @@ -133,7 +133,7 @@ func main() {
return nil
},
)
flag.BoolVar(&useCpuSysfsStateScaling, "use-cpu-sysfs-state-scaling", false, "Use sysfs cpu state scaling for CPU scaling")
flag.StringVar(&defaultCpuScalingMode, "default-cpu-scaling-mode", vmv1.CpuScalingModeQMP, fmt.Sprintf("Default: CPU scaling: %s || %s", vmv1.CpuScalingModeQMP, vmv1.CpuScalingModeSysfs))
flag.BoolVar(&disableRunnerCgroup, "disable-runner-cgroup", false, "Disable creation of a cgroup in neonvm-runner for fractional CPU limiting")
flag.StringVar(&qemuDiskCacheSettings, "qemu-disk-cache-settings", "cache=none", "Set neonvm-runner's QEMU disk cache settings")
flag.Func("default-memory-provider", "Set default memory provider to use for new VMs", defaultMemoryProvider.FlagFunc)
Expand Down Expand Up @@ -197,6 +197,7 @@ func main() {
MemhpAutoMovableRatio: memhpAutoMovableRatio,
FailurePendingPeriod: failurePendingPeriod,
FailingRefreshInterval: failingRefreshInterval,
DefaultCPUScalingMode: defaultCpuScalingMode,
}

vmReconciler := &controllers.VMReconciler{
Expand Down
3 changes: 0 additions & 3 deletions neonvm-daemon/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ func (s *cpuServer) handleGetCPUStatus(w http.ResponseWriter) {
}

func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) {
// TODO: should the call to this method be conditional, only if the statefs cpu scaling is enabled?
// on the other hand, currently this endpoint is called by runner only if the statefs scaling is enabled
// and it is a bit tricky to pass vmSpec here
s.cpuOperationsMutex.Lock()
defer s.cpuOperationsMutex.Unlock()
body, err := io.ReadAll(r.Body)
Expand Down
107 changes: 33 additions & 74 deletions neonvm-runner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"os/signal"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -612,10 +611,10 @@ type Config struct {
kernelPath string
appendKernelCmdline string
skipCgroupManagement bool
enableDummyCPUServer bool
diskCacheSettings string
memoryProvider vmv1.MemoryProvider
autoMovableRatio string
cpuScalingMode string
}

func newConfig(logger *zap.Logger) *Config {
Expand All @@ -625,10 +624,10 @@ func newConfig(logger *zap.Logger) *Config {
kernelPath: defaultKernelPath,
appendKernelCmdline: "",
skipCgroupManagement: false,
enableDummyCPUServer: false,
diskCacheSettings: "cache=none",
memoryProvider: "", // Require that this is explicitly set. We'll check later.
autoMovableRatio: "", // Require that this is explicitly set IFF memoryProvider is VirtioMem. We'll check later.
cpuScalingMode: "", // Require that this is explicitly set. We'll check later.
}
flag.StringVar(&cfg.vmSpecDump, "vmspec", cfg.vmSpecDump,
"Base64 encoded VirtualMachine json specification")
Expand All @@ -641,14 +640,12 @@ func newConfig(logger *zap.Logger) *Config {
flag.BoolVar(&cfg.skipCgroupManagement, "skip-cgroup-management",
cfg.skipCgroupManagement,
"Don't try to manage CPU")
flag.BoolVar(&cfg.enableDummyCPUServer, "enable-dummy-cpu-server",
cfg.skipCgroupManagement,
"Use with -skip-cgroup-management. Provide a CPU server but don't actually do anything with it")
flag.StringVar(&cfg.diskCacheSettings, "qemu-disk-cache-settings",
cfg.diskCacheSettings, "Cache settings to add to -drive args for VM disks")
flag.Func("memory-provider", "Set provider for memory hotplug", cfg.memoryProvider.FlagFunc)
flag.StringVar(&cfg.autoMovableRatio, "memhp-auto-movable-ratio",
cfg.autoMovableRatio, "Set value of kernel's memory_hotplug.auto_movable_ratio [virtio-mem only]")
flag.StringVar(&cfg.cpuScalingMode, "cpu-scaling-mode", "", "Set CPU scaling mode")
flag.Parse()

if cfg.memoryProvider == "" {
Expand All @@ -657,8 +654,8 @@ func newConfig(logger *zap.Logger) *Config {
if cfg.memoryProvider == vmv1.MemoryProviderVirtioMem && cfg.autoMovableRatio == "" {
logger.Fatal("missing required flag '-memhp-auto-movable-ratio'")
}
if cfg.enableDummyCPUServer && !cfg.skipCgroupManagement {
logger.Fatal("flag -enable-dummy-cpu-server requires -skip-cgroup-management")
if cfg.cpuScalingMode == "" {
logger.Fatal("missing required flag '-cpu-scaling-mode'")
}

return cfg
Expand Down Expand Up @@ -894,22 +891,26 @@ func buildQEMUCmd(
maxCPUs := vmSpec.Guest.CPUs.Max.RoundedUp()
minCPUs := vmSpec.Guest.CPUs.Min.RoundedUp()

if vmSpec.CpuScalingMode != nil && *vmSpec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState {
// if we use sysfs based scaling we specify start cpus equal to max cpus
switch cfg.cpuScalingMode {
case vmv1.CpuScalingModeSysfs:
qemuCmd = append(qemuCmd, "-smp", fmt.Sprintf(
// if we use sysfs based scaling we specify initial value for cpus qemu arg equal to max cpus
"cpus=%d,maxcpus=%d,sockets=1,cores=%d,threads=1",
maxCPUs,
maxCPUs,
maxCPUs,
))
} else {
// if we use hotplug we specify start cpus equal to min cpus and scale using udev rules for cpu plug events
case vmv1.CpuScalingModeQMP:
// if we use hotplug we specify initial value for cpus qemu arg equal to min cpus and scale using udev rules for cpu plug events
qemuCmd = append(qemuCmd, "-smp", fmt.Sprintf(
"cpus=%d,maxcpus=%d,sockets=1,cores=%d,threads=1",
minCPUs,
maxCPUs,
maxCPUs,
))
default:
// we should never get here because we validate the flag in newConfig
panic(fmt.Errorf("unknown CPU scaling mode %s", cfg.cpuScalingMode))
}

// memory details
Expand Down Expand Up @@ -998,8 +999,8 @@ func makeKernelCmdline(cfg *Config, vmSpec *vmv1.VirtualMachineSpec, vmStatus *v
if cfg.appendKernelCmdline != "" {
cmdlineParts = append(cmdlineParts, cfg.appendKernelCmdline)
}
if vmSpec.CpuScalingMode != nil && *vmSpec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState {
// if we use sysfs based scaling we need to specify the start cpus as min CPUs
if cfg.cpuScalingMode == vmv1.CpuScalingModeSysfs {
// if we use sysfs based scaling we need to specify the start cpus as min CPUs to mark every CPU except 0 as offline
cmdlineParts = append(cmdlineParts, fmt.Sprintf("maxcpus=%d", vmSpec.Guest.CPUs.Min.RoundedUp()))
}

Expand Down Expand Up @@ -1048,44 +1049,28 @@ func runQEMU(
wg := sync.WaitGroup{}

wg.Add(1)
useCpuSysfsStateScaling := false
if vmSpec.CpuScalingMode != nil && *vmSpec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState {
useCpuSysfsStateScaling = true
}
go terminateQemuOnSigterm(ctx, logger, &wg)
if !cfg.skipCgroupManagement || cfg.enableDummyCPUServer || useCpuSysfsStateScaling {
if !cfg.skipCgroupManagement || cfg.cpuScalingMode == vmv1.CpuScalingModeSysfs {
var callbacks cpuServerCallbacks

if cfg.enableDummyCPUServer {
lastValue := &atomic.Uint32{}
lastValue.Store(uint32(vmSpec.Guest.CPUs.Min))

callbacks = cpuServerCallbacks{
get: func(logger *zap.Logger) (*vmv1.MilliCPU, error) {
return lo.ToPtr(vmv1.MilliCPU(lastValue.Load())), nil
},
set: func(logger *zap.Logger, cpu vmv1.MilliCPU) error {
if useCpuSysfsStateScaling {
err := setNeonvmDaemonCPU(cpu)
if err != nil {
logger.Error("setting CPU through NeonVM Daemon failed", zap.Any("cpu", cpu), zap.Error(err))
return err
}
lastValue := &atomic.Uint32{}
lastValue.Store(uint32(vmSpec.Guest.CPUs.Min))

callbacks = cpuServerCallbacks{
get: func(logger *zap.Logger) (*vmv1.MilliCPU, error) {
return lo.ToPtr(vmv1.MilliCPU(lastValue.Load())), nil
},
set: func(logger *zap.Logger, cpu vmv1.MilliCPU) error {
if cfg.cpuScalingMode == vmv1.CpuScalingModeSysfs {
err := setNeonvmDaemonCPU(cpu)
if err != nil {
logger.Error("setting CPU through NeonVM Daemon failed", zap.Any("cpu", cpu), zap.Error(err))
return err
}
lastValue.Store(uint32(cpu))
return nil
},
}
} else {
// Standard implementation -- actually set the cgroup
callbacks = cpuServerCallbacks{
get: func(logger *zap.Logger) (*vmv1.MilliCPU, error) {
return getCgroupQuota(cgroupPath)
},
set: func(logger *zap.Logger, cpu vmv1.MilliCPU) error {
return setCgroupLimit(logger, cpu, cgroupPath)
},
}
}
lastValue.Store(uint32(cpu))
return nil
},
}

wg.Add(1)
Expand Down Expand Up @@ -1493,32 +1478,6 @@ func setCgroupLimit(logger *zap.Logger, r vmv1.MilliCPU, cgroupPath string) erro
return nil
}

func getCgroupQuota(cgroupPath string) (*vmv1.MilliCPU, error) {
isV2 := cgroups.Mode() == cgroups.Unified
var path string
if isV2 {
path = filepath.Join(cgroupMountPoint, cgroupPath, "cpu.max")
} else {
path = filepath.Join(cgroupMountPoint, "cpu", cgroupPath, "cpu.cfs_quota_us")
}
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}

arr := strings.Split(strings.Trim(string(data), "\n"), " ")
if len(arr) == 0 {
return nil, errors.New("unexpected cgroup data")
}
quota, err := strconv.ParseUint(arr[0], 10, 64)
if err != nil {
return nil, err
}
cpu := vmv1.MilliCPU(uint32(quota * 1000 / cgroupPeriod))
cpu /= cpuLimitOvercommitFactor
return &cpu, nil
}

func terminateQemuOnSigterm(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) {
logger = logger.Named("terminate-qemu-on-sigterm")

Expand Down
4 changes: 2 additions & 2 deletions neonvm/apis/neonvm/v1/virtualmachine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ const (
// that the VM should use QMP to scale CPUs.
CpuScalingModeQMP string = "qmpScaling"

// CpuScalingModeCpuSysfsState is the value of the VirtualMachineSpec.CpuScalingMode field that
// CpuScalingModeSysfs is the value of the VirtualMachineSpec.CpuScalingMode field that
// indicates that the VM should use the CPU sysfs state interface to scale CPUs.
CpuScalingModeCpuSysfsState string = "sysfsScaling"
CpuScalingModeSysfs string = "sysfsScaling"
)

// VirtualMachineUsage provides information about a VM's current usage. This is the type of the
Expand Down
3 changes: 3 additions & 0 deletions pkg/neonvm/controllers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ type ReconcilerConfig struct {
// FailingRefreshInterval is the interval between consecutive
// updates of metrics and logs, related to failing reconciliations
FailingRefreshInterval time.Duration

// DefaultCPUScalingMode is the default CPU scaling mode that will be used for VMs with empty spec.cpuScalingMode
DefaultCPUScalingMode string
}
1 change: 1 addition & 0 deletions pkg/neonvm/controllers/functests/vm_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ var _ = Describe("VirtualMachine controller", func() {
MemhpAutoMovableRatio: "301",
FailurePendingPeriod: 1 * time.Minute,
FailingRefreshInterval: 1 * time.Minute,
DefaultCPUScalingMode: vmv1.CpuScalingModeQMP,
},
}

Expand Down
34 changes: 25 additions & 9 deletions pkg/neonvm/controllers/vm_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ func (r *VMReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re

// examine cpuScalingMode and set it to the default value if it is not set
if vm.Spec.CpuScalingMode == nil || *vm.Spec.CpuScalingMode == "" {
vm.Spec.CpuScalingMode = lo.ToPtr(vmv1.CpuScalingModeQMP)
log.Info("Setting default CPU scaling mode", "default", r.Config.DefaultCPUScalingMode)
vm.Spec.CpuScalingMode = lo.ToPtr(r.Config.DefaultCPUScalingMode)
if err := r.tryUpdateVM(ctx, &vm); err != nil {
log.Error(err, "Failed to set default CPU scaling mode")
return ctrl.Result{}, err
Expand Down Expand Up @@ -309,10 +310,10 @@ func (r *VMReconciler) updateVMStatusCPU(

func (r *VMReconciler) updateVMStatusMemory(
vm *vmv1.VirtualMachine,
QmpMemorySize *resource.Quantity,
qmpMemorySize *resource.Quantity,
) {
if vm.Status.MemorySize == nil || !QmpMemorySize.Equal(*vm.Status.MemorySize) {
vm.Status.MemorySize = QmpMemorySize
if vm.Status.MemorySize == nil || !qmpMemorySize.Equal(*vm.Status.MemorySize) {
vm.Status.MemorySize = qmpMemorySize
r.Recorder.Event(vm, "Normal", "MemoryInfo",
fmt.Sprintf("VirtualMachine %s uses %v memory",
vm.Name,
Expand Down Expand Up @@ -555,18 +556,31 @@ func (r *VMReconciler) doReconcile(ctx context.Context, vm *vmv1.VirtualMachine)
return err
}
var pluggedCPU uint32
if vm.Spec.CpuScalingMode != nil && *vm.Spec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState {

if vm.Spec.CpuScalingMode == nil { // should not happen
err := fmt.Errorf("CPU scaling mode is not set")
log.Error(err, "Unknown CPU scaling mode", "VirtualMachine", vm.Name)
return err
}

switch *vm.Spec.CpuScalingMode {
case vmv1.CpuScalingModeSysfs:
log.Info("CPU scaling mode is set to CpuSysfsState, CPU usage check based on cgroups")
pluggedCPU = cgroupUsage.VCPUs.RoundedUp()
} else {
// get CPU details from QEMU
case vmv1.CpuScalingModeQMP:
log.Info("CPU scaling mode is set to QMP, CPU usage check based on QMP")
cpuSlotsPlugged, _, err := QmpGetCpus(QmpAddr(vm))
if err != nil {
log.Error(err, "Failed to get CPU details from VirtualMachine", "VirtualMachine", vm.Name)
return err
}
pluggedCPU = uint32(len(cpuSlotsPlugged))
default:
err := fmt.Errorf("unsupported CPU scaling mode: %s", *vm.Spec.CpuScalingMode)
log.Error(err, "Unknown CPU scaling mode", "VirtualMachine", vm.Name, "CPU scaling mode", *vm.Spec.CpuScalingMode)
return err
}

// update status by CPUs used in the VM
r.updateVMStatusCPU(ctx, vm, vmRunner, pluggedCPU, cgroupUsage)

Expand Down Expand Up @@ -1400,10 +1414,9 @@ func podSpec(
Command: func() []string {
cmd := []string{"runner"}
if config.DisableRunnerCgroup {
cmd = append(cmd, "-skip-cgroup-management")
// cgroup management disabled, but we still need something to provide
// the server, so the runner will just provide a dummy implementation.
cmd = append(cmd, "-enable-dummy-cpu-server")
cmd = append(cmd, "-skip-cgroup-management")
}
cmd = append(
cmd,
Expand All @@ -1420,6 +1433,9 @@ func podSpec(
"-vmspec", base64.StdEncoding.EncodeToString(vmSpecJson),
"-vmstatus", base64.StdEncoding.EncodeToString(vmStatusJson),
)
// NB: We don't need to check if the value is nil because the default value
// was set in Reconcile
cmd = append(cmd, "-cpu-scaling-mode", *vm.Spec.CpuScalingMode)
return cmd
}(),
Env: []corev1.EnvVar{{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ func (r *VMReconciler) handleCPUScaling(ctx context.Context, vm *vmv1.VirtualMac

log := log.FromContext(ctx)
useCpuSysfsStateScaling := false
if vm.Spec.CpuScalingMode != nil && *vm.Spec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState {
if vm.Spec.CpuScalingMode != nil && *vm.Spec.CpuScalingMode == vmv1.CpuScalingModeSysfs {
useCpuSysfsStateScaling = true
}

var scaled bool
var err error
if !useCpuSysfsStateScaling {
scaled, err = r.handleQMPBasedCPUScaling(ctx, vm, vmRunner)
scaled, err = r.handleCPUScalingQMP(ctx, vm, vmRunner)
} else {
scaled, err = r.delegateScalingToNeonvmDaemon(ctx, vm, vmRunner)
scaled, err = r.handleCPUScalingSysfs(ctx, vm, vmRunner)
}

if err != nil {
Expand All @@ -39,8 +39,8 @@ func (r *VMReconciler) handleCPUScaling(ctx context.Context, vm *vmv1.VirtualMac
return scaled, nil
}

// handleQMPBasedCPUScaling handles CPU scaling using QMP, extracted as is from doReconcile
func (r *VMReconciler) handleQMPBasedCPUScaling(ctx context.Context, vm *vmv1.VirtualMachine, vmRunner *corev1.Pod) (bool, error) {
// handleCPUScalingQMP handles CPU scaling using QMP, extracted as is from doReconcile
func (r *VMReconciler) handleCPUScalingQMP(ctx context.Context, vm *vmv1.VirtualMachine, vmRunner *corev1.Pod) (bool, error) {
log := log.FromContext(ctx)
specCPU := vm.Spec.Guest.CPUs.Use
cgroupUsage, err := getRunnerCgroup(ctx, vm)
Expand Down Expand Up @@ -91,7 +91,7 @@ func (r *VMReconciler) handleQMPBasedCPUScaling(ctx context.Context, vm *vmv1.Vi
return hotPlugCPUScaled, nil
}

func (r *VMReconciler) delegateScalingToNeonvmDaemon(ctx context.Context, vm *vmv1.VirtualMachine, vmRunner *corev1.Pod) (bool, error) {
func (r *VMReconciler) handleCPUScalingSysfs(ctx context.Context, vm *vmv1.VirtualMachine, vmRunner *corev1.Pod) (bool, error) {
log := log.FromContext(ctx)
specCPU := vm.Spec.Guest.CPUs.Use

Expand Down
1 change: 1 addition & 0 deletions pkg/neonvm/controllers/vm_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func newTestParams(t *testing.T) *testParams {
MemhpAutoMovableRatio: "301",
FailurePendingPeriod: time.Minute,
FailingRefreshInterval: time.Minute,
DefaultCPUScalingMode: vmv1.CpuScalingModeQMP,
},
Metrics: reconcilerMetrics,
}
Expand Down
Loading

0 comments on commit 636e001

Please sign in to comment.