diff --git a/pkg/kubelet/cadvisor/BUILD b/pkg/kubelet/cadvisor/BUILD new file mode 100644 index 0000000000000..e69b6f8f58e6e --- /dev/null +++ b/pkg/kubelet/cadvisor/BUILD @@ -0,0 +1,103 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = [ + "cadvisor_linux.go", + "cadvisor_unsupported.go", + "cadvisor_windows.go", + "doc.go", + "helpers_linux.go", + "helpers_unsupported.go", + "types.go", + "util.go", + ], + importpath = "k8s.io/kubernetes/pkg/kubelet/cadvisor", + deps = [ + "//pkg/apis/core/v1/helper:go_default_library", + "//pkg/kubelet/types:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/github.com/google/cadvisor/events:go_default_library", + "//vendor/github.com/google/cadvisor/info/v1:go_default_library", + "//vendor/github.com/google/cadvisor/info/v2:go_default_library", + ] + select({ + "@io_bazel_rules_go//go/platform:android": [ + "//vendor/github.com/google/cadvisor/cache/memory:go_default_library", + "//vendor/github.com/google/cadvisor/container:go_default_library", + "//vendor/github.com/google/cadvisor/container/containerd/install:go_default_library", + "//vendor/github.com/google/cadvisor/container/crio/install:go_default_library", + "//vendor/github.com/google/cadvisor/container/docker/install:go_default_library", + "//vendor/github.com/google/cadvisor/container/systemd/install:go_default_library", + "//vendor/github.com/google/cadvisor/fs:go_default_library", + "//vendor/github.com/google/cadvisor/manager:go_default_library", + "//vendor/github.com/google/cadvisor/utils/cloudinfo/aws:go_default_library", + "//vendor/github.com/google/cadvisor/utils/cloudinfo/azure:go_default_library", + "//vendor/github.com/google/cadvisor/utils/cloudinfo/gce:go_default_library", + "//vendor/github.com/google/cadvisor/utils/sysfs:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], + "@io_bazel_rules_go//go/platform:linux": [ + "//vendor/github.com/google/cadvisor/cache/memory:go_default_library", + "//vendor/github.com/google/cadvisor/container:go_default_library", + "//vendor/github.com/google/cadvisor/container/containerd/install:go_default_library", + "//vendor/github.com/google/cadvisor/container/crio/install:go_default_library", + "//vendor/github.com/google/cadvisor/container/docker/install:go_default_library", + "//vendor/github.com/google/cadvisor/container/systemd/install:go_default_library", + "//vendor/github.com/google/cadvisor/fs:go_default_library", + "//vendor/github.com/google/cadvisor/manager:go_default_library", + "//vendor/github.com/google/cadvisor/utils/cloudinfo/aws:go_default_library", + "//vendor/github.com/google/cadvisor/utils/cloudinfo/azure:go_default_library", + "//vendor/github.com/google/cadvisor/utils/cloudinfo/gce:go_default_library", + "//vendor/github.com/google/cadvisor/utils/sysfs:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], + "@io_bazel_rules_go//go/platform:windows": [ + "//pkg/kubelet/winstats:go_default_library", + ], + "//conditions:default": [], + }), +) + +go_test( + name = "go_default_test", + srcs = ["util_test.go"], + embed = [":go_default_library"], + deps = select({ + "@io_bazel_rules_go//go/platform:android": [ + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/github.com/google/cadvisor/container/crio:go_default_library", + "//vendor/github.com/google/cadvisor/info/v1:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + ], + "@io_bazel_rules_go//go/platform:linux": [ + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/github.com/google/cadvisor/info/v1:go_default_library", + ], + "//conditions:default": [], + }), +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/kubelet/cadvisor/testing:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/cadvisor/helpers_linux.go b/pkg/kubelet/cadvisor/helpers_linux.go index 13b0c235b481d..d61d82af56842 100644 --- a/pkg/kubelet/cadvisor/helpers_linux.go +++ b/pkg/kubelet/cadvisor/helpers_linux.go @@ -38,13 +38,6 @@ func (i *imageFsInfoProvider) ImageFsInfoLabel() (string, error) { switch i.runtime { case types.DockerContainerRuntime: return cadvisorfs.LabelDockerImages, nil - case types.RemoteContainerRuntime: - // This is a temporary workaround to get stats for cri-o from cadvisor - // and should be removed. - // Related to https://github.com/kubernetes/kubernetes/issues/51798 - if i.runtimeEndpoint == CrioSocket || i.runtimeEndpoint == "unix://"+CrioSocket { - return cadvisorfs.LabelCrioImages, nil - } } return "", fmt.Errorf("no imagefs label for configured runtime") } diff --git a/pkg/kubelet/cadvisor/util.go b/pkg/kubelet/cadvisor/util.go index 6020abd4dee1a..d53a2cb71b354 100644 --- a/pkg/kubelet/cadvisor/util.go +++ b/pkg/kubelet/cadvisor/util.go @@ -21,20 +21,12 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi2 "github.com/google/cadvisor/info/v2" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) -const ( - // CrioSocket is the path to the CRI-O socket. - // Please keep this in sync with the one in: - // github.com/google/cadvisor/tree/master/container/crio/client.go - CrioSocket = "/var/run/crio/crio.sock" -) - -// CapacityFromMachineInfo returns the capacity of the resources from the machine info. func CapacityFromMachineInfo(info *cadvisorapi.MachineInfo) v1.ResourceList { c := v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity( @@ -74,6 +66,5 @@ func EphemeralStorageCapacityFromFsInfo(info cadvisorapi2.FsInfo) v1.ResourceLis // be removed. Related issue: // https://github.com/kubernetes/kubernetes/issues/51798 func UsingLegacyCadvisorStats(runtime, runtimeEndpoint string) bool { - return (runtime == kubetypes.DockerContainerRuntime && goruntime.GOOS == "linux") || - runtimeEndpoint == CrioSocket || runtimeEndpoint == "unix://"+CrioSocket + return (runtime == kubetypes.DockerContainerRuntime && goruntime.GOOS == "linux") } diff --git a/pkg/kubelet/cadvisor/util_test.go b/pkg/kubelet/cadvisor/util_test.go index 9d0e7895e3412..a5d94c2828102 100644 --- a/pkg/kubelet/cadvisor/util_test.go +++ b/pkg/kubelet/cadvisor/util_test.go @@ -22,9 +22,7 @@ import ( "reflect" "testing" - "github.com/google/cadvisor/container/crio" info "github.com/google/cadvisor/info/v1" - "github.com/stretchr/testify/assert" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" ) @@ -51,7 +49,3 @@ func TestCapacityFromMachineInfoWithHugePagesEnable(t *testing.T) { t.Errorf("when set hugepages true, got resource list %v, want %v", actual, expected) } } - -func TestCrioSocket(t *testing.T) { - assert.EqualValues(t, CrioSocket, crio.CrioSocket, "CrioSocket in this package must equal the one in github.com/google/cadvisor/container/crio/client.go") -} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e4a3b1817bf6d..01204fa514908 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -59,7 +59,7 @@ import ( cloudprovider "k8s.io/cloud-provider" "k8s.io/component-helpers/apimachinery/lease" internalapi "k8s.io/cri-api/pkg/apis" - "k8s.io/klog/v2" + "k8s.io/klog" pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" api "k8s.io/kubernetes/pkg/apis/core" @@ -2318,10 +2318,19 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { // to the pod manager. kl.podManager.UpdatePod(pod) + sidecarsStatus := status.GetSidecarsStatus(pod) + // Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation. if status.NeedToReconcilePodReadiness(pod) { mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) + } else if sidecarsStatus.ContainersWaiting { + // if containers aren't running and the sidecars are all ready trigger a sync so that the containers get started + if sidecarsStatus.SidecarsPresent && sidecarsStatus.SidecarsReady { + klog.Infof("Pod: %s: sidecars: sidecars are ready, dispatching work", format.Pod(pod)) + mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) + kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) + } } // After an evicted pod is synced, all dead containers in the pod can be removed. diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 3cf0c4a7c9151..7947f9481ec70 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -39,7 +39,7 @@ import ( grpcstatus "google.golang.org/grpc/status" "github.com/armon/circbuf" - "k8s.io/klog/v2" + "k8s.io/klog" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -284,9 +284,9 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String()) m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg) - if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil { - klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod), - "podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String()) + if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, 0); err != nil { + klog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v", + container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err) } return msg, ErrPostStartHook } @@ -631,6 +631,12 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(containerID l := getContainerInfoFromLabels(s.Labels) a := getContainerInfoFromAnnotations(s.Annotations) + + annotations := make(map[string]string) + if a.Sidecar { + annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", l.ContainerName)] = "Sidecar" + } + // Notice that the followings are not full spec. The container killing code should not use // un-restored fields. pod = &v1.Pod{ @@ -639,6 +645,7 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(containerID Name: l.PodName, Namespace: l.PodNamespace, DeletionGracePeriodSeconds: a.PodDeletionGracePeriod, + Annotations: annotations, }, Spec: v1.PodSpec{ TerminationGracePeriodSeconds: a.PodTerminationGracePeriod, @@ -660,12 +667,19 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(containerID // killContainer kills a container through the following steps: // * Run the pre-stop lifecycle hooks (if applicable). // * Stop the container. -func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodOverride *int64) error { +func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodDuration time.Duration) error { var containerSpec *v1.Container if pod != nil { if containerSpec = kubecontainer.GetContainerSpec(pod, containerName); containerSpec == nil { - return fmt.Errorf("failed to get containerSpec %q (id=%q) in pod %q when killing container for reason %q", - containerName, containerID.String(), format.Pod(pod), message) + // after a kubelet restart, it's not 100% certain that the + // pod we're given has the container we need in the spec + // -- we try to recover that here. + restoredPod, restoredContainer, err := m.restoreSpecsFromContainerLabels(containerID) + if err != nil { + return fmt.Errorf("failed to get containerSpec %q(id=%q) in pod %q when killing container for reason %q. error: %v", + containerName, containerID.String(), format.Pod(pod), message, err) + } + pod, containerSpec = restoredPod, restoredContainer } } else { // Restore necessary information if one of the specs is nil. @@ -714,10 +728,9 @@ func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubec if gracePeriod < minimumGracePeriodInSeconds { gracePeriod = minimumGracePeriodInSeconds } - if gracePeriodOverride != nil { - gracePeriod = *gracePeriodOverride - klog.V(3).InfoS("Killing container with a grace period override", "pod", klog.KObj(pod), "podUID", pod.UID, - "containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod) + if gracePeriodDuration > 0 { + gracePeriod = int64(gracePeriodDuration.Seconds()) + klog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod) } klog.V(2).InfoS("Killing container with a grace period", "pod", klog.KObj(pod), "podUID", pod.UID, @@ -736,18 +749,54 @@ func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubec } // killContainersWithSyncResult kills all pod's containers with sync results. -func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) { +func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodDuration time.Duration) (syncResults []*kubecontainer.SyncResult) { + // split out sidecars and non-sidecars + var ( + sidecars []*kubecontainer.Container + nonSidecars []*kubecontainer.Container + ) + for _, container := range runningPod.Containers { + if isSidecar(pod, container.Name) { + sidecars = append(sidecars, container) + } else { + nonSidecars = append(nonSidecars, container) + } + } + containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers)) - wg := sync.WaitGroup{} + // non-sidecars first + start := time.Now() + klog.Infof("Pod: %s, killContainersWithSyncResult: killing %d non-sidecars, %s termination period", runningPod.Name, len(nonSidecars), gracePeriodDuration) + nonSidecarsWg := sync.WaitGroup{} + nonSidecarsWg.Add(len(nonSidecars)) + for _, container := range nonSidecars { + go func(container *kubecontainer.Container) { + defer utilruntime.HandleCrash() + defer nonSidecarsWg.Done() + killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name) + if err := m.killContainer(pod, container.ID, container.Name, "Need to kill Pod", gracePeriodDuration); err != nil { + killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) + } + containerResults <- killContainerResult + }(container) + } + nonSidecarsWg.Wait() - wg.Add(len(runningPod.Containers)) - for _, container := range runningPod.Containers { + gracePeriodDuration = gracePeriodDuration - time.Since(start) + if gracePeriodDuration < 0 { + gracePeriodDuration = 0 + } + + // then sidecars + klog.Infof("Pod: %s, killContainersWithSyncResult: killing %d sidecars, %s left", runningPod.Name, len(sidecars), gracePeriodDuration) + wg := sync.WaitGroup{} + wg.Add(len(sidecars)) + for _, container := range sidecars { go func(container *kubecontainer.Container) { defer utilruntime.HandleCrash() defer wg.Done() - killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name) - if err := m.killContainer(pod, container.ID, container.Name, "", reasonUnknown, gracePeriodOverride); err != nil { + if err := m.killContainer(pod, container.ID, container.Name, "Need to kill Pod", reasonUnknown, gracePeriodDuration); err != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) // Use runningPod for logging as the pod passed in could be *nil*. klog.ErrorS(err, "Kill container failed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID, @@ -757,6 +806,7 @@ func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, ru }(container) } wg.Wait() + close(containerResults) for containerResult := range containerResults { diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container_test.go b/pkg/kubelet/kuberuntime/kuberuntime_container_test.go index 1a1ae2b65e4a0..407fdbecbd367 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -122,7 +122,7 @@ func TestKillContainer(t *testing.T) { } for _, test := range tests { - err := m.killContainer(test.pod, test.containerID, test.containerName, test.reason, "", &test.gracePeriodOverride) + err := m.killContainer(test.pod, test.containerID, test.containerName, test.reason, "", time.Duration(test.gracePeriodOverride)*time.Second) if test.succeed != (err == nil) { t.Errorf("%s: expected %v, got %v (%v)", test.caseName, test.succeed, (err == nil), err) } @@ -292,7 +292,7 @@ func TestLifeCycleHook(t *testing.T) { // Configured and works as expected t.Run("PreStop-CMDExec", func(t *testing.T) { testPod.Spec.Containers[0].Lifecycle = cmdLifeCycle - m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod) + m.killContainer(testPod, cID, "foo", "testKill", "", time.Duration(gracePeriod)*time.Second) if fakeRunner.Cmd[0] != cmdLifeCycle.PreStop.Exec.Command[0] { t.Errorf("CMD Prestop hook was not invoked") } @@ -302,7 +302,7 @@ func TestLifeCycleHook(t *testing.T) { t.Run("PreStop-HTTPGet", func(t *testing.T) { defer func() { fakeHTTP.url = "" }() testPod.Spec.Containers[0].Lifecycle = httpLifeCycle - m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriod) + m.killContainer(testPod, cID, "foo", "testKill", "", time.Duration(gracePeriod)*time.Second) if !strings.Contains(fakeHTTP.url, httpLifeCycle.PreStop.HTTPGet.Host) { t.Errorf("HTTP Prestop hook was not invoked") @@ -316,7 +316,7 @@ func TestLifeCycleHook(t *testing.T) { testPod.DeletionGracePeriodSeconds = &gracePeriodLocal testPod.Spec.TerminationGracePeriodSeconds = &gracePeriodLocal - m.killContainer(testPod, cID, "foo", "testKill", "", &gracePeriodLocal) + m.killContainer(testPod, cID, "foo", "testKill", "", time.Duration(gracePeriodLocal)*time.Second) if strings.Contains(fakeHTTP.url, httpLifeCycle.PreStop.HTTPGet.Host) { t.Errorf("HTTP Should not execute when gracePeriod is 0") diff --git a/pkg/kubelet/kuberuntime/kuberuntime_gc.go b/pkg/kubelet/kuberuntime/kuberuntime_gc.go index 1d522a2faa260..a259b80997f28 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_gc.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_gc.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -137,8 +137,8 @@ func (cgc *containerGC) removeOldestN(containers []containerGCInfo, toRemove int ID: containers[i].id, } message := "Container is in unknown state, try killing it before removal" - if err := cgc.manager.killContainer(nil, id, containers[i].name, message, reasonUnknown, nil); err != nil { - klog.ErrorS(err, "Failed to stop container", "containerID", containers[i].id) + if err := cgc.manager.killContainer(nil, id, containers[i].name, message, reasonUnknown, 0); err != nil { + klog.Errorf("Failed to stop container %q: %v", containers[i].id, err) continue } } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 0dcb31fec5da0..062a42e61f7d5 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -25,7 +25,7 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" crierror "k8s.io/cri-api/pkg/errors" - "k8s.io/klog/v2" + "k8s.io/klog" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -52,6 +52,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/runtimeclass" + "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/cache" "k8s.io/kubernetes/pkg/kubelet/util/format" @@ -536,6 +537,14 @@ func containerSucceeded(c *v1.Container, podStatus *kubecontainer.PodStatus) boo return cStatus.ExitCode == 0 } +func isSidecar(pod *v1.Pod, containerName string) bool { + if pod == nil { + klog.V(5).Infof("isSidecar: pod is nil, so returning false") + return false + } + return pod.Annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", containerName)] == "Sidecar" +} + // computePodActions checks whether the pod spec has changed and returns the changes if true. func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions { klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod)) @@ -550,6 +559,17 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo), } + var sidecarNames []string + for _, container := range pod.Spec.Containers { + if isSidecar(pod, container.Name) { + sidecarNames = append(sidecarNames, container.Name) + } + } + + // determine sidecar status + sidecarStatus := status.GetSidecarsStatus(pod) + klog.Infof("Pod: %s, sidecars: %s, status: Present=%v,Ready=%v,ContainersWaiting=%v", format.Pod(pod), sidecarNames, sidecarStatus.SidecarsPresent, sidecarStatus.SidecarsReady, sidecarStatus.ContainersWaiting) + // If we need to (re-)create the pod sandbox, everything will need to be // killed and recreated, and init containers should be purged. if createPodSandbox { @@ -588,7 +608,22 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku changes.NextInitContainerToStart = &pod.Spec.InitContainers[0] return changes } - changes.ContainersToStart = containersToStart + if len(sidecarNames) > 0 { + for idx, c := range pod.Spec.Containers { + if isSidecar(pod, c.Name) { + changes.ContainersToStart = append(changes.ContainersToStart, idx) + } + } + return changes + } + // Start all containers by default but exclude the ones that + // succeeded if RestartPolicy is OnFailure + for idx, c := range pod.Spec.Containers { + if containerSucceeded(&c, podStatus) && pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure { + continue + } + changes.ContainersToStart = append(changes.ContainersToStart, idx) + } return changes } @@ -634,6 +669,21 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku keepCount := 0 // check the status of containers. for idx, container := range pod.Spec.Containers { + // this works because in other cases, if it was a sidecar, we + // are always allowed to handle the container. + // + // if it is a non-sidecar, and there are no sidecars, then + // we're are also always allowed to restart the container. + // + // if there are sidecars, then we can only restart non-sidecars under + // the following conditions: + // - the non-sidecars have run before (i.e. they are not in a Waiting state) OR + // - the sidecars are ready (we're starting them for the first time) + if !isSidecar(pod, container.Name) && sidecarStatus.SidecarsPresent && sidecarStatus.ContainersWaiting && !sidecarStatus.SidecarsReady { + klog.Infof("Pod: %s, Container: %s, sidecar=%v skipped: Present=%v,Ready=%v,ContainerWaiting=%v", format.Pod(pod), container.Name, isSidecar(pod, container.Name), sidecarStatus.SidecarsPresent, sidecarStatus.SidecarsReady, sidecarStatus.ContainersWaiting) + continue + } + containerStatus := podStatus.FindContainerStatusByName(container.Name) // Call internal container post-stop lifecycle hook for any non-running container so that any @@ -650,7 +700,8 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku // need to restart it. if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning { if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) { - klog.V(3).InfoS("Container of pod is not in the desired state and shall be started", "containerName", container.Name, "pod", klog.KObj(pod)) + message := fmt.Sprintf("%s: Container %+v is dead, but RestartPolicy says that we should restart it.", pod.Name, container) + klog.V(3).Infof(message) changes.ContainersToStart = append(changes.ContainersToStart, idx) if containerStatus != nil && containerStatus.State == kubecontainer.ContainerStateUnknown { // If container is in unknown state, we don't know whether it @@ -708,6 +759,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku } if keepCount == 0 && len(changes.ContainersToStart) == 0 { + klog.Infof("Pod: %s: KillPod=true", format.Pod(pod)) changes.KillPod = true } @@ -763,7 +815,7 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontaine klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod)) killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name) result.AddSyncResult(killContainerResult) - if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil { + if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, 0); err != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod)) return @@ -971,6 +1023,35 @@ func (m *kubeGenericRuntimeManager) doBackOff(pod *v1.Pod, container *v1.Contain // only hard kill paths are allowed to specify a gracePeriodOverride in the kubelet in order to not corrupt user data. // it is useful when doing SIGKILL for hard eviction scenarios, or max grace period during soft eviction scenarios. func (m *kubeGenericRuntimeManager) KillPod(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error { + // if the pod is nil, we need to recover it, so we can get the + // grace period and also the sidecar status. + if pod == nil { + for _, container := range runningPod.Containers { + klog.Infof("Pod: %s, KillPod: pod nil, trying to restore from container %s", runningPod.Name, container.ID) + podSpec, _, err := m.restoreSpecsFromContainerLabels(container.ID) + if err != nil { + klog.Errorf("Pod: %s, KillPod: couldn't restore: %s", runningPod.Name, container.ID) + continue + } + pod = podSpec + break + } + } + + if gracePeriodOverride == nil && pod != nil { + switch { + case pod.DeletionGracePeriodSeconds != nil: + gracePeriodOverride = pod.DeletionGracePeriodSeconds + case pod.Spec.TerminationGracePeriodSeconds != nil: + gracePeriodOverride = pod.Spec.TerminationGracePeriodSeconds + } + } + + if gracePeriodOverride == nil || *gracePeriodOverride < minimumGracePeriodInSeconds { + min := int64(minimumGracePeriodInSeconds) + gracePeriodOverride = &min + } + err := m.killPodWithSyncResult(pod, runningPod, gracePeriodOverride) return err.Error() } @@ -978,7 +1059,12 @@ func (m *kubeGenericRuntimeManager) KillPod(pod *v1.Pod, runningPod kubecontaine // killPodWithSyncResult kills a runningPod and returns SyncResult. // Note: The pod passed in could be *nil* when kubelet restarted. func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) { - killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride) + gracePeriodDuration := 0 * time.Second + if gracePeriodOverride != nil { + gracePeriodDuration = time.Duration(*gracePeriodOverride) * time.Second + } + + killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodDuration) for _, containerResult := range killContainerResults { result.AddSyncResult(containerResult) } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go index 38ba3001c7fa6..70cadae62dd5a 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -17,6 +17,7 @@ limitations under the License. package kuberuntime import ( + "fmt" "path/filepath" "reflect" "sort" @@ -1313,6 +1314,13 @@ func makeBasePodAndStatusWithInitContainers() (*v1.Pod, *kubecontainer.PodStatus return pod, status } +func makeBasePodAndStatusWithSidecar() (*v1.Pod, *kubecontainer.PodStatus) { + pod, status := makeBasePodAndStatus() + pod.Annotations = map[string]string{fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", pod.Spec.Containers[1].Name): "Sidecar"} + status.ContainerStatuses[1].Hash = kubecontainer.HashContainer(&pod.Spec.Containers[1]) + return pod, status +} + func TestComputePodActionsWithInitAndEphemeralContainers(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EphemeralContainers, true)() // Make sure existing test cases pass with feature enabled @@ -1494,6 +1502,181 @@ func TestSyncPodWithSandboxAndDeletedPod(t *testing.T) { assert.NoError(t, result.Error()) } +func TestComputePodActionsWithSidecar(t *testing.T) { + _, _, m, err := createTestRuntimeManager() + require.NoError(t, err) + + // Createing a pair reference pod and status for the test cases to refer + // the specific fields. + basePod, baseStatus := makeBasePodAndStatusWithSidecar() + for desc, test := range map[string]struct { + mutatePodFn func(*v1.Pod) + mutateStatusFn func(*kubecontainer.PodStatus) + actions podActions + }{ + "Start sidecar containers before non-sidecars when creating a new pod": { + mutateStatusFn: func(status *kubecontainer.PodStatus) { + // No container or sandbox exists. + status.SandboxStatuses = []*runtimeapi.PodSandboxStatus{} + status.ContainerStatuses = []*kubecontainer.ContainerStatus{} + }, + actions: podActions{ + KillPod: true, + CreateSandbox: true, + Attempt: uint32(0), + ContainersToStart: []int{1}, + ContainersToKill: getKillMap(basePod, baseStatus, []int{}), + }, + }, + "Don't start non-sidecars until sidecars are ready": { + mutatePodFn: func(pod *v1.Pod) { + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "foo1", + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{}, + }, + }, + { + Name: "foo2", + Ready: false, + }, + { + Name: "foo3", + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{}, + }, + }, + } + }, + mutateStatusFn: func(status *kubecontainer.PodStatus) { + for i := range status.ContainerStatuses { + if i == 1 { + continue + } + status.ContainerStatuses[i].State = "" + } + }, + actions: podActions{ + SandboxID: baseStatus.SandboxStatuses[0].Id, + ContainersToStart: []int{}, + ContainersToKill: getKillMap(basePod, baseStatus, []int{}), + }, + }, + "Start non-sidecars when sidecars are ready": { + mutatePodFn: func(pod *v1.Pod) { + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "foo1", + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{}, + }, + }, + { + Name: "foo2", + Ready: true, + }, + { + Name: "foo3", + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{}, + }, + }, + } + }, + mutateStatusFn: func(status *kubecontainer.PodStatus) { + for i := range status.ContainerStatuses { + if i == 1 { + continue + } + status.ContainerStatuses[i].State = "" + } + }, + actions: podActions{ + SandboxID: baseStatus.SandboxStatuses[0].Id, + ContainersToStart: []int{0, 2}, + ContainersToKill: getKillMap(basePod, baseStatus, []int{}), + }, + }, + "Restart only sidecars while non-sidecars are waiting": { + mutatePodFn: func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyAlways + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "foo1", + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{}, + }, + }, + { + Name: "foo2", + Ready: false, + }, + { + Name: "foo3", + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{}, + }, + }, + } + }, + mutateStatusFn: func(status *kubecontainer.PodStatus) { + for i := range status.ContainerStatuses { + if i == 1 { + status.ContainerStatuses[i].State = kubecontainer.ContainerStateExited + } + status.ContainerStatuses[i].State = "" + } + }, + actions: podActions{ + SandboxID: baseStatus.SandboxStatuses[0].Id, + ContainersToStart: []int{1}, + ContainersToKill: getKillMap(basePod, baseStatus, []int{}), + }, + }, + "Restart running non-sidecars despite sidecar becoming not ready ": { + mutatePodFn: func(pod *v1.Pod) { + pod.Spec.RestartPolicy = v1.RestartPolicyAlways + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + Name: "foo1", + }, + { + Name: "foo2", + Ready: false, + }, + { + Name: "foo3", + }, + } + }, + mutateStatusFn: func(status *kubecontainer.PodStatus) { + for i := range status.ContainerStatuses { + if i == 1 { + continue + } + status.ContainerStatuses[i].State = kubecontainer.ContainerStateExited + } + }, + actions: podActions{ + SandboxID: baseStatus.SandboxStatuses[0].Id, + ContainersToStart: []int{0, 2}, + ContainersToKill: getKillMap(basePod, baseStatus, []int{}), + }, + }, + } { + pod, status := makeBasePodAndStatusWithSidecar() + if test.mutatePodFn != nil { + test.mutatePodFn(pod) + } + if test.mutateStatusFn != nil { + test.mutateStatusFn(status) + } + actions := m.computePodActions(pod, status) + verifyActions(t, &test.actions, &actions, desc) + } +} + func makeBasePodAndStatusWithInitAndEphemeralContainers() (*v1.Pod, *kubecontainer.PodStatus) { pod, status := makeBasePodAndStatus() pod.Spec.InitContainers = []v1.Container{ diff --git a/pkg/kubelet/kuberuntime/labels.go b/pkg/kubelet/kuberuntime/labels.go index 4ee13e7337d7d..8c8a57c5577a7 100644 --- a/pkg/kubelet/kuberuntime/labels.go +++ b/pkg/kubelet/kuberuntime/labels.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -48,6 +48,7 @@ const ( // Currently, ContainerD on Windows does not yet fully support HostProcess containers // but will pass annotations to hcsshim which does have support. windowsHostProcessContainer = "microsoft.com/hostprocess-container" + containerSidecarLabel = "com.lyft.sidecars.container-lifecycle" ) type labeledPodSandboxInfo struct { @@ -73,6 +74,7 @@ type labeledContainerInfo struct { type annotatedContainerInfo struct { Hash uint64 RestartCount int + Sidecar bool PodDeletionGracePeriod *int64 PodTerminationGracePeriod *int64 TerminationMessagePath string @@ -143,6 +145,11 @@ func newContainerAnnotations(container *v1.Container, pod *v1.Pod, restartCount annotations[containerTerminationMessagePathLabel] = container.TerminationMessagePath annotations[containerTerminationMessagePolicyLabel] = string(container.TerminationMessagePolicy) + annotations[containerSidecarLabel] = "Default" + if isSidecar(pod, container.Name) { + annotations[containerSidecarLabel] = "Sidecar" + } + if pod.DeletionGracePeriodSeconds != nil { annotations[podDeletionGracePeriodLabel] = strconv.FormatInt(*pod.DeletionGracePeriodSeconds, 10) } @@ -237,6 +244,9 @@ func getContainerInfoFromAnnotations(annotations map[string]string) *annotatedCo if containerInfo.PodTerminationGracePeriod, err = getInt64PointerFromLabel(annotations, podTerminationGracePeriodLabel); err != nil { klog.ErrorS(err, "Unable to get label value from annotations", "label", podTerminationGracePeriodLabel, "annotations", annotations) } + if getStringValueFromLabel(annotations, containerSidecarLabel) == "Sidecar" { + containerInfo.Sidecar = true + } preStopHandler := &v1.Handler{} if found, err := getJSONObjectFromLabel(annotations, containerPreStopHandlerLabel, preStopHandler); err != nil { diff --git a/pkg/kubelet/stats/cri_stats_provider.go b/pkg/kubelet/stats/cri_stats_provider.go index 9c959c3c26d63..d87da51e60175 100644 --- a/pkg/kubelet/stats/cri_stats_provider.go +++ b/pkg/kubelet/stats/cri_stats_provider.go @@ -25,7 +25,6 @@ import ( "sync" "time" - cadvisorfs "github.com/google/cadvisor/fs" cadvisorapiv2 "github.com/google/cadvisor/info/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -365,12 +364,13 @@ func (p *criStatsProvider) getFsInfo(fsID *runtimeapi.FilesystemIdentifier) *cad mountpoint := fsID.GetMountpoint() fsInfo, err := p.cadvisor.GetDirFsInfo(mountpoint) if err != nil { - msg := "Failed to get the info of the filesystem with mountpoint" - if err == cadvisorfs.ErrNoSuchDevice { - klog.V(2).InfoS(msg, "mountpoint", mountpoint, "err", err) - } else { - klog.ErrorS(err, msg, "mountpoint", mountpoint) - } + // comment out per upstream bug https://github.com/kubernetes/kubernetes/issues/94825 + // msg := fmt.Sprintf("Failed to get the info of the filesystem with mountpoint %q: %v.", mountpoint, err) + // if err == cadvisorfs.ErrNoSuchDevice { + // klog.V(2).Info(msg) + // } else { + // klog.Error(msg) + // } return nil } return &fsInfo diff --git a/pkg/kubelet/stats/cri_stats_provider_test.go b/pkg/kubelet/stats/cri_stats_provider_test.go index 3320c8ab3fa0e..c1eb98f959ca5 100644 --- a/pkg/kubelet/stats/cri_stats_provider_test.go +++ b/pkg/kubelet/stats/cri_stats_provider_test.go @@ -905,6 +905,7 @@ func makeFakeLogStats(seed int) *volume.Metrics { func TestGetContainerUsageNanoCores(t *testing.T) { var value0 uint64 var value1 uint64 = 10000000000 + var value2 uint64 = 188427786383 // Test with a large container of 100+ CPUs var value2 uint64 = 188427786383 diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index c9a1c5cf6fe82..5b59d9453e838 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -885,3 +885,45 @@ func NeedToReconcilePodReadiness(pod *v1.Pod) bool { } return false } + +// SidecarsStatus contains three bools, whether the pod has sidecars, +// if the all the sidecars are ready and if the non sidecars are in a +// waiting state. +type SidecarsStatus struct { + SidecarsPresent bool + SidecarsReady bool + ContainersWaiting bool +} + +// GetSidecarsStatus returns the SidecarsStatus for the given pod +func GetSidecarsStatus(pod *v1.Pod) SidecarsStatus { + if pod == nil { + klog.Infof("Pod was nil, returning empty sidecar status") + return SidecarsStatus{} + } + if pod.Spec.Containers == nil || pod.Status.ContainerStatuses == nil { + klog.Infof("Pod Containers or Container status was nil, returning empty sidecar status") + return SidecarsStatus{} + } + sidecarsStatus := SidecarsStatus{SidecarsPresent: false, SidecarsReady: true, ContainersWaiting: false} + for _, container := range pod.Spec.Containers { + for _, status := range pod.Status.ContainerStatuses { + if status.Name == container.Name { + if pod.Annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", container.Name)] == "Sidecar" { + sidecarsStatus.SidecarsPresent = true + if !status.Ready { + klog.Infof("Pod %s: %s: sidecar not ready", format.Pod(pod), container.Name) + sidecarsStatus.SidecarsReady = false + } else { + klog.Infof("Pod %s: %s: sidecar is ready", format.Pod(pod), container.Name) + } + } else if status.State.Waiting != nil { + // check if non-sidecars have started + klog.Infof("Pod: %s: %s: non-sidecar waiting", format.Pod(pod), container.Name) + sidecarsStatus.ContainersWaiting = true + } + } + } + } + return sidecarsStatus +}