Skip to content

Commit

Permalink
refactor(scheduler): makes task exclusive policy more readable
Browse files Browse the repository at this point in the history
This moves part of the logic to separate method to improve readability
and fixes how PostRun cleans up the resources.
  • Loading branch information
VAveryanov8 committed Nov 25, 2024
1 parent cde0f77 commit 216f877
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
25 changes: 21 additions & 4 deletions pkg/service/scheduler/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,24 @@ func (t *TaskExclusiveLockPolicy) PreRun(clusterID, _, _ uuid.UUID, taskType Tas

cluster, ok := t.running[clusterID]
if !ok {
t.running[clusterID] = map[TaskType]struct{}{}
cluster = map[TaskType]struct{}{}
t.running[clusterID] = cluster
}

if err := t.canRunTaskExclusively(cluster, taskType); err != nil {
// cluster is busy
return err
}

cluster[taskType] = struct{}{}

return nil
}

// canRunTaskExclusively returns nil if taskType can be run in the cluster, otherwise err is returned.
func (t *TaskExclusiveLockPolicy) canRunTaskExclusively(cluster map[TaskType]struct{}, taskType TaskType) error {
// No tasks are running, so we can start the task.
if len(cluster) == 0 {
t.running[clusterID][taskType] = struct{}{}
return nil
}

Expand All @@ -85,14 +99,17 @@ func (t *TaskExclusiveLockPolicy) PreRun(clusterID, _, _ uuid.UUID, taskType Tas
return errClusterBusy
}

t.running[clusterID][taskType] = struct{}{}

return nil
}

// PostRun releases a lock on a cluster for a provided taskType.
func (t *TaskExclusiveLockPolicy) PostRun(clusterID, _, _ uuid.UUID, taskType TaskType) {
t.mu.Lock()
defer t.mu.Unlock()
// No need to check if t.running[clusterID] exists, because built-in delete will no-op in case of nil map.
delete(t.running[clusterID], taskType)
// Cleaning up the map if no more tasks left in the cluster.
if len(t.running[clusterID]) == 0 {
delete(t.running, clusterID)
}
}
8 changes: 6 additions & 2 deletions pkg/service/scheduler/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func TestExclusiveTaskLockPolicy(t *testing.T) {
if err != nil {
t.Fatalf("PreRun: unexpected err: %v", err)
}

})

t.Run("when exclusive task is running, other tasks are not allowed", func(t *testing.T) {
Expand Down Expand Up @@ -160,9 +159,14 @@ func TestExclusiveTaskLockPolicy(t *testing.T) {
t.Fatalf("PreRun: expected errClusterBusy, got: %v", err)
}

// Release a lock.
// Release a lock and clean up the underlying map.
restoreExclusiveTask.PostRun(clusterID, taskID, runID, RestoreTask)

if _, ok := restoreExclusiveTask.running[clusterID]; ok {
t.Fatalf("t.running[clusterID] should be deleted")
}

// Lock can be acquried again.
err = restoreExclusiveTask.PreRun(clusterID, taskID, runID, RestoreTask)
if err != nil {
t.Fatalf("PreRun: unexpected err: %v", err)
Expand Down

0 comments on commit 216f877

Please sign in to comment.