From 01ea2e7fd926e360cb450510ca881d20d324751d Mon Sep 17 00:00:00 2001 From: mfanjie Date: Thu, 18 Aug 2022 09:51:34 +0800 Subject: [PATCH] refine cpu sorter of crane agent --- pkg/ensurance/analyzer/analyzer.go | 125 ++++----- pkg/ensurance/executor/cpu_usage.go | 58 ++--- pkg/ensurance/executor/evict.go | 12 +- pkg/ensurance/executor/interface.go | 2 +- pkg/ensurance/executor/metric.go | 12 +- pkg/ensurance/executor/pod-info/pod_info.go | 244 ------------------ pkg/ensurance/executor/podinfo/pod_info.go | 138 ++++++++++ pkg/ensurance/executor/schedule.go | 8 +- pkg/ensurance/executor/sort/cpu_usage_sort.go | 44 ++-- .../executor/sort/cpu_usage_sort_test.go | 82 ++++++ pkg/ensurance/executor/sort/general_sort.go | 4 +- .../executor/sort/mem_metrics_sort.go | 4 +- pkg/ensurance/executor/sort/sort.go | 71 ++++- pkg/ensurance/executor/throttle.go | 16 +- .../executor/{waterline.go => watermark.go} | 6 +- .../{waterline_test.go => watermark_test.go} | 0 pkg/resource/pod_resource_manger.go | 2 +- pkg/utils/pod.go | 14 +- 18 files changed, 415 insertions(+), 427 deletions(-) delete mode 100644 pkg/ensurance/executor/pod-info/pod_info.go create mode 100644 pkg/ensurance/executor/podinfo/pod_info.go create mode 100644 pkg/ensurance/executor/sort/cpu_usage_sort_test.go rename pkg/ensurance/executor/{waterline.go => watermark.go} (98%) rename pkg/ensurance/executor/{waterline_test.go => watermark_test.go} (100%) diff --git a/pkg/ensurance/analyzer/analyzer.go b/pkg/ensurance/analyzer/analyzer.go index 96727ae69..e21c8ac6c 100644 --- a/pkg/ensurance/analyzer/analyzer.go +++ b/pkg/ensurance/analyzer/analyzer.go @@ -25,7 +25,7 @@ import ( "github.com/gocrane/crane/pkg/ensurance/analyzer/evaluator" ecache "github.com/gocrane/crane/pkg/ensurance/cache" "github.com/gocrane/crane/pkg/ensurance/executor" - podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/podinfo" "github.com/gocrane/crane/pkg/known" "github.com/gocrane/crane/pkg/metrics" "github.com/gocrane/crane/pkg/utils" @@ -167,16 +167,16 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) { for _, n := range neps { for _, v := range n.Spec.ObjectiveEnsurances { var key = strings.Join([]string{n.Name, v.Name}, ".") - ac, err := s.analyze(key, v, state) + actionContext, err := s.analyze(key, v, state) if err != nil { metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeAnalyzeError, key, 1.0) klog.Errorf("Failed to analyze, %v.", err) } - metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeAvoidance, key, float64(utils.Bool2Int32(ac.Triggered))) - metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeRestore, key, float64(utils.Bool2Int32(ac.Restored))) + metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeAvoidance, key, float64(utils.Bool2Int32(actionContext.Triggered))) + metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeRestore, key, float64(utils.Bool2Int32(actionContext.Restored))) - ac.Nep = n - actionContexts = append(actionContexts, ac) + actionContext.Nep = n + actionContexts = append(actionContexts, actionContext) } } @@ -222,17 +222,17 @@ func (s *AnormalyAnalyzer) trigger(series []common.TimeSeries, object ensurancea } func (s *AnormalyAnalyzer) analyze(key string, object ensuranceapi.ObjectiveEnsurance, stateMap map[string][]common.TimeSeries) (ecache.ActionContext, error) { - var ac = ecache.ActionContext{Strategy: object.Strategy, ObjectiveEnsuranceName: object.Name, ActionName: object.AvoidanceActionName} + var actionContext = ecache.ActionContext{Strategy: object.Strategy, ObjectiveEnsuranceName: object.Name, ActionName: object.AvoidanceActionName} state, ok := stateMap[object.MetricRule.Name] if !ok { - return ac, fmt.Errorf("metric %s not found", object.MetricRule.Name) + return actionContext, fmt.Errorf("metric %s not found", object.MetricRule.Name) } //step1: get series from value series, err := s.getSeries(state, object.MetricRule.Selector, object.MetricRule.Name) if err != nil { - return ac, err + return actionContext, err } //step2: check if triggered for NodeQOSEnsurance @@ -241,9 +241,9 @@ func (s *AnormalyAnalyzer) analyze(key string, object ensuranceapi.ObjectiveEnsu klog.V(4).Infof("For NodeQOS %s, metrics reach the threshold: %v", key, threshold) //step3: check is triggered action or restored, set the detection - s.computeActionContext(threshold, key, object, &ac) + s.computeActionContext(threshold, key, object, &actionContext) - return ac, nil + return actionContext, nil } func (s *AnormalyAnalyzer) computeActionContext(threshold bool, key string, object ensuranceapi.ObjectiveEnsurance, ac *ecache.ActionContext) { @@ -278,47 +278,47 @@ func (s *AnormalyAnalyzer) filterDryRun(acs []ecache.ActionContext) []ecache.Act return dcsFiltered } -func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, avoidanceMaps map[string]*ensuranceapi.AvoidanceAction, acs []ecache.ActionContext) executor.AvoidanceExecutor { - var ae executor.AvoidanceExecutor +func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, actionMap map[string]*ensuranceapi.AvoidanceAction, actionContexts []ecache.ActionContext) executor.AvoidanceExecutor { + var executor executor.AvoidanceExecutor //step1 filter dry run ActionContext - acsFiltered := s.filterDryRun(acs) + filteredActionContext := s.filterDryRun(actionContexts) //step2 do DisableScheduled merge - enableSchedule := s.disableSchedulingMerge(acsFiltered, avoidanceMaps, &ae) + s.mergeSchedulingActions(filteredActionContext, actionMap, &executor) - for _, ac := range acsFiltered { - action, ok := avoidanceMaps[ac.ActionName] + for _, actionCtx := range filteredActionContext { + action, ok := actionMap[actionCtx.ActionName] if !ok { - klog.Warningf("The action %s not found.", ac.ActionName) + klog.Warningf("The action %s not found.", actionCtx.ActionName) continue } //step3 get and deduplicate throttlePods, throttleUpPods if action.Spec.Throttle != nil { - throttlePods, throttleUpPods := s.getThrottlePods(enableSchedule, ac, action, stateMap) + throttlePods, throttleUpPods := s.getThrottlePods(actionCtx, action, stateMap) // combine the throttle waterline - combineThrottleWaterLine(&ae.ThrottleExecutor, ac, enableSchedule) + combineThrottleWaterLine(&executor.ThrottleExecutor, actionCtx) // combine the replicated pod - combineThrottleDuplicate(&ae.ThrottleExecutor, throttlePods, throttleUpPods) + combineThrottleDuplicate(&executor.ThrottleExecutor, throttlePods, throttleUpPods) } //step4 get and deduplicate evictPods if action.Spec.Eviction != nil { - evictPods := s.getEvictPods(ac.Triggered, action, stateMap) + evictPods := s.getEvictPods(actionCtx.Triggered, action, stateMap) // combine the evict waterline - combineEvictWaterLine(&ae.EvictExecutor, ac) + combineEvictWaterLine(&executor.EvictExecutor, actionCtx) // combine the replicated pod - combineEvictDuplicate(&ae.EvictExecutor, evictPods) + combineEvictDuplicate(&executor.EvictExecutor, evictPods) } } - ae.StateMap = stateMap + executor.StateMap = stateMap - klog.V(6).Infof("ThrottleExecutor is %#v, EvictExecutor is %#v", ae.ThrottleExecutor, ae.EvictExecutor) + klog.V(6).Infof("ThrottleExecutor is %#v, EvictExecutor is %#v", executor.ThrottleExecutor, executor.EvictExecutor) - return ae + return executor } func (s *AnormalyAnalyzer) logEvent(ac ecache.ActionContext, now time.Time) { @@ -388,12 +388,11 @@ func (s *AnormalyAnalyzer) actionTriggered(ac ecache.ActionContext) bool { return false } -func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.ActionContext, - action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]podinfo.PodContext, []podinfo.PodContext) { +func (s *AnormalyAnalyzer) getThrottlePods(actionCtx ecache.ActionContext, action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]podinfo.PodContext, []podinfo.PodContext) { throttlePods, throttleUpPods := []podinfo.PodContext{}, []podinfo.PodContext{} - if !ac.Triggered && !(enableSchedule && ac.Restored) { + if !actionCtx.Triggered && !actionCtx.Restored { return throttlePods, throttleUpPods } @@ -404,11 +403,11 @@ func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.Action } for _, pod := range allPods { - if ac.Triggered { - throttlePods = append(throttlePods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleDown)) + if actionCtx.Triggered { + throttlePods = append(throttlePods, podinfo.BuildPodActionContext(pod, stateMap, action, podinfo.ThrottleDown)) } - if enableSchedule && ac.Restored { - throttleUpPods = append(throttleUpPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleUp)) + if actionCtx.Restored { + throttleUpPods = append(throttleUpPods, podinfo.BuildPodActionContext(pod, stateMap, action, podinfo.ThrottleUp)) } } @@ -426,32 +425,21 @@ func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.Avo } for _, pod := range allPods { - evictPods = append(evictPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.Evict)) + evictPods = append(evictPods, podinfo.BuildPodActionContext(pod, stateMap, action, podinfo.Evict)) } } return evictPods } -func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionContext, avoidanceMaps map[string]*ensuranceapi.AvoidanceAction, ae *executor.AvoidanceExecutor) bool { +func (s *AnormalyAnalyzer) mergeSchedulingActions(actionContexts []ecache.ActionContext, avoidanceMaps map[string]*ensuranceapi.AvoidanceAction, ae *executor.AvoidanceExecutor) { var now = time.Now() - // If any rules are triggered, the avoidance is true,otherwise the avoidance is false. - // If all rules are not triggered and some rules are restored, the restore is true,otherwise the restore is false. - // If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false. - var enableScheduling, avoidance, restore bool - - defer func() { - metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(utils.Bool2Int32(enableScheduling))) - metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeAvoidance, float64(utils.Bool2Int32(avoidance))) - metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeRestore, float64(utils.Bool2Int32(restore))) - }() - // If the ensurance rules are empty, it must be recovered soon. // So we set enableScheduling true - if len(acsFiltered) == 0 { - enableScheduling = true + if len(actionContexts) == 0 { + s.ToggleScheduleSetting(ae, false) } else { - for _, ac := range acsFiltered { + for _, ac := range actionContexts { action, ok := avoidanceMaps[ac.ActionName] if !ok { klog.Warningf("DoMerge for detection,but the action %s not found", ac.ActionName) @@ -459,34 +447,31 @@ func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionCon } if ac.Triggered { - avoidance = true - enableScheduling = false + metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(0)) + s.ToggleScheduleSetting(ae, true) } if ac.Restored { - restore = true - if !avoidance && now.After(s.lastTriggeredTime.Add(time.Duration(action.Spec.CoolDownSeconds)*time.Second)) { - enableScheduling = true + if now.After(s.lastTriggeredTime.Add(time.Duration(action.Spec.CoolDownSeconds) * time.Second)) { + metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(1)) + s.ToggleScheduleSetting(ae, false) } } } } +} - if avoidance { - s.lastTriggeredTime = now - ae.ScheduleExecutor.DisableClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0} - } - - if enableScheduling { - ae.ScheduleExecutor.RestoreClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0} +func (s *AnormalyAnalyzer) ToggleScheduleSetting(ae *executor.AvoidanceExecutor, toBeDisable bool) { + if toBeDisable { + s.lastTriggeredTime = time.Now() } - - return enableScheduling + ae.ScheduleExecutor.ToBeDisable = toBeDisable + ae.ScheduleExecutor.ToBeRestore = !ae.ScheduleExecutor.ToBeDisable } func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, throttleUpPods executor.ThrottlePods) { for _, t := range throttlePods { - if i := e.ThrottleDownPods.Find(t.PodKey); i == -1 { + if i := e.ThrottleDownPods.Find(t.Key); i == -1 { e.ThrottleDownPods = append(e.ThrottleDownPods, t) } else { if t.CPUThrottle.MinCPURatio > e.ThrottleDownPods[i].CPUThrottle.MinCPURatio { @@ -500,7 +485,7 @@ func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, thrott } for _, t := range throttleUpPods { - if i := e.ThrottleUpPods.Find(t.PodKey); i == -1 { + if i := e.ThrottleUpPods.Find(t.Key); i == -1 { e.ThrottleUpPods = append(e.ThrottleUpPods, t) } else { if t.CPUThrottle.MinCPURatio > e.ThrottleUpPods[i].CPUThrottle.MinCPURatio { @@ -516,7 +501,7 @@ func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, thrott func combineEvictDuplicate(e *executor.EvictExecutor, evictPods executor.EvictPods) { for _, ep := range evictPods { - if i := e.EvictPods.Find(ep.PodKey); i == -1 { + if i := e.EvictPods.Find(ep.Key); i == -1 { e.EvictPods = append(e.EvictPods, ep) } else { if (ep.DeletionGracePeriodSeconds != nil) && ((e.EvictPods[i].DeletionGracePeriodSeconds == nil) || @@ -527,8 +512,8 @@ func combineEvictDuplicate(e *executor.EvictExecutor, evictPods executor.EvictPo } } -func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionContext, enableSchedule bool) { - if !ac.Triggered && !(enableSchedule && ac.Restored) { +func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionContext) { + if !ac.Triggered && !ac.Restored { return } @@ -551,7 +536,7 @@ func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionCont klog.V(6).Infof("ThrottleDownWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines) } - if enableSchedule && ac.Restored { + if ac.Restored { for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances { if ensurance.Name == ac.ObjectiveEnsuranceName { if e.ThrottleUpWaterLine == nil { diff --git a/pkg/ensurance/executor/cpu_usage.go b/pkg/ensurance/executor/cpu_usage.go index a521907ff..c6b39bce5 100644 --- a/pkg/ensurance/executor/cpu_usage.go +++ b/pkg/ensurance/executor/cpu_usage.go @@ -7,7 +7,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" - podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/podinfo" "github.com/gocrane/crane/pkg/ensurance/executor/sort" cruntime "github.com/gocrane/crane/pkg/ensurance/runtime" "github.com/gocrane/crane/pkg/metrics" @@ -15,29 +15,29 @@ import ( ) func init() { - registerMetricMap(cpu_usage) + registerMetricMap(cpuUsage) } -var cpu_usage = metric{ +var cpuUsage = metric{ Name: CpuUsage, ActionPriority: 5, - SortAble: true, - SortFunc: sort.CpuUsageSorter, + Sortable: true, + SortFunc: sort.CpuUsageSort, - ThrottleAble: true, + Throttleable: true, ThrottleQuantified: true, ThrottleFunc: throttleOnePodCpu, RestoreFunc: restoreOnePodCpu, - EvictAble: true, + Evictable: true, EvictQuantified: true, EvictFunc: evictOnePodCpu, } func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods ThrottlePods, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) { - pod, err := ctx.PodLister.Pods(ThrottleDownPods[index].PodKey.Namespace).Get(ThrottleDownPods[index].PodKey.Name) + pod, err := ctx.PodLister.Pods(ThrottleDownPods[index].Key.Namespace).Get(ThrottleDownPods[index].Key.Name) if err != nil { - errPodKeys = append(errPodKeys, fmt.Sprintf("pod %s not found", ThrottleDownPods[index].PodKey.String())) + errPodKeys = append(errPodKeys, fmt.Sprintf("pod %s not found", ThrottleDownPods[index].Key.String())) return } @@ -55,19 +55,19 @@ func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods Throttle containerCPUQuota, err := podinfo.GetUsageById(ThrottleDownPods[index].ContainerCPUQuotas, v.ContainerId) if err != nil { - errPodKeys = append(errPodKeys, err.Error(), ThrottleDownPods[index].PodKey.String()) + errPodKeys = append(errPodKeys, err.Error(), ThrottleDownPods[index].Key.String()) continue } containerCPUPeriod, err := podinfo.GetUsageById(ThrottleDownPods[index].ContainerCPUPeriods, v.ContainerId) if err != nil { - errPodKeys = append(errPodKeys, err.Error(), ThrottleDownPods[index].PodKey.String()) + errPodKeys = append(errPodKeys, err.Error(), ThrottleDownPods[index].Key.String()) continue } container, err := utils.GetPodContainerByName(pod, v.ContainerName) if err != nil { - errPodKeys = append(errPodKeys, err.Error(), ThrottleDownPods[index].PodKey.String()) + errPodKeys = append(errPodKeys, err.Error(), ThrottleDownPods[index].Key.String()) continue } @@ -96,14 +96,14 @@ func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods Throttle if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) { err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)}) if err != nil { - errPodKeys = append(errPodKeys, fmt.Sprintf("failed to updateResource for %s/%s, error: %v", ThrottleDownPods[index].PodKey.String(), v.ContainerName, err)) + errPodKeys = append(errPodKeys, fmt.Sprintf("failed to updateResource for %s/%s, error: %v", ThrottleDownPods[index].Key.String(), v.ContainerName, err)) continue } else { klog.V(4).Infof("ThrottleExecutor avoid pod %s, container %s, set cpu quota %.2f.", klog.KObj(pod), v.ContainerName, containerCPUQuotaNew*containerCPUPeriod.Value) released = ConstructCpuUsageRelease(ThrottleDownPods[index], containerCPUQuotaNew, v.Value) - klog.V(6).Infof("For pod %s, container %s, release %f cpu usage", ThrottleDownPods[index].PodKey.String(), container.Name, released[CpuUsage]) + klog.V(6).Infof("For pod %s, container %s, release %f cpu usage", ThrottleDownPods[index].Key.String(), container.Name, released[CpuUsage]) totalReleasedResource.Add(released) } @@ -113,9 +113,9 @@ func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods Throttle } func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePods, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) { - pod, err := ctx.PodLister.Pods(ThrottleUpPods[index].PodKey.Namespace).Get(ThrottleUpPods[index].PodKey.Name) + pod, err := ctx.PodLister.Pods(ThrottleUpPods[index].Key.Namespace).Get(ThrottleUpPods[index].Key.Name) if err != nil { - errPodKeys = append(errPodKeys, "not found ", ThrottleUpPods[index].PodKey.String()) + errPodKeys = append(errPodKeys, "not found ", ThrottleUpPods[index].Key.String()) return } @@ -130,19 +130,19 @@ func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePod containerCPUQuota, err := podinfo.GetUsageById(ThrottleUpPods[index].ContainerCPUQuotas, v.ContainerId) if err != nil { - errPodKeys = append(errPodKeys, err.Error(), ThrottleUpPods[index].PodKey.String()) + errPodKeys = append(errPodKeys, err.Error(), ThrottleUpPods[index].Key.String()) continue } containerCPUPeriod, err := podinfo.GetUsageById(ThrottleUpPods[index].ContainerCPUPeriods, v.ContainerId) if err != nil { - errPodKeys = append(errPodKeys, err.Error(), ThrottleUpPods[index].PodKey.String()) + errPodKeys = append(errPodKeys, err.Error(), ThrottleUpPods[index].Key.String()) continue } container, err := utils.GetPodContainerByName(pod, v.ContainerName) if err != nil { - errPodKeys = append(errPodKeys, err.Error(), ThrottleUpPods[index].PodKey.String()) + errPodKeys = append(errPodKeys, err.Error(), ThrottleUpPods[index].Key.String()) continue } @@ -175,20 +175,20 @@ func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePod if utils.AlmostEqual(containerCPUQuotaNew, -1) { err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(-1)}) if err != nil { - errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), ThrottleUpPods[index].PodKey.String()) + errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), ThrottleUpPods[index].Key.String()) continue } } else { err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)}) if err != nil { klog.Errorf("Failed to updateResource, err %s", err.Error()) - errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), ThrottleUpPods[index].PodKey.String()) + errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), ThrottleUpPods[index].Key.String()) continue } klog.V(4).Infof("ThrottleExecutor restore pod %s, container %s, set cpu quota %.2f, .", klog.KObj(pod), v.ContainerName, containerCPUQuotaNew*containerCPUPeriod.Value) released = ConstructCpuUsageRelease(ThrottleUpPods[index], containerCPUQuotaNew, v.Value) - klog.V(6).Infof("For pod %s, container %s, restore %f cpu usage", ThrottleUpPods[index].PodKey, container.Name, released[CpuUsage]) + klog.V(6).Infof("For pod %s, container %s, restore %f cpu usage", ThrottleUpPods[index].Key, container.Name, released[CpuUsage]) totalReleasedResource.Add(released) } @@ -208,16 +208,16 @@ func evictOnePodCpu(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalRel go func(evictPod podinfo.PodContext) { defer wg.Done() - pod, err := ctx.PodLister.Pods(evictPod.PodKey.Namespace).Get(evictPod.PodKey.Name) + pod, err := ctx.PodLister.Pods(evictPod.Key.Namespace).Get(evictPod.Key.Name) if err != nil { - errPodKeys = append(errPodKeys, "not found ", evictPod.PodKey.String()) + errPodKeys = append(errPodKeys, "not found ", evictPod.Key.String()) return } err = utils.EvictPodWithGracePeriod(ctx.Client, pod, evictPod.DeletionGracePeriodSeconds) if err != nil { - errPodKeys = append(errPodKeys, "evict failed ", evictPod.PodKey.String()) - klog.Warningf("Failed to evict pod %s: %v", evictPod.PodKey.String(), err) + errPodKeys = append(errPodKeys, "evict failed ", evictPod.Key.String()) + klog.Warningf("Failed to evict pod %s: %v", evictPod.Key.String(), err) return } @@ -229,12 +229,12 @@ func evictOnePodCpu(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalRel } func ConstructCpuUsageRelease(pod podinfo.PodContext, containerCPUQuotaNew, currentContainerCpuUsage float64) ReleaseResource { - if pod.PodType == podinfo.Evict { + if pod.ActionType == podinfo.Evict { return ReleaseResource{ CpuUsage: pod.PodCPUUsage * CpuQuotaCoefficient, } } - if pod.PodType == podinfo.ThrottleDown { + if pod.ActionType == podinfo.ThrottleDown { reduction := (currentContainerCpuUsage - containerCPUQuotaNew) * CpuQuotaCoefficient if reduction > 0 { return ReleaseResource{ @@ -243,7 +243,7 @@ func ConstructCpuUsageRelease(pod podinfo.PodContext, containerCPUQuotaNew, curr } return ReleaseResource{} } - if pod.PodType == podinfo.ThrottleUp { + if pod.ActionType == podinfo.ThrottleUp { reduction := (containerCPUQuotaNew - currentContainerCpuUsage) * CpuQuotaCoefficient if reduction > 0 { return ReleaseResource{ diff --git a/pkg/ensurance/executor/evict.go b/pkg/ensurance/executor/evict.go index b73203d64..f53cb57e9 100644 --- a/pkg/ensurance/executor/evict.go +++ b/pkg/ensurance/executor/evict.go @@ -9,7 +9,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" - podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/podinfo" execsort "github.com/gocrane/crane/pkg/ensurance/executor/sort" "github.com/gocrane/crane/pkg/known" "github.com/gocrane/crane/pkg/metrics" @@ -25,7 +25,7 @@ type EvictPods []podinfo.PodContext func (e EvictPods) Find(key types.NamespacedName) int { for i, v := range e { - if v.PodKey == key { + if v.Key == key { return i } } @@ -86,7 +86,7 @@ func (e *EvictExecutor) Avoid(ctx *ExecuteContext) error { wg := sync.WaitGroup{} for _, m := range metricsEvictQuantified { klog.V(6).Infof("Evict precisely on metric %s", m) - if metricMap[m].SortAble { + if metricMap[m].Sortable { metricMap[m].SortFunc(e.EvictPods) } else { execsort.GeneralSorter(e.EvictPods) @@ -94,7 +94,7 @@ func (e *EvictExecutor) Avoid(ctx *ExecuteContext) error { klog.V(6).Info("After sort, the sequence to evict is ") for _, pc := range e.EvictPods { - klog.V(6).Info(pc.PodKey.String()) + klog.V(6).Info(pc.Key.String()) } for !ctx.EvictGapToWaterLines.TargetGapsRemoved(m) { @@ -103,9 +103,9 @@ func (e *EvictExecutor) Avoid(ctx *ExecuteContext) error { index := podinfo.GetFirstNoExecutedPod(e.EvictPods) errKeys, released = metricMap[m].EvictFunc(&wg, ctx, index, &totalReleased, e.EvictPods) errPodKeys = append(errPodKeys, errKeys...) - klog.V(6).Infof("Evict pods %s, released %f resource", e.EvictPods[index].PodKey, released[m]) + klog.V(6).Infof("Evict pods %s, released %f resource", e.EvictPods[index].Key, released[m]) - e.EvictPods[index].HasBeenActioned = true + e.EvictPods[index].Executed = true ctx.EvictGapToWaterLines[m] -= released[m] } else { klog.V(6).Info("There is no pod that can be evicted") diff --git a/pkg/ensurance/executor/interface.go b/pkg/ensurance/executor/interface.go index b51016271..efdc99125 100644 --- a/pkg/ensurance/executor/interface.go +++ b/pkg/ensurance/executor/interface.go @@ -29,7 +29,7 @@ type ExecuteContext struct { RuntimeClient pb.RuntimeServiceClient RuntimeConn *grpc.ClientConn - // Gap for metrics EvictAble/ThrottleAble + // Gap for metrics Evictable/Throttleable // Key is the metric name, value is (actual used)-(the lowest waterline for NodeQOSEnsurancePolicies which use throttleDown action) ThrottoleDownGapToWaterLines GapToWaterLines // Key is the metric name, value is (actual used)-(the lowest waterline for NodeQOSEnsurancePolicies which use throttleUp action) diff --git a/pkg/ensurance/executor/metric.go b/pkg/ensurance/executor/metric.go index 50f5e1b66..2d36f8311 100644 --- a/pkg/ensurance/executor/metric.go +++ b/pkg/ensurance/executor/metric.go @@ -3,7 +3,7 @@ package executor import ( "sync" - podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/podinfo" ) type metric struct { @@ -16,15 +16,15 @@ type metric struct { // Some incompressible metric such as memory usage can be given a higher priority ActionPriority int - SortAble bool + Sortable bool SortFunc func(pods []podinfo.PodContext) - ThrottleAble bool + Throttleable bool ThrottleQuantified bool ThrottleFunc func(ctx *ExecuteContext, index int, ThrottleDownPods ThrottlePods, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) RestoreFunc func(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePods, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) - EvictAble bool + Evictable bool EvictQuantified bool // If use goroutine to evcit, make sure to calculate release resources outside the goroutine EvictFunc func(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource, EvictPods EvictPods) (errPodKeys []string, released ReleaseResource) @@ -38,7 +38,7 @@ func registerMetricMap(m metric) { func GetThrottleAbleMetricName() (throttleAbleMetricList []WaterLineMetric) { for _, m := range metricMap { - if m.ThrottleAble { + if m.Throttleable { throttleAbleMetricList = append(throttleAbleMetricList, m.Name) } } @@ -47,7 +47,7 @@ func GetThrottleAbleMetricName() (throttleAbleMetricList []WaterLineMetric) { func GetEvictAbleMetricName() (evictAbleMetricList []WaterLineMetric) { for _, m := range metricMap { - if m.EvictAble { + if m.Evictable { evictAbleMetricList = append(evictAbleMetricList, m.Name) } } diff --git a/pkg/ensurance/executor/pod-info/pod_info.go b/pkg/ensurance/executor/pod-info/pod_info.go deleted file mode 100644 index 067181e86..000000000 --- a/pkg/ensurance/executor/pod-info/pod_info.go +++ /dev/null @@ -1,244 +0,0 @@ -package pod_info - -import ( - "fmt" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/klog/v2" - - ensuranceapi "github.com/gocrane/api/ensurance/v1alpha1" - "github.com/gocrane/crane/pkg/common" - stypes "github.com/gocrane/crane/pkg/ensurance/collector/types" - "github.com/gocrane/crane/pkg/utils" -) - -type ClassAndPriority struct { - PodQOSClass v1.PodQOSClass - PriorityClassValue int32 -} - -type PodType string - -const ( - ThrottleDown PodType = "ThrottleDown" - ThrottleUp PodType = "ThrottleUp" - Evict PodType = "Evict" -) - -type ContainerUsage struct { - ContainerName string - ContainerId string - Value float64 -} - -func GetUsageById(usages []ContainerUsage, containerId string) (ContainerUsage, error) { - for _, v := range usages { - if v.ContainerId == containerId { - return v, nil - } - } - - return ContainerUsage{}, fmt.Errorf("containerUsage not found") -} - -func GetPodUsage(metricName string, stateMap map[string][]common.TimeSeries, pod *v1.Pod) (float64, []ContainerUsage) { - var podUsage = 0.0 - var containerUsages []ContainerUsage - var podMaps = map[string]string{common.LabelNamePodName: pod.Name, common.LabelNamePodNamespace: pod.Namespace, common.LabelNamePodUid: string(pod.UID)} - state, ok := stateMap[metricName] - if !ok { - return podUsage, containerUsages - } - for _, vv := range state { - var labelMaps = common.Labels2Maps(vv.Labels) - if utils.ContainMaps(labelMaps, podMaps) { - if labelMaps[common.LabelNameContainerId] == "" { - podUsage = vv.Samples[0].Value - } else { - containerUsages = append(containerUsages, ContainerUsage{ContainerId: labelMaps[common.LabelNameContainerId], - ContainerName: labelMaps[common.LabelNameContainerName], Value: vv.Samples[0].Value}) - } - } - } - - return podUsage, containerUsages -} - -type CPURatio struct { - //the min of cpu ratio for pods - MinCPURatio uint64 `json:"minCPURatio,omitempty"` - - //the step of cpu share and limit for once down-size (1-100) - StepCPURatio uint64 `json:"stepCPURatio,omitempty"` -} - -type MemoryThrottleExecutor struct { - // to force gc the page cache of low level pods - ForceGC bool `json:"forceGC,omitempty"` -} - -type PodContext struct { - PodKey types.NamespacedName - ClassAndPriority ClassAndPriority - PodCPUUsage float64 - ContainerCPUUsages []ContainerUsage - PodCPUShare float64 - ContainerCPUShares []ContainerUsage - PodCPUQuota float64 - ContainerCPUQuotas []ContainerUsage - PodCPUPeriod float64 - ContainerCPUPeriods []ContainerUsage - ExtCpuBeUsed bool - ExtCpuLimit int64 - ExtCpuRequest int64 - StartTime *metav1.Time - - PodType PodType - - CPUThrottle CPURatio - MemoryThrottle MemoryThrottleExecutor - - DeletionGracePeriodSeconds *int32 - - HasBeenActioned bool -} - -func HasNoExecutedPod(pods []PodContext) bool { - for _, p := range pods { - if p.HasBeenActioned == false { - return true - } - } - return false -} - -func GetFirstNoExecutedPod(pods []PodContext) int { - for index, p := range pods { - if p.HasBeenActioned == false { - return index - } - } - return -1 -} - -func BuildPodBasicInfo(pod *v1.Pod, stateMap map[string][]common.TimeSeries, action *ensuranceapi.AvoidanceAction, podType PodType) PodContext { - var podContext PodContext - - podContext.ClassAndPriority = ClassAndPriority{PodQOSClass: pod.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(pod.Spec.Priority, 0)} - podContext.PodKey = types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} - - podContext.PodCPUUsage, podContext.ContainerCPUUsages = GetPodUsage(string(stypes.MetricNameContainerCpuTotalUsage), stateMap, pod) - podContext.PodCPUShare, podContext.ContainerCPUShares = GetPodUsage(string(stypes.MetricNameContainerCpuLimit), stateMap, pod) - podContext.PodCPUQuota, podContext.ContainerCPUQuotas = GetPodUsage(string(stypes.MetricNameContainerCpuQuota), stateMap, pod) - podContext.PodCPUPeriod, podContext.ContainerCPUPeriods = GetPodUsage(string(stypes.MetricNameContainerCpuPeriod), stateMap, pod) - podContext.ExtCpuBeUsed, podContext.ExtCpuLimit, podContext.ExtCpuRequest = utils.ExtResourceAllocated(pod, v1.ResourceCPU) - podContext.StartTime = pod.Status.StartTime - - if action.Spec.Throttle != nil { - podContext.CPUThrottle.MinCPURatio = uint64(action.Spec.Throttle.CPUThrottle.MinCPURatio) - podContext.CPUThrottle.StepCPURatio = uint64(action.Spec.Throttle.CPUThrottle.StepCPURatio) - } - - podContext.PodType = podType - - return podContext -} - -func CompareClassAndPriority(a, b ClassAndPriority) int32 { - qosClassCmp := comparePodQosClass(a.PodQOSClass, b.PodQOSClass) - if qosClassCmp != 0 { - return qosClassCmp - } - if a.PriorityClassValue == b.PriorityClassValue { - return 0 - } else if a.PriorityClassValue < b.PriorityClassValue { - return -1 - } - return 1 -} - -func (s ClassAndPriority) Less(i ClassAndPriority) bool { - if comparePodQosClass(s.PodQOSClass, i.PodQOSClass) == 1 { - return false - } - - if comparePodQosClass(s.PodQOSClass, i.PodQOSClass) == -1 { - return true - } - - return s.PriorityClassValue < i.PriorityClassValue -} - -func (s ClassAndPriority) Greater(i ClassAndPriority) bool { - if comparePodQosClass(s.PodQOSClass, i.PodQOSClass) == 1 { - return true - } - - if comparePodQosClass(s.PodQOSClass, i.PodQOSClass) == -1 { - return false - } - - return s.PriorityClassValue > i.PriorityClassValue -} - -func GetMaxQOSPriority(podLister corelisters.PodLister, podTypes []types.NamespacedName) (types.NamespacedName, ClassAndPriority) { - - var podType types.NamespacedName - var scheduledQOSPriority ClassAndPriority - - for _, podNamespace := range podTypes { - if pod, err := podLister.Pods(podNamespace.Namespace).Get(podNamespace.Name); err != nil { - klog.V(6).Infof("Warning: getMaxQOSPriority get pod %s not found", podNamespace.String()) - continue - } else { - var priority = ClassAndPriority{PodQOSClass: pod.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(pod.Spec.Priority, 0) - 1} - if priority.Greater(scheduledQOSPriority) { - scheduledQOSPriority = priority - podType = podNamespace - } - } - } - - return podType, scheduledQOSPriority -} - -// We defined guaranteed is the highest qos class, burstable is the middle level -// bestEffort is the lowest -// if a qos class is greater than b, return 1 -// if a qos class is less than b, return -1 -// if a qos class equal with b , return 0 -func comparePodQosClass(a v1.PodQOSClass, b v1.PodQOSClass) int32 { - switch b { - case v1.PodQOSGuaranteed: - if a == v1.PodQOSGuaranteed { - return 0 - } else { - return -1 - } - case v1.PodQOSBurstable: - if a == v1.PodQOSGuaranteed { - return 1 - } else if a == v1.PodQOSBurstable { - return 0 - } else { - return -1 - } - case v1.PodQOSBestEffort: - if (a == v1.PodQOSGuaranteed) || (a == v1.PodQOSBurstable) { - return 1 - } else if a == v1.PodQOSBestEffort { - return 0 - } else { - return -1 - } - default: - if (a == v1.PodQOSGuaranteed) || (a == v1.PodQOSBurstable) || (a == v1.PodQOSBestEffort) { - return 1 - } else { - return 0 - } - } -} diff --git a/pkg/ensurance/executor/podinfo/pod_info.go b/pkg/ensurance/executor/podinfo/pod_info.go new file mode 100644 index 000000000..9d9497393 --- /dev/null +++ b/pkg/ensurance/executor/podinfo/pod_info.go @@ -0,0 +1,138 @@ +package podinfo + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + ensuranceapi "github.com/gocrane/api/ensurance/v1alpha1" + "github.com/gocrane/crane/pkg/common" + stypes "github.com/gocrane/crane/pkg/ensurance/collector/types" + "github.com/gocrane/crane/pkg/utils" +) + +type ClassAndPriority struct { + PodQOSClass v1.PodQOSClass + PriorityClassValue int32 +} + +type ActionType string + +const ( + ThrottleDown ActionType = "ThrottleDown" + ThrottleUp ActionType = "ThrottleUp" + Evict ActionType = "Evict" +) + +type ContainerState struct { + ContainerName string + ContainerId string + Value float64 +} + +func GetUsageById(usages []ContainerState, containerId string) (ContainerState, error) { + for _, v := range usages { + if v.ContainerId == containerId { + return v, nil + } + } + + return ContainerState{}, fmt.Errorf("containerUsage not found") +} + +func GetPodUsage(metricName string, stateMap map[string][]common.TimeSeries, pod *v1.Pod) (float64, []ContainerState) { + var podUsage = 0.0 + var containerUsages []ContainerState + var podMaps = map[string]string{common.LabelNamePodName: pod.Name, common.LabelNamePodNamespace: pod.Namespace, common.LabelNamePodUid: string(pod.UID)} + state, ok := stateMap[metricName] + if !ok { + return podUsage, containerUsages + } + for _, vv := range state { + var labelMaps = common.Labels2Maps(vv.Labels) + if utils.ContainMaps(labelMaps, podMaps) { + if labelMaps[common.LabelNameContainerId] == "" { + podUsage = vv.Samples[0].Value + } else { + containerUsages = append(containerUsages, ContainerState{ContainerId: labelMaps[common.LabelNameContainerId], + ContainerName: labelMaps[common.LabelNameContainerName], Value: vv.Samples[0].Value}) + } + } + } + + return podUsage, containerUsages +} + +type CPURatio struct { + //the min of cpu ratio for pods + MinCPURatio uint64 `json:"minCPURatio,omitempty"` + + //the step of cpu share and limit for once down-size (1-100) + StepCPURatio uint64 `json:"stepCPURatio,omitempty"` +} + +type MemoryThrottleExecutor struct { + // to force gc the page cache of low level pods + ForceGC bool `json:"forceGC,omitempty"` +} + +type PodContext struct { + Key types.NamespacedName + QOSClass v1.PodQOSClass + Priority int32 + StartTime *metav1.Time + DeletionGracePeriodSeconds *int32 + + ElasticCPU int64 + PodCPUUsage, PodCPUShare, PodCPUQuota, PodCPUPeriod float64 + ContainerCPUUsages, ContainerCPUShares, ContainerCPUQuotas, ContainerCPUPeriods []ContainerState + + ActionType ActionType + CPUThrottle CPURatio + Executed bool +} + +func HasNoExecutedPod(pods []PodContext) bool { + for _, p := range pods { + if p.Executed == false { + return true + } + } + return false +} + +func GetFirstNoExecutedPod(pods []PodContext) int { + for index, p := range pods { + if p.Executed == false { + return index + } + } + return -1 +} + +func BuildPodActionContext(pod *v1.Pod, stateMap map[string][]common.TimeSeries, action *ensuranceapi.AvoidanceAction, actionType ActionType) PodContext { + var podContext PodContext + + podContext.QOSClass = pod.Status.QOSClass + podContext.Priority = utils.GetInt32withDefault(pod.Spec.Priority, 0) + + podContext.Key = types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + + podContext.PodCPUUsage, podContext.ContainerCPUUsages = GetPodUsage(string(stypes.MetricNameContainerCpuTotalUsage), stateMap, pod) + podContext.PodCPUShare, podContext.ContainerCPUShares = GetPodUsage(string(stypes.MetricNameContainerCpuLimit), stateMap, pod) + podContext.PodCPUQuota, podContext.ContainerCPUQuotas = GetPodUsage(string(stypes.MetricNameContainerCpuQuota), stateMap, pod) + podContext.PodCPUPeriod, podContext.ContainerCPUPeriods = GetPodUsage(string(stypes.MetricNameContainerCpuPeriod), stateMap, pod) + podContext.ElasticCPU = utils.GetElasticResourceLimit(pod, v1.ResourceCPU) + podContext.StartTime = pod.Status.StartTime + + if action.Spec.Throttle != nil { + podContext.CPUThrottle.MinCPURatio = uint64(action.Spec.Throttle.CPUThrottle.MinCPURatio) + podContext.CPUThrottle.StepCPURatio = uint64(action.Spec.Throttle.CPUThrottle.StepCPURatio) + } + + podContext.ActionType = actionType + + return podContext +} diff --git a/pkg/ensurance/executor/schedule.go b/pkg/ensurance/executor/schedule.go index 663ad227e..c9486197b 100644 --- a/pkg/ensurance/executor/schedule.go +++ b/pkg/ensurance/executor/schedule.go @@ -6,7 +6,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" - podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" "github.com/gocrane/crane/pkg/known" "github.com/gocrane/crane/pkg/metrics" "github.com/gocrane/crane/pkg/utils" @@ -17,8 +16,7 @@ const ( ) type ScheduleExecutor struct { - DisableClassAndPriority *podinfo.ClassAndPriority - RestoreClassAndPriority *podinfo.ClassAndPriority + ToBeDisable, ToBeRestore bool } func (b *ScheduleExecutor) Avoid(ctx *ExecuteContext) error { @@ -28,7 +26,7 @@ func (b *ScheduleExecutor) Avoid(ctx *ExecuteContext) error { klog.V(6).Info("DisableScheduledExecutor avoid, %v", *b) - if b.DisableClassAndPriority == nil { + if !b.ToBeDisable { metrics.UpdateExecutorStatus(metrics.SubComponentSchedule, metrics.StepAvoid, 0) return nil } @@ -58,7 +56,7 @@ func (b *ScheduleExecutor) Restore(ctx *ExecuteContext) error { klog.V(6).Info("DisableScheduledExecutor restore, %v", *b) - if b.RestoreClassAndPriority == nil { + if !b.ToBeRestore { metrics.UpdateExecutorStatus(metrics.SubComponentSchedule, metrics.StepRestore, 0.0) return nil } diff --git a/pkg/ensurance/executor/sort/cpu_usage_sort.go b/pkg/ensurance/executor/sort/cpu_usage_sort.go index 08acb1f99..ef840db64 100644 --- a/pkg/ensurance/executor/sort/cpu_usage_sort.go +++ b/pkg/ensurance/executor/sort/cpu_usage_sort.go @@ -1,47 +1,37 @@ package sort import ( - v1 "k8s.io/api/core/v1" - - podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/podinfo" "github.com/gocrane/crane/pkg/utils" ) -func CpuUsageSorter(pods []podinfo.PodContext) { - orderedBy(classAndPriority, cpuUsage, extCpuUsage, runningTime).Sort(pods) +func CpuUsageSort(pods []podinfo.PodContext) { + // todo, need ut to make sure all cases + orderedBy(UseElasticCPU, ComparePriority, ComparePodQOSClass, CompareCPUUsage, CompareElasticCPU, CompareRunningTime).Sort(pods) } -// extCpuUsage compares the partition of extcpu usage to extcpu limit -func extCpuUsage(p1, p2 podinfo.PodContext) int32 { +// CompareElasticCPU compares the partition of extcpu usage to extcpu limit +func CompareElasticCPU(p1, p2 podinfo.PodContext) int32 { // if both pod don't use ext resource, then return - if p1.ExtCpuBeUsed == false && p2.ExtCpuBeUsed == false { + if p1.ElasticCPU == 0 && p2.ElasticCPU == 0 { return 0 } - p1Ratio := p1.PodCPUUsage / float64(p1.ExtCpuLimit) - p2Ratio := p2.PodCPUUsage / float64(p2.ExtCpuLimit) + p1Ratio := p1.PodCPUUsage / float64(p1.ElasticCPU) + p2Ratio := p2.PodCPUUsage / float64(p2.ElasticCPU) return utils.CmpFloat(p1Ratio, p2Ratio) } -// cpuUsage compares the partition extcpu usage of extcpu limit -func cpuUsage(p1, p2 podinfo.PodContext) int32 { - var p1usage, p2usage float64 - // if both pod is PodQOSBestEffort, then compare the absolute usage;otherwise, cmpare the ratio compared with PodCPUQuota - if p1.ClassAndPriority.PodQOSClass == v1.PodQOSBestEffort && p2.ClassAndPriority.PodQOSClass == v1.PodQOSBestEffort { - p1usage = p1.PodCPUUsage - p2usage = p2.PodCPUUsage - } else { - p1usage = p1.PodCPUUsage * p1.PodCPUPeriod / p1.PodCPUQuota - p2usage = p2.PodCPUUsage * p2.PodCPUPeriod / p2.PodCPUQuota - } - return utils.CmpFloat(p1usage, p2usage) +// CompareCPUUsage compares the partition cpu usage of cpu limit +func CompareCPUUsage(p1, p2 podinfo.PodContext) int32 { + return utils.CmpFloat(p2.PodCPUUsage, p1.PodCPUUsage) } -// extCpuBeUsed compares pod by using ext resource whether -func extCpuBeUsed(p1, p2 podinfo.PodContext) int32 { - use1 := utils.Bool2Uint(p1.ExtCpuBeUsed) - use2 := utils.Bool2Uint(p2.ExtCpuBeUsed) +// UseElasticCPU compares pod by using ext resource whether +func UseElasticCPU(p1, p2 podinfo.PodContext) int32 { + use1 := utils.Bool2Uint(p1.ElasticCPU != 0) + use2 := utils.Bool2Uint(p2.ElasticCPU != 0) - return int32(use1 - use2) + return int32(use2 - use1) } diff --git a/pkg/ensurance/executor/sort/cpu_usage_sort_test.go b/pkg/ensurance/executor/sort/cpu_usage_sort_test.go new file mode 100644 index 000000000..be113c1bd --- /dev/null +++ b/pkg/ensurance/executor/sort/cpu_usage_sort_test.go @@ -0,0 +1,82 @@ +package sort + +import ( + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + "github.com/gocrane/crane/pkg/ensurance/executor/podinfo" +) + +func TestCpuUsageSorter(t *testing.T) { + now := metav1.NewTime(time.Unix(1000, 0).UTC()) + later := metav1.NewTime(time.Unix(2000, 0).UTC()) + // orderedBy(UseElasticCPU, ComparePodQOSClass, ComparePriority, CompareCPUUsage, CompareElasticCPU, CompareRunningTime).Sort(pods) + pods := []podinfo.PodContext{ + { + Key: types.NamespacedName{Name: "elastic-cpu-2"}, + ElasticCPU: 2, + QOSClass: v1.PodQOSBestEffort, + }, + { + Key: types.NamespacedName{Name: "elastic-cpu-4"}, + ElasticCPU: 4, + QOSClass: v1.PodQOSBestEffort, + }, + { + Key: types.NamespacedName{Name: "cpu-1"}, + PodCPUUsage: 1, + QOSClass: v1.PodQOSGuaranteed, + }, + { + Key: types.NamespacedName{Name: "cpu-2"}, + PodCPUUsage: 2, + QOSClass: v1.PodQOSBurstable, + }, + { + Key: types.NamespacedName{Name: "guarantee-1"}, + PodCPUUsage: 1, + QOSClass: v1.PodQOSGuaranteed, + }, + { + Key: types.NamespacedName{Name: "burstable-2"}, + PodCPUUsage: 1, + QOSClass: v1.PodQOSBurstable, + }, + { + Key: types.NamespacedName{Name: "prioirty-2"}, + Priority: 2, + PodCPUUsage: 1, + QOSClass: v1.PodQOSBurstable, + }, + { + Key: types.NamespacedName{Name: "prioirty-2-2"}, + Priority: 2, + PodCPUUsage: 2, + QOSClass: v1.PodQOSBurstable, + }, + { + Key: types.NamespacedName{Name: "priority-1"}, + Priority: 1, + QOSClass: v1.PodQOSBurstable, + }, + { + Key: types.NamespacedName{Name: "time-1"}, + StartTime: &now, + QOSClass: v1.PodQOSGuaranteed, + }, + { + Key: types.NamespacedName{Name: "time-2"}, + StartTime: &later, + QOSClass: v1.PodQOSGuaranteed, + }, + } + CpuUsageSort(pods) + t.Logf("sorted pods:") + for _, p := range pods { + t.Logf("key %s, useElasticCPU %v, qosClass %s, priority %d, usage %f, elasticCPUUsage %d, startTime %v", p.Key, (p.ElasticCPU != 0), p.QOSClass, p.Priority, p.PodCPUUsage, p.ElasticCPU, p.StartTime) + } +} diff --git a/pkg/ensurance/executor/sort/general_sort.go b/pkg/ensurance/executor/sort/general_sort.go index 1a509ce92..32dd0ae94 100644 --- a/pkg/ensurance/executor/sort/general_sort.go +++ b/pkg/ensurance/executor/sort/general_sort.go @@ -1,7 +1,7 @@ package sort -import podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" +import "github.com/gocrane/crane/pkg/ensurance/executor/podinfo" func GeneralSorter(pods []podinfo.PodContext) { - orderedBy(classAndPriority, runningTime).Sort(pods) + orderedBy(ComparePriority, ComparePodQOSClass, CompareRunningTime).Sort(pods) } diff --git a/pkg/ensurance/executor/sort/mem_metrics_sort.go b/pkg/ensurance/executor/sort/mem_metrics_sort.go index 9fa01e742..9d1ae74eb 100644 --- a/pkg/ensurance/executor/sort/mem_metrics_sort.go +++ b/pkg/ensurance/executor/sort/mem_metrics_sort.go @@ -1,9 +1,9 @@ package sort -import podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" +import "github.com/gocrane/crane/pkg/ensurance/executor/podinfo" // Todo: Memory metrics related sort func need to be filled func MemMetricsSorter(pods []podinfo.PodContext) { - orderedBy(classAndPriority, runningTime).Sort(pods) + orderedBy(ComparePriority, ComparePodQOSClass, CompareRunningTime).Sort(pods) } diff --git a/pkg/ensurance/executor/sort/sort.go b/pkg/ensurance/executor/sort/sort.go index 3c5d1a0a8..a2392aec7 100644 --- a/pkg/ensurance/executor/sort/sort.go +++ b/pkg/ensurance/executor/sort/sort.go @@ -3,22 +3,23 @@ package sort import ( "sort" + v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" - podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/podinfo" ) -// RankFunc sorts the pods type RankFunc func(pods []podinfo.PodContext) var sortFunc = map[string]func(p1, p2 podinfo.PodContext) int32{ - "ExtCpuBeUsed": extCpuBeUsed, - "ClassAndPriority": classAndPriority, - "ExtCpuUsage": extCpuUsage, - "CpuUsage": cpuUsage, - "RunningTime": runningTime, + "UseElasticResource": UseElasticCPU, + "PodQOSClass": ComparePodQOSClass, + "ExtCpuUsage": CompareElasticCPU, + "CpuUsage": CompareCPUUsage, + "RunningTime": CompareRunningTime, } +// RankFuncConstruct is a sample for future extends, keep it even it is not called func RankFuncConstruct(customize []string) RankFunc { if len(customize) == 0 { klog.Fatal("If customize sort func is defined, it can't be empty.") @@ -33,14 +34,14 @@ func RankFuncConstruct(customize []string) RankFunc { rankFunc = orderedBy(cmp...).Sort } } else { - rankFunc = CpuUsageSorter + rankFunc = CpuUsageSort } return rankFunc } -// runningTime compares pods by pod's start time -func runningTime(p1, p2 podinfo.PodContext) int32 { +// CompareRunningTime compares pods by pod's start time +func CompareRunningTime(p1, p2 podinfo.PodContext) int32 { t1 := p1.StartTime t2 := p2.StartTime @@ -56,9 +57,53 @@ func runningTime(p1, p2 podinfo.PodContext) int32 { return -1 } -// classAndPriority compares pods by pod's ClassAndPriority -func classAndPriority(p1, p2 podinfo.PodContext) int32 { - return podinfo.CompareClassAndPriority(p1.ClassAndPriority, p2.ClassAndPriority) +// ComparePodQOSClass compares pods by pod's QOSClass +func ComparePodQOSClass(p1, p2 podinfo.PodContext) int32 { + return ComparePodQosClass(p1.QOSClass, p2.QOSClass) +} + +func ComparePriority(p1, p2 podinfo.PodContext) int32 { + if p1.Priority == p2.Priority { + return 0 + } else if p1.Priority < p2.Priority { + return -1 + } + return 1 +} + +// ComparePodQosClass compares Pod QOSClass +// Guaranteed > Burstable > BestEffort +func ComparePodQosClass(a v1.PodQOSClass, b v1.PodQOSClass) int32 { + switch b { + case v1.PodQOSGuaranteed: + if a == v1.PodQOSGuaranteed { + return 0 + } else { + return -1 + } + case v1.PodQOSBurstable: + if a == v1.PodQOSGuaranteed { + return 1 + } else if a == v1.PodQOSBurstable { + return 0 + } else { + return -1 + } + case v1.PodQOSBestEffort: + if (a == v1.PodQOSGuaranteed) || (a == v1.PodQOSBurstable) { + return 1 + } else if a == v1.PodQOSBestEffort { + return 0 + } else { + return -1 + } + default: + if (a == v1.PodQOSGuaranteed) || (a == v1.PodQOSBurstable) || (a == v1.PodQOSBestEffort) { + return 1 + } else { + return 0 + } + } } // Cmp compares p1 and p2 and returns: diff --git a/pkg/ensurance/executor/throttle.go b/pkg/ensurance/executor/throttle.go index fd55248fd..51b1e993e 100644 --- a/pkg/ensurance/executor/throttle.go +++ b/pkg/ensurance/executor/throttle.go @@ -8,7 +8,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" - podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/podinfo" execsort "github.com/gocrane/crane/pkg/ensurance/executor/sort" "github.com/gocrane/crane/pkg/known" "github.com/gocrane/crane/pkg/metrics" @@ -32,7 +32,7 @@ type ThrottlePods []podinfo.PodContext func (t ThrottlePods) Find(podTypes types.NamespacedName) int { for i, v := range t { - if v.PodKey == podTypes { + if v.Key == podTypes { return i } } @@ -99,7 +99,7 @@ func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error { var released ReleaseResource for _, m := range metricsThrottleQuantified { klog.V(6).Infof("ThrottleDown precisely on metric %s", m) - if metricMap[m].SortAble { + if metricMap[m].Sortable { metricMap[m].SortFunc(t.ThrottleDownPods) } else { execsort.GeneralSorter(t.ThrottleDownPods) @@ -107,14 +107,14 @@ func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error { klog.V(6).Info("After sort, the sequence to throttle is ") for _, pc := range t.ThrottleDownPods { - klog.V(6).Info(pc.PodKey.String(), pc.ContainerCPUUsages) + klog.V(6).Info(pc.Key.String(), pc.ContainerCPUUsages) } for index := 0; !ctx.ThrottoleDownGapToWaterLines.TargetGapsRemoved(m) && index < len(t.ThrottleDownPods); index++ { klog.V(6).Infof("For metric %s, there is still gap to waterlines: %f", m, ctx.ThrottoleDownGapToWaterLines[m]) errKeys, released = metricMap[m].ThrottleFunc(ctx, index, t.ThrottleDownPods, &totalReleased) - klog.V(6).Infof("ThrottleDown pods %s, released %f resource", t.ThrottleDownPods[index].PodKey, released[m]) + klog.V(6).Infof("ThrottleDown pods %s, released %f resource", t.ThrottleDownPods[index].Key, released[m]) errPodKeys = append(errPodKeys, errKeys...) ctx.ThrottoleDownGapToWaterLines[m] -= released[m] @@ -189,7 +189,7 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error { var released ReleaseResource for _, m := range metricsThrottleQuantified { klog.V(6).Infof("ThrottleUp precisely on metric %s", m) - if metricMap[m].SortAble { + if metricMap[m].Sortable { metricMap[m].SortFunc(t.ThrottleUpPods) } else { execsort.GeneralSorter(t.ThrottleUpPods) @@ -198,14 +198,14 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error { klog.V(6).Info("After sort, the sequence to throttle is ") for _, pc := range t.ThrottleUpPods { - klog.V(6).Info(pc.PodKey.String()) + klog.V(6).Info(pc.Key.String()) } for index := 0; !ctx.ThrottoleUpGapToWaterLines.TargetGapsRemoved(m) && index < len(t.ThrottleUpPods); index++ { klog.V(6).Infof("For metric %s, there is still gap to waterlines: %f", m, ctx.ThrottoleUpGapToWaterLines[m]) errKeys, released = metricMap[m].RestoreFunc(ctx, index, t.ThrottleUpPods, &totalReleased) - klog.V(6).Infof("ThrottleUp pods %s, released %f resource", t.ThrottleUpPods[index].PodKey, released[m]) + klog.V(6).Infof("ThrottleUp pods %s, released %f resource", t.ThrottleUpPods[index].Key, released[m]) errPodKeys = append(errPodKeys, errKeys...) ctx.ThrottoleUpGapToWaterLines[m] -= released[m] diff --git a/pkg/ensurance/executor/waterline.go b/pkg/ensurance/executor/watermark.go similarity index 98% rename from pkg/ensurance/executor/waterline.go rename to pkg/ensurance/executor/watermark.go index 56dfe29db..0c7c219d2 100644 --- a/pkg/ensurance/executor/waterline.go +++ b/pkg/ensurance/executor/watermark.go @@ -102,7 +102,7 @@ func (e WaterLines) DivideMetricsByEvictQuantified() (metricsEvictQuantified []W func (e WaterLines) GetHighestPriorityThrottleAbleMetric() (highestPrioriyMetric WaterLineMetric) { highestActionPriority := 0 for m := range e { - if metricMap[m].ThrottleAble == true { + if metricMap[m].Throttleable == true { if metricMap[m].ActionPriority >= highestActionPriority { highestPrioriyMetric = m highestActionPriority = metricMap[m].ActionPriority @@ -112,11 +112,11 @@ func (e WaterLines) GetHighestPriorityThrottleAbleMetric() (highestPrioriyMetric return } -// GetHighestPriorityEvictAbleMetric get the highest priority in metrics that can be EvictAble +// GetHighestPriorityEvictAbleMetric get the highest priority in metrics that can be Evictable func (e WaterLines) GetHighestPriorityEvictAbleMetric() (highestPrioriyMetric WaterLineMetric) { highestActionPriority := 0 for m := range e { - if metricMap[m].EvictAble == true { + if metricMap[m].Evictable == true { if metricMap[m].ActionPriority >= highestActionPriority { highestPrioriyMetric = m highestActionPriority = metricMap[m].ActionPriority diff --git a/pkg/ensurance/executor/waterline_test.go b/pkg/ensurance/executor/watermark_test.go similarity index 100% rename from pkg/ensurance/executor/waterline_test.go rename to pkg/ensurance/executor/watermark_test.go diff --git a/pkg/resource/pod_resource_manger.go b/pkg/resource/pod_resource_manger.go index 03dd07d00..d694b8b85 100644 --- a/pkg/resource/pod_resource_manger.go +++ b/pkg/resource/pod_resource_manger.go @@ -20,7 +20,7 @@ import ( "github.com/gocrane/crane/pkg/ensurance/collector/cadvisor" stypes "github.com/gocrane/crane/pkg/ensurance/collector/types" "github.com/gocrane/crane/pkg/ensurance/executor" - podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info" + podinfo "github.com/gocrane/crane/pkg/ensurance/executor/podinfo" cgrpc "github.com/gocrane/crane/pkg/ensurance/grpc" cruntime "github.com/gocrane/crane/pkg/ensurance/runtime" "github.com/gocrane/crane/pkg/known" diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index 71574d91c..395ff5232 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -213,21 +213,15 @@ func GetContainerIdFromPod(pod *v1.Pod, containerName string) string { return "" } -// Whether pod uses ext resource ext-resource.node.gocrane.io, and value it uses -func ExtResourceAllocated(pod *v1.Pod, resName v1.ResourceName) (hasExtResource bool, extCpuLimit, extCpuRequest int64) { +// GetElasticResourceLimit sum all containers resources limit for gocrane.io/resource +// As extended resource is not over committable resource, so request = limit +func GetElasticResourceLimit(pod *v1.Pod, resName v1.ResourceName) (amount int64) { resPrefix := fmt.Sprintf(ExtResourcePrefixFormat, resName) for i := range pod.Spec.Containers { container := pod.Spec.Containers[i] for res, val := range container.Resources.Limits { if strings.HasPrefix(res.String(), resPrefix) { - hasExtResource = true - extCpuLimit += val.MilliValue() - } - } - for res, val := range container.Resources.Requests { - if strings.HasPrefix(res.String(), resPrefix) { - hasExtResource = true - extCpuRequest += val.MilliValue() + amount += val.MilliValue() } } }