diff --git a/pkg/flink/config.go b/pkg/flink/config.go index 495cfe7..0af90d3 100644 --- a/pkg/flink/config.go +++ b/pkg/flink/config.go @@ -27,6 +27,7 @@ type Config struct { GeneratedNameMaxLength *int `json:"generatedNameMaxLength" pflag:"Specifies the length of TaskExecutionID generated name. default: 50"` RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for array jobs"` NonRetryableExitCodes []int32 `json:"nonRetryableExitCodes" pfFlag:"Defines which job submitter exit codes should not be retried"` + NonRetryableFlyteCode *string `json:"nonRetryableFlyteCode,omitempty" pfFlag:"Defines which code should be returned in case of nonRetryable exit codes"` } func GetFlinkConfig() *Config { diff --git a/pkg/flink/constants.go b/pkg/flink/constants.go index 5612516..83c4282 100644 --- a/pkg/flink/constants.go +++ b/pkg/flink/constants.go @@ -15,6 +15,7 @@ package flink import ( + "github.com/flyteorg/flyteplugins/go/tasks/errors" "regexp" pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config" @@ -42,6 +43,7 @@ var ( regexpFlinkClusterName = regexp.MustCompile(`^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`) generatedNameMaxLength = 50 nonRetryableExitCodes = []int32{} + nonRetryableFlyteCode = errors.DownstreamSystemError defaultServiceAccount = "default" defaultResourceRequirements = corev1.ResourceRequirements{ Limits: map[corev1.ResourceName]resource.Quantity{ @@ -66,6 +68,7 @@ var ( }, GeneratedNameMaxLength: &generatedNameMaxLength, NonRetryableExitCodes: nonRetryableExitCodes, + NonRetryableFlyteCode: &nonRetryableFlyteCode, } flinkConfigSection = pluginsConfig.MustRegisterSubSection("flink", &defaultConfig) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index 180b4cb..b422c1b 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -250,7 +250,7 @@ func flinkClusterJobPhaseInfo(ctx context.Context, jobStatus *flinkOp.JobStatus, return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info) } reason := fmt.Sprintf("Flink jobsubmitter exited with non-zero exit code: %v (non-retryable)", jobStatus.FailureReasons) - return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, reason, info) + return pluginsCore.PhaseInfoFailure(nonRetryableFlyteCode, reason, info) default: msg := fmt.Sprintf("job id: %s with unknown state: %s", jobStatus.ID, jobStatus.State) return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info)