diff --git a/pkg/cmd/scylla-manager/server.go b/pkg/cmd/scylla-manager/server.go index 8c9a40c22..8542e72e2 100644 --- a/pkg/cmd/scylla-manager/server.go +++ b/pkg/cmd/scylla-manager/server.go @@ -144,11 +144,13 @@ func (s *server) makeServices(ctx context.Context) error { return errors.Wrapf(err, "scheduler service") } + restoreExclusiveLock := scheduler.NewTaskExclusiveLockPolicy(scheduler.RestoreTask) + // Register the runners - s.schedSvc.SetRunner(scheduler.BackupTask, scheduler.PolicyRunner{Policy: scheduler.NewLockClusterPolicy(), Runner: s.backupSvc.Runner()}) - s.schedSvc.SetRunner(scheduler.RestoreTask, scheduler.PolicyRunner{Policy: scheduler.NewLockClusterPolicy(), Runner: s.restoreSvc.Runner()}) + s.schedSvc.SetRunner(scheduler.BackupTask, scheduler.PolicyRunner{Policy: restoreExclusiveLock, Runner: s.backupSvc.Runner(), TaskType: scheduler.BackupTask}) + s.schedSvc.SetRunner(scheduler.RestoreTask, scheduler.PolicyRunner{Policy: restoreExclusiveLock, Runner: s.restoreSvc.Runner(), TaskType: scheduler.RestoreTask}) s.schedSvc.SetRunner(scheduler.HealthCheckTask, s.healthSvc.Runner()) - s.schedSvc.SetRunner(scheduler.RepairTask, scheduler.PolicyRunner{Policy: scheduler.NewLockClusterPolicy(), Runner: s.repairSvc.Runner()}) + s.schedSvc.SetRunner(scheduler.RepairTask, scheduler.PolicyRunner{Policy: restoreExclusiveLock, Runner: s.repairSvc.Runner(), TaskType: scheduler.RepairTask}) s.schedSvc.SetRunner(scheduler.ValidateBackupTask, s.backupSvc.ValidationRunner()) // Add additional properties on task run. diff --git a/pkg/service/scheduler/mock_policy_test.go b/pkg/service/scheduler/mock_policy_test.go index 16929f683..319a279eb 100644 --- a/pkg/service/scheduler/mock_policy_test.go +++ b/pkg/service/scheduler/mock_policy_test.go @@ -35,27 +35,27 @@ func (m *mockPolicy) EXPECT() *mockPolicyMockRecorder { } // PostRun mocks base method. -func (m *mockPolicy) PostRun(arg0, arg1, arg2 uuid.UUID) { +func (m *mockPolicy) PostRun(arg0, arg1, arg2 uuid.UUID, arg3 TaskType) { m.ctrl.T.Helper() - m.ctrl.Call(m, "PostRun", arg0, arg1, arg2) + m.ctrl.Call(m, "PostRun", arg0, arg1, arg2, arg3) } // PostRun indicates an expected call of PostRun. -func (mr *mockPolicyMockRecorder) PostRun(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *mockPolicyMockRecorder) PostRun(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostRun", reflect.TypeOf((*mockPolicy)(nil).PostRun), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostRun", reflect.TypeOf((*mockPolicy)(nil).PostRun), arg0, arg1, arg2, arg3) } // PreRun mocks base method. -func (m *mockPolicy) PreRun(arg0, arg1, arg2 uuid.UUID) error { +func (m *mockPolicy) PreRun(arg0, arg1, arg2 uuid.UUID, arg3 TaskType) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PreRun", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "PreRun", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // PreRun indicates an expected call of PreRun. -func (mr *mockPolicyMockRecorder) PreRun(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *mockPolicyMockRecorder) PreRun(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PreRun", reflect.TypeOf((*mockPolicy)(nil).PreRun), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PreRun", reflect.TypeOf((*mockPolicy)(nil).PreRun), arg0, arg1, arg2, arg3) } diff --git a/pkg/service/scheduler/policy.go b/pkg/service/scheduler/policy.go index 4dca9b5b9..8e3174791 100644 --- a/pkg/service/scheduler/policy.go +++ b/pkg/service/scheduler/policy.go @@ -5,6 +5,7 @@ package scheduler import ( "context" "encoding/json" + "fmt" "sync" "github.com/pkg/errors" @@ -13,55 +14,102 @@ import ( // Policy decides if given task can be run. type Policy interface { - PreRun(clusterID, taskID, runID uuid.UUID) error - PostRun(clusterID, taskID, runID uuid.UUID) + PreRun(clusterID, taskID, runID uuid.UUID, taskType TaskType) error + PostRun(clusterID, taskID, runID uuid.UUID, taskType TaskType) } // PolicyRunner is a runner that uses policy to check if a task can be run. type PolicyRunner struct { Policy Policy Runner Runner + + // TaskType of a task that this runner is executing + TaskType TaskType } // Run implements Runner. func (pr PolicyRunner) Run(ctx context.Context, clusterID, taskID, runID uuid.UUID, properties json.RawMessage) error { - if err := pr.Policy.PreRun(clusterID, taskID, runID); err != nil { + if err := pr.Policy.PreRun(clusterID, taskID, runID, pr.TaskType); err != nil { return err } - defer pr.Policy.PostRun(clusterID, taskID, runID) + defer pr.Policy.PostRun(clusterID, taskID, runID, pr.TaskType) return pr.Runner.Run(ctx, clusterID, taskID, runID, properties) } var errClusterBusy = errors.New("another task is running") -// LockClusterPolicy is a policy that can execute only one task at a time. -type LockClusterPolicy struct { - mu sync.Mutex - busy map[uuid.UUID]struct{} +// TaskExclusiveLockPolicy is a policy that executes the exclusiveTask only if there are no other tasks in the cluster. +// Conversely, other tasks can run only if the exclusiveTask is not running. +// Additionally this policy ensures that only one task of a task type can be executed at a time in a cluster. +type TaskExclusiveLockPolicy struct { + mu sync.Mutex + running map[uuid.UUID]map[TaskType]struct{} + + exclusiveTask TaskType } -func NewLockClusterPolicy() *LockClusterPolicy { - return &LockClusterPolicy{ - busy: make(map[uuid.UUID]struct{}), +func NewTaskExclusiveLockPolicy(exclusiveTask TaskType) *TaskExclusiveLockPolicy { + return &TaskExclusiveLockPolicy{ + running: map[uuid.UUID]map[TaskType]struct{}{}, + + exclusiveTask: exclusiveTask, } } -// PreRun implements Policy. -func (p *LockClusterPolicy) PreRun(clusterID, _, _ uuid.UUID) error { - p.mu.Lock() - defer p.mu.Unlock() +// PreRun acquires exclusive lock on a cluster for a provided taskType. +func (t *TaskExclusiveLockPolicy) PreRun(clusterID, _, _ uuid.UUID, taskType TaskType) error { + t.mu.Lock() + defer t.mu.Unlock() + + cluster, ok := t.running[clusterID] + if !ok { + cluster = map[TaskType]struct{}{} + t.running[clusterID] = cluster + } + + if err := t.canRunTaskExclusively(cluster, taskType); err != nil { + // cluster is busy + return err + } - if _, ok := p.busy[clusterID]; ok { + cluster[taskType] = struct{}{} + + return nil +} + +// canRunTaskExclusively returns nil if taskType can be run in the cluster, otherwise err is returned. +func (t *TaskExclusiveLockPolicy) canRunTaskExclusively(cluster map[TaskType]struct{}, taskType TaskType) error { + // No tasks are running, so we can start the task. + if len(cluster) == 0 { + return nil + } + + // Exclusive task can be run only when no other tasks is running. + if taskType == t.exclusiveTask { + return fmt.Errorf("run exclusive task %s: %w", taskType, errClusterBusy) + } + + // Any other task can't be run when exclusive task is running. + if _, ok := cluster[t.exclusiveTask]; ok { + return fmt.Errorf("exclusive task (%s) is running: %w", taskType, errClusterBusy) + } + + // Only one task of a taskType can run in a cluster at a time. + if _, ok := cluster[taskType]; ok { return errClusterBusy } - p.busy[clusterID] = struct{}{} return nil } -// PostRun implements Policy. -func (p *LockClusterPolicy) PostRun(clusterID, _, _ uuid.UUID) { - p.mu.Lock() - defer p.mu.Unlock() - delete(p.busy, clusterID) +// PostRun releases a lock on a cluster for a provided taskType. +func (t *TaskExclusiveLockPolicy) PostRun(clusterID, _, _ uuid.UUID, taskType TaskType) { + t.mu.Lock() + defer t.mu.Unlock() + // No need to check if t.running[clusterID] exists, because built-in delete will no-op in case of nil map. + delete(t.running[clusterID], taskType) + // Cleaning up the map if no more tasks left in the cluster. + if len(t.running[clusterID]) == 0 { + delete(t.running, clusterID) + } } diff --git a/pkg/service/scheduler/policy_test.go b/pkg/service/scheduler/policy_test.go index d98aba95c..aff943f0b 100644 --- a/pkg/service/scheduler/policy_test.go +++ b/pkg/service/scheduler/policy_test.go @@ -22,15 +22,17 @@ func TestPolicyRunner(t *testing.T) { c := uuid.MustRandom() k := uuid.MustRandom() r := uuid.MustRandom() + tt := TaskType("") e := errors.New("test") mp := NewmockPolicy(ctrl) - mp.EXPECT().PreRun(c, k, r).Return(e) + mp.EXPECT().PreRun(c, k, r, tt).Return(e) mr := NewmockRunner(ctrl) p := PolicyRunner{ - Policy: mp, - Runner: mr, + Policy: mp, + Runner: mr, + TaskType: tt, } if err := p.Run(context.Background(), c, k, r, nil); err != e { t.Fatal("expected", e, "got", err) @@ -44,19 +46,21 @@ func TestPolicyRunner(t *testing.T) { c := uuid.MustRandom() k := uuid.MustRandom() r := uuid.MustRandom() + tt := TaskType("") e := errors.New("test") mp := NewmockPolicy(ctrl) gomock.InOrder( - mp.EXPECT().PreRun(c, k, r).Return(nil), - mp.EXPECT().PostRun(c, k, r), + mp.EXPECT().PreRun(c, k, r, tt).Return(nil), + mp.EXPECT().PostRun(c, k, r, tt), ) mr := NewmockRunner(ctrl) mr.EXPECT().Run(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(e) p := PolicyRunner{ - Policy: mp, - Runner: mr, + Policy: mp, + Runner: mr, + TaskType: tt, } if err := p.Run(context.Background(), c, k, r, nil); err != e { t.Fatal("expected", e, "got", err) @@ -64,27 +68,108 @@ func TestPolicyRunner(t *testing.T) { }) } -func TestNewLockClusterPolicy(t *testing.T) { - c := uuid.MustRandom() - k := uuid.MustRandom() - r := uuid.MustRandom() - p := NewLockClusterPolicy() +func TestExclusiveTaskLockPolicy(t *testing.T) { + clusterID := uuid.MustRandom() + runID := uuid.MustRandom() + taskID := uuid.MustRandom() - if err := p.PreRun(c, k, r); err != nil { - t.Fatal(err) - } + t.Run("when no other task is running, preRun should return nil", func(t *testing.T) { + restoreExclusiveTask := NewTaskExclusiveLockPolicy(RestoreTask) - if err := p.PreRun(c, uuid.MustRandom(), uuid.MustRandom()); err == nil { - t.Fatal("expected error") - } + err := restoreExclusiveTask.PreRun(clusterID, taskID, runID, RestoreTask) + if err != nil { + t.Fatalf("PreRun: unexpected err: %v", err) + } + }) + + t.Run("when exclusive task is running, other tasks are not allowed", func(t *testing.T) { + restoreExclusiveTask := NewTaskExclusiveLockPolicy(RestoreTask) + + err := restoreExclusiveTask.PreRun(clusterID, taskID, runID, RestoreTask) + if err != nil { + t.Fatalf("PreRun: unexpected err: %v", err) + } + + err = restoreExclusiveTask.PreRun(clusterID, taskID, runID, BackupTask) + if !errors.Is(err, errClusterBusy) { + t.Fatalf("PreRun: expected errClusterBusy, got: %v", err) + } + err = restoreExclusiveTask.PreRun(clusterID, taskID, runID, RepairTask) + if !errors.Is(err, errClusterBusy) { + t.Fatalf("PreRun: expected errClusterBusy, got: %v", err) + } + }) + + t.Run("when non exclusive task is running, exclusive task is not allowed", func(t *testing.T) { + restoreExclusiveTask := NewTaskExclusiveLockPolicy(RestoreTask) + + err := restoreExclusiveTask.PreRun(clusterID, taskID, runID, BackupTask) + if err != nil { + t.Fatalf("PreRun: unexpected err: %v", err) + } + err = restoreExclusiveTask.PreRun(clusterID, taskID, runID, RepairTask) + if err != nil { + t.Fatalf("PreRun: unexpected err: %v", err) + } + + err = restoreExclusiveTask.PreRun(clusterID, taskID, runID, RestoreTask) + if !errors.Is(err, errClusterBusy) { + t.Fatalf("PreRun: expected errClusterBusy, got: %v", err) + } + }) + + t.Run("only one instance of a task type is allowed to run at a time", func(t *testing.T) { + restoreExclusiveTask := NewTaskExclusiveLockPolicy(RestoreTask) - if err := p.PreRun(uuid.MustRandom(), uuid.MustRandom(), uuid.MustRandom()); err != nil { - t.Fatal(err) - } + err := restoreExclusiveTask.PreRun(clusterID, taskID, runID, RestoreTask) + if err != nil { + t.Fatalf("PreRun: unexpected err: %v", err) + } + err = restoreExclusiveTask.PreRun(clusterID, taskID, runID, RestoreTask) + if !errors.Is(err, errClusterBusy) { + t.Fatalf("PreRun: expected errClusterBusy, got: %v", err) + } + + restoreExclusiveTask = NewTaskExclusiveLockPolicy(RestoreTask) + err = restoreExclusiveTask.PreRun(clusterID, taskID, runID, BackupTask) + if err != nil { + t.Fatalf("PreRun: unexpected err: %v", err) + } + err = restoreExclusiveTask.PreRun(clusterID, taskID, runID, BackupTask) + if !errors.Is(err, errClusterBusy) { + t.Fatalf("PreRun: expected errClusterBusy, got: %v", err) + } + }) + + t.Run("PostRun on a empty cluster", func(t *testing.T) { + restoreExclusiveTask := NewTaskExclusiveLockPolicy(RestoreTask) + + restoreExclusiveTask.PostRun(clusterID, taskID, runID, RestoreTask) + }) - p.PostRun(c, uuid.MustRandom(), uuid.MustRandom()) + t.Run("PostRun should release lock for a given task type", func(t *testing.T) { + restoreExclusiveTask := NewTaskExclusiveLockPolicy(RestoreTask) - if err := p.PreRun(c, uuid.MustRandom(), uuid.MustRandom()); err != nil { - t.Fatal(errClusterBusy) - } + err := restoreExclusiveTask.PreRun(clusterID, taskID, runID, RestoreTask) + if err != nil { + t.Fatalf("PreRun: unexpected err: %v", err) + } + err = restoreExclusiveTask.PreRun(clusterID, taskID, runID, RestoreTask) + if !errors.Is(err, errClusterBusy) { + t.Fatalf("PreRun: expected errClusterBusy, got: %v", err) + } + + // Release a lock and clean up the underlying map. + restoreExclusiveTask.PostRun(clusterID, taskID, runID, RestoreTask) + + if _, ok := restoreExclusiveTask.running[clusterID]; ok { + t.Fatalf("t.running[clusterID] should be deleted") + } + + // Lock can be acquried again. + err = restoreExclusiveTask.PreRun(clusterID, taskID, runID, RestoreTask) + if err != nil { + t.Fatalf("PreRun: unexpected err: %v", err) + } + }) }