From 3765ac69ac713e26c679f23f8ead7ec3b5327465 Mon Sep 17 00:00:00 2001 From: zhangzujian Date: Tue, 17 Dec 2024 09:37:40 +0000 Subject: [PATCH] use JSON merge patch to update labels/annotations Signed-off-by: zhangzujian --- charts/kube-ovn/templates/ovn-CR.yaml | 1 + cmd/daemon/cniserver.go | 6 +- dist/images/install.sh | 1 + .../kubeovn/v1/zz_generated.deepcopy_test.go | 2 +- pkg/controller/endpoint.go | 9 +- pkg/controller/external_gw.go | 10 +- pkg/controller/init.go | 19 +- pkg/controller/inspection.go | 15 +- pkg/controller/namespace.go | 27 +-- pkg/controller/node.go | 16 +- pkg/controller/pod.go | 118 +++++------ pkg/controller/vpc_nat_gateway.go | 32 +-- pkg/daemon/controller.go | 54 ++--- pkg/daemon/ovs_linux.go | 31 +-- pkg/ovn_ic_controller/controller.go | 2 +- pkg/ovn_ic_controller/ovn_ic_controller.go | 8 +- pkg/ovn_leader_checker/ovn.go | 64 ++---- pkg/ovn_leader_checker/ovn_test.go | 117 ---------- pkg/speaker/controller.go | 2 +- pkg/util/k8s.go | 35 --- pkg/util/k8s_test.go | 200 +----------------- pkg/util/patch.go | 34 +++ pkg/util/patch_test.go | 102 ++++++++- 23 files changed, 293 insertions(+), 612 deletions(-) delete mode 100644 pkg/ovn_leader_checker/ovn_test.go diff --git a/charts/kube-ovn/templates/ovn-CR.yaml b/charts/kube-ovn/templates/ovn-CR.yaml index e8587001104..7162c29600b 100644 --- a/charts/kube-ovn/templates/ovn-CR.yaml +++ b/charts/kube-ovn/templates/ovn-CR.yaml @@ -121,6 +121,7 @@ rules: - get - list - update + - patch - create - delete - watch diff --git a/cmd/daemon/cniserver.go b/cmd/daemon/cniserver.go index 76ec3e5a831..6b732da24cb 100644 --- a/cmd/daemon/cniserver.go +++ b/cmd/daemon/cniserver.go @@ -196,9 +196,9 @@ func initChassisAnno(cfg *daemon.Configuration) error { klog.Error(err) return err } - annotations := map[string]any{util.ChassisAnnotation: chassesName} - if err = util.UpdateNodeAnnotations(cfg.KubeClient.CoreV1().Nodes(), cfg.NodeName, annotations); err != nil { - klog.Errorf("failed to update chassis annotation of node %s: %v", cfg.NodeName, err) + patch := util.KVPatch{util.ChassisAnnotation: chassesName} + if err = util.PatchAnnotations(cfg.KubeClient.CoreV1().Nodes(), cfg.NodeName, patch); err != nil { + klog.Errorf("failed to patch chassis annotation of node %s: %v", cfg.NodeName, err) return err } diff --git a/dist/images/install.sh b/dist/images/install.sh index 99eebc0fbb8..527cf1d9954 100755 --- a/dist/images/install.sh +++ b/dist/images/install.sh @@ -3491,6 +3491,7 @@ rules: - get - list - update + - patch - create - delete - watch diff --git a/pkg/apis/kubeovn/v1/zz_generated.deepcopy_test.go b/pkg/apis/kubeovn/v1/zz_generated.deepcopy_test.go index fd911b60351..5321fc084c3 100644 --- a/pkg/apis/kubeovn/v1/zz_generated.deepcopy_test.go +++ b/pkg/apis/kubeovn/v1/zz_generated.deepcopy_test.go @@ -3,7 +3,7 @@ package v1 import ( "testing" - runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime" "github.com/brianvoe/gofakeit/v7" "github.com/stretchr/testify/require" diff --git a/pkg/controller/endpoint.go b/pkg/controller/endpoint.go index 6a1ee045d09..c9ab816e04d 100644 --- a/pkg/controller/endpoint.go +++ b/pkg/controller/endpoint.go @@ -218,12 +218,9 @@ func (c *Controller) handleUpdateEndpoint(key string) error { } if svcVpc = svc.Annotations[util.VpcAnnotation]; svcVpc != vpcName { - if svc.Annotations == nil { - svc.Annotations = make(map[string]string, 1) - } - svc.Annotations[util.VpcAnnotation] = vpcName - if _, err = c.config.KubeClient.CoreV1().Services(namespace).Update(context.Background(), svc, metav1.UpdateOptions{}); err != nil { - klog.Errorf("failed to update service %s: %v", key, err) + patch := util.KVPatch{util.VpcAnnotation: vpcName} + if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Services(namespace), svc.Name, patch); err != nil { + klog.Errorf("failed to patch service %s: %v", key, err) return err } } diff --git a/pkg/controller/external_gw.go b/pkg/controller/external_gw.go index e09d9cdd2bd..8f26dccd412 100644 --- a/pkg/controller/external_gw.go +++ b/pkg/controller/external_gw.go @@ -83,8 +83,8 @@ func (c *Controller) removeExternalGateway() error { return err } for _, node := range nodes { - labels := map[string]any{util.ExGatewayLabel: "false"} - if err = util.UpdateNodeLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, labels); err != nil { + patch := util.KVPatch{util.ExGatewayLabel: "false"} + if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil { klog.Errorf("failed to patch external gw node %s: %v", node.Name, err) return err } @@ -234,9 +234,9 @@ func (c *Controller) getGatewayChassis(config map[string]string) ([]string, erro klog.Errorf("failed to get gw node %s, %v", gw, err) return nil, err } - labels := map[string]any{util.ExGatewayLabel: "true"} - if err = util.UpdateNodeLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, labels); err != nil { - klog.Errorf("failed to update annotations of node %s: %v", node.Name, err) + patch := util.KVPatch{util.ExGatewayLabel: "true"} + if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil { + klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err) return nil, err } diff --git a/pkg/controller/init.go b/pkg/controller/init.go index 582ee59298f..4a410a116ce 100644 --- a/pkg/controller/init.go +++ b/pkg/controller/init.go @@ -521,16 +521,15 @@ func (c *Controller) initDefaultProviderNetwork() error { excludeAnno := fmt.Sprintf(util.ProviderNetworkExcludeTemplate, c.config.DefaultProviderName) interfaceAnno := fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, c.config.DefaultProviderName) - newNodes := make([]*v1.Node, 0, len(nodes)) + patchNodes := make([]string, 0, len(nodes)) for _, node := range nodes { if len(node.Annotations) == 0 { continue } - var newNode *v1.Node if node.Annotations[excludeAnno] == "true" { pn.Spec.ExcludeNodes = append(pn.Spec.ExcludeNodes, node.Name) - newNode = node.DeepCopy() + patchNodes = append(patchNodes, node.Name) } else if s := node.Annotations[interfaceAnno]; s != "" { var index *int for i := range pn.Spec.CustomInterfaces { @@ -545,12 +544,7 @@ func (c *Controller) initDefaultProviderNetwork() error { ci := kubeovnv1.CustomInterface{Interface: s, Nodes: []string{node.Name}} pn.Spec.CustomInterfaces = append(pn.Spec.CustomInterfaces, ci) } - newNode = node.DeepCopy() - } - if newNode != nil { - delete(newNode.Annotations, excludeAnno) - delete(newNode.Annotations, interfaceAnno) - newNodes = append(newNodes, newNode) + patchNodes = append(patchNodes, node.Name) } } @@ -560,9 +554,10 @@ func (c *Controller) initDefaultProviderNetwork() error { } // update nodes only when provider network has been created successfully - for _, node := range newNodes { - if _, err := c.config.KubeClient.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{}); err != nil { - klog.Errorf("failed to update node %s: %v", node.Name, err) + patch := util.KVPatch{excludeAnno: nil, interfaceAnno: nil} + for _, node := range patchNodes { + if err := util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node, patch); err != nil { + klog.Errorf("failed to patch node %s: %v", node, err) } } }() diff --git a/pkg/controller/inspection.go b/pkg/controller/inspection.go index 20ebba0926a..40fdfcf9662 100644 --- a/pkg/controller/inspection.go +++ b/pkg/controller/inspection.go @@ -1,13 +1,10 @@ package controller import ( - "context" "fmt" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "github.com/kubeovn/kube-ovn/pkg/ovs" @@ -48,15 +45,11 @@ func (c *Controller) inspectPod() error { } if !exists { // pod exists but not lsp - delete(pod.Annotations, fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)) - delete(pod.Annotations, fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName)) - patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod) - if err != nil { - klog.Errorf("failed to generate patch payload, %v", err) - return err + patch := util.KVPatch{ + fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName): nil, + fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName): nil, } - if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, - types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil { + if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil { klog.Errorf("patch pod %s/%s failed %v during inspection", pod.Name, pod.Namespace, err) return err } diff --git a/pkg/controller/namespace.go b/pkg/controller/namespace.go index 1b3e9282019..7711d0edfd3 100644 --- a/pkg/controller/namespace.go +++ b/pkg/controller/namespace.go @@ -1,7 +1,6 @@ package controller import ( - "context" "reflect" "slices" "strings" @@ -11,7 +10,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -197,32 +195,25 @@ func (c *Controller) handleAddNamespace(key string) error { excludeIps = append(excludeIps, strings.Join(subnet.Spec.ExcludeIps, ",")) } - if len(namespace.Annotations) == 0 { - namespace.Annotations = map[string]string{} - } else if namespace.Annotations[util.LogicalSwitchAnnotation] == strings.Join(lss, ",") && + if namespace.Annotations[util.LogicalSwitchAnnotation] == strings.Join(lss, ",") && namespace.Annotations[util.CidrAnnotation] == strings.Join(cidrs, ";") && namespace.Annotations[util.ExcludeIpsAnnotation] == strings.Join(excludeIps, ";") && namespace.Annotations[util.IPPoolAnnotation] == ippool { return nil } - namespace.Annotations[util.LogicalSwitchAnnotation] = strings.Join(lss, ",") - namespace.Annotations[util.CidrAnnotation] = strings.Join(cidrs, ";") - namespace.Annotations[util.ExcludeIpsAnnotation] = strings.Join(excludeIps, ";") - + patch := util.KVPatch{ + util.LogicalSwitchAnnotation: strings.Join(lss, ","), + util.CidrAnnotation: strings.Join(cidrs, ";"), + util.ExcludeIpsAnnotation: strings.Join(excludeIps, ";"), + } if ippool == "" { - delete(namespace.Annotations, util.IPPoolAnnotation) + patch[util.IPPoolAnnotation] = nil } else { - namespace.Annotations[util.IPPoolAnnotation] = ippool + patch[util.IPPoolAnnotation] = ippool } - patch, err := util.GenerateStrategicMergePatchPayload(cachedNs, namespace) - if err != nil { - klog.Error(err) - return err - } - if _, err = c.config.KubeClient.CoreV1().Namespaces().Patch(context.Background(), key, - types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil { + if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Namespaces(), key, patch); err != nil { klog.Errorf("patch namespace %s failed %v", key, err) } return err diff --git a/pkg/controller/node.go b/pkg/controller/node.go index 0a3f55f7192..502c2133c88 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -219,7 +219,7 @@ func (c *Controller) handleAddNode(key string) error { return err } - annotations := map[string]any{ + patch := util.KVPatch{ util.IPAddressAnnotation: ipStr, util.MacAddressAnnotation: mac, util.CidrAnnotation: subnet.Spec.CIDRBlock, @@ -228,7 +228,7 @@ func (c *Controller) handleAddNode(key string) error { util.AllocatedAnnotation: "true", util.PortNameAnnotation: portName, } - if err = util.UpdateNodeAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, annotations); err != nil { + if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil { klog.Errorf("failed to update annotations of node %s: %v", node.Name, err) return err } @@ -329,14 +329,10 @@ func (c *Controller) handleNodeAnnotationsForProviderNetworks(node *v1.Node) err } if len(node.Annotations) != 0 { - newNode := node.DeepCopy() - delete(newNode.Annotations, excludeAnno) - delete(newNode.Annotations, interfaceAnno) - if len(newNode.Annotations) != len(node.Annotations) { - if _, err = c.config.KubeClient.CoreV1().Nodes().Update(context.Background(), newNode, metav1.UpdateOptions{}); err != nil { - klog.Errorf("failed to update node %s: %v", node.Name, err) - return err - } + patch := util.KVPatch{excludeAnno: nil, interfaceAnno: nil} + if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil { + klog.Errorf("failed to patch node %s: %v", node.Name, err) + return err } } diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 542bdfa34d5..cd13e9813ef 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -451,7 +451,7 @@ func (c *Controller) handleAddOrUpdatePod(key string) (err error) { defer func() { _ = c.podKeyMutex.UnlockKey(key) }() klog.Infof("handle add/update pod %s", key) - cachedPod, err := c.podsLister.Pods(namespace).Get(name) + pod, err := c.podsLister.Pods(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { return nil @@ -459,7 +459,6 @@ func (c *Controller) handleAddOrUpdatePod(key string) (err error) { klog.Error(err) return err } - pod := cachedPod.DeepCopy() if err := util.ValidatePodNetwork(pod.Annotations); err != nil { klog.Errorf("validate pod %s/%s failed: %v", namespace, name, err) c.recorder.Eventf(pod, v1.EventTypeWarning, "ValidatePodNetworkFailed", err.Error()) @@ -471,39 +470,34 @@ func (c *Controller) handleAddOrUpdatePod(key string) (err error) { klog.Errorf("failed to get pod nets %v", err) return err } - if len(pod.Annotations) == 0 { - pod.Annotations = map[string]string{} - } // check and do hotnoplug nic - if cachedPod, err = c.syncKubeOvnNet(cachedPod, pod, podNets); err != nil { + if pod, err = c.syncKubeOvnNet(pod, podNets); err != nil { klog.Errorf("failed to sync pod nets %v", err) return err } - if cachedPod == nil { + if pod == nil { // pod has been deleted return nil } - pod = cachedPod.DeepCopy() needAllocatePodNets := needAllocateSubnets(pod, podNets) if len(needAllocatePodNets) != 0 { - if cachedPod, err = c.reconcileAllocateSubnets(cachedPod, pod, needAllocatePodNets); err != nil { + if pod, err = c.reconcileAllocateSubnets(pod, needAllocatePodNets); err != nil { klog.Error(err) return err } - if cachedPod == nil { + if pod == nil { // pod has been deleted return nil } } // check if route subnet is need. - pod = cachedPod.DeepCopy() - return c.reconcileRouteSubnets(cachedPod, pod, needRouteSubnets(pod, podNets)) + return c.reconcileRouteSubnets(pod, needRouteSubnets(pod, podNets)) } // do the same thing as add pod -func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAllocatePodNets []*kubeovnNet) (*v1.Pod, error) { +func (c *Controller) reconcileAllocateSubnets(pod *v1.Pod, needAllocatePodNets []*kubeovnNet) (*v1.Pod, error) { namespace := pod.Namespace name := pod.Name klog.Infof("sync pod %s/%s allocated", namespace, name) @@ -520,6 +514,7 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca vmKey = fmt.Sprintf("%s/%s", namespace, vmName) } // Avoid create lsp for already running pod in ovn-nb when controller restart + patch := util.KVPatch{} for _, podNet := range needAllocatePodNets { // the subnet may changed when alloc static ip from the latter subnet after ns supports multi subnets v4IP, v6IP, mac, subnet, err := c.acquireAddress(pod, podNet) @@ -529,26 +524,26 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca return nil, err } ipStr := util.GetStringIP(v4IP, v6IP) - pod.Annotations[fmt.Sprintf(util.IPAddressAnnotationTemplate, podNet.ProviderName)] = ipStr + patch[fmt.Sprintf(util.IPAddressAnnotationTemplate, podNet.ProviderName)] = ipStr if mac == "" { - delete(pod.Annotations, fmt.Sprintf(util.MacAddressAnnotationTemplate, podNet.ProviderName)) + patch[fmt.Sprintf(util.MacAddressAnnotationTemplate, podNet.ProviderName)] = nil } else { - pod.Annotations[fmt.Sprintf(util.MacAddressAnnotationTemplate, podNet.ProviderName)] = mac + patch[fmt.Sprintf(util.MacAddressAnnotationTemplate, podNet.ProviderName)] = mac } - pod.Annotations[fmt.Sprintf(util.CidrAnnotationTemplate, podNet.ProviderName)] = subnet.Spec.CIDRBlock - pod.Annotations[fmt.Sprintf(util.GatewayAnnotationTemplate, podNet.ProviderName)] = subnet.Spec.Gateway + patch[fmt.Sprintf(util.CidrAnnotationTemplate, podNet.ProviderName)] = subnet.Spec.CIDRBlock + patch[fmt.Sprintf(util.GatewayAnnotationTemplate, podNet.ProviderName)] = subnet.Spec.Gateway if isOvnSubnet(podNet.Subnet) { - pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, podNet.ProviderName)] = subnet.Name + patch[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, podNet.ProviderName)] = subnet.Name if pod.Annotations[fmt.Sprintf(util.PodNicAnnotationTemplate, podNet.ProviderName)] == "" { - pod.Annotations[fmt.Sprintf(util.PodNicAnnotationTemplate, podNet.ProviderName)] = c.config.PodNicType + patch[fmt.Sprintf(util.PodNicAnnotationTemplate, podNet.ProviderName)] = c.config.PodNicType } } else { - delete(pod.Annotations, fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, podNet.ProviderName)) - delete(pod.Annotations, fmt.Sprintf(util.PodNicAnnotationTemplate, podNet.ProviderName)) + patch[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, podNet.ProviderName)] = nil + patch[fmt.Sprintf(util.PodNicAnnotationTemplate, podNet.ProviderName)] = nil } - pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] = "true" + patch[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] = "true" if vmKey != "" { - pod.Annotations[fmt.Sprintf(util.VMAnnotationTemplate, podNet.ProviderName)] = vmName + patch[fmt.Sprintf(util.VMAnnotationTemplate, podNet.ProviderName)] = vmName } if err := util.ValidateNetworkBroadcast(podNet.Subnet.Spec.CIDRBlock, ipStr); err != nil { klog.Errorf("validate pod %s/%s failed: %v", namespace, name, err) @@ -558,7 +553,7 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca if podNet.Type != providerTypeIPAM { if (subnet.Spec.Vlan == "" || subnet.Spec.LogicalGateway || subnet.Spec.U2OInterconnection) && subnet.Spec.Vpc != "" { - pod.Annotations[fmt.Sprintf(util.LogicalRouterAnnotationTemplate, podNet.ProviderName)] = subnet.Spec.Vpc + patch[fmt.Sprintf(util.LogicalRouterAnnotationTemplate, podNet.ProviderName)] = subnet.Spec.Vpc } if subnet.Spec.Vlan != "" { @@ -568,8 +563,8 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca c.recorder.Eventf(pod, v1.EventTypeWarning, "GetVlanInfoFailed", err.Error()) return nil, err } - pod.Annotations[fmt.Sprintf(util.VlanIDAnnotationTemplate, podNet.ProviderName)] = strconv.Itoa(vlan.Spec.ID) - pod.Annotations[fmt.Sprintf(util.ProviderNetworkTemplate, podNet.ProviderName)] = vlan.Spec.Provider + patch[fmt.Sprintf(util.VlanIDAnnotationTemplate, podNet.ProviderName)] = strconv.Itoa(vlan.Spec.ID) + patch[fmt.Sprintf(util.ProviderNetworkTemplate, podNet.ProviderName)] = vlan.Spec.Provider } portSecurity := false @@ -630,14 +625,7 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca return nil, err } } - patch, err := util.GenerateMergePatchPayload(cachedPod, pod) - if err != nil { - klog.Errorf("failed to generate patch for pod %s/%s: %v", name, namespace, err) - return nil, err - } - patchedPod, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(context.Background(), name, - types.MergePatchType, patch, metav1.PatchOptions{}, "") - if err != nil { + if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(namespace), name, patch); err != nil { if k8serrors.IsNotFound(err) { // Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod. // Then we need to recycle the resource again. @@ -646,18 +634,29 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca c.deletePodQueue.AddRateLimited(key) return nil, nil } - klog.Errorf("patch pod %s/%s failed: %v", name, namespace, err) + klog.Errorf("failed to patch pod %s/%s: %v", namespace, name, err) + return nil, err + } + + if pod, err = c.config.KubeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}); err != nil { + if k8serrors.IsNotFound(err) { + key := strings.Join([]string{namespace, name}, "/") + c.deletingPodObjMap.Store(key, pod) + c.deletePodQueue.AddRateLimited(key) + return nil, nil + } + klog.Errorf("failed to get pod %s/%s: %v", namespace, name, err) return nil, err } if vpcGwName, isVpcNatGw := pod.Annotations[util.VpcNatGatewayAnnotation]; isVpcNatGw { c.initVpcNatGatewayQueue.Add(vpcGwName) } - return patchedPod.DeepCopy(), nil + return pod, nil } // do the same thing as update pod -func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodNets []*kubeovnNet) error { +func (c *Controller) reconcileRouteSubnets(pod *v1.Pod, needRoutePodNets []*kubeovnNet) error { // the lb-svc pod has dependencies on Running state, check it when pod state get updated if err := c.checkAndReInitLbSvcPod(pod); err != nil { klog.Errorf("failed to init iptable rules for load-balancer pod %s/%s: %v", pod.Namespace, pod.Name, err) @@ -675,7 +674,7 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN var podIP string var subnet *kubeovnv1.Subnet - + patch := util.KVPatch{} for _, podNet := range needRoutePodNets { // in case update handler overlap the annotation when cache is not in sync if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "" { @@ -860,15 +859,9 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN } } - pod.Annotations[fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName)] = "true" + patch[fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName)] = "true" } - patch, err := util.GenerateMergePatchPayload(cachedPod, pod) - if err != nil { - klog.Errorf("failed to generate patch for pod %s/%s: %v", name, namespace, err) - return err - } - if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(context.Background(), name, - types.MergePatchType, patch, metav1.PatchOptions{}, ""); err != nil { + if err := util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(namespace), name, patch); err != nil { if k8serrors.IsNotFound(err) { // Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod. // Then we need to recycle the resource again. @@ -877,7 +870,7 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN c.deletePodQueue.AddRateLimited(key) return nil } - klog.Errorf("patch pod %s/%s failed %v", name, namespace, err) + klog.Errorf("failed to patch pod %s/%s: %v", namespace, name, err) return err } return nil @@ -1152,7 +1145,7 @@ func (c *Controller) handleUpdatePodSecurity(key string) error { return nil } -func (c *Controller) syncKubeOvnNet(cachedPod, pod *v1.Pod, podNets []*kubeovnNet) (*v1.Pod, error) { +func (c *Controller) syncKubeOvnNet(pod *v1.Pod, podNets []*kubeovnNet) (*v1.Pod, error) { podName := c.getNameByPod(pod) key := fmt.Sprintf("%s/%s", pod.Namespace, podName) targetPortNameList := strset.NewWithSize(len(podNets)) @@ -1205,32 +1198,35 @@ func (c *Controller) syncKubeOvnNet(cachedPod, pod *v1.Pod, podNets []*kubeovnNe } } + patch := util.KVPatch{} for _, providerName := range annotationsNeedToDel { - for annotationKey := range pod.Annotations { - if strings.HasPrefix(annotationKey, providerName) { - delete(pod.Annotations, annotationKey) + for key := range pod.Annotations { + if strings.HasPrefix(key, providerName) { + patch[key] = nil } } } - if len(cachedPod.Annotations) == len(pod.Annotations) { + if len(patch) == 0 { return pod, nil } - patch, err := util.GenerateMergePatchPayload(cachedPod, pod) - if err != nil { - klog.Errorf("failed to generate patch payload for pod '%s', %v", pod.Name, err) + + if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil { + if k8serrors.IsNotFound(err) { + return nil, nil + } + klog.Errorf("failed to clean annotations for pod %s/%s: %v", pod.Namespace, pod.Name, err) return nil, err } - patchedPod, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, - types.MergePatchType, patch, metav1.PatchOptions{}, "") - if err != nil { + + if pod, err = c.config.KubeClient.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil { if k8serrors.IsNotFound(err) { return nil, nil } - klog.Errorf("failed to delete useless annotations for pod %s: %v", pod.Name, err) + klog.Errorf("failed to get pod %s/%s: %v", pod.Namespace, pod.Name, err) return nil, err } - return patchedPod.DeepCopy(), nil + return pod, nil } func isStatefulSetPod(pod *v1.Pod) (bool, string, types.UID) { diff --git a/pkg/controller/vpc_nat_gateway.go b/pkg/controller/vpc_nat_gateway.go index ea83614b7f0..d238e94c709 100644 --- a/pkg/controller/vpc_nat_gateway.go +++ b/pkg/controller/vpc_nat_gateway.go @@ -292,13 +292,12 @@ func (c *Controller) handleInitVpcNatGw(key string) error { // subnet for vpc-nat-gw has been checked when create vpc-nat-gw - oriPod, err := c.getNatGwPod(key) + pod, err := c.getNatGwPod(key) if err != nil { err := fmt.Errorf("failed to get nat gw %s pod: %w", gw.Name, err) klog.Error(err) return err } - pod := oriPod.DeepCopy() if pod.Status.Phase != corev1.PodRunning { time.Sleep(10 * time.Second) @@ -343,15 +342,10 @@ func (c *Controller) handleInitVpcNatGw(key string) error { c.updateVpcSnatQueue.Add(key) c.updateVpcSubnetQueue.Add(key) c.updateVpcEipQueue.Add(key) - pod.Annotations[util.VpcNatGatewayInitAnnotation] = "true" - patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod) - if err != nil { - klog.Error(err) - return err - } - if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, - types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil { - err := fmt.Errorf("patch pod %s/%s failed %w", pod.Name, pod.Namespace, err) + + patch := util.KVPatch{util.VpcNatGatewayInitAnnotation: "true"} + if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil { + err := fmt.Errorf("failed to patch pod %s/%s: %w", pod.Namespace, pod.Name, err) klog.Error(err) return err } @@ -546,13 +540,12 @@ func (c *Controller) handleUpdateNatGwSubnetRoute(natGwKey string) error { defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(natGwKey) }() klog.Infof("handle update subnet route for nat gateway %s", natGwKey) - cachedPod, err := c.getNatGwPod(natGwKey) + pod, err := c.getNatGwPod(natGwKey) if err != nil { err = fmt.Errorf("failed to get nat gw '%s' pod, %w", natGwKey, err) klog.Error(err) return err } - pod := cachedPod.DeepCopy() v4InternalGw, _, err := c.GetGwBySubnet(gw.Spec.Subnet) if err != nil { @@ -631,15 +624,10 @@ func (c *Controller) handleUpdateNatGwSubnetRoute(natGwKey string) error { klog.Errorf("marshal eip annotation failed %v", err) return err } - pod.Annotations[util.VpcCIDRsAnnotation] = string(cidrBytes) - patch, err := util.GenerateStrategicMergePatchPayload(cachedPod, pod) - if err != nil { - klog.Error(err) - return err - } - if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, - types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil { - err = fmt.Errorf("patch pod %s/%s failed %w", pod.Name, pod.Namespace, err) + + patch := util.KVPatch{util.VpcCIDRsAnnotation: string(cidrBytes)} + if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil { + err = fmt.Errorf("failed to patch pod %s/%s: %w", pod.Namespace, pod.Name, err) klog.Error(err) return err } diff --git a/pkg/daemon/controller.go b/pkg/daemon/controller.go index aade017e0da..0d39099ff6e 100644 --- a/pkg/daemon/controller.go +++ b/pkg/daemon/controller.go @@ -13,7 +13,6 @@ import ( v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -81,7 +80,7 @@ func newTypedRateLimitingQueue[T comparable](name string, rateLimiter workqueue. func NewController(config *Configuration, stopCh <-chan struct{}, podInformerFactory, nodeInformerFactory informers.SharedInformerFactory, kubeovnInformerFactory kubeovninformer.SharedInformerFactory) (*Controller, error) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events("")}) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(v1.NamespaceAll)}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName}) providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks() @@ -282,7 +281,7 @@ func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1 } } - labels := map[string]any{ + patch := util.KVPatch{ fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name): nil, fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil, fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name): nil, @@ -309,19 +308,19 @@ func (c *Controller) initProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1 var err error klog.V(3).Infof("ovs init provider network %s", pn.Name) if mtu, err = c.ovsInitProviderNetwork(pn.Name, nic, vlans.List(), pn.Spec.ExchangeLinkName, c.config.MacLearningFallback); err != nil { - delete(labels, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name)) - if err1 := util.UpdateNodeLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, labels); err1 != nil { - klog.Errorf("failed to update annotations of node %s: %v", node.Name, err1) + delete(patch, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name)) + if err1 := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err1 != nil { + klog.Errorf("failed to patch annotations of node %s: %v", node.Name, err1) } c.recordProviderNetworkErr(pn.Name, err.Error()) return err } - labels[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true" - labels[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic - labels[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu) - if err = util.UpdateNodeLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, labels); err != nil { - klog.Errorf("failed to update labels of node %s: %v", node.Name, err) + patch[fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name)] = "true" + patch[fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)] = nic + patch[fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name)] = strconv.Itoa(mtu) + if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil { + klog.Errorf("failed to patch labels of node %s: %v", node.Name, err) return err } c.recordProviderNetworkErr(pn.Name, "") @@ -332,7 +331,7 @@ func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) { var currentPod *v1.Pod var err error if c.localPodName == "" { - pods, err := c.config.KubeClient.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{ + pods, err := c.config.KubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{ LabelSelector: "app=kube-ovn-cni", FieldSelector: fmt.Sprintf("spec.nodeName=%s", c.config.NodeName), }) @@ -359,23 +358,14 @@ func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) { } } - newPod := currentPod.DeepCopy() - if newPod.Annotations == nil { - newPod.Annotations = make(map[string]string) - } - if newPod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg { + patch := util.KVPatch{} + if currentPod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] != errMsg { if errMsg == "" { - delete(newPod.Annotations, fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)) + patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = nil } else { - newPod.Annotations[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg - } - patch, err := util.GenerateStrategicMergePatchPayload(currentPod, newPod) - if err != nil { - klog.Errorf("failed to gen patch payload pod %s: %v", c.localPodName, err) - return + patch[fmt.Sprintf(util.ProviderNetworkErrMessageTemplate, providerNetwork)] = errMsg } - if _, err = c.config.KubeClient.CoreV1().Pods(c.localNamespace).Patch(context.Background(), c.localPodName, - types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil { + if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(c.localNamespace), c.localPodName, patch); err != nil { klog.Errorf("failed to patch pod %s: %v", c.localPodName, err) return } @@ -383,14 +373,14 @@ func (c *Controller) recordProviderNetworkErr(providerNetwork, errMsg string) { } func (c *Controller) cleanProviderNetwork(pn *kubeovnv1.ProviderNetwork, node *v1.Node) error { - labels := map[string]any{ + patch := util.KVPatch{ fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name): nil, fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil, fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name): nil, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name): "true", } - if err := util.UpdateNodeLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, labels); err != nil { - klog.Errorf("failed to update labels of node %s: %v", node.Name, err) + if err := util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil { + klog.Errorf("failed to patch labels of node %s: %v", node.Name, err) return err } @@ -412,14 +402,14 @@ func (c *Controller) handleDeleteProviderNetwork(pn *kubeovnv1.ProviderNetwork) return nil } - labels := map[string]any{ + patch := util.KVPatch{ fmt.Sprintf(util.ProviderNetworkReadyTemplate, pn.Name): nil, fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name): nil, fmt.Sprintf(util.ProviderNetworkMtuTemplate, pn.Name): nil, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name): nil, } - if err = util.UpdateNodeLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, labels); err != nil { - klog.Errorf("failed to update labels of node %s: %v", node.Name, err) + if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil { + klog.Errorf("failed to patch labels of node %s: %v", node.Name, err) return err } diff --git a/pkg/daemon/ovs_linux.go b/pkg/daemon/ovs_linux.go index b6f339c2f1c..cac85de9201 100644 --- a/pkg/daemon/ovs_linux.go +++ b/pkg/daemon/ovs_linux.go @@ -22,7 +22,6 @@ import ( sriovutilfs "github.com/k8snetworkplumbingwg/sriovnet/pkg/utils/filesystem" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" - v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -142,25 +141,13 @@ func (csh cniServerHandler) configureNic(podName, podNamespace, provider, netns, } else { podNameNew = podName } - var pod *v1.Pod - pod, err = csh.Controller.podsLister.Pods(podNamespace).Get(podNameNew) - if err != nil { - klog.Errorf("failed to generate patch for pod %s/%s: %v", podNameNew, podNamespace, err) - return nil, err - } - oriPod := pod.DeepCopy() - pod.Annotations[fmt.Sprintf(util.VfRepresentorNameTemplate, provider)] = hostNicName - pod.Annotations[fmt.Sprintf(util.VfNameTemplate, provider)] = containerNicName - pod.Annotations[fmt.Sprintf(util.PodNicAnnotationTemplate, provider)] = util.SriovNicType - var patch []byte - patch, err = util.GenerateMergePatchPayload(oriPod, pod) - if err != nil { - klog.Errorf("failed to generate patch for pod %s/%s: %v", podNameNew, podNamespace, err) - return nil, err + patch := util.KVPatch{ + fmt.Sprintf(util.VfRepresentorNameTemplate, provider): hostNicName, + fmt.Sprintf(util.VfNameTemplate, provider): containerNicName, + fmt.Sprintf(util.PodNicAnnotationTemplate, provider): util.SriovNicType, } - if _, err = csh.Config.KubeClient.CoreV1().Pods(podNamespace).Patch(context.Background(), podNameNew, - types.MergePatchType, patch, metav1.PatchOptions{}, ""); err != nil { - klog.Errorf("patch pod %s/%s failed: %v", podNameNew, podNamespace, err) + if err = util.PatchAnnotations(csh.Config.KubeClient.CoreV1().Pods(podNamespace), podNameNew, patch); err != nil { + klog.Errorf("failed to patch pod %s/%s: %v", podNamespace, podNameNew, err) return nil, err } } @@ -1051,9 +1038,9 @@ func (c *Controller) patchNodeExternalGwLabel(enabled bool) error { return err } - labels := map[string]any{util.NodeExtGwLabel: strconv.FormatBool(enabled)} - if err = util.UpdateNodeLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, labels); err != nil { - klog.Errorf("failed to update labels of node %s: %v", node.Name, err) + patch := util.KVPatch{util.NodeExtGwLabel: strconv.FormatBool(enabled)} + if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil { + klog.Errorf("failed to patch labels of node %s: %v", node.Name, err) return err } diff --git a/pkg/ovn_ic_controller/controller.go b/pkg/ovn_ic_controller/controller.go index 46cccd309a9..51eb19d9e46 100644 --- a/pkg/ovn_ic_controller/controller.go +++ b/pkg/ovn_ic_controller/controller.go @@ -50,7 +50,7 @@ func NewController(config *Configuration) *Controller { klog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events("")}) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(corev1.NamespaceAll)}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0, diff --git a/pkg/ovn_ic_controller/ovn_ic_controller.go b/pkg/ovn_ic_controller/ovn_ic_controller.go index d83784e7224..eb7aa9aadb2 100644 --- a/pkg/ovn_ic_controller/ovn_ic_controller.go +++ b/pkg/ovn_ic_controller/ovn_ic_controller.go @@ -242,8 +242,8 @@ func (c *Controller) removeInterConnection(azName string) error { return err } for _, node := range nodes { - labels := map[string]any{util.ICGatewayLabel: "false"} - if err = util.UpdateNodeLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, labels); err != nil { + patch := util.KVPatch{util.ICGatewayLabel: "false"} + if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil { klog.Errorf("failed to patch ic gw node %s: %v", node.Name, err) return err } @@ -305,8 +305,8 @@ func (c *Controller) establishInterConnection(config map[string]string) error { klog.Errorf("failed to get gw node %q: %v", gw, err) return err } - labels := map[string]any{util.ICGatewayLabel: "true"} - if err = util.UpdateNodeLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, labels); err != nil { + patch := util.KVPatch{util.ICGatewayLabel: "true"} + if err = util.PatchLabels(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil { klog.Errorf("failed to patch ic gw node %s: %v", node.Name, err) return err } diff --git a/pkg/ovn_leader_checker/ovn.go b/pkg/ovn_leader_checker/ovn.go index feeb6dd989c..ccbacf6c498 100755 --- a/pkg/ovn_leader_checker/ovn.go +++ b/pkg/ovn_leader_checker/ovn.go @@ -8,16 +8,13 @@ import ( "net" "os" "os/exec" - "reflect" "strconv" "strings" "syscall" "time" "github.com/spf13/pflag" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -251,23 +248,6 @@ func stealLock() { } } -func patchPodLabels(cfg *Configuration, cachedPod *corev1.Pod, labels map[string]string) error { - if reflect.DeepEqual(cachedPod.Labels, labels) { - return nil - } - - pod := cachedPod.DeepCopy() - pod.Labels = labels - patch, err := util.GenerateStrategicMergePatchPayload(cachedPod, pod) - if err != nil { - klog.Errorf("failed to generate patch payload, %v", err) - return err - } - _, err = cfg.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, - types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "") - return err -} - func checkNorthdSvcExist(cfg *Configuration, namespace, svcName string) bool { _, err := cfg.KubeClient.CoreV1().Services(namespace).Get(context.Background(), svcName, metav1.GetOptions{}) if err != nil { @@ -315,14 +295,6 @@ func checkNorthdEpAlive(cfg *Configuration, namespace, epName string) bool { return checkNorthdEpAvailable(eps.Subsets[0].Addresses[0].IP) } -func updatePodLabels(labels map[string]string, key string, isLeader bool) { - if isLeader { - labels[key] = "true" - } else { - delete(labels, key) - } -} - func compactOvnDatabase(db string) { command := []string{ "-t", @@ -356,26 +328,15 @@ func doOvnLeaderCheck(cfg *Configuration, podName, podNamespace string) { return } - cachedPod, err := cfg.KubeClient.CoreV1().Pods(podNamespace).Get(context.Background(), podName, metav1.GetOptions{}) - if err != nil { - klog.Errorf("get pod %v namespace %v error %v", podName, podNamespace, err) - return - } - - labels := make(map[string]string, len(cachedPod.Labels)) - for k, v := range cachedPod.Labels { - labels[k] = v - } - if !cfg.ISICDBServer { - nbLeader := isDBLeader("OVN_Northbound", 6641) sbLeader := isDBLeader("OVN_Southbound", 6642) - northdLeader := checkNorthdActive() - updatePodLabels(labels, "ovn-nb-leader", nbLeader) - updatePodLabels(labels, "ovn-sb-leader", sbLeader) - updatePodLabels(labels, "ovn-northd-leader", northdLeader) - if err = patchPodLabels(cfg, cachedPod, labels); err != nil { - klog.Errorf("patch label error %v", err) + patch := util.KVPatch{ + "ovn-nb-leader": strconv.FormatBool(isDBLeader("OVN_Northbound", 6641)), + "ovn-sb-leader": strconv.FormatBool(sbLeader), + "ovn-northd-leader": strconv.FormatBool(checkNorthdActive()), + } + if err := util.PatchLabels(cfg.KubeClient.CoreV1().Pods(podNamespace), podName, patch); err != nil { + klog.Errorf("failed to patch labels for pod %s/%s: %v", podNamespace, podName, err) return } if sbLeader && checkNorthdSvcExist(cfg, podNamespace, "ovn-northd") { @@ -391,11 +352,12 @@ func doOvnLeaderCheck(cfg *Configuration, podName, podNamespace string) { } } else { icNbLeader := isDBLeader("OVN_IC_Northbound", 6645) - icSbLeader := isDBLeader("OVN_IC_Southbound", 6646) - updatePodLabels(labels, "ovn-ic-nb-leader", icNbLeader) - updatePodLabels(labels, "ovn-ic-sb-leader", icSbLeader) - if err = patchPodLabels(cfg, cachedPod, labels); err != nil { - klog.Errorf("patch label error %v", err) + patch := util.KVPatch{ + "ovn-ic-nb-leader": strconv.FormatBool(icNbLeader), + "ovn-ic-sb-leader": strconv.FormatBool(isDBLeader("OVN_IC_Southbound", 6646)), + } + if err := util.PatchLabels(cfg.KubeClient.CoreV1().Pods(podNamespace), podName, patch); err != nil { + klog.Errorf("failed to patch labels for pod %s/%s: %v", podNamespace, podName, err) return } diff --git a/pkg/ovn_leader_checker/ovn_test.go b/pkg/ovn_leader_checker/ovn_test.go deleted file mode 100644 index 75c90c1d583..00000000000 --- a/pkg/ovn_leader_checker/ovn_test.go +++ /dev/null @@ -1,117 +0,0 @@ -package ovn_leader_checker - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" -) - -func mockPod(namespace, name string, labels map[string]string) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - Labels: labels, - }, - Status: v1.PodStatus{ - Phase: v1.PodRunning, - }, - } -} - -func Test_patchPodLabels(t *testing.T) { - t.Parallel() - t.Run("patch new labels", func(t *testing.T) { - t.Parallel() - podName := "ovn-central-123" - podNamespace := "default" - pod := mockPod(podName, podNamespace, map[string]string{ - "app": "nginx", - }) - clientset := fake.NewSimpleClientset(pod) - - cfg := &Configuration{ - KubeClient: clientset, - } - - err := patchPodLabels(cfg, pod, map[string]string{ - "app": "nginx", - "ovn-nb-leader": "true", - "ovn-sb-leader": "true", - "ovn-northd-leader": "true", - }) - - require.NoError(t, err) - - newPod, err := clientset.CoreV1().Pods(podName).Get(context.Background(), podNamespace, metav1.GetOptions{}) - require.NoError(t, err) - require.Equal(t, map[string]string{ - "ovn-nb-leader": "true", - "ovn-sb-leader": "true", - "ovn-northd-leader": "true", - "app": "nginx", - }, newPod.Labels) - }) - - t.Run("delete some labels", func(t *testing.T) { - t.Parallel() - podName := "ovn-central-123" - podNamespace := "default" - pod := mockPod(podName, podNamespace, map[string]string{ - "app": "nginx", - "ovn-nb-leader": "true", - "ovn-sb-leader": "true", - "ovn-northd-leader": "true", - }) - - clientset := fake.NewSimpleClientset(pod) - - cfg := &Configuration{ - KubeClient: clientset, - } - - err := patchPodLabels(cfg, pod, map[string]string{ - "ovn-northd-leader": "true", - "app": "nginx", - }) - - require.NoError(t, err) - - newPod, err := clientset.CoreV1().Pods(podName).Get(context.Background(), podNamespace, metav1.GetOptions{}) - require.NoError(t, err) - require.Equal(t, map[string]string{ - "ovn-northd-leader": "true", - "app": "nginx", - }, newPod.Labels) - }) - - t.Run("pod's labels is empty", func(t *testing.T) { - t.Parallel() - podName := "ovn-central-123" - podNamespace := "default" - pod := mockPod(podName, podNamespace, nil) - - clientset := fake.NewSimpleClientset(pod) - - cfg := &Configuration{ - KubeClient: clientset, - } - - err := patchPodLabels(cfg, pod, map[string]string{ - "ovn-northd-leader": "true", - "app": "nginx", - }) - require.NoError(t, err) - - newPod, err := clientset.CoreV1().Pods(podName).Get(context.Background(), podNamespace, metav1.GetOptions{}) - require.NoError(t, err) - require.Equal(t, map[string]string{ - "ovn-northd-leader": "true", - "app": "nginx", - }, newPod.Labels) - }) -} diff --git a/pkg/speaker/controller.go b/pkg/speaker/controller.go index 1fcbda7e66e..294e124b34e 100644 --- a/pkg/speaker/controller.go +++ b/pkg/speaker/controller.go @@ -51,7 +51,7 @@ func NewController(config *Configuration) *Controller { klog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events("")}) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events(corev1.NamespaceAll)}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0, diff --git a/pkg/util/k8s.go b/pkg/util/k8s.go index f54aac89a50..72b9538c834 100644 --- a/pkg/util/k8s.go +++ b/pkg/util/k8s.go @@ -1,9 +1,7 @@ package util import ( - "context" "crypto/tls" - "encoding/json" "errors" "fmt" "net" @@ -17,8 +15,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" - "k8s.io/apimachinery/pkg/types" - clientv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -148,37 +144,6 @@ func GetTruncatedUID(uid string) string { return uid[len(uid)-12:] } -func UpdateNodeLabels(cs clientv1.NodeInterface, node string, labels map[string]any) error { - buf, err := json.Marshal(labels) - if err != nil { - klog.Errorf("failed to marshal labels: %v", err) - return err - } - patch := fmt.Sprintf(`{"metadata":{"labels":%s}}`, string(buf)) - return nodeMergePatch(cs, node, patch) -} - -func UpdateNodeAnnotations(cs clientv1.NodeInterface, node string, annotations map[string]any) error { - buf, err := json.Marshal(annotations) - if err != nil { - klog.Errorf("failed to marshal annotations: %v", err) - return err - } - patch := fmt.Sprintf(`{"metadata":{"annotations":%s}}`, string(buf)) - return nodeMergePatch(cs, node, patch) -} - -// we do not use GenerateMergePatchPayload/GenerateStrategicMergePatchPayload, -// because we use a `null` value to delete a label/annotation -func nodeMergePatch(cs clientv1.NodeInterface, node, patch string) error { - _, err := cs.Patch(context.Background(), node, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) - if err != nil { - klog.Errorf("failed to patch node %s with json merge patch %q: %v", node, patch, err) - return err - } - return nil -} - func SetOwnerReference(owner, object metav1.Object) error { return controllerutil.SetOwnerReference(owner, object, scheme.Scheme) } diff --git a/pkg/util/k8s_test.go b/pkg/util/k8s_test.go index 5f5ba293155..de3a00c035d 100644 --- a/pkg/util/k8s_test.go +++ b/pkg/util/k8s_test.go @@ -1,7 +1,6 @@ package util import ( - "context" "errors" "fmt" "math/rand/v2" @@ -14,16 +13,15 @@ import ( "github.com/google/uuid" nadv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" - "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/fake" - clientv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/klog/v2" "k8s.io/utils/ptr" + "github.com/stretchr/testify/require" + kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" ) @@ -355,162 +353,6 @@ func TestServiceClusterIPs(t *testing.T) { } } -func TestUpdateNodeLabels(t *testing.T) { - client := fake.NewSimpleClientset() - nodeClient := client.CoreV1().Nodes() - tests := []struct { - name string - cs clientv1.NodeInterface - node string - labels map[string]any - exp error - }{ - { - name: "node_with_labels", - cs: nodeClient, - node: "node1", - labels: map[string]any{ - "key1": "value1", - }, - exp: nil, - }, - { - name: "node_with_nil_labels", - cs: nodeClient, - node: "node2", - labels: map[string]any{}, - exp: nil, - }, - { - name: "node_with_unsupported_type", - cs: nodeClient, - node: "node3", - labels: map[string]any{ - "callback": func() {}, - }, - exp: errors.New("unsupported type"), - }, - } - for _, tt := range tests { - // create a node - node, err := client.CoreV1().Nodes().Create(context.Background(), &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: tt.node, - }, - }, metav1.CreateOptions{}) - require.NoError(t, err) - require.NotNil(t, node) - t.Run(tt.name, func(t *testing.T) { - err := UpdateNodeLabels(tt.cs, tt.node, tt.labels) - if tt.exp == nil { - require.NoError(t, err) - return - } - if errors.Is(err, tt.exp) { - t.Errorf("got %v, want %v", err, tt.exp) - } - }) - } -} - -func TestUpdateNodeAnnotations(t *testing.T) { - client := fake.NewSimpleClientset() - nodeClient := client.CoreV1().Nodes() - tests := []struct { - name string - cs clientv1.NodeInterface - node string - annotations map[string]any - exp error - }{ - { - name: "node_with_annotations", - cs: nodeClient, - node: "node1", - annotations: map[string]any{ - "key1": "value1", - }, - exp: nil, - }, - { - name: "node_with_nil_annotations", - cs: nodeClient, - node: "node2", - annotations: map[string]any{}, - exp: nil, - }, - { - name: "node_with_unsupported_type", - cs: nodeClient, - node: "node3", - annotations: map[string]any{ - "callback": func() {}, - }, - exp: errors.New("unsupported type"), - }, - } - for _, tt := range tests { - // create a node - node, err := client.CoreV1().Nodes().Create(context.Background(), &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: tt.node, - }, - }, metav1.CreateOptions{}) - require.NoError(t, err) - require.NotNil(t, node) - t.Run(tt.name, func(t *testing.T) { - err := UpdateNodeAnnotations(tt.cs, tt.node, tt.annotations) - if tt.exp == nil { - require.NoError(t, err) - return - } - if errors.Is(err, tt.exp) { - t.Errorf("got %v, want %v", err, tt.exp) - } - }) - } -} - -func TestNodeMergePatch(t *testing.T) { - client := fake.NewSimpleClientset() - nodeClient := client.CoreV1().Nodes() - tests := []struct { - name string - cs clientv1.NodeInterface - node string - patch string - exp error - }{ - { - name: "node_with_patch", - cs: nodeClient, - node: "node", - patch: `{"metadata":{"labels":{"key1":"value1"}}}`, - exp: nil, - }, - } - for _, tt := range tests { - // create a node - node, err := client.CoreV1().Nodes().Create(context.Background(), &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: tt.node, - }, - }, metav1.CreateOptions{}) - require.NoError(t, err) - require.NotNil(t, node) - t.Run(tt.name, func(t *testing.T) { - err := nodeMergePatch(tt.cs, tt.node, tt.patch) - if tt.exp == nil { - require.NoError(t, err) - return - } - if errors.Is(err, tt.exp) { - t.Errorf("got %v, want %v", err, tt.exp) - } - }) - } -} - func TestLabelSelectorNotEquals(t *testing.T) { selector, err := LabelSelectorNotEquals("key", "value") require.NoError(t, err) @@ -830,41 +672,3 @@ func TestDeploymentIsReady(t *testing.T) { }) } } - -func Test_nodeMergePatch(t *testing.T) { - tests := []struct { - name string - patch string - wantErr bool - }{ - { - name: "valid_merge_patch", - patch: `{"metadata":{"labels":{"key1":"value1"}}}`, - wantErr: false, - }, - { - name: "invalid_merge_patch", - patch: "invalid_merge_patch", - wantErr: true, - }, - } - - client := fake.NewClientset(&v1.Node{}).CoreV1().Nodes() - _, err := client.Create(context.TODO(), &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node1", - }, - }, metav1.CreateOptions{}) - require.NoError(t, err) - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err = nodeMergePatch(client, "node1", tt.patch) - if tt.wantErr { - require.Error(t, err) - return - } - require.NoError(t, err) - }) - } -} diff --git a/pkg/util/patch.go b/pkg/util/patch.go index d42e97157af..cbe05382f55 100644 --- a/pkg/util/patch.go +++ b/pkg/util/patch.go @@ -1,13 +1,47 @@ package util import ( + "context" + jsonpatch "github.com/evanphx/json-patch/v5" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/klog/v2" ) +type KVPatch map[string]any + +type patchClient[T metav1.Object] interface { + Patch(ctx context.Context, name string, patchType types.PatchType, patch []byte, opt metav1.PatchOptions, subresources ...string) (T, error) +} + +func patchMetaKVs[T metav1.Object](cs patchClient[T], name, field string, patch KVPatch) error { + obj := map[string]map[string]KVPatch{"metadata": {field: patch}} + patchData, err := json.Marshal(obj) + if err != nil { + klog.Errorf("failed to marshal patch %#v for field .metadata.%s: %v", patch, field, err) + return err + } + + _, err = cs.Patch(context.Background(), name, types.MergePatchType, patchData, metav1.PatchOptions{}) + if err != nil { + klog.Errorf("failed to patch resource %s with json merge patch %q: %v", name, string(patchData), err) + return err + } + return nil +} + +func PatchLabels[T metav1.Object](cs patchClient[T], name string, patch KVPatch) error { + return patchMetaKVs(cs, name, "labels", patch) +} + +func PatchAnnotations[T metav1.Object](cs patchClient[T], name string, patch KVPatch) error { + return patchMetaKVs(cs, name, "annotations", patch) +} + func GenerateStrategicMergePatchPayload(original, modified runtime.Object) ([]byte, error) { originalJSON, err := json.Marshal(original) if err != nil { diff --git a/pkg/util/patch_test.go b/pkg/util/patch_test.go index 6c3453e4adc..b78d9f9f1be 100644 --- a/pkg/util/patch_test.go +++ b/pkg/util/patch_test.go @@ -1,17 +1,115 @@ package util import ( + "context" "encoding/json" "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/client-go/kubernetes/fake" + clientv1 "k8s.io/client-go/kubernetes/typed/core/v1" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) +func TestPatchAnnotations(t *testing.T) { + client := fake.NewSimpleClientset() + nodeClient := client.CoreV1().Nodes() + tests := []struct { + name string + cs clientv1.NodeInterface + node string + patch KVPatch + wantErr bool + }{ + { + name: "normal", + cs: nodeClient, + node: "node1", + patch: KVPatch{"key1": "value1"}, + }, + { + name: "nil patch", + cs: nodeClient, + node: "node2", + patch: KVPatch{}, + }, + { + name: "patch with unsupported value type", + cs: nodeClient, + node: "node3", + patch: KVPatch{"callback": func() {}}, + wantErr: true, + }, + } + for _, tt := range tests { + // create a node + node, err := client.CoreV1().Nodes().Create(context.Background(), &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: tt.node, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + require.NotNil(t, node) + t.Run(tt.name, func(t *testing.T) { + if err = PatchAnnotations(tt.cs, tt.node, tt.patch); (err != nil) != tt.wantErr { + t.Errorf("PatchAnnotations() error = %v, wantErr = %v", err, tt.wantErr) + } + }) + } +} + +func TestPatchLabels(t *testing.T) { + client := fake.NewSimpleClientset() + nsClient := client.CoreV1().Namespaces() + tests := []struct { + name string + cs clientv1.NamespaceInterface + namespace string + patch KVPatch + wantErr bool + }{ + { + name: "normal", + cs: nsClient, + namespace: "ns1", + patch: KVPatch{"key1": "value1"}, + }, + { + name: "nil patch", + cs: nsClient, + namespace: "ns2", + patch: nil, + }, + { + name: "patch with unsupported value type", + cs: nsClient, + namespace: "ns3", + patch: KVPatch{"callback": func() {}}, + wantErr: true, + }, + } + for _, tt := range tests { + // create a node + node, err := client.CoreV1().Namespaces().Create(context.Background(), &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: tt.namespace, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + require.NotNil(t, node) + t.Run(tt.name, func(t *testing.T) { + if err = PatchLabels(tt.cs, tt.namespace, tt.patch); (err != nil) != tt.wantErr { + t.Errorf("PatchLabels() error = %v, wantErr = %v", err, tt.wantErr) + } + }) + } +} + func TestGenerateStrategicMergePatchPayload(t *testing.T) { type C chan struct{} type unsupportedType struct {