Skip to content

Commit

Permalink
Merge branch 'oleg/revert-crictl' into oleg/node-down-2
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Vasilev <[email protected]>
  • Loading branch information
Omrigan committed Aug 30, 2024
2 parents 837f32e + 3d2f1a1 commit a1e7ea4
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 40 deletions.
20 changes: 14 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,30 @@ help: ## Display this help.
.PHONY: generate
generate: ## Generate boilerplate DeepCopy methods, manifests, and Go client
# Use uid and gid of current user to avoid mismatched permissions
iidfile=$$(mktemp /tmp/iid-XXXXXX) && \
set -e ; \
iidfile=$$(mktemp /tmp/iid-XXXXXX) ; \
docker build \
--build-arg USER_ID=$(shell id -u $(USER)) \
--build-arg GROUP_ID=$(shell id -g $(USER)) \
--build-arg CONTROLLER_TOOLS_VERSION=$(CONTROLLER_TOOLS_VERSION) \
--build-arg CODE_GENERATOR_VERSION=$(CODE_GENERATOR_VERSION) \
--file neonvm/hack/Dockerfile.generate \
--iidfile $$iidfile . && \
--iidfile $$iidfile . ; \
volumes=('--volume' "$$PWD:/go/src/github.com/neondatabase/autoscaling") ; \
if [ -f .git ]; then \
gitdir="$$(git rev-parse --git-common-dir)" ; \
gitdir="$$(cd -P -- $$gitdir && pwd)" ; \
volumes+=('--volume' "$$gitdir:$$gitdir") ; \
fi ; \
set -x ; \
docker run --rm \
--volume $$PWD:/go/src/github.com/neondatabase/autoscaling \
"$${volumes[@]}" \
--workdir /go/src/github.com/neondatabase/autoscaling \
--user $(shell id -u $(USER)):$(shell id -g $(USER)) \
$$(cat $$iidfile) \
./neonvm/hack/generate.sh && \
docker rmi $$(cat $$iidfile)
rm -rf $$iidfile
./neonvm/hack/generate.sh ; \
docker rmi $$(cat $$iidfile) ; \
rm -rf $$iidfile ; \
go fmt ./...

.PHONY: fmt
Expand Down
8 changes: 8 additions & 0 deletions neonvm/controllers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ import (
// ReconcilerConfig stores shared configuration for VirtualMachineReconciler and
// VirtualMachineMigrationReconciler.
type ReconcilerConfig struct {
// DisableRunnerCgroup, if true, disables running QEMU in a cgroup in new VM runner pods.
// Fractional CPU scaling will continue to *pretend* to work, but it will not do anything in
// practice.
//
// Under the hood, this results in passing -skip-cgroup-management and -enable-dummy-cpu-server
// to neonvm-runner.
DisableRunnerCgroup bool

MaxConcurrentReconciles int

// SkipUpdateValidationFor is the set of object names that we should ignore when doing webhook
Expand Down
1 change: 1 addition & 0 deletions neonvm/controllers/functests/vm_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ var _ = Describe("VirtualMachine controller", func() {
Scheme: k8sClient.Scheme(),
Recorder: nil,
Config: &controllers.ReconcilerConfig{
DisableRunnerCgroup: false,
MaxConcurrentReconciles: 1,
SkipUpdateValidationFor: nil,
QEMUDiskCacheSettings: "cache=none",
Expand Down
22 changes: 18 additions & 4 deletions neonvm/controllers/vm_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,12 @@ func podSpec(
}},
Command: func() []string {
cmd := []string{"runner"}
if config.DisableRunnerCgroup {
cmd = append(cmd, "-skip-cgroup-management")
// cgroup management disabled, but we still need something to provide
// the server, so the runner will just provide a dummy implementation.
cmd = append(cmd, "-enable-dummy-cpu-server")
}
cmd = append(
cmd,
"-qemu-disk-cache-settings", config.QEMUDiskCacheSettings,
Expand Down Expand Up @@ -1494,9 +1500,13 @@ func podSpec(
MountPropagation: lo.ToPtr(corev1.MountPropagationNone),
}

// the /sys/fs/cgroup mount is only necessary if neonvm-runner has to
// do is own cpu limiting
return []corev1.VolumeMount{images, cgroups}
if config.DisableRunnerCgroup {
return []corev1.VolumeMount{images}
} else {
// the /sys/fs/cgroup mount is only necessary if neonvm-runner has to
// do is own cpu limiting
return []corev1.VolumeMount{images, cgroups}
}
}(),
Resources: vm.Spec.PodResources,
}
Expand All @@ -1519,7 +1529,11 @@ func podSpec(
},
},
}
return []corev1.Volume{images, cgroup}
if config.DisableRunnerCgroup {
return []corev1.Volume{images}
} else {
return []corev1.Volume{images, cgroup}
}
}(),
},
}
Expand Down
1 change: 1 addition & 0 deletions neonvm/controllers/vm_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func newTestParams(t *testing.T) *testParams {
Recorder: params.mockRecorder,
Scheme: scheme,
Config: &ReconcilerConfig{
DisableRunnerCgroup: false,
MaxConcurrentReconciles: 10,
SkipUpdateValidationFor: nil,
QEMUDiskCacheSettings: "",
Expand Down
2 changes: 2 additions & 0 deletions neonvm/hack/Dockerfile.generate
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM golang:1.21

RUN apt-get update && apt-get install -y patch

ARG CONTROLLER_TOOLS_VERSION
ARG CODE_GENERATOR_VERSION

Expand Down
33 changes: 28 additions & 5 deletions neonvm/hack/generate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,34 @@

set -eu -o pipefail

bash $GOPATH/src/k8s.io/code-generator/generate-groups.sh "deepcopy,client,informer,lister" \
github.com/neondatabase/autoscaling/neonvm/client \
github.com/neondatabase/autoscaling/neonvm/apis \
neonvm:v1 \
--go-header-file neonvm/hack/boilerplate.go.txt
CODEGEN_PATH="$GOPATH/src/k8s.io/code-generator/kube_codegen.sh"

# apply a small patch to allow kube_codegen.sh to work with our file structure.
patch "$CODEGEN_PATH" neonvm/hack/kube_codegen.patch

source "$CODEGEN_PATH"

# Required to allow git worktrees with non-root ownership on the host to work.
git config --global --add safe.directory "$GOPATH/src/github.com/neondatabase/autoscaling"

# note: generation requires that "${output_base}/${input_pkg_root}" is valid, and *generally* the
# way to do that is that it's the same directory.
# The only way for that to be true is if $output_base is equal to "$GOPATH/src", which we make
# possible by the way we mount the repo from the host.

echo "Running gen_helpers ..."
kube::codegen::gen_helpers \
--output-base "/go/src" \
--input-pkg-root github.com/neondatabase/autoscaling/neonvm/apis \
--boilerplate neonvm/hack/boilerplate.go.txt

echo "Running gen_client ..."
kube::codegen::gen_client \
--output-base "/go/src" \
--input-pkg-root github.com/neondatabase/autoscaling/neonvm/apis \
--output-pkg-root github.com/neondatabase/autoscaling/neonvm/client \
--with-watch \
--boilerplate neonvm/hack/boilerplate.go.txt

controller-gen object:headerFile="neonvm/hack/boilerplate.go.txt" paths="./neonvm/apis/..."

Expand Down
13 changes: 13 additions & 0 deletions neonvm/hack/kube_codegen.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
527,528c527
< while read -r file; do
< dir="$(dirname "${file}")"
---
> while read -r dir; do
541c540
< ":(glob)${in_root}"/'**/types.go' \
---
> ":(glob)${in_root}"/'**/*.go' \
543c542
< ) | LC_ALL=C sort -u
---
> ) | LC_ALL=C sort -u | sed 's:/[^/]*$::' | uniq
6 changes: 6 additions & 0 deletions neonvm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Check failure on line 41 in neonvm/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

"k8s.io/apimachinery/pkg/apis/meta/v1" imported as metav1 and not used
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"

Check failure on line 45 in neonvm/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

"k8s.io/client-go/kubernetes" imported and not used
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"

Check failure on line 50 in neonvm/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

"k8s.io/client-go/rest" imported and not used (typecheck)
"k8s.io/klog/v2"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
Expand Down Expand Up @@ -97,6 +100,7 @@ func main() {
var probeAddr string
var concurrencyLimit int
var skipUpdateValidationFor map[types.NamespacedName]struct{}
var disableRunnerCgroup bool
var qemuDiskCacheSettings string
var defaultMemoryProvider vmv1.MemoryProvider
var memhpAutoMovableRatio string
Expand Down Expand Up @@ -134,6 +138,7 @@ func main() {
return nil
},
)
flag.BoolVar(&disableRunnerCgroup, "disable-runner-cgroup", false, "Disable creation of a cgroup in neonvm-runner for fractional CPU limiting")
flag.StringVar(&qemuDiskCacheSettings, "qemu-disk-cache-settings", "cache=none", "Set neonvm-runner's QEMU disk cache settings")
flag.Func("default-memory-provider", "Set default memory provider to use for new VMs", defaultMemoryProvider.FlagFunc)
flag.StringVar(&memhpAutoMovableRatio, "memhp-auto-movable-ratio", "301", "For virtio-mem, set VM kernel's memory_hotplug.auto_movable_ratio")
Expand Down Expand Up @@ -190,6 +195,7 @@ func main() {
reconcilerMetrics := controllers.MakeReconcilerMetrics()

rc := &controllers.ReconcilerConfig{
DisableRunnerCgroup: disableRunnerCgroup,
MaxConcurrentReconciles: concurrencyLimit,
SkipUpdateValidationFor: skipUpdateValidationFor,
QEMUDiskCacheSettings: qemuDiskCacheSettings,
Expand Down
78 changes: 68 additions & 10 deletions neonvm/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand All @@ -36,6 +37,7 @@ import (
"github.com/jpillora/backoff"
"github.com/kdomanski/iso9660"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/samber/lo"
"github.com/vishvananda/netlink"
"go.uber.org/zap"

Expand Down Expand Up @@ -594,6 +596,7 @@ type Config struct {
kernelPath string
appendKernelCmdline string
skipCgroupManagement bool
enableDummyCPUServer bool
diskCacheSettings string
memoryProvider vmv1.MemoryProvider
autoMovableRatio string
Expand All @@ -606,6 +609,7 @@ func newConfig(logger *zap.Logger) *Config {
kernelPath: defaultKernelPath,
appendKernelCmdline: "",
skipCgroupManagement: false,
enableDummyCPUServer: false,
diskCacheSettings: "cache=none",
memoryProvider: "", // Require that this is explicitly set. We'll check later.
autoMovableRatio: "", // Require that this is explicitly set IFF memoryProvider is VirtioMem. We'll check later.
Expand All @@ -620,7 +624,10 @@ func newConfig(logger *zap.Logger) *Config {
cfg.appendKernelCmdline, "Additional kernel command line arguments")
flag.BoolVar(&cfg.skipCgroupManagement, "skip-cgroup-management",
cfg.skipCgroupManagement,
"Don't try to manage CPU (use if running alongside container-mgr)")
"Don't try to manage CPU (use if running alongside container-mgr, or if dummy CPU server is enabled)")
flag.BoolVar(&cfg.enableDummyCPUServer, "enable-dummy-cpu-server",
cfg.skipCgroupManagement,
"Provide a CPU server (unlike -skip-cgroup-management) but don't actually do anything with it")
flag.StringVar(&cfg.diskCacheSettings, "qemu-disk-cache-settings",
cfg.diskCacheSettings, "Cache settings to add to -drive args for VM disks")
flag.Func("memory-provider", "Set provider for memory hotplug", cfg.memoryProvider.FlagFunc)
Expand All @@ -635,6 +642,9 @@ func newConfig(logger *zap.Logger) *Config {
if cfg.memoryProvider == vmv1.MemoryProviderVirtioMem && cfg.autoMovableRatio == "" {
logger.Fatal("missing required flag '-memhp-auto-movable-ratio'")
}
if cfg.enableDummyCPUServer && !cfg.skipCgroupManagement {
logger.Fatal("flag -enable-dummy-cpu-server requires -skip-cgroup-management")
}

return cfg
}
Expand Down Expand Up @@ -1007,9 +1017,36 @@ func runQEMU(

wg.Add(1)
go terminateQemuOnSigterm(ctx, logger, &wg)
if !cfg.skipCgroupManagement {
if !cfg.skipCgroupManagement || cfg.enableDummyCPUServer {
var callbacks cpuServerCallbacks

if cfg.enableDummyCPUServer {
lastValue := &atomic.Uint32{}
lastValue.Store(uint32(vmSpec.Guest.CPUs.Min))

callbacks = cpuServerCallbacks{
get: func(logger *zap.Logger) (*vmv1.MilliCPU, error) {
return lo.ToPtr(vmv1.MilliCPU(lastValue.Load())), nil
},
set: func(logger *zap.Logger, cpu vmv1.MilliCPU) error {
lastValue.Store(uint32(cpu))
return nil
},
}
} else {
// Standard implementation -- actually set the cgroup
callbacks = cpuServerCallbacks{
get: func(logger *zap.Logger) (*vmv1.MilliCPU, error) {
return getCgroupQuota(cgroupPath)
},
set: func(logger *zap.Logger, cpu vmv1.MilliCPU) error {
return setCgroupLimit(logger, cpu, cgroupPath)
},
}
}

wg.Add(1)
go listenForCPUChanges(ctx, logger, vmSpec.RunnerPort, cgroupPath, &wg)
go listenForCPUChanges(ctx, logger, vmSpec.RunnerPort, callbacks, &wg)
}
wg.Add(1)
go forwardLogs(ctx, logger, &wg)
Expand Down Expand Up @@ -1040,7 +1077,12 @@ func runQEMU(
return err
}

func handleCPUChange(logger *zap.Logger, w http.ResponseWriter, r *http.Request, cgroupPath string) {
func handleCPUChange(
logger *zap.Logger,
w http.ResponseWriter,
r *http.Request,
set func(*zap.Logger, vmv1.MilliCPU) error,
) {
if r.Method != "POST" {
logger.Error("unexpected method", zap.String("method", r.Method))
w.WriteHeader(400)
Expand All @@ -1062,7 +1104,7 @@ func handleCPUChange(logger *zap.Logger, w http.ResponseWriter, r *http.Request,

// update cgroup
logger.Info("got CPU update", zap.Float64("CPU", parsed.VCPUs.AsFloat64()))
err = setCgroupLimit(logger, parsed.VCPUs, cgroupPath)
err = set(logger, parsed.VCPUs)
if err != nil {
logger.Error("could not set cgroup limit", zap.Error(err))
w.WriteHeader(500)
Expand All @@ -1072,14 +1114,19 @@ func handleCPUChange(logger *zap.Logger, w http.ResponseWriter, r *http.Request,
w.WriteHeader(200)
}

func handleCPUCurrent(logger *zap.Logger, w http.ResponseWriter, r *http.Request, cgroupPath string) {
func handleCPUCurrent(
logger *zap.Logger,
w http.ResponseWriter,
r *http.Request,
get func(*zap.Logger) (*vmv1.MilliCPU, error),
) {
if r.Method != "GET" {
logger.Error("unexpected method", zap.String("method", r.Method))
w.WriteHeader(400)
return
}

cpus, err := getCgroupQuota(cgroupPath)
cpus, err := get(logger)
if err != nil {
logger.Error("could not get cgroup quota", zap.Error(err))
w.WriteHeader(500)
Expand All @@ -1097,17 +1144,28 @@ func handleCPUCurrent(logger *zap.Logger, w http.ResponseWriter, r *http.Request
w.Write(body) //nolint:errcheck // Not much to do with the error here. TODO: log it?
}

func listenForCPUChanges(ctx context.Context, logger *zap.Logger, port int32, cgroupPath string, wg *sync.WaitGroup) {
type cpuServerCallbacks struct {
get func(*zap.Logger) (*vmv1.MilliCPU, error)
set func(*zap.Logger, vmv1.MilliCPU) error
}

func listenForCPUChanges(
ctx context.Context,
logger *zap.Logger,
port int32,
callbacks cpuServerCallbacks,
wg *sync.WaitGroup,
) {
defer wg.Done()
mux := http.NewServeMux()
loggerHandlers := logger.Named("http-handlers")
cpuChangeLogger := loggerHandlers.Named("cpu_change")
mux.HandleFunc("/cpu_change", func(w http.ResponseWriter, r *http.Request) {
handleCPUChange(cpuChangeLogger, w, r, cgroupPath)
handleCPUChange(cpuChangeLogger, w, r, callbacks.set)
})
cpuCurrentLogger := loggerHandlers.Named("cpu_current")
mux.HandleFunc("/cpu_current", func(w http.ResponseWriter, r *http.Request) {
handleCPUCurrent(cpuCurrentLogger, w, r, cgroupPath)
handleCPUCurrent(cpuCurrentLogger, w, r, callbacks.get)
})
server := http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", port),
Expand Down
3 changes: 2 additions & 1 deletion pkg/plugin/trans.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ func (r resourceTransitioner[T]) handleRequestedGeneric(
// the factor.
maxIncrease := (remainingReservable / opts.factor) * opts.factor
// ... but we must allow at least opts.forceApprovalMinimum
maxIncrease = util.Max(maxIncrease, opts.forceApprovalMinimum)
increaseFromForceApproval := util.SaturatingSub(opts.forceApprovalMinimum, r.pod.Reserved)
maxIncrease = util.Max(maxIncrease, increaseFromForceApproval)

if increase > maxIncrease /* increases are bound by what's left in the node */ {
r.pod.CapacityPressure = increase - maxIncrease
Expand Down
Loading

0 comments on commit a1e7ea4

Please sign in to comment.