Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent dataloss due to the concurrent RPC calls (occurrence is very low) #4970

Merged
merged 4 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,13 @@ func (ns *NodeServer) NodePublishVolume(
targetPath := req.GetTargetPath()
volID := fsutil.VolumeID(req.GetVolumeId())

if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

volOptions := &store.VolumeOptions{}
defer volOptions.Destroy()

Expand All @@ -506,9 +513,6 @@ func (ns *NodeServer) NodePublishVolume(
return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error())
}

// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish

if err = util.CreateMountPoint(targetPath); err != nil {
log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err)

Expand Down Expand Up @@ -599,12 +603,17 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, err
}

// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath()
volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

// stop the health-checker that may have been started in NodeGetVolumeStats()
ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath)
ns.healthChecker.StopChecker(volID, targetPath)

isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil {
Expand All @@ -627,7 +636,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
isMnt = true
}
if !isMnt {
if err = os.RemoveAll(targetPath); err != nil {
if err = os.Remove(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand Down
22 changes: 16 additions & 6 deletions internal/rbd/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,12 @@ func (ns *NodeServer) NodePublishVolume(
volID := req.GetVolumeId()
stagingPath += "/" + volID

// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

// Check if that target path exists properly
notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I see it correctly, createTargetMountPath still uses the (possibly unreliable?) IsLikelyNotMountPoint under the hood; should this be adjusted to the same IsMountPoint that the cephfs implementation uses? Or can this remain as-is, as discussed in #4966 (comment)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes thats correct, it checks the sources, The main problem is with lock and the os.RemoveAll, if os.Remove fails we will get another RPC for unpublish which will trigger umount if its actually a mount. For now lets keep it simple with what we have today.

Expand Down Expand Up @@ -914,8 +918,14 @@ func (ns *NodeServer) NodeUnpublishVolume(
}

targetPath := req.GetTargetPath()
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.

if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)

return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)

isMnt, err := ns.Mounter.IsMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -928,7 +938,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, status.Error(codes.NotFound, err.Error())
}
if !isMnt {
if err = os.RemoveAll(targetPath); err != nil {
if err = os.Remove(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand All @@ -939,7 +949,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, status.Error(codes.Internal, err.Error())
}

if err = os.RemoveAll(targetPath); err != nil {
if err = os.Remove(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand Down
3 changes: 3 additions & 0 deletions internal/util/idlocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (

// SnapshotOperationAlreadyExistsFmt string format to return for concurrent operation.
SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists"

// TargetPathOperationAlreadyExistsFmt string format to return for concurrent operation on target path.
TargetPathOperationAlreadyExistsFmt = "an operation with the given target path %s already exists"
)

// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
Expand Down