Skip to content

Commit

Permalink
Remove persistent volume claim for the same mount path (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored May 13, 2021
1 parent 00ae1af commit c9a909c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 16 deletions.
61 changes: 45 additions & 16 deletions pkg/flink/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,39 @@ func getPersistentVolumeClaim(name string, pv *flinkIdl.Resource_PersistentVolum
}
}

func addPersistentVolumeClaim(
claims []corev1.PersistentVolumeClaim,
volumeMounts []corev1.VolumeMount,
claim corev1.PersistentVolumeClaim,
mountPath string) ([]corev1.PersistentVolumeClaim, []corev1.VolumeMount) {

claimsByName := make(map[string]corev1.PersistentVolumeClaim)
for _, c := range claims {
claimsByName[c.Name] = c
}

mounts := []corev1.VolumeMount{}
for _, volumeMount := range volumeMounts {
if volumeMount.MountPath != mountPath {
mounts = append(mounts, volumeMount)
} else {
delete(claimsByName, volumeMount.Name)
}
}
mounts = append(mounts, corev1.VolumeMount{
Name: claim.Name,
ReadOnly: false,
MountPath: volumeClaimMountPath,
})

templates := []corev1.PersistentVolumeClaim{claim}
for _, c := range claimsByName {
templates = append(templates, c)
}

return templates, mounts
}

func (fc *FlinkCluster) updateJobManagerSpec(taskCtx FlinkTaskContext) {
out := &fc.Spec.JobManager

Expand All @@ -139,14 +172,12 @@ func (fc *FlinkCluster) updateJobManagerSpec(taskCtx FlinkTaskContext) {

if pv := jm.GetResource().GetPersistentVolume(); pv != nil {
claim := getPersistentVolumeClaim(jobManagerVolumeClaim, pv)

out.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{claim}
out.VolumeMounts = append(out.VolumeMounts, corev1.VolumeMount{
Name: claim.Name,
ReadOnly: false,
MountPath: volumeClaimMountPath,
})

out.VolumeClaimTemplates, out.VolumeMounts = addPersistentVolumeClaim(
out.VolumeClaimTemplates,
out.VolumeMounts,
claim,
volumeClaimMountPath,
)
fc.Spec.FlinkProperties[flinkIoTmpDirsProperty] = volumeClaimMountPath
}
}
Expand Down Expand Up @@ -177,14 +208,12 @@ func (fc *FlinkCluster) updateTaskManagerSpec(taskCtx FlinkTaskContext) {

if pv := tm.GetResource().GetPersistentVolume(); pv != nil {
claim := getPersistentVolumeClaim(taskManagerVolumeClaim, pv)

out.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{claim}
out.VolumeMounts = append(out.VolumeMounts, corev1.VolumeMount{
Name: claim.Name,
ReadOnly: false,
MountPath: volumeClaimMountPath,
})

out.VolumeClaimTemplates, out.VolumeMounts = addPersistentVolumeClaim(
out.VolumeClaimTemplates,
out.VolumeMounts,
claim,
volumeClaimMountPath,
)
fc.Spec.FlinkProperties[flinkIoTmpDirsProperty] = volumeClaimMountPath
}
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/flink/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ plugins:
volumeMounts:
- mountPath: /cache
name: cache-volume
- name: pvc-jm-default
mountPath: /flink-tmp
volumeClaimTemplates:
- metadata:
name: pvc-jm-default
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: "pd-standard"
resources:
requests:
storage: 250Gi
nodeSelector:
- gke-nodepool: "nodepool-1"
sidecars:
Expand All @@ -43,6 +54,17 @@ plugins:
volumeMounts:
- mountPath: /cache
name: cache-volume
- name: pvc-tm-default
mountPath: /flink-tmp
volumeClaimTemplates:
- metadata:
name: pvc-tm-default
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: "pd-standard"
resources:
requests:
storage: 250Gi
nodeSelector:
- gke-nodepool: "nodepool-2"
flinkProperties:
Expand Down

0 comments on commit c9a909c

Please sign in to comment.