diff --git a/pkg/flink/resources.go b/pkg/flink/resources.go index 7c97d28..3736b78 100644 --- a/pkg/flink/resources.go +++ b/pkg/flink/resources.go @@ -117,6 +117,39 @@ func getPersistentVolumeClaim(name string, pv *flinkIdl.Resource_PersistentVolum } } +func addPersistentVolumeClaim( + claims []corev1.PersistentVolumeClaim, + volumeMounts []corev1.VolumeMount, + claim corev1.PersistentVolumeClaim, + mountPath string) ([]corev1.PersistentVolumeClaim, []corev1.VolumeMount) { + + claimsByName := make(map[string]corev1.PersistentVolumeClaim) + for _, c := range claims { + claimsByName[c.Name] = c + } + + mounts := []corev1.VolumeMount{} + for _, volumeMount := range volumeMounts { + if volumeMount.MountPath != mountPath { + mounts = append(mounts, volumeMount) + } else { + delete(claimsByName, volumeMount.Name) + } + } + mounts = append(mounts, corev1.VolumeMount{ + Name: claim.Name, + ReadOnly: false, + MountPath: volumeClaimMountPath, + }) + + templates := []corev1.PersistentVolumeClaim{claim} + for _, c := range claimsByName { + templates = append(templates, c) + } + + return templates, mounts +} + func (fc *FlinkCluster) updateJobManagerSpec(taskCtx FlinkTaskContext) { out := &fc.Spec.JobManager @@ -139,14 +172,12 @@ func (fc *FlinkCluster) updateJobManagerSpec(taskCtx FlinkTaskContext) { if pv := jm.GetResource().GetPersistentVolume(); pv != nil { claim := getPersistentVolumeClaim(jobManagerVolumeClaim, pv) - - out.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{claim} - out.VolumeMounts = append(out.VolumeMounts, corev1.VolumeMount{ - Name: claim.Name, - ReadOnly: false, - MountPath: volumeClaimMountPath, - }) - + out.VolumeClaimTemplates, out.VolumeMounts = addPersistentVolumeClaim( + out.VolumeClaimTemplates, + out.VolumeMounts, + claim, + volumeClaimMountPath, + ) fc.Spec.FlinkProperties[flinkIoTmpDirsProperty] = volumeClaimMountPath } } @@ -177,14 +208,12 @@ func (fc *FlinkCluster) updateTaskManagerSpec(taskCtx FlinkTaskContext) { if pv := tm.GetResource().GetPersistentVolume(); pv != nil { claim := getPersistentVolumeClaim(taskManagerVolumeClaim, pv) - - out.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{claim} - out.VolumeMounts = append(out.VolumeMounts, corev1.VolumeMount{ - Name: claim.Name, - ReadOnly: false, - MountPath: volumeClaimMountPath, - }) - + out.VolumeClaimTemplates, out.VolumeMounts = addPersistentVolumeClaim( + out.VolumeClaimTemplates, + out.VolumeMounts, + claim, + volumeClaimMountPath, + ) fc.Spec.FlinkProperties[flinkIoTmpDirsProperty] = volumeClaimMountPath } } diff --git a/pkg/flink/testdata/config.yaml b/pkg/flink/testdata/config.yaml index 0beef7b..c48fd46 100644 --- a/pkg/flink/testdata/config.yaml +++ b/pkg/flink/testdata/config.yaml @@ -28,6 +28,17 @@ plugins: volumeMounts: - mountPath: /cache name: cache-volume + - name: pvc-jm-default + mountPath: /flink-tmp + volumeClaimTemplates: + - metadata: + name: pvc-jm-default + spec: + accessModes: ["ReadWriteOnce"] + storageClassName: "pd-standard" + resources: + requests: + storage: 250Gi nodeSelector: - gke-nodepool: "nodepool-1" sidecars: @@ -43,6 +54,17 @@ plugins: volumeMounts: - mountPath: /cache name: cache-volume + - name: pvc-tm-default + mountPath: /flink-tmp + volumeClaimTemplates: + - metadata: + name: pvc-tm-default + spec: + accessModes: ["ReadWriteOnce"] + storageClassName: "pd-standard" + resources: + requests: + storage: 250Gi nodeSelector: - gke-nodepool: "nodepool-2" flinkProperties: