diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index 422f32d..43bf17d 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -98,7 +98,7 @@ type flinkResourceHandler struct{} func (flinkResourceHandler) GetProperties() k8s.PluginProperties { config := GetFlinkConfig() props := k8s.PluginProperties{ - GeneratedNameMaxLength: config.GeneratedNameMaxLength, + GeneratedNameMaxLength: config.GeneratedNameMaxLength, DisableDeleteResourceOnFinalize: true, } @@ -137,22 +137,16 @@ func (flinkResourceHandler) BuildIdentityResource(ctx context.Context, taskCtx p } func (h flinkResourceHandler) OnAbort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, resource client.Object) (behavior k8s.AbortBehavior, err error) { - flinkCluster := resource.(*flinkOp.FlinkCluster) - - if flinkCluster.Status.Components.Job != nil && flinkCluster.Status.Components.Job.State == flinkOp.JobStatePending { - return k8s.AbortBehaviorDeleteDefaultResource(), nil - } - - annotationPatch, err := NewAnnotationPatch(flinkOp.ControlAnnotation, flinkOp.ControlNameJobCancel) var abortBehavior k8s.AbortBehavior - if err == nil { - abortBehavior = k8s.AbortBehaviorPatchDefaultResource( - k8s.PatchResourceOperation{Patch: annotationPatch}, - false) + annotationPatch, err := NewAnnotationPatch(flinkOp.ControlAnnotation, flinkOp.ControlNameJobCancel) + if err != nil { + return abortBehavior, err } - return abortBehavior, err + patchOp := k8s.PatchResourceOperation{Patch: annotationPatch} + abortBehavior = k8s.AbortBehaviorPatchDefaultResource(patchOp, false) + return abortBehavior, nil } func flinkClusterTaskLogs(ctx context.Context, flinkCluster *flinkOp.FlinkCluster) ([]*core.TaskLog, error) {