Skip to content

Commit

Permalink
neonvm-controller, neonvm-runner, neonvm-daemon: introduce separate C…
Browse files Browse the repository at this point in the history
…PU scaling flow based on the vmSpec.cpuScalingMode

If vmSpec.cpuScalingMode is equal to `qmp_scaling` the logic of the
scaling is preserved as before:

- Scale, if required the amount of CPUs using qmp commands.
- If it is required to scale cgroups, call vm-runner /cpu_change endpoint

if vmSpec.cpuScalingMode is equal to `cpuSysfsStateScaling` all cpu
scaling requests go directly to the vm-runner /cpu_change, which in
that configuration goes to the neonvm-daemon to reconcile required
amount of online CPUs.

Value `cpuSysfsStateScaling` also modifies the qemu and the kernel
arguments to enable plug all CPUs but mark as online only first one.

Signed-off-by: Misha Sakhnov <[email protected]>
  • Loading branch information
mikhail-sakhnov committed Oct 14, 2024
1 parent 4a3f699 commit ac70868
Show file tree
Hide file tree
Showing 20 changed files with 532 additions and 170 deletions.
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ test: fmt vet envtest ## Run tests.

.PHONY: build
build: fmt vet bin/vm-builder ## Build all neonvm binaries.
GOOS=linux go build -o bin/controller neonvm/main.go
GOOS=linux go build -o bin/vxlan-controller neonvm/tools/vxlan/controller/main.go
GOOS=linux go build -o bin/runner neonvm/runner/*.go
GOOS=linux go build -o bin/controller neonvm-controller/cmd/main.go
GOOS=linux go build -o bin/vxlan-controller neonvm-vxlan-controller/cmd/main.go
GOOS=linux go build -o bin/daemon neonvm-daemon/main.go
GOOS=linux go build -o bin/runner neonvm-runner/cmd/main.go

.PHONY: bin/vm-builder
bin/vm-builder: ## Build vm-builder binary.
Expand Down
7 changes: 4 additions & 3 deletions neonvm-controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand Down Expand Up @@ -95,7 +96,7 @@ func main() {
var concurrencyLimit int
var skipUpdateValidationFor map[types.NamespacedName]struct{}
var disableRunnerCgroup bool
var useOnlineOfflining bool
var useCpuSysfsStateScaling bool
var qemuDiskCacheSettings string
var defaultMemoryProvider vmv1.MemoryProvider
var memhpAutoMovableRatio string
Expand All @@ -106,7 +107,7 @@ func main() {
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&useOnlineOfflining, "use-online-offlining", false, "Use online offlining for CPU scaling")
flag.BoolVar(&useCpuSysfsStateScaling, "use-cpu-sysfs-state-scaling", false, "Use sysfs cpu state scaling for CPU scaling")
flag.IntVar(&concurrencyLimit, "concurrency-limit", 1, "Maximum number of concurrent reconcile operations")
flag.Func(
"skip-update-validation-for",
Expand Down Expand Up @@ -189,7 +190,7 @@ func main() {
reconcilerMetrics := controllers.MakeReconcilerMetrics()

rc := &controllers.ReconcilerConfig{
UseOnlineOfflining: useOnlineOfflining,
DisableRunnerCgroup: disableRunnerCgroup,
MaxConcurrentReconciles: concurrencyLimit,
SkipUpdateValidationFor: skipUpdateValidationFor,
QEMUDiskCacheSettings: qemuDiskCacheSettings,
Expand Down
2 changes: 1 addition & 1 deletion neonvm-controller/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ spec:
- "--concurrency-limit=128"
- "--skip-update-validation-for="
- "--disable-runner-cgroup"
- "--use-online-offlining"
- "--use-cpu-sysfs-state-scaling"
# See #775 and its links.
# * cache.writeback=on - don't set O_DSYNC (don't flush every write)
# * cache.direct=on - use O_DIRECT (don't abuse host's page cache!)
Expand Down
45 changes: 18 additions & 27 deletions neonvm-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"flag"
"fmt"
"io"
"math"
"net/http"
"os"
"strconv"
"sync"
"time"

"github.com/neondatabase/autoscaling/neonvm/daemon/pkg/cpuscaling"
vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"

"github.com/neondatabase/autoscaling/neonvm-daemon/pkg/cpuscaling"
"go.uber.org/zap"
)

Expand All @@ -31,8 +32,9 @@ func main() {
defer logger.Sync() //nolint:errcheck // what are we gonna do, log something about it?

logger.Info("Starting neonvm-daemon", zap.String("addr", *addr))
cpuHotPlugController := &cpuscaling.CPUOnlineOffliner{}
cpuHotPlugController := &cpuscaling.CPUSysFsStateScaler{}
srv := cpuServer{
cpuOperationsMutex: &sync.Mutex{},
cpuSystemWideScaler: cpuHotPlugController,
logger: logger.Named("cpu-srv"),
}
Expand All @@ -43,47 +45,35 @@ type cpuServer struct {
// Protects CPU operations from concurrent access to prevent multiple ensureOnlineCPUs calls from running concurrently
// and ensure that status response is always actual
cpuOperationsMutex *sync.Mutex
cpuSystemWideScaler *cpuscaling.CPUOnlineOffliner
cpuSystemWideScaler *cpuscaling.CPUSysFsStateScaler
logger *zap.Logger
}

// milliCPU is a type that represents CPU in milli units
type milliCPU uint64

// milliCPUFromString converts a byte slice to milliCPU
func milliCPUFromString(s []byte) (milliCPU, error) {
func milliCPUFromString(s []byte) (vmv1.MilliCPU, error) {

Check failure on line 53 in neonvm-daemon/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

func `milliCPUFromString` is unused (unused)
cpu, err := strconv.ParseUint(string(s), 10, 32)
if err != nil {
return 0, err
}
return milliCPU(cpu), nil
}

// ToCPU converts milliCPU to CPU
func (m milliCPU) ToCPU() int {
cpu := float64(m) / 1000.0
// Use math.Ceil to round up to the next CPU.
return int(math.Ceil(cpu))
return vmv1.MilliCPU(cpu), nil
}

func (s *cpuServer) handleGetCPUStatus(w http.ResponseWriter) {
// should be replaced with cgroups milliseconds to be exposed instead of having CPU
s.cpuOperationsMutex.Lock()
defer s.cpuOperationsMutex.Unlock()
totalCPUs, err := s.cpuSystemWideScaler.GetTotalCPUsCount()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
activeCPUs, err := s.cpuSystemWideScaler.GetActiveCPUsCount()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write([]byte(fmt.Sprintf("%d %d", activeCPUs, totalCPUs)))
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf("%d", activeCPUs*1000)))

Check failure on line 71 in neonvm-daemon/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Error return value of `w.Write` is not checked (errcheck)
}

func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) {
// TODO: should it be conditional, only if the statefs cpu scaling is enabled?
// on the other hand, currently this endpoint is called by runner only if the statefs scaling is enabled
s.cpuOperationsMutex.Lock()
defer s.cpuOperationsMutex.Unlock()
body, err := io.ReadAll(r.Body)
Expand All @@ -92,19 +82,20 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
return
}

milliCPU, err := milliCPUFromString(body)
updateInt, err := strconv.Atoi(string(body))
update := vmv1.MilliCPU(updateInt)
if err != nil {
s.logger.Error("could not parse request body as uint32", zap.Error(err))
s.logger.Error("could not unmarshal request body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
if err := s.cpuSystemWideScaler.EnsureOnlineCPUs(milliCPU.ToCPU()); err != nil {
s.logger.Info("Setting CPU status", zap.String("body", string(body)))
if err := s.cpuSystemWideScaler.EnsureOnlineCPUs(int(update.RoundedUp())); err != nil {
s.logger.Error("failed to ensure online CPUs", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
w.WriteHeader(http.StatusOK)
}

func (s *cpuServer) run(addr string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
// CPU directory path
const cpuPath = "/sys/devices/system/cpu/"

type CPUOnlineOffliner struct {
type CPUSysFsStateScaler struct {
}

func (c *CPUOnlineOffliner) EnsureOnlineCPUs(X int) error {
func (c *CPUSysFsStateScaler) EnsureOnlineCPUs(X int) error {
cpus, err := getAllCPUs()
if err != nil {
return err
Expand Down Expand Up @@ -89,7 +89,7 @@ func (c *CPUOnlineOffliner) EnsureOnlineCPUs(X int) error {
}

// GetActiveCPUsCount() returns the count of online CPUs.
func (c *CPUOnlineOffliner) GetActiveCPUsCount() (uint32, error) {
func (c *CPUSysFsStateScaler) GetActiveCPUsCount() (uint32, error) {
cpus, err := getAllCPUs()
if err != nil {
return 0, err
Expand All @@ -110,7 +110,7 @@ func (c *CPUOnlineOffliner) GetActiveCPUsCount() (uint32, error) {
}

// GetTotalCPUsCount returns the total number of CPUs (online + offline).
func (c *CPUOnlineOffliner) GetTotalCPUsCount() (uint32, error) {
func (c *CPUSysFsStateScaler) GetTotalCPUsCount() (uint32, error) {
cpus, err := getAllCPUs()
if err != nil {
return 0, err
Expand Down
95 changes: 85 additions & 10 deletions neonvm-runner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,6 @@ type Config struct {
diskCacheSettings string
memoryProvider vmv1.MemoryProvider
autoMovableRatio string
useOnlineOfflining bool
}

func newConfig(logger *zap.Logger) *Config {
Expand All @@ -630,7 +629,6 @@ func newConfig(logger *zap.Logger) *Config {
diskCacheSettings: "cache=none",
memoryProvider: "", // Require that this is explicitly set. We'll check later.
autoMovableRatio: "", // Require that this is explicitly set IFF memoryProvider is VirtioMem. We'll check later.
useOnlineOfflining: false,
}
flag.StringVar(&cfg.vmSpecDump, "vmspec", cfg.vmSpecDump,
"Base64 encoded VirtualMachine json specification")
Expand All @@ -651,7 +649,6 @@ func newConfig(logger *zap.Logger) *Config {
flag.Func("memory-provider", "Set provider for memory hotplug", cfg.memoryProvider.FlagFunc)
flag.StringVar(&cfg.autoMovableRatio, "memhp-auto-movable-ratio",
cfg.autoMovableRatio, "Set value of kernel's memory_hotplug.auto_movable_ratio [virtio-mem only]")
flag.BoolVar(&cfg.useOnlineOfflining, "use-online-offlining", false, "Use online offlining for CPU scaling")
flag.Parse()

if cfg.memoryProvider == "" {
Expand Down Expand Up @@ -761,7 +758,7 @@ func run(logger *zap.Logger) error {

tg.Go("qemu-cmd", func(logger *zap.Logger) error {
var err error
qemuCmd, err = buildQEMUCmd(cfg, logger, vmSpec, &vmStatus, enableSSH, swapSize, hostname, cfg.useOnlineOfflining)
qemuCmd, err = buildQEMUCmd(cfg, logger, vmSpec, &vmStatus, enableSSH, swapSize, hostname)
return err
})

Expand Down Expand Up @@ -815,7 +812,6 @@ func buildQEMUCmd(
enableSSH bool,
swapSize *resource.Quantity,
hostname string,
useOnlineOfflining bool,
) ([]string, error) {
// prepare qemu command line
qemuCmd := []string{
Expand Down Expand Up @@ -893,8 +889,8 @@ func buildQEMUCmd(
logger.Warn("not using KVM acceleration")
}
qemuCmd = append(qemuCmd, "-cpu", "max")
if useOnlineOfflining {
// if we use online offlining we specify start cpus equal to max cpus
if vmSpec.CpuScalingMode != nil && *vmSpec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState {
// if we use sysfs based scaling we specify start cpus equal to max cpus
qemuCmd = append(qemuCmd, "-smp", fmt.Sprintf(
"cpus=%d,maxcpus=%d,sockets=1,cores=%d,threads=1",
vmSpec.Guest.CPUs.Max.RoundedUp(),
Expand Down Expand Up @@ -997,8 +993,8 @@ func makeKernelCmdline(cfg *Config, vmSpec *vmv1.VirtualMachineSpec, vmStatus *v
if cfg.appendKernelCmdline != "" {
cmdlineParts = append(cmdlineParts, cfg.appendKernelCmdline)
}
if cfg.useOnlineOfflining {
// if we use online offlining we need to specify the start cpus as min CPUs
if vmSpec.CpuScalingMode != nil && *vmSpec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState {
// if we use sysfs based scaling we need to specify the start cpus as min CPUs
cmdlineParts = append(cmdlineParts, fmt.Sprintf("maxcpus=%d", vmSpec.Guest.CPUs.Min.RoundedUp()))
}

Expand Down Expand Up @@ -1047,8 +1043,12 @@ func runQEMU(
wg := sync.WaitGroup{}

wg.Add(1)
useCpuSysfsStateScaling := false
if vmSpec.CpuScalingMode != nil && *vmSpec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState {
useCpuSysfsStateScaling = true
}
go terminateQemuOnSigterm(ctx, logger, &wg)
if !cfg.skipCgroupManagement || cfg.enableDummyCPUServer {
if !cfg.skipCgroupManagement || cfg.enableDummyCPUServer || useCpuSysfsStateScaling {
var callbacks cpuServerCallbacks

if cfg.enableDummyCPUServer {
Expand All @@ -1060,6 +1060,13 @@ func runQEMU(
return lo.ToPtr(vmv1.MilliCPU(lastValue.Load())), nil
},
set: func(logger *zap.Logger, cpu vmv1.MilliCPU) error {
if useCpuSysfsStateScaling {
err := setNeonvmDaemonCPU(cpu)
if err != nil {
logger.Error("setting CPU through NeonVM Daemon failed", zap.Any("cpu", cpu), zap.Error(err))
return err
}
}
lastValue.Store(uint32(cpu))
return nil
},
Expand Down Expand Up @@ -1830,3 +1837,71 @@ func overlayNetwork(iface string) (mac.MAC, error) {

return mac, nil
}

func getNeonvmDaemonCPU() (*vmv1.MilliCPU, error) {

Check failure on line 1841 in neonvm-runner/cmd/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

func `getNeonvmDaemonCPU` is unused (unused)
_, vmIP, _, err := calcIPs(defaultNetworkCIDR)
if err != nil {
return nil, fmt.Errorf("could not calculate VM IP address: %w", err)
}

ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

url := fmt.Sprintf("http://%s:25183/cpu", vmIP)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("could not build request: %w", err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("could not send request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return nil, fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not read response body: %w", err)
}

milliCPU, err := strconv.Atoi(string(body))
if err != nil {
return nil, fmt.Errorf("could not parse response body: %w", err)
}
res := lo.ToPtr(vmv1.MilliCPU(milliCPU))
return res, nil
}

func setNeonvmDaemonCPU(cpu vmv1.MilliCPU) error {
_, vmIP, _, err := calcIPs(defaultNetworkCIDR)
if err != nil {
return fmt.Errorf("could not calculate VM IP address: %w", err)
}

ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

url := fmt.Sprintf("http://%s:25183/cpu", vmIP)
body := bytes.NewReader([]byte(fmt.Sprintf("%d", uint32(cpu))))

req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
if err != nil {
return fmt.Errorf("could not build request: %w", err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("could not send request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode)
}

return nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package controllers
package qmp

import (
"context"
Expand All @@ -17,6 +17,7 @@ import (
"k8s.io/client-go/tools/record"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/neonvm/pkg/tools"
)

type QmpCpus struct {
Expand Down Expand Up @@ -750,7 +751,7 @@ func QmpSyncMemoryToTarget(vm *vmv1.VirtualMachine, migration *vmv1.VirtualMachi
// run over Target memory and compare device id
found := false
for _, tm := range memoryDevicesInTarget {
if DeepEqual(m, tm) {
if tools.DeepEqual(m, tm) {
found = true
}
}
Expand Down
Loading

0 comments on commit ac70868

Please sign in to comment.