diff --git a/pkg/service/scheduler/policy.go b/pkg/service/scheduler/policy.go index 1968b9b2e..8e3174791 100644 --- a/pkg/service/scheduler/policy.go +++ b/pkg/service/scheduler/policy.go @@ -63,10 +63,24 @@ func (t *TaskExclusiveLockPolicy) PreRun(clusterID, _, _ uuid.UUID, taskType Tas cluster, ok := t.running[clusterID] if !ok { - t.running[clusterID] = map[TaskType]struct{}{} + cluster = map[TaskType]struct{}{} + t.running[clusterID] = cluster } + + if err := t.canRunTaskExclusively(cluster, taskType); err != nil { + // cluster is busy + return err + } + + cluster[taskType] = struct{}{} + + return nil +} + +// canRunTaskExclusively returns nil if taskType can be run in the cluster, otherwise err is returned. +func (t *TaskExclusiveLockPolicy) canRunTaskExclusively(cluster map[TaskType]struct{}, taskType TaskType) error { + // No tasks are running, so we can start the task. if len(cluster) == 0 { - t.running[clusterID][taskType] = struct{}{} return nil } @@ -85,8 +99,6 @@ func (t *TaskExclusiveLockPolicy) PreRun(clusterID, _, _ uuid.UUID, taskType Tas return errClusterBusy } - t.running[clusterID][taskType] = struct{}{} - return nil } @@ -94,5 +106,10 @@ func (t *TaskExclusiveLockPolicy) PreRun(clusterID, _, _ uuid.UUID, taskType Tas func (t *TaskExclusiveLockPolicy) PostRun(clusterID, _, _ uuid.UUID, taskType TaskType) { t.mu.Lock() defer t.mu.Unlock() + // No need to check if t.running[clusterID] exists, because built-in delete will no-op in case of nil map. delete(t.running[clusterID], taskType) + // Cleaning up the map if no more tasks left in the cluster. + if len(t.running[clusterID]) == 0 { + delete(t.running, clusterID) + } } diff --git a/pkg/service/scheduler/policy_test.go b/pkg/service/scheduler/policy_test.go index 7ea4ea2fd..aff943f0b 100644 --- a/pkg/service/scheduler/policy_test.go +++ b/pkg/service/scheduler/policy_test.go @@ -80,7 +80,6 @@ func TestExclusiveTaskLockPolicy(t *testing.T) { if err != nil { t.Fatalf("PreRun: unexpected err: %v", err) } - }) t.Run("when exclusive task is running, other tasks are not allowed", func(t *testing.T) { @@ -160,9 +159,14 @@ func TestExclusiveTaskLockPolicy(t *testing.T) { t.Fatalf("PreRun: expected errClusterBusy, got: %v", err) } - // Release a lock. + // Release a lock and clean up the underlying map. restoreExclusiveTask.PostRun(clusterID, taskID, runID, RestoreTask) + if _, ok := restoreExclusiveTask.running[clusterID]; ok { + t.Fatalf("t.running[clusterID] should be deleted") + } + + // Lock can be acquried again. err = restoreExclusiveTask.PreRun(clusterID, taskID, runID, RestoreTask) if err != nil { t.Fatalf("PreRun: unexpected err: %v", err)