From d8657548c3a28ff01f714df4599712f6309cf761 Mon Sep 17 00:00:00 2001 From: zwwhdls Date: Thu, 2 Jan 2025 13:14:41 +0800 Subject: [PATCH] fix parse mount pod when a lot of mount pods & remove debug log in client Signed-off-by: zwwhdls --- pkg/config/setting.go | 3 +- pkg/controller/reconciler.go | 2 +- pkg/fuse/passfd/passfd.go | 57 ++++++++++-------- pkg/k8sclient/client.go | 110 +---------------------------------- 4 files changed, 39 insertions(+), 133 deletions(-) diff --git a/pkg/config/setting.go b/pkg/config/setting.go index e591e12cb5..ff109cf533 100644 --- a/pkg/config/setting.go +++ b/pkg/config/setting.go @@ -565,7 +565,7 @@ func GenSettingAttrWithMountPod(ctx context.Context, client *k8sclient.K8sClient if err != nil { log.Error(err, "Get pv error", "pv", pvName) } - if pv != nil { + if pv != nil && pv.Spec.ClaimRef != nil { pvc, err = client.GetPersistentVolumeClaim(ctx, pv.Spec.ClaimRef.Name, pv.Spec.ClaimRef.Namespace) if err != nil { log.Error(err, "Get pvc error", "namespace", pv.Spec.ClaimRef.Namespace, "name", pv.Spec.ClaimRef.Name) @@ -1116,6 +1116,5 @@ func GenHashOfSetting(log klog.Logger, setting JfsSetting) string { h := sha256.New() h.Write(settingStr) val := hex.EncodeToString(h.Sum(nil))[:63] - log.V(1).Info("get jfsSetting hash", "hashVal", val, "setting", setting.SafeString()) return val } diff --git a/pkg/controller/reconciler.go b/pkg/controller/reconciler.go index 62a195d20f..acdefcbc78 100644 --- a/pkg/controller/reconciler.go +++ b/pkg/controller/reconciler.go @@ -144,7 +144,7 @@ func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) { lastStatus.syncAt = time.Now() if err != nil { reconcilerLog.Error(err, "Driver check pod error, will retry", "name", pod.Name) - if strings.Contains(err.Error(), "client rate limiter Wait returned an error: context canceled") { + if strings.Contains(err.Error(), "client rate limiter Wait returned an error") { reconcilerLog.V(1).Info("client rate limit") backOff.Next(backOffID, time.Now()) } else { diff --git a/pkg/fuse/passfd/passfd.go b/pkg/fuse/passfd/passfd.go index 0959a24b1c..3ed8acd7c9 100644 --- a/pkg/fuse/passfd/passfd.go +++ b/pkg/fuse/passfd/passfd.go @@ -81,6 +81,24 @@ func (fs *Fds) ParseFuseFds(ctx context.Context) error { fdLog.Error(err, "read dir error", "basePath", fs.basePath) return err } + labelSelector := &metav1.LabelSelector{MatchLabels: map[string]string{ + common.PodTypeKey: common.PodTypeValue, + }} + fieldSelector := &fields.Set{"spec.nodeName": config.NodeName} + pods, err := fs.client.ListPod(ctx, config.Namespace, labelSelector, fieldSelector) + if err != nil { + fdLog.Error(err, "list pods error") + return err + } + podMaps := make(map[string]*corev1.Pod) + for _, pod := range pods { + podMaps[resource.GetUpgradeUUID(&pod)] = &pod + } + + wg := sync.WaitGroup{} + limitCh := make(chan struct{}, 20) + defer close(limitCh) + for _, entry := range entries { if !entry.IsDir() { continue @@ -97,11 +115,24 @@ func (fs *Fds) ParseFuseFds(ctx context.Context) error { for _, subEntry := range subEntries { if strings.HasPrefix(subEntry.Name(), "fuse_fd_comm.") { subdir := path.Join(fs.basePath, entry.Name(), subEntry.Name()) + if po, ok := podMaps[entry.Name()]; !ok || po.DeletionTimestamp != nil { + // make sure the pod is still running + continue + } fdLog.V(1).Info("parse fuse fd", "path", subdir) - fs.parseFuse(ctx, entry.Name(), subdir) + wg.Add(1) + go func() { + defer func() { + wg.Done() + <-limitCh + }() + limitCh <- struct{}{} + fs.parseFuse(ctx, entry.Name(), subdir) + }() } } } + wg.Wait() return nil } @@ -193,28 +224,8 @@ func (fs *Fds) CloseFd(pod *corev1.Pod) { func (fs *Fds) parseFuse(ctx context.Context, upgradeUUID, fusePath string) { fuseFd, fuseSetting := GetFuseFd(fusePath, false) if fuseFd <= 0 { - // get fuse fd error, try to get mount pod - labelSelector := &metav1.LabelSelector{MatchLabels: map[string]string{ - common.PodTypeKey: common.PodTypeValue, - }} - fieldSelector := &fields.Set{"spec.nodeName": config.NodeName} - pods, err := fs.client.ListPod(ctx, config.Namespace, labelSelector, fieldSelector) - if err != nil { - fdLog.Error(err, "list pods error") - return - } - var mountPod *corev1.Pod - for _, pod := range pods { - if resource.GetUpgradeUUID(&pod) == upgradeUUID && pod.DeletionTimestamp == nil { - mountPod = &pod - break - } - } - if mountPod == nil { - fdLog.V(1).Info("get fuse fd error and mount pod not found, ignore it", "upgradeUUID", upgradeUUID, "fusePath", fusePath) - // if can not get fuse fd, do not serve for it - return - } + // if can not get fuse fd, do not serve for it + return } serverPath := path.Join(fs.basePath, upgradeUUID, "fuse_fd_csi_comm.sock") diff --git a/pkg/k8sclient/client.go b/pkg/k8sclient/client.go index b6e9e22334..1cb2e5abc5 100644 --- a/pkg/k8sclient/client.go +++ b/pkg/k8sclient/client.go @@ -43,8 +43,6 @@ import ( restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/klog/v2" - - "github.com/juicedata/juicefs-csi-driver/pkg/util" ) const ( @@ -126,33 +124,25 @@ func newClient(config rest.Config) (*K8sClient, error) { } func (k *K8sClient) CreatePod(ctx context.Context, pod *corev1.Pod) (*corev1.Pod, error) { - log := util.GenLog(ctx, clientLog, "") if pod == nil { - log.Info("Create pod: pod is nil") return nil, nil } - log.V(1).Info("Create pod", "name", pod.Name) mntPod, err := k.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) if err != nil { - log.V(1).Info("Can't create pod", "name", pod.Name, "error", err) return nil, err } return mntPod, nil } func (k *K8sClient) GetPod(ctx context.Context, podName, namespace string) (*corev1.Pod, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Get pod", "name", podName) mntPod, err := k.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) if err != nil { - log.V(1).Info("Can't get pod", "name", podName, "namespace", namespace, "error", err) return nil, err } return mntPod, nil } func (k *K8sClient) ListPod(ctx context.Context, namespace string, labelSelector *metav1.LabelSelector, filedSelector *fields.Set) ([]corev1.Pod, error) { - log := util.GenLog(ctx, clientLog, "") listOptions := metav1.ListOptions{} if k.enableAPIServerListCache { // set ResourceVersion="0" means the list response is returned from apiserver cache instead of etcd @@ -164,24 +154,19 @@ func (k *K8sClient) ListPod(ctx context.Context, namespace string, labelSelector return nil, err } listOptions.LabelSelector = labelMap.String() - log.V(1).Info("List pod", "labelSelector", listOptions.LabelSelector) } if filedSelector != nil { listOptions.FieldSelector = fields.SelectorFromSet(*filedSelector).String() - log.V(1).Info("List pod", "fieldSelector", listOptions.FieldSelector) } podList, err := k.CoreV1().Pods(namespace).List(ctx, listOptions) if err != nil { - log.V(1).Info("Can't list pod", "namespace", namespace, "error", err) return nil, err } return podList.Items, nil } func (k *K8sClient) ListNode(ctx context.Context, labelSelector *metav1.LabelSelector) ([]corev1.Node, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("List node by labelSelector", "labelSelector", labelSelector.String()) listOptions := metav1.ListOptions{} if labelSelector != nil { labelMap, err := metav1.LabelSelectorAsSelector(labelSelector) @@ -193,15 +178,12 @@ func (k *K8sClient) ListNode(ctx context.Context, labelSelector *metav1.LabelSel nodeList, err := k.CoreV1().Nodes().List(ctx, listOptions) if err != nil { - log.V(1).Info("Can't list node", "labelSelector", labelSelector.String(), "error", err) return nil, err } return nodeList.Items, nil } func (k *K8sClient) GetPodLog(ctx context.Context, podName, namespace, containerName string) (string, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Get pod log", "name", podName) tailLines := int64(20) req := k.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ Container: containerName, @@ -223,131 +205,95 @@ func (k *K8sClient) GetPodLog(ctx context.Context, podName, namespace, container } func (k *K8sClient) PatchPod(ctx context.Context, pod *corev1.Pod, data []byte, pt types.PatchType) error { - log := util.GenLog(ctx, clientLog, "") if pod == nil { - log.Info("Patch pod: pod is nil") return nil } - log.V(1).Info("Patch pod", "name", pod.Name) _, err := k.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, pt, data, metav1.PatchOptions{}) return err } func (k *K8sClient) UpdatePod(ctx context.Context, pod *corev1.Pod) error { - log := util.GenLog(ctx, clientLog, "") if pod == nil { - log.Info("Update pod: pod is nil") return nil } - log.V(1).Info("Update pod", "name", pod.Name) _, err := k.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{}) return err } func (k *K8sClient) DeletePod(ctx context.Context, pod *corev1.Pod) error { - log := util.GenLog(ctx, clientLog, "") if pod == nil { - log.Info("Delete pod: pod is nil") return nil } - log.V(1).Info("Delete pod", "name", pod.Name) return k.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) } func (k *K8sClient) GetSecret(ctx context.Context, secretName, namespace string) (*corev1.Secret, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Get secret", "name", secretName) secret, err := k.CoreV1().Secrets(namespace).Get(ctx, secretName, metav1.GetOptions{}) if err != nil { - log.V(1).Info("Can't get secret", "name", secretName, "namespace", namespace, "error", err) return nil, err } return secret, nil } func (k *K8sClient) CreateSecret(ctx context.Context, secret *corev1.Secret) (*corev1.Secret, error) { - log := util.GenLog(ctx, clientLog, "") if secret == nil { - log.Info("Create secret: secret is nil") return nil, nil } - log.V(1).Info("Create secret", "name", secret.Name) s, err := k.CoreV1().Secrets(secret.Namespace).Create(ctx, secret, metav1.CreateOptions{}) if err != nil { - log.Info("Can't create secret", "name", secret.Name, "error", err) return nil, err } return s, nil } func (k *K8sClient) UpdateSecret(ctx context.Context, secret *corev1.Secret) error { - log := util.GenLog(ctx, clientLog, "") if secret == nil { - log.Info("Update secret: secret is nil") return nil } - log.V(1).Info("Update secret", "name", secret.Name) _, err := k.CoreV1().Secrets(secret.Namespace).Update(ctx, secret, metav1.UpdateOptions{}) return err } func (k *K8sClient) DeleteSecret(ctx context.Context, secretName string, namespace string) error { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Delete secret", "name", secretName) return k.CoreV1().Secrets(namespace).Delete(ctx, secretName, metav1.DeleteOptions{}) } func (k *K8sClient) PatchSecret(ctx context.Context, secret *corev1.Secret, data []byte, pt types.PatchType) error { - log := util.GenLog(ctx, clientLog, "") if secret == nil { - log.Info("Patch secret: secret is nil") return nil } - log.V(1).Info("Patch secret", "name", secret.Name) _, err := k.CoreV1().Secrets(secret.Namespace).Patch(ctx, secret.Name, pt, data, metav1.PatchOptions{}) return err } func (k *K8sClient) GetJob(ctx context.Context, jobName, namespace string) (*batchv1.Job, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Get job", "name", jobName) job, err := k.BatchV1().Jobs(namespace).Get(ctx, jobName, metav1.GetOptions{}) if err != nil { - log.V(1).Info("Can't get job", "name", jobName, "namespace", namespace, "error", err) return nil, err } return job, nil } func (k *K8sClient) CreateJob(ctx context.Context, job *batchv1.Job) (*batchv1.Job, error) { - log := util.GenLog(ctx, clientLog, "") if job == nil { - log.Info("Create job: job is nil") return nil, nil } - log.V(1).Info("Create job", "name", job.Name) created, err := k.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{}) if err != nil { - log.Info("Can't create job", "name", job.Name, "error", err) return nil, err } return created, nil } func (k *K8sClient) UpdateJob(ctx context.Context, job *batchv1.Job) error { - log := util.GenLog(ctx, clientLog, "") if job == nil { - log.Info("Update job: job is nil") return nil } - log.V(1).Info("Update job", "name", job.Name) _, err := k.BatchV1().Jobs(job.Namespace).Update(ctx, job, metav1.UpdateOptions{}) return err } func (k *K8sClient) DeleteJob(ctx context.Context, jobName string, namespace string) error { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Delete job", "name", jobName) policy := metav1.DeletePropagationBackground return k.BatchV1().Jobs(namespace).Delete(ctx, jobName, metav1.DeleteOptions{ PropagationPolicy: &policy, @@ -355,19 +301,14 @@ func (k *K8sClient) DeleteJob(ctx context.Context, jobName string, namespace str } func (k *K8sClient) GetPersistentVolume(ctx context.Context, pvName string) (*corev1.PersistentVolume, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Get pv", "name", pvName) pv, err := k.CoreV1().PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{}) if err != nil { - log.V(1).Info("Can't get pv", "name", pvName, "error", err) return nil, err } return pv, nil } func (k *K8sClient) ListPersistentVolumes(ctx context.Context, labelSelector *metav1.LabelSelector, filedSelector *fields.Set) ([]corev1.PersistentVolume, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("List pvs", "labelSelector", labelSelector.String(), "fieldSelector", filedSelector) listOptions := metav1.ListOptions{} if labelSelector != nil { labelMap, err := metav1.LabelSelectorAsMap(labelSelector) @@ -381,7 +322,6 @@ func (k *K8sClient) ListPersistentVolumes(ctx context.Context, labelSelector *me } pvList, err := k.CoreV1().PersistentVolumes().List(ctx, listOptions) if err != nil { - log.V(1).Info("Can't list pv", "error", err) return nil, err } return pvList.Items, nil @@ -403,74 +343,54 @@ func (k *K8sClient) ListPersistentVolumesByVolumeHandle(ctx context.Context, vol } func (k *K8sClient) ListStorageClasses(ctx context.Context) ([]storagev1.StorageClass, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("List storageclass") scList, err := k.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{}) if err != nil { - log.V(1).Info("Can't list pv", "error", err) return nil, err } return scList.Items, nil } func (k *K8sClient) GetPersistentVolumeClaim(ctx context.Context, pvcName, namespace string) (*corev1.PersistentVolumeClaim, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Get pvc", "name", pvcName, "namespace", namespace) mntPod, err := k.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{}) if err != nil { - log.V(1).Info("Can't get pvc", "name", pvcName, "namespace", namespace, "error", err) return nil, err } return mntPod, nil } func (k *K8sClient) GetReplicaSet(ctx context.Context, rsName, namespace string) (*appsv1.ReplicaSet, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Get replicaset", "name", rsName, "namespace", namespace) rs, err := k.AppsV1().ReplicaSets(namespace).Get(ctx, rsName, metav1.GetOptions{}) if err != nil { - log.V(1).Info("Can't get replicaset", "name", rsName, "namespace", namespace, "error", err) return nil, err } return rs, nil } func (k *K8sClient) GetStatefulSet(ctx context.Context, stsName, namespace string) (*appsv1.StatefulSet, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Get statefulset", "name", stsName, "namespace", namespace) sts, err := k.AppsV1().StatefulSets(namespace).Get(ctx, stsName, metav1.GetOptions{}) if err != nil { - log.V(1).Info("Can't get statefulset", "name", stsName, "namespace", namespace, "error", err) return nil, err } return sts, nil } func (k *K8sClient) GetStorageClass(ctx context.Context, scName string) (*storagev1.StorageClass, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Get sc", "name", scName) mntPod, err := k.StorageV1().StorageClasses().Get(ctx, scName, metav1.GetOptions{}) if err != nil { - log.V(1).Info("Can't get sc", "name", scName, "error", err) return nil, err } return mntPod, nil } func (k *K8sClient) GetDaemonSet(ctx context.Context, dsName, namespace string) (*appsv1.DaemonSet, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Get ds", "name", dsName) ds, err := k.AppsV1().DaemonSets(namespace).Get(ctx, dsName, metav1.GetOptions{}) if err != nil { - log.Info("Can't get DaemonSet", "name", dsName, "error", err) return nil, err } return ds, nil } func (k *K8sClient) ExecuteInContainer(ctx context.Context, podName, namespace, containerName string, cmd []string) (stdout string, stderr string, err error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Execute command pod", "command", cmd, "container", containerName, "pod", podName, "namespace", namespace) const tty = false req := k.CoreV1().RESTClient().Post(). @@ -513,41 +433,24 @@ func execute(ctx context.Context, method string, url *url.URL, config *restclien } func (k *K8sClient) GetConfigMap(ctx context.Context, cmName, namespace string) (*corev1.ConfigMap, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Get configmap", "name", cmName) cm, err := k.CoreV1().ConfigMaps(namespace).Get(ctx, cmName, metav1.GetOptions{}) if err != nil { - log.V(1).Info("Can't get configMap", "name", cmName, "namespace", namespace, "error", err) return nil, err } return cm, nil } func (k *K8sClient) CreateConfigMap(ctx context.Context, cfg *corev1.ConfigMap) error { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Create configmap", "name", cfg.Name) _, err := k.CoreV1().ConfigMaps(cfg.Namespace).Create(ctx, cfg, metav1.CreateOptions{}) - if err != nil { - log.V(1).Info("Can't create configMap", "name", cfg.Name, "error", err) - return err - } - return nil + return err } func (k *K8sClient) UpdateConfigMap(ctx context.Context, cfg *corev1.ConfigMap) error { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Update configmap", "name", cfg.Name) _, err := k.CoreV1().ConfigMaps(cfg.Namespace).Update(ctx, cfg, metav1.UpdateOptions{}) - if err != nil { - log.V(1).Info("Can't update configMap", "name", cfg.Name, "error", err) - return err - } - return nil + return err } func (k *K8sClient) CreateEvent(ctx context.Context, pod corev1.Pod, evtType, reason, message string) error { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Create event", "name", pod.Name) now := time.Now() _, err := k.CoreV1().Events(pod.Namespace).Create(ctx, &corev1.Event{ ObjectMeta: metav1.ObjectMeta{ @@ -573,19 +476,12 @@ func (k *K8sClient) CreateEvent(ctx context.Context, pod corev1.Pod, evtType, re ReportingController: "juicefs-csi-node", ReportingInstance: pod.Spec.NodeName, }, metav1.CreateOptions{}) - if err != nil { - log.V(1).Info("Can't create event", "podName", pod.Name, "error", err) - return err - } - return nil + return err } func (k *K8sClient) GetEvents(ctx context.Context, pod *corev1.Pod) ([]corev1.Event, error) { - log := util.GenLog(ctx, clientLog, "") - log.V(1).Info("Get event", "name", pod.Name) events, err := k.CoreV1().Events(pod.Namespace).List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s", pod.Name), TypeMeta: metav1.TypeMeta{Kind: "Pod"}}) if err != nil { - log.V(1).Info("Can't get event", "podName", pod.Name, "error", err) return nil, err } return events.Items, nil