Skip to content

Commit

Permalink
Add new ConcurrencyPolicy setting to the MedusaBackupSchedule (#1204)
Browse files Browse the repository at this point in the history
* Add new ConcurrencyPolicy setting to the MedusaBackupSchedule to prevent multiple active backups working on the same Datacenter
  • Loading branch information
burmanm authored Feb 12, 2024
1 parent 98d439e commit c03280e
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG/CHANGELOG-1.13.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ Changelog for the K8ssandra Operator, new PRs should update the `unreleased` sec
When cutting a new release, update the `unreleased` heading to the tag being generated and date, like `## vX.Y.Z - YYYY-MM-DD` and create a new placeholder section for `unreleased` entries.

## unreleased

* [ENHANCEMENT] [#1203](https://github.com/k8ssandra/k8ssandra-operator/issues/1203) Add new setting ConcurrencyPolicy to MedusaBackupSchedules
8 changes: 8 additions & 0 deletions apis/medusa/v1alpha1/medusaschedule_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -31,6 +32,13 @@ type MedusaBackupScheduleSpec struct {

// BackupSpec defines the MedusaBackup to be created for this job
BackupSpec MedusaBackupJobSpec `json:"backupSpec"`

// Specifics if this backup task can be run concurrently with other active backup tasks. Valid values are:
// - "Allow": allows multiple Tasks to run concurrently on Cassandra cluster
// - "Forbid" (default): only a single task is executed at once
// The "Allow" property is only valid if all the other active Tasks have "Allow" as well.
// +optional
ConcurrencyPolicy batchv1.ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"`
}

// MedusaBackupScheduleStatus defines the observed state of MedusaBackupSchedule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ spec:
required:
- cassandraDatacenter
type: object
concurrencyPolicy:
description: 'Specifics if this backup task can be run concurrently
with other active backup tasks. Valid values are: - "Allow": allows
multiple Tasks to run concurrently on Cassandra cluster - "Forbid"
(default): only a single task is executed at once The "Allow" property
is only valid if all the other active Tasks have "Allow" as well.'
type: string
cronSchedule:
description: CronSchedule is a cronjob format schedule for backups.
Overrides any easier methods of defining the schedule
Expand Down
46 changes: 44 additions & 2 deletions controllers/medusa/medusabackupschedule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ import (
"fmt"
"time"

batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
medusav1alpha1 "github.com/k8ssandra/k8ssandra-operator/apis/medusa/v1alpha1"
cron "github.com/robfig/cron/v3"
)
Expand Down Expand Up @@ -70,6 +73,15 @@ func (r *MedusaBackupScheduleReconciler) Reconcile(ctx context.Context, req ctrl
return ctrl.Result{}, err
}

dcKey := types.NamespacedName{Namespace: backupSchedule.Namespace, Name: backupSchedule.Spec.BackupSpec.CassandraDatacenter}
dc := &cassdcapi.CassandraDatacenter{}
if err := r.Get(ctx, dcKey, dc); err != nil {
logger.Error(err, "failed to get cassandradatacenter", "CassandraDatacenter", dcKey)
return ctrl.Result{}, err
}

defaults(backupSchedule)

previousExecution, err := getPreviousExecutionTime(ctx, backupSchedule)
if err != nil {
return ctrl.Result{}, err
Expand All @@ -83,6 +95,16 @@ func (r *MedusaBackupScheduleReconciler) Reconcile(ctx context.Context, req ctrl
createBackup := false

if nextExecution.Before(now) {
if backupSchedule.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent {
if activeTasks, err := r.activeTasks(backupSchedule, dc); err != nil {
return ctrl.Result{}, err
} else {
if len(activeTasks) > 0 {
logger.V(1).Info("Postponing backup schedule due to existing active backups", "MedusaBackupSchedule", req.NamespacedName)
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}
}
}
nextExecution = sched.Next(now)
previousExecution = now
createBackup = true && !backupSchedule.Spec.Disabled
Expand All @@ -100,14 +122,13 @@ func (r *MedusaBackupScheduleReconciler) Reconcile(ctx context.Context, req ctrl
}

if createBackup {
// TODO Verify here no previous jobs is still executing?
logger.V(1).Info("Scheduled time has been reached, creating a backup job", "MedusaBackupSchedule", req.NamespacedName)
generatedName := fmt.Sprintf("%s-%d", backupSchedule.Name, now.Unix())
backupJob := &medusav1alpha1.MedusaBackupJob{
ObjectMeta: metav1.ObjectMeta{
Name: generatedName,
Namespace: backupSchedule.Namespace,
// TODO Labels?
Labels: dc.GetDatacenterLabels(),
},
Spec: backupSchedule.Spec.BackupSpec,
}
Expand All @@ -134,6 +155,27 @@ func getPreviousExecutionTime(ctx context.Context, backupSchedule *medusav1alpha
return previousExecution.Time.UTC(), nil
}

func defaults(backupSchedule *medusav1alpha1.MedusaBackupSchedule) {
if backupSchedule.Spec.ConcurrencyPolicy == "" {
backupSchedule.Spec.ConcurrencyPolicy = batchv1.ForbidConcurrent
}
}

func (r *MedusaBackupScheduleReconciler) activeTasks(backupSchedule *medusav1alpha1.MedusaBackupSchedule, dc *cassdcapi.CassandraDatacenter) ([]medusav1alpha1.MedusaBackupJob, error) {
backupJobs := &medusav1alpha1.MedusaBackupJobList{}
if err := r.Client.List(context.Background(), backupJobs, client.InNamespace(backupSchedule.Namespace), client.MatchingLabels(dc.GetDatacenterLabels())); err != nil {
return nil, err
}
activeJobs := make([]medusav1alpha1.MedusaBackupJob, 0)
for _, job := range backupJobs.Items {
if job.Status.FinishTime.IsZero() {
activeJobs = append(activeJobs, job)
}
}

return activeJobs, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *MedusaBackupScheduleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
51 changes: 42 additions & 9 deletions controllers/medusa/medusabackupschedule_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
medusav1alpha1 "github.com/k8ssandra/k8ssandra-operator/apis/medusa/v1alpha1"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -26,6 +27,18 @@ var _ Clock = &FakeClock{}

func TestScheduler(t *testing.T) {
require := require.New(t)
require.NoError(medusav1alpha1.AddToScheme(scheme.Scheme))
require.NoError(cassdcapi.AddToScheme(scheme.Scheme))

fClock := &FakeClock{}

dc := cassdcapi.CassandraDatacenter{
ObjectMeta: metav1.ObjectMeta{
Name: "dc1",
Namespace: "test-ns",
},
Spec: cassdcapi.CassandraDatacenterSpec{},
}

// To manipulate time and requeue, we use fakeclient here instead of envtest
backupSchedule := &medusav1alpha1.MedusaBackupSchedule{
Expand All @@ -41,27 +54,23 @@ func TestScheduler(t *testing.T) {
},
},
}
err := medusav1alpha1.AddToScheme(scheme.Scheme)
require.NoError(err)

fakeClient := fake.NewClientBuilder().
WithRuntimeObjects(backupSchedule).
WithRuntimeObjects(backupSchedule, &dc).
WithScheme(scheme.Scheme).
Build()

fClock := &FakeClock{}
nsName := types.NamespacedName{
Name: backupSchedule.Name,
Namespace: backupSchedule.Namespace,
}

r := &MedusaBackupScheduleReconciler{
Client: fakeClient,
Scheme: scheme.Scheme,
Clock: fClock,
}

nsName := types.NamespacedName{
Name: backupSchedule.Name,
Namespace: backupSchedule.Namespace,
}

res, err := r.Reconcile(context.TODO(), reconcile.Request{NamespacedName: nsName})
require.NoError(err)
require.True(res.RequeueAfter > 0)
Expand Down Expand Up @@ -98,11 +107,35 @@ func TestScheduler(t *testing.T) {
require.NoError(err)
require.True(res.RequeueAfter > 0)

// We should not have more than 1, since we never set the previous one as finished
backupRequests = medusav1alpha1.MedusaBackupJobList{}
err = fakeClient.List(context.TODO(), &backupRequests)
require.NoError(err)
require.Equal(1, len(backupRequests.Items))

// Mark the first one as finished and try again
backup.Status.FinishTime = metav1.NewTime(fClock.currentTime)
require.NoError(fakeClient.Update(context.TODO(), &backup))

backupRequests = medusav1alpha1.MedusaBackupJobList{}
err = fakeClient.List(context.TODO(), &backupRequests)
require.NoError(err)
require.Equal(1, len(backupRequests.Items))

_, err = r.Reconcile(context.TODO(), reconcile.Request{NamespacedName: nsName})
require.NoError(err)
require.True(res.RequeueAfter > 0)

backupRequests = medusav1alpha1.MedusaBackupJobList{}
err = fakeClient.List(context.TODO(), &backupRequests)
require.NoError(err)
require.Equal(2, len(backupRequests.Items))

for _, backup := range backupRequests.Items {
backup.Status.FinishTime = metav1.NewTime(fClock.currentTime)
require.NoError(fakeClient.Update(context.TODO(), &backup))
}

// Verify that invocating again without reaching the next time does not generate another backup
// or modify the Status
backupScheduleLive = &medusav1alpha1.MedusaBackupSchedule{}
Expand Down

0 comments on commit c03280e

Please sign in to comment.