Skip to content

Commit

Permalink
check invariant config for priority
Browse files Browse the repository at this point in the history
  • Loading branch information
kkunapuli committed Oct 7, 2024
1 parent 2928a3b commit 3392e52
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 33 deletions.
2 changes: 1 addition & 1 deletion master/internal/command/command_job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *Command) SetJobPriority(priority int) error {

// Returns an error if RM does not implement priority.
if smallerHigher, err := c.rm.SmallerValueIsHigherPriority(); err == nil {
ok, err := configpolicy.PriorityAllowed(
ok, err := configpolicy.PriorityUpdateAllowed(
int(c.GenericCommandSpec.Metadata.WorkspaceID),
model.NTSCType,
priority,
Expand Down
64 changes: 34 additions & 30 deletions master/internal/configpolicy/task_config_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type NTSCConfigPolicies struct {
var (
errPriorityConstraintFailure = errors.New("submitted workload failed priority constraint")
errResourceConstraintFailure = errors.New("submitted workload failed a resource constraint")
errPriorityUnmutable = errors.New("priority cannot be modified")
errPriorityImmutable = errors.New("priority cannot be modified")
)

// CheckNTSCConstraints returns an error if the NTSC config fails constraint checks.
Expand Down Expand Up @@ -170,40 +170,44 @@ func priorityUpdateAllowed(scope *int, workloadType string, priority int) (int,
}

// Cannot update priority if priority set in invariant config.
switch workloadType {
case model.NTSCType:
var configs model.CommandConfig
err = yaml.Unmarshal([]byte(*configPolicies.InvariantConfig), &configs)
if err != nil {
return 0, false, fmt.Errorf("unable to unmarshal task config policies: %w", err)
}
if configs.Resources.Priority != nil && *configs.Resources.Priority != priority {
// If task config policies have updated since the workload was originally scheduled, allow users
// to update the priority to the new priority set by invariant config.
return 0, false, fmt.Errorf("priority already set in invariant config: %w", errPriorityUnmutable)
}
case model.ExperimentType:
var configs expconf.ExperimentConfigV0
err = yaml.Unmarshal([]byte(*configPolicies.InvariantConfig), &configs)
if err != nil {
return 0, false, fmt.Errorf("unable to unmarshal task config policies: %w", err)
}
if configs.Resources().Priority() != nil && *configs.Resources().Priority() != priority {
// If task config policies have updated since the workload was originally scheduled, allow users
// to update the priority to the new priority set by invariant config.
return 0, false, fmt.Errorf("priority already set in invariant config: %w", errPriorityUnmutable)
if configPolicies.InvariantConfig != nil {
switch workloadType {
case model.NTSCType:
var configs model.CommandConfig
err = yaml.Unmarshal([]byte(*configPolicies.InvariantConfig), &configs)
if err != nil {
return 0, false, fmt.Errorf("unable to unmarshal task config policies: %w", err)
}
if configs.Resources.Priority != nil && *configs.Resources.Priority != priority {
// If task config policies have updated since the workload was originally scheduled, allow users
// to update the priority to the new priority set by invariant config.
return 0, false, fmt.Errorf("priority already set in invariant config: %w", errPriorityImmutable)
}
case model.ExperimentType:
var configs expconf.ExperimentConfigV0
err = yaml.Unmarshal([]byte(*configPolicies.InvariantConfig), &configs)
if err != nil {
return 0, false, fmt.Errorf("unable to unmarshal task config policies: %w", err)
}
if configs.Resources().Priority() != nil && *configs.Resources().Priority() != priority {
// If task config policies have updated since the workload was originally scheduled, allow users
// to update the priority to the new priority set by invariant config.
return 0, false, fmt.Errorf("priority already set in invariant config: %w", errPriorityImmutable)
}
default:
return 0, false, fmt.Errorf("workload type %s not supported", workloadType)
}
default:
return 0, false, fmt.Errorf("workload type %s not supported", workloadType)
}

// Find priority constraint, if set.
var constraints model.Constraints
if err = json.Unmarshal([]byte(*configPolicies.Constraints), &constraints); err != nil {
return 0, false, fmt.Errorf("unable to unmarshal task config policies: %w", err)
}
if constraints.PriorityLimit != nil {
return *constraints.PriorityLimit, true, nil
if configPolicies.Constraints != nil {
if err = json.Unmarshal([]byte(*configPolicies.Constraints), &constraints); err != nil {
return 0, false, fmt.Errorf("unable to unmarshal task config policies: %w", err)
}
if constraints.PriorityLimit != nil {
return *constraints.PriorityLimit, true, nil
}
}

return 0, false, nil
Expand Down
19 changes: 18 additions & 1 deletion master/internal/configpolicy/task_config_policy_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/determined-ai/determined/master/pkg/schemas/expconf"
)

func TestPriorityAllowed(t *testing.T) {
func TestPriorityUpdateAllowed(t *testing.T) {
require.NoError(t, etc.SetRootPath(db.RootFromDB))
pgDB, cleanup := db.MustResolveNewPostgresDatabase(t)
defer cleanup()
Expand Down Expand Up @@ -51,6 +51,10 @@ func TestPriorityAllowed(t *testing.T) {
require.False(t, ok)

// Priority cannot be updated if invariant_config.resoruces.priority is set.
invariantConfig := `{"resources": {"priority": 7}}`
addConfigs(t, user, &w.ID, invariantConfig, model.NTSCType)
ok, err = PriorityUpdateAllowed(w.ID, model.NTSCType, globalLimit, true)
require.Error(t, errPriorityImmutable)
}

func TestCheckNTSCConstraints(t *testing.T) {
Expand Down Expand Up @@ -330,6 +334,19 @@ func addConstraints(t *testing.T, user model.User, wkspID *int, constraints stri
require.NoError(t, err)
}

func addConfigs(t *testing.T, user model.User, wkspID *int, configs string, workloadType string) {
ctx := context.Background()

input := model.TaskConfigPolicies{
WorkloadType: workloadType,
WorkspaceID: wkspID,
InvariantConfig: &configs,
LastUpdatedBy: user.ID,
}
err := SetTaskConfigPolicies(ctx, &input)
require.NoError(t, err)
}

func defaultNTSCConfig() model.CommandConfig {
config := model.DefaultConfig(nil)

Expand Down
2 changes: 1 addition & 1 deletion master/internal/experiment_job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (e *internalExperiment) SetJobPriority(priority int) error {

// Returns an error if RM does not implement priority.
if smallerHigher, err := e.rm.SmallerValueIsHigherPriority(); err == nil {
ok, err := configpolicy.PriorityAllowed(
ok, err := configpolicy.PriorityUpdateAllowed(
wkspID,
model.ExperimentType,
priority,
Expand Down

0 comments on commit 3392e52

Please sign in to comment.