Skip to content

Commit

Permalink
fix parse mount pod when a lot of mount pods & remove debug log in cl…
Browse files Browse the repository at this point in the history
…ient

Signed-off-by: zwwhdls <[email protected]>
  • Loading branch information
zwwhdls committed Jan 2, 2025
1 parent ed9ea81 commit d865754
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 133 deletions.
3 changes: 1 addition & 2 deletions pkg/config/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func GenSettingAttrWithMountPod(ctx context.Context, client *k8sclient.K8sClient
if err != nil {
log.Error(err, "Get pv error", "pv", pvName)
}
if pv != nil {
if pv != nil && pv.Spec.ClaimRef != nil {
pvc, err = client.GetPersistentVolumeClaim(ctx, pv.Spec.ClaimRef.Name, pv.Spec.ClaimRef.Namespace)
if err != nil {
log.Error(err, "Get pvc error", "namespace", pv.Spec.ClaimRef.Namespace, "name", pv.Spec.ClaimRef.Name)
Expand Down Expand Up @@ -1116,6 +1116,5 @@ func GenHashOfSetting(log klog.Logger, setting JfsSetting) string {
h := sha256.New()
h.Write(settingStr)
val := hex.EncodeToString(h.Sum(nil))[:63]
log.V(1).Info("get jfsSetting hash", "hashVal", val, "setting", setting.SafeString())
return val
}
2 changes: 1 addition & 1 deletion pkg/controller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) {
lastStatus.syncAt = time.Now()
if err != nil {
reconcilerLog.Error(err, "Driver check pod error, will retry", "name", pod.Name)
if strings.Contains(err.Error(), "client rate limiter Wait returned an error: context canceled") {
if strings.Contains(err.Error(), "client rate limiter Wait returned an error") {
reconcilerLog.V(1).Info("client rate limit")
backOff.Next(backOffID, time.Now())
} else {
Expand Down
57 changes: 34 additions & 23 deletions pkg/fuse/passfd/passfd.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,24 @@ func (fs *Fds) ParseFuseFds(ctx context.Context) error {
fdLog.Error(err, "read dir error", "basePath", fs.basePath)
return err
}
labelSelector := &metav1.LabelSelector{MatchLabels: map[string]string{
common.PodTypeKey: common.PodTypeValue,
}}
fieldSelector := &fields.Set{"spec.nodeName": config.NodeName}
pods, err := fs.client.ListPod(ctx, config.Namespace, labelSelector, fieldSelector)
if err != nil {
fdLog.Error(err, "list pods error")
return err
}
podMaps := make(map[string]*corev1.Pod)
for _, pod := range pods {
podMaps[resource.GetUpgradeUUID(&pod)] = &pod
}

wg := sync.WaitGroup{}
limitCh := make(chan struct{}, 20)
defer close(limitCh)

for _, entry := range entries {
if !entry.IsDir() {
continue
Expand All @@ -97,11 +115,24 @@ func (fs *Fds) ParseFuseFds(ctx context.Context) error {
for _, subEntry := range subEntries {
if strings.HasPrefix(subEntry.Name(), "fuse_fd_comm.") {
subdir := path.Join(fs.basePath, entry.Name(), subEntry.Name())
if po, ok := podMaps[entry.Name()]; !ok || po.DeletionTimestamp != nil {
// make sure the pod is still running
continue
}
fdLog.V(1).Info("parse fuse fd", "path", subdir)
fs.parseFuse(ctx, entry.Name(), subdir)
wg.Add(1)
go func() {
defer func() {
wg.Done()
<-limitCh
}()
limitCh <- struct{}{}
fs.parseFuse(ctx, entry.Name(), subdir)
}()
}
}
}
wg.Wait()
return nil
}

Expand Down Expand Up @@ -193,28 +224,8 @@ func (fs *Fds) CloseFd(pod *corev1.Pod) {
func (fs *Fds) parseFuse(ctx context.Context, upgradeUUID, fusePath string) {
fuseFd, fuseSetting := GetFuseFd(fusePath, false)
if fuseFd <= 0 {
// get fuse fd error, try to get mount pod
labelSelector := &metav1.LabelSelector{MatchLabels: map[string]string{
common.PodTypeKey: common.PodTypeValue,
}}
fieldSelector := &fields.Set{"spec.nodeName": config.NodeName}
pods, err := fs.client.ListPod(ctx, config.Namespace, labelSelector, fieldSelector)
if err != nil {
fdLog.Error(err, "list pods error")
return
}
var mountPod *corev1.Pod
for _, pod := range pods {
if resource.GetUpgradeUUID(&pod) == upgradeUUID && pod.DeletionTimestamp == nil {
mountPod = &pod
break
}
}
if mountPod == nil {
fdLog.V(1).Info("get fuse fd error and mount pod not found, ignore it", "upgradeUUID", upgradeUUID, "fusePath", fusePath)
// if can not get fuse fd, do not serve for it
return
}
// if can not get fuse fd, do not serve for it
return
}

serverPath := path.Join(fs.basePath, upgradeUUID, "fuse_fd_csi_comm.sock")
Expand Down
Loading

0 comments on commit d865754

Please sign in to comment.