Skip to content

Commit

Permalink
Merge pull request #69 from cybozu-go/support-discard-job
Browse files Browse the repository at this point in the history
implement reconcileDiscardJob
  • Loading branch information
satoru-takeuchi authored Nov 25, 2024
2 parents e15ee7a + cec8e55 commit 0b01029
Show file tree
Hide file tree
Showing 2 changed files with 352 additions and 5 deletions.
190 changes: 185 additions & 5 deletions internal/controller/mantlebackup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
labelComponentExportJob = "export-job"
labelComponentUploadJob = "upload-job"
labelComponentImportJob = "import-job"
labelComponentDiscardJob = "discard-job"
labelComponentDiscardVolume = "discard-volume"
annotRemoteUID = "mantle.cybozu.io/remote-uid"
annotDiffFrom = "mantle.cybozu.io/diff-from"
annotDiffTo = "mantle.cybozu.io/diff-to"
Expand Down Expand Up @@ -1522,17 +1524,195 @@ func (r *MantleBackupReconciler) updateStatusManifests(
}

func (r *MantleBackupReconciler) reconcileDiscardJob(
_ context.Context,
ctx context.Context,
backup *mantlev1.MantleBackup,
_ *snapshotTarget,
) (ctrl.Result, error) { //nolint:unparam
snapshotTarget *snapshotTarget,
) (ctrl.Result, error) {
if backup.GetAnnotations()[annotSyncMode] != syncModeFull {
return ctrl.Result{}, nil
}

// FIXME: implement here later
if err := r.createOrUpdateDiscardPV(ctx, backup, snapshotTarget.pv); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
if err := r.createOrUpdateDiscardPVC(ctx, backup, snapshotTarget.pvc); err != nil {
return ctrl.Result{}, err
}

if err := r.createOrUpdateDiscardJob(ctx, backup); err != nil {
return ctrl.Result{}, err
}

completed, err := r.hasDiscardJobCompleted(ctx, backup)
if err != nil {
return ctrl.Result{}, err
}
if completed {
return ctrl.Result{}, nil
}
return ctrl.Result{Requeue: true}, nil
}

func (r *MantleBackupReconciler) createOrUpdateDiscardPV(
ctx context.Context,
backup *mantlev1.MantleBackup,
targetPV *corev1.PersistentVolume,
) error {
var pv corev1.PersistentVolume
pv.SetName(makeDiscardPVName(backup))
_, err := ctrl.CreateOrUpdate(ctx, r.Client, &pv, func() error {
labels := pv.GetLabels()
if labels == nil {
labels = map[string]string{}
}
labels["app.kubernetes.io/name"] = labelAppNameValue
labels["app.kubernetes.io/component"] = labelComponentDiscardVolume
pv.SetLabels(labels)

pv.Spec.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
pv.Spec.Capacity = targetPV.Spec.Capacity
pv.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain
pv.Spec.StorageClassName = ""

volumeMode := corev1.PersistentVolumeBlock
pv.Spec.VolumeMode = &volumeMode

if pv.Spec.CSI == nil {
pv.Spec.CSI = &corev1.CSIPersistentVolumeSource{}
}
pv.Spec.CSI.Driver = targetPV.Spec.CSI.Driver
pv.Spec.CSI.ControllerExpandSecretRef = targetPV.Spec.CSI.ControllerExpandSecretRef
pv.Spec.CSI.NodeStageSecretRef = targetPV.Spec.CSI.NodeStageSecretRef
pv.Spec.CSI.VolumeHandle = targetPV.Spec.CSI.VolumeAttributes["imageName"]

if pv.Spec.CSI.VolumeAttributes == nil {
pv.Spec.CSI.VolumeAttributes = map[string]string{}
}
pv.Spec.CSI.VolumeAttributes["clusterID"] = targetPV.Spec.CSI.VolumeAttributes["clusterID"]
pv.Spec.CSI.VolumeAttributes["imageFeatures"] = targetPV.Spec.CSI.VolumeAttributes["imageFeatures"]
pv.Spec.CSI.VolumeAttributes["imageFormat"] = targetPV.Spec.CSI.VolumeAttributes["imageFormat"]
pv.Spec.CSI.VolumeAttributes["pool"] = targetPV.Spec.CSI.VolumeAttributes["pool"]
pv.Spec.CSI.VolumeAttributes["staticVolume"] = "true"

return nil
})
return err
}

func (r *MantleBackupReconciler) createOrUpdateDiscardPVC(
ctx context.Context,
backup *mantlev1.MantleBackup,
targetPVC *corev1.PersistentVolumeClaim,
) error {
var pvc corev1.PersistentVolumeClaim
pvc.SetName(makeDiscardPVCName(backup))
pvc.SetNamespace(r.managedCephClusterID)
_, err := ctrl.CreateOrUpdate(ctx, r.Client, &pvc, func() error {
labels := pvc.GetLabels()
if labels == nil {
labels = map[string]string{}
}
labels["app.kubernetes.io/name"] = labelAppNameValue
labels["app.kubernetes.io/component"] = labelComponentDiscardVolume
pvc.SetLabels(labels)

storageClassName := ""
pvc.Spec.StorageClassName = &storageClassName
pvc.Spec.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
pvc.Spec.Resources = targetPVC.Spec.Resources
pvc.Spec.VolumeName = makeDiscardPVName(backup)

volumeMode := corev1.PersistentVolumeBlock
pvc.Spec.VolumeMode = &volumeMode

return nil
})
return err
}

func (r *MantleBackupReconciler) createOrUpdateDiscardJob(
ctx context.Context,
backup *mantlev1.MantleBackup,
) error {
var job batchv1.Job
job.SetName(makeDiscardJobName(backup))
job.SetNamespace(r.managedCephClusterID)
_, err := ctrl.CreateOrUpdate(ctx, r.Client, &job, func() error {
labels := job.GetLabels()
if labels == nil {
labels = map[string]string{}
}
labels["app.kubernetes.io/name"] = labelAppNameValue
labels["app.kubernetes.io/component"] = labelComponentDiscardJob
job.SetLabels(labels)

var backoffLimit int32 = 65535
job.Spec.BackoffLimit = &backoffLimit

job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure

tru := true
var zero int64 = 0
job.Spec.Template.Spec.Containers = []corev1.Container{
{
Name: "discard",
Image: r.podImage,
Command: []string{
"/bin/bash",
"-c",
`
set -e
blkdiscard /dev/discard-rbd
`,
},
SecurityContext: &corev1.SecurityContext{
Privileged: &tru,
RunAsGroup: &zero,
RunAsUser: &zero,
},
VolumeDevices: []corev1.VolumeDevice{
{
Name: "discard-rbd",
DevicePath: "/dev/discard-rbd",
},
},
},
}

job.Spec.Template.Spec.Volumes = []corev1.Volume{
{
Name: "discard-rbd",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: makeDiscardPVCName(backup),
},
},
},
}

return nil
})
return err
}

func (r *MantleBackupReconciler) hasDiscardJobCompleted(ctx context.Context, backup *mantlev1.MantleBackup) (bool, error) {
var job batchv1.Job
if err := r.Client.Get(
ctx,
types.NamespacedName{Name: makeDiscardJobName(backup), Namespace: r.managedCephClusterID},
&job,
); err != nil {
if aerrors.IsNotFound(err) {
return false, nil // The cache must be stale. Let's just requeue.
}
return false, err
}

if IsJobConditionTrue(job.Status.Conditions, batchv1.JobComplete) {
return true, nil
}
return false, nil
}

func (r *MantleBackupReconciler) reconcileImportJob(
Expand Down
167 changes: 167 additions & 0 deletions internal/controller/mantlebackup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1717,4 +1717,171 @@ var _ = Describe("import", func() {
Expect(res.IsZero()).To(BeTrue())
})
})

Context("reconcileDiscardJob", func() {
It("should NOT create anything in an incremental backup", func(ctx SpecContext) {
backup, err := createMantleBackupUsingDummyPVC(ctx, "target", ns)
Expect(err).NotTo(HaveOccurred())
backup.SetAnnotations(map[string]string{
annotDiffFrom: "source",
annotSyncMode: syncModeIncremental,
annotRemoteUID: "uid",
})
err = k8sClient.Update(ctx, backup)
Expect(err).NotTo(HaveOccurred())

result, err := mbr.reconcileDiscardJob(ctx, backup, &snapshotTarget{})
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())

var pv corev1.PersistentVolume
err = k8sClient.Get(ctx, types.NamespacedName{Name: makeDiscardPVName(backup), Namespace: nsController}, &pv)
Expect(err).To(HaveOccurred())
Expect(aerrors.IsNotFound(err)).To(BeTrue())

var pvc corev1.PersistentVolume
err = k8sClient.Get(ctx, types.NamespacedName{Name: makeDiscardPVCName(backup), Namespace: nsController}, &pvc)
Expect(err).To(HaveOccurred())
Expect(aerrors.IsNotFound(err)).To(BeTrue())

var job batchv1.Job
err = k8sClient.Get(ctx, types.NamespacedName{Name: makeDiscardJobName(backup), Namespace: nsController}, &job)
Expect(err).To(HaveOccurred())
Expect(aerrors.IsNotFound(err)).To(BeTrue())
})

It("should create a PV, PVC, and Job, requeue, and complete in a full backup", func(ctx SpecContext) {
backup, err := createMantleBackupUsingDummyPVC(ctx, "target", ns)
Expect(err).NotTo(HaveOccurred())
backup.SetAnnotations(map[string]string{
annotSyncMode: syncModeFull,
})
err = k8sClient.Update(ctx, backup)
Expect(err).NotTo(HaveOccurred())

pvCapacity := corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("1Gi"),
}
pvDriver := "test-pv-driver"
pvControllerExpandSecretRef := corev1.SecretReference{
Name: "test-pv-cesr-name",
Namespace: "test-pv-cesr-ns",
}
pvNodeStageSecretRef := corev1.SecretReference{
Name: "test-pv-nssr-name",
Namespace: "test-pv-nssr-ns",
}
pvClusterID := "test-pv-cluster-id"
pvImageFeatures := "test-pv-image-features"
pvImageFormat := "test-pv-image-format"
pvPool := "test-pv-pool"
pvImageName := "test-pv-image-name"
pvcResources := corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("1Gi"),
},
}
snapshotTarget := &snapshotTarget{
pvc: &corev1.PersistentVolumeClaim{
Spec: corev1.PersistentVolumeClaimSpec{
Resources: pvcResources,
},
},
pv: &corev1.PersistentVolume{
Spec: corev1.PersistentVolumeSpec{
Capacity: pvCapacity,
PersistentVolumeSource: corev1.PersistentVolumeSource{
CSI: &corev1.CSIPersistentVolumeSource{
Driver: pvDriver,
ControllerExpandSecretRef: &pvControllerExpandSecretRef,
NodeStageSecretRef: &pvNodeStageSecretRef,
VolumeAttributes: map[string]string{
"clusterID": pvClusterID,
"imageFeatures": pvImageFeatures,
"imageFormat": pvImageFormat,
"pool": pvPool,
"imageName": pvImageName,
},
},
},
},
},
imageName: pvImageName,
poolName: "poolName",
}

// The first call to reconcileDiscardJob should create a PV, PVC, and Job, and requeue.
result, err := mbr.reconcileDiscardJob(ctx, backup, snapshotTarget)
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeTrue())

var pv corev1.PersistentVolume
err = k8sClient.Get(ctx, types.NamespacedName{Name: makeDiscardPVName(backup), Namespace: nsController}, &pv)
Expect(err).NotTo(HaveOccurred())
Expect(pv.GetLabels()["app.kubernetes.io/name"]).To(Equal(labelAppNameValue))
Expect(pv.GetLabels()["app.kubernetes.io/component"]).To(Equal(labelComponentDiscardVolume))
Expect(len(pv.Spec.AccessModes)).To(Equal(1))
Expect(pv.Spec.AccessModes[0]).To(Equal(corev1.ReadWriteOnce))
Expect(pv.Spec.Capacity).To(Equal(pvCapacity))
Expect(pv.Spec.CSI.Driver).To(Equal(pvDriver))
Expect(*pv.Spec.CSI.ControllerExpandSecretRef).To(Equal(pvControllerExpandSecretRef))
Expect(*pv.Spec.CSI.NodeStageSecretRef).To(Equal(pvNodeStageSecretRef))
Expect(pv.Spec.CSI.VolumeAttributes["clusterID"]).To(Equal(pvClusterID))
Expect(pv.Spec.CSI.VolumeAttributes["imageFeatures"]).To(Equal(pvImageFeatures))
Expect(pv.Spec.CSI.VolumeAttributes["imageFormat"]).To(Equal(pvImageFormat))
Expect(pv.Spec.CSI.VolumeAttributes["pool"]).To(Equal(pvPool))
Expect(pv.Spec.CSI.VolumeAttributes["staticVolume"]).To(Equal("true"))
Expect(pv.Spec.CSI.VolumeHandle).To(Equal(pvImageName))
Expect(pv.Spec.PersistentVolumeReclaimPolicy).To(Equal(corev1.PersistentVolumeReclaimRetain))
Expect(*pv.Spec.VolumeMode).To(Equal(corev1.PersistentVolumeBlock))
Expect(pv.Spec.StorageClassName).To(Equal(""))

var pvc corev1.PersistentVolumeClaim
err = k8sClient.Get(ctx, types.NamespacedName{Name: makeDiscardPVName(backup), Namespace: nsController}, &pvc)
Expect(err).NotTo(HaveOccurred())
Expect(pvc.GetLabels()["app.kubernetes.io/name"]).To(Equal(labelAppNameValue))
Expect(pvc.GetLabels()["app.kubernetes.io/component"]).To(Equal(labelComponentDiscardVolume))
Expect(*pvc.Spec.StorageClassName).To(Equal(""))
Expect(len(pvc.Spec.AccessModes)).To(Equal(1))
Expect(pvc.Spec.AccessModes[0]).To(Equal(corev1.ReadWriteOnce))
Expect(pvc.Spec.Resources).To(Equal(pvcResources))
Expect(*pvc.Spec.VolumeMode).To(Equal(corev1.PersistentVolumeBlock))
Expect(pvc.Spec.VolumeName).To(Equal(makeDiscardPVName(backup)))

var job batchv1.Job
err = k8sClient.Get(ctx, types.NamespacedName{Name: makeDiscardJobName(backup), Namespace: nsController}, &job)
Expect(err).NotTo(HaveOccurred())
Expect(job.GetLabels()["app.kubernetes.io/name"]).To(Equal(labelAppNameValue))
Expect(job.GetLabels()["app.kubernetes.io/component"]).To(Equal(labelComponentDiscardJob))
Expect(*job.Spec.BackoffLimit).To(Equal(int32(65535)))
Expect(len(job.Spec.Template.Spec.Containers)).To(Equal(1))
Expect(job.Spec.Template.Spec.Containers[0].Name).To(Equal("discard"))
Expect(*job.Spec.Template.Spec.Containers[0].SecurityContext.Privileged).To(BeTrue())
Expect(*job.Spec.Template.Spec.Containers[0].SecurityContext.RunAsGroup).To(Equal(int64(0)))
Expect(*job.Spec.Template.Spec.Containers[0].SecurityContext.RunAsUser).To(Equal(int64(0)))
Expect(job.Spec.Template.Spec.Containers[0].Image).To(Equal(mbr.podImage))
Expect(len(job.Spec.Template.Spec.Containers[0].VolumeDevices)).To(Equal(1))
Expect(job.Spec.Template.Spec.Containers[0].VolumeDevices[0].Name).To(Equal("discard-rbd"))
Expect(job.Spec.Template.Spec.Containers[0].VolumeDevices[0].DevicePath).To(Equal("/dev/discard-rbd"))
Expect(job.Spec.Template.Spec.RestartPolicy).To(Equal(corev1.RestartPolicyOnFailure))
Expect(len(job.Spec.Template.Spec.Volumes)).To(Equal(1))
Expect(job.Spec.Template.Spec.Volumes[0]).To(Equal(corev1.Volume{
Name: "discard-rbd",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: makeDiscardPVCName(backup),
},
},
}))

// Make the Job completed
err = resMgr.ChangeJobCondition(ctx, &job, batchv1.JobComplete, corev1.ConditionTrue)
Expect(err).NotTo(HaveOccurred())

// A call to reconcileDiscardJob should NOT requeue after the Job completed
result, err = mbr.reconcileDiscardJob(ctx, backup, snapshotTarget)
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())
})
})
})

0 comments on commit 0b01029

Please sign in to comment.