Skip to content

Commit

Permalink
Tweak OnAbort action (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Jan 26, 2022
1 parent cbed0eb commit 9fccfde
Showing 1 changed file with 7 additions and 13 deletions.
20 changes: 7 additions & 13 deletions pkg/flink/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type flinkResourceHandler struct{}
func (flinkResourceHandler) GetProperties() k8s.PluginProperties {
config := GetFlinkConfig()
props := k8s.PluginProperties{
GeneratedNameMaxLength: config.GeneratedNameMaxLength,
GeneratedNameMaxLength: config.GeneratedNameMaxLength,
DisableDeleteResourceOnFinalize: true,
}

Expand Down Expand Up @@ -137,22 +137,16 @@ func (flinkResourceHandler) BuildIdentityResource(ctx context.Context, taskCtx p
}

func (h flinkResourceHandler) OnAbort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, resource client.Object) (behavior k8s.AbortBehavior, err error) {
flinkCluster := resource.(*flinkOp.FlinkCluster)

if flinkCluster.Status.Components.Job != nil && flinkCluster.Status.Components.Job.State == flinkOp.JobStatePending {
return k8s.AbortBehaviorDeleteDefaultResource(), nil
}

annotationPatch, err := NewAnnotationPatch(flinkOp.ControlAnnotation, flinkOp.ControlNameJobCancel)
var abortBehavior k8s.AbortBehavior

if err == nil {
abortBehavior = k8s.AbortBehaviorPatchDefaultResource(
k8s.PatchResourceOperation{Patch: annotationPatch},
false)
annotationPatch, err := NewAnnotationPatch(flinkOp.ControlAnnotation, flinkOp.ControlNameJobCancel)
if err != nil {
return abortBehavior, err
}

return abortBehavior, err
patchOp := k8s.PatchResourceOperation{Patch: annotationPatch}
abortBehavior = k8s.AbortBehaviorPatchDefaultResource(patchOp, false)
return abortBehavior, nil
}

func flinkClusterTaskLogs(ctx context.Context, flinkCluster *flinkOp.FlinkCluster) ([]*core.TaskLog, error) {
Expand Down

0 comments on commit 9fccfde

Please sign in to comment.