From dff226f05c46238f27ca574d2fb5c3114b26cb04 Mon Sep 17 00:00:00 2001 From: Yash Bhardwaj Date: Mon, 21 Oct 2024 15:05:36 +0530 Subject: [PATCH] Replay cancel scheduled cleared reruns (#283) * fix: replay do not cancel non replay managed runs, also cancel cleared runs(one's that were originally scheduled but cleared by replay) * fix: typos and add logs --- core/scheduler/service/replay_service.go | 17 +-- core/scheduler/service/replay_service_test.go | 132 +++++++++++++++++- core/scheduler/service/replay_worker.go | 34 +++-- core/scheduler/service/replay_worker_test.go | 9 +- core/scheduler/status.go | 61 +++++++- ext/scheduler/airflow/airflow.go | 39 ++++-- ext/scheduler/airflow/client.go | 35 ++++- 7 files changed, 286 insertions(+), 41 deletions(-) diff --git a/core/scheduler/service/replay_service.go b/core/scheduler/service/replay_service.go index a66ccdba91..a252a90d45 100644 --- a/core/scheduler/service/replay_service.go +++ b/core/scheduler/service/replay_service.go @@ -58,8 +58,9 @@ type ReplayValidator interface { type ReplayExecutor interface { Execute(ctx context.Context, replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) - SyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) - CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunStatus) []*scheduler.JobRunStatus + FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) + FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error) + CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus } type ReplayService struct { @@ -225,8 +226,6 @@ func (r *ReplayService) GetRunsStatus(ctx context.Context, tenant tenant.Tenant, } func (r *ReplayService) cancelReplayRuns(ctx context.Context, replayWithRun *scheduler.ReplayWithRun) error { - // get list of in progress runs - // stop them on the scheduler replay := replayWithRun.Replay jobName := replay.JobName() jobCron, err := getJobCron(ctx, r.logger, r.jobRepo, replay.Tenant(), jobName) @@ -235,15 +234,17 @@ func (r *ReplayService) cancelReplayRuns(ctx context.Context, replayWithRun *sch return err } - syncedRunStatus, err := r.executor.SyncStatus(ctx, replayWithRun, jobCron) + jobRunsWithDetails, err := r.executor.FetchRunsWithDetails(ctx, replay, jobCron) if err != nil { r.logger.Error("unable to sync replay runs status for job [%s]: %s", jobName.String(), err.Error()) return err } - r.logger.Debug(fmt.Sprintf("Synced Run status from Airflow : %#v", syncedRunStatus)) + r.logger.Debug(fmt.Sprintf("Synced Run status from Airflow : %#v", jobRunsWithDetails)) - statesForCanceling := []scheduler.State{scheduler.StateRunning, scheduler.StateInProgress, scheduler.StateQueued} - toBeCanceledRuns := syncedRunStatus.GetSortedRunsByStates(statesForCanceling) + filteredRunsMangedByReplay := jobRunsWithDetails.FilterRunsManagedByReplay(replayWithRun.Runs) + + statesForCanceling := []scheduler.State{scheduler.StateRunning, scheduler.StateUpForRetry, scheduler.StateQueued, scheduler.StateRestarting} + toBeCanceledRuns := filteredRunsMangedByReplay.GetSortedRunsByStates(statesForCanceling) if len(toBeCanceledRuns) == 0 { return nil } diff --git a/core/scheduler/service/replay_service_test.go b/core/scheduler/service/replay_service_test.go index 3d16c837a5..7516c8c55d 100644 --- a/core/scheduler/service/replay_service_test.go +++ b/core/scheduler/service/replay_service_test.go @@ -433,7 +433,128 @@ func TestReplayService(t *testing.T) { replayWorker := new(ReplayExecutor) defer replayWorker.AssertExpectations(t) - replayWorker.On("SyncStatus", ctx, replayWithRun, jobCron).Return(scheduler.JobRunStatusList{}, nil) + replayWorker.On("FetchRunsWithDetails", ctx, replay, jobCron).Return([]*scheduler.JobRunWithDetails{}, nil) + + replayService := service.NewReplayService(replayRepository, jobRepository, nil, nil, replayWorker, nil, logger, nil, nil) + err := replayService.CancelReplay(ctx, replayWithRun) + assert.NoError(t, err) + }) + + t.Run("returns no error if replay has been successfully cancelled with runs to be canceled", func(t *testing.T) { + replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, startTime, message) + replayWithRun := &scheduler.ReplayWithRun{ + Replay: replay, + Runs: []*scheduler.JobRunStatus{ + { + ScheduledAt: startTime, + State: scheduler.StatePending, + }, + }, + } + replayRepository := new(ReplayRepository) + defer replayRepository.AssertExpectations(t) + replayRepository.On("UpdateReplayStatus", mock.Anything, replay.ID(), scheduler.ReplayStateCancelled, mock.Anything).Return(nil).Once() + + jobRepository := new(JobRepository) + defer jobRepository.AssertExpectations(t) + jobRepository.On("GetJobDetails", mock.Anything, projName, jobName).Return(jobWithDetails, nil) + + replayWorker := new(ReplayExecutor) + defer replayWorker.AssertExpectations(t) + jobRunWithDetails := []*scheduler.JobRunWithDetails{ + { + ScheduledAt: startTime, + State: scheduler.StatePending, + RunType: "scheduled", + ExternalTrigger: false, + DagRunID: "scheduled_some_date", + DagID: jobName.String(), + }, + } + replayWorker.On("FetchRunsWithDetails", ctx, replay, jobCron).Return(jobRunWithDetails, nil) + + replayService := service.NewReplayService(replayRepository, jobRepository, nil, nil, replayWorker, nil, logger, nil, nil) + err := replayService.CancelReplay(ctx, replayWithRun) + assert.NoError(t, err) + }) + + t.Run("returns no error if replay has been successfully cancelled with runs in queued and running state", func(t *testing.T) { + replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, startTime, message) + replayWithRun := &scheduler.ReplayWithRun{ + Replay: replay, + Runs: []*scheduler.JobRunStatus{ + { + ScheduledAt: startTime, + State: scheduler.StatePending, // this state won't matter only the scheduler state matters + }, + { + ScheduledAt: startTime.Add(48 * time.Hour), + State: scheduler.StateRunning, + }, + }, + } + replayRepository := new(ReplayRepository) + defer replayRepository.AssertExpectations(t) + replayRepository.On("UpdateReplayStatus", mock.Anything, replay.ID(), scheduler.ReplayStateCancelled, mock.Anything).Return(nil).Once() + + jobRepository := new(JobRepository) + defer jobRepository.AssertExpectations(t) + jobRepository.On("GetJobDetails", mock.Anything, projName, jobName).Return(jobWithDetails, nil) + + replayWorker := new(ReplayExecutor) + defer replayWorker.AssertExpectations(t) + jobRunWithDetailsFromScheduler := []*scheduler.JobRunWithDetails{ + { + ScheduledAt: startTime, + State: scheduler.StateQueued, // this means queued on scheduler + }, + { + ScheduledAt: startTime.Add(12 * time.Hour), + State: scheduler.StateQueued, // this is non replay managed extra run from scheduler + }, + { + ScheduledAt: startTime.Add(24 * time.Hour), + State: scheduler.StateRunning, // this is non replay managed extra run from scheduler + }, + { + ScheduledAt: startTime.Add(48 * time.Hour), + State: scheduler.StateRunning, + }, + } + replayWorker.On("FetchRunsWithDetails", ctx, replay, jobCron).Return(jobRunWithDetailsFromScheduler, nil) + + filteredJobRunWithDetails := []*scheduler.JobRunWithDetails{ + { + ScheduledAt: startTime, + State: scheduler.StateQueued, // this means queued on scheduler + }, + { + ScheduledAt: startTime.Add(48 * time.Hour), + State: scheduler.StateRunning, + }, + } + replayWorker.On("CancelReplayRunsOnScheduler", ctx, replay, jobCron, filteredJobRunWithDetails).Return([]*scheduler.JobRunStatus{ + { + ScheduledAt: startTime, + State: scheduler.StateQueued, // this means queued on scheduler + }, + { + ScheduledAt: startTime.Add(48 * time.Hour), + State: scheduler.StateRunning, + }, + }) + + replayRepository.On("UpdateReplayRuns", mock.Anything, replay.ID(), + []*scheduler.JobRunStatus{ + { + ScheduledAt: startTime, + State: scheduler.StateQueued, // this means queued on scheduler + }, + { + ScheduledAt: startTime.Add(48 * time.Hour), + State: scheduler.StateRunning, + }, + }).Return(nil).Once() replayService := service.NewReplayService(replayRepository, jobRepository, nil, nil, replayWorker, nil, logger, nil, nil) err := replayService.CancelReplay(ctx, replayWithRun) @@ -749,12 +870,17 @@ func (_m *ReplayExecutor) Execute(ctx context.Context, replayID uuid.UUID, jobTe _m.Called(ctx, replayID, jobTenant, jobName) } -func (_m *ReplayExecutor) SyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) { +func (_m *ReplayExecutor) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) { args := _m.Called(ctx, replayWithRun, jobCron) return args.Get(0).(scheduler.JobRunStatusList), args.Error(1) } -func (_m *ReplayExecutor) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunStatus) []*scheduler.JobRunStatus { +func (_m *ReplayExecutor) FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error) { + args := _m.Called(ctx, replay, jobCron) + return args.Get(0).([]*scheduler.JobRunWithDetails), args.Error(1) +} + +func (_m *ReplayExecutor) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus { args := _m.Called(ctx, replay, jobCron, runs) return args.Get(0).([]*scheduler.JobRunStatus) } diff --git a/core/scheduler/service/replay_worker.go b/core/scheduler/service/replay_worker.go index 360eed709a..a709c291b8 100644 --- a/core/scheduler/service/replay_worker.go +++ b/core/scheduler/service/replay_worker.go @@ -41,9 +41,10 @@ type ReplayScheduler interface { Clear(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) error ClearBatch(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, startTime, endTime time.Time) error - CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error + CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, dagRunID string) error CreateRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) + GetJobRunsWithDetails(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunWithDetails, error) } func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig, alertManager AlertManager) *ReplayWorker { @@ -107,7 +108,7 @@ func (w *ReplayWorker) Execute(ctxBack context.Context, replayID uuid.UUID, jobT } } -func (w *ReplayWorker) SyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) { +func (w *ReplayWorker) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) { incomingRuns, err := w.fetchRuns(ctx, replayWithRun, jobCron) if err != nil { w.logger.Error("[ReplayID: %s] unable to get incoming runs: %s", replayWithRun.Replay.ID().String(), err) @@ -171,9 +172,8 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI } } - syncedRunStatus, err := w.SyncStatus(ctx, replayWithRun, jobCron) + syncedRunStatus, err := w.FetchAndSyncStatus(ctx, replayWithRun, jobCron) if err != nil { - // todo: lets not kill watchers on such errors w.logger.Error("[ReplayID: %s] unable to get incoming runs: %s", replayWithRun.Replay.ID().String(), err) return err } @@ -257,6 +257,15 @@ func (w *ReplayWorker) finishReplay(ctx context.Context, replay *scheduler.Repla return nil } +func (w *ReplayWorker) FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error) { + jobRunCriteria := &scheduler.JobRunsCriteria{ + Name: replay.JobName().String(), + StartDate: replay.Config().StartTime.UTC(), + EndDate: replay.Config().EndTime.UTC(), + } + return w.scheduler.GetJobRunsWithDetails(ctx, replay.Tenant(), jobRunCriteria, jobCron) +} + func (w *ReplayWorker) fetchRuns(ctx context.Context, replayReq *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) { jobRunCriteria := &scheduler.JobRunsCriteria{ Name: replayReq.Replay.JobName().String(), @@ -266,20 +275,21 @@ func (w *ReplayWorker) fetchRuns(ctx context.Context, replayReq *scheduler.Repla return w.scheduler.GetJobRuns(ctx, replayReq.Replay.Tenant(), jobRunCriteria, jobCron) } -func (w *ReplayWorker) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunStatus) []*scheduler.JobRunStatus { +func (w *ReplayWorker) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus { var canceledRuns []*scheduler.JobRunStatus for _, run := range runs { - logicalTime := run.GetLogicalTime(jobCron) + runState := scheduler.JobRunStatus{ + ScheduledAt: run.ScheduledAt, + State: scheduler.StateCanceled, + } + logicalTime := runState.GetLogicalTime(jobCron) w.logger.Info("[ReplayID: %s] Canceling run with logical time: %s", replay.ID(), logicalTime) - if err := w.scheduler.CancelRun(ctx, replay.Tenant(), replay.JobName(), logicalTime, prefixReplayed); err != nil { + if err := w.scheduler.CancelRun(ctx, replay.Tenant(), replay.JobName(), run.DagRunID); err != nil { w.logger.Error("[ReplayID: %s] unable to cancel job run for job: %s, Schedule Time: %s, err: %s", replay.ID(), replay.JobName(), run.ScheduledAt, err.Error()) continue } - canceledRuns = append(canceledRuns, &scheduler.JobRunStatus{ - ScheduledAt: run.ScheduledAt, - State: scheduler.StateCanceled, - }) + canceledRuns = append(canceledRuns, &runState) } return canceledRuns } @@ -391,6 +401,7 @@ func (w *ReplayWorker) getRequestsToProcess(ctx context.Context, replays []*sche if lag.Seconds() > maxLag { maxLag = lag.Seconds() } + w.logger.Info(fmt.Sprintf("trying to acquired replay request with ID: %s", replay.ID())) err := w.replayRepo.AcquireReplayRequest(ctx, replay.ID(), unhandledClassifierDuration) if err != nil { if errors.IsErrorType(err, errors.ErrNotFound) { @@ -398,6 +409,7 @@ func (w *ReplayWorker) getRequestsToProcess(ctx context.Context, replays []*sche } w.logger.Error("unable to acquire lock on replay request err: %s", err.Error()) } + w.logger.Info(fmt.Sprintf("successfully acquired replay request with ID: %s", replay.ID())) requestsToProcess = append(requestsToProcess, replay) } replayReqLag.Set(maxLag) diff --git a/core/scheduler/service/replay_worker_test.go b/core/scheduler/service/replay_worker_test.go index 7e4c5be9cd..925e669039 100644 --- a/core/scheduler/service/replay_worker_test.go +++ b/core/scheduler/service/replay_worker_test.go @@ -849,11 +849,16 @@ func (_m *mockReplayScheduler) CreateRun(ctx context.Context, tnnt tenant.Tenant return r0 } -func (_m *mockReplayScheduler) CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error { - args := _m.Called(ctx, tnnt, jobName, executionTime, dagRunIDPrefix) +func (_m *mockReplayScheduler) CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, dagRunID string) error { + args := _m.Called(ctx, tnnt, jobName, dagRunID) return args.Error(0) } +func (_m *mockReplayScheduler) GetJobRunsWithDetails(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunWithDetails, error) { + args := _m.Called(ctx, t, criteria, jobCron) + return args.Get(0).([]*scheduler.JobRunWithDetails), args.Error(1) +} + // GetJobRuns provides a mock function with given fields: ctx, t, criteria, jobCron func (_m *mockReplayScheduler) GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) { ret := _m.Called(ctx, t, criteria, jobCron) diff --git a/core/scheduler/status.go b/core/scheduler/status.go index 43e0b017ce..fb3d6e26da 100644 --- a/core/scheduler/status.go +++ b/core/scheduler/status.go @@ -26,8 +26,9 @@ const ( StateNotScheduled State = "waiting_to_schedule" StateWaitUpstream State = "wait_upstream" StateInProgress State = "in_progress" - - StateMissing State = "missing" + StateUpForRetry State = "up_for_retry" + StateRestarting State = "restarting" + StateMissing State = "missing" ) type State string @@ -68,6 +69,14 @@ type JobRunStatus struct { State State } +func (j JobRunStatus) GetState() State { + return j.State +} + +func (j JobRunStatus) GetScheduledAt() time.Time { + return j.ScheduledAt +} + func JobRunStatusFrom(scheduledAt time.Time, state string) (JobRunStatus, error) { runState, err := StateFromString(state) if err != nil { @@ -84,6 +93,54 @@ func (j JobRunStatus) GetLogicalTime(jobCron *cron.ScheduleSpec) time.Time { return jobCron.Prev(j.ScheduledAt) } +type JobRunWithDetails struct { + ScheduledAt time.Time + State State + RunType string + ExternalTrigger bool + DagRunID string + DagID string +} + +func (j JobRunWithDetails) GetState() State { + return j.State +} + +type JobRunDetailsList []*JobRunWithDetails + +func (j JobRunDetailsList) GetSortedRunsByStates(states []State) []*JobRunWithDetails { + stateMap := make(map[State]bool, len(states)) + for _, state := range states { + stateMap[state] = true + } + + var result []*JobRunWithDetails + for _, run := range j { + if stateMap[run.State] { + result = append(result, run) + } + } + sort.Slice(result, func(i, j int) bool { + return result[i].ScheduledAt.Before(result[j].ScheduledAt) + }) + return result +} + +func (j JobRunDetailsList) FilterRunsManagedByReplay(runs []*JobRunStatus) JobRunDetailsList { + runMap := make(map[time.Time]bool, len(runs)) + for _, state := range runs { + runMap[state.ScheduledAt] = true + } + + var result []*JobRunWithDetails + for _, run := range j { + if runMap[run.ScheduledAt] { + result = append(result, run) + } + } + return result +} + type JobRunStatusList []*JobRunStatus func (j JobRunStatusList) GetSortedRunsByStates(states []State) []*JobRunStatus { diff --git a/ext/scheduler/airflow/airflow.go b/ext/scheduler/airflow/airflow.go index 6e86a8183c..76f2db4bf9 100644 --- a/ext/scheduler/airflow/airflow.go +++ b/ext/scheduler/airflow/airflow.go @@ -273,28 +273,44 @@ func jobNameFromPath(filePath, suffix string) string { return strings.TrimSuffix(jobFileName, suffix) } -func (s *Scheduler) GetJobRuns(ctx context.Context, tnnt tenant.Tenant, jobQuery *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) { - spanCtx, span := startChildSpan(ctx, "GetJobRuns") - defer span.End() - +func (s *Scheduler) fetchJobRunBatch(ctx context.Context, tnnt tenant.Tenant, jobQuery *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]byte, error) { dagRunRequest := getDagRunRequest(jobQuery, jobCron) reqBody, err := json.Marshal(dagRunRequest) if err != nil { return nil, errors.Wrap(EntityAirflow, "unable to marshal dag run request", err) } - req := airflowRequest{ - path: dagStatusBatchURL, - method: http.MethodPost, - body: reqBody, - } + req := airflowRequest{path: dagStatusBatchURL, method: http.MethodPost, body: reqBody} schdAuth, err := s.getSchedulerAuth(ctx, tnnt) if err != nil { return nil, err } - resp, err := s.client.Invoke(spanCtx, req, schdAuth) + return s.client.Invoke(ctx, req, schdAuth) +} + +func (s *Scheduler) GetJobRunsWithDetails(ctx context.Context, tnnt tenant.Tenant, jobQuery *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunWithDetails, error) { + spanCtx, span := startChildSpan(ctx, "GetJobRuns") + defer span.End() + + resp, err := s.fetchJobRunBatch(spanCtx, tnnt, jobQuery, jobCron) + if err != nil { + return nil, errors.Wrap(EntityAirflow, "failure while fetching airflow dag runs", err) + } + var dagRunList DagRunListResponse + if err := json.Unmarshal(resp, &dagRunList); err != nil { + return nil, errors.Wrap(EntityAirflow, fmt.Sprintf("json error on parsing airflow dag runs: %s", string(resp)), err) + } + + return getJobRunsWithDetails(dagRunList, jobCron) +} + +func (s *Scheduler) GetJobRuns(ctx context.Context, tnnt tenant.Tenant, jobQuery *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) { + spanCtx, span := startChildSpan(ctx, "GetJobRuns") + defer span.End() + + resp, err := s.fetchJobRunBatch(spanCtx, tnnt, jobQuery, jobCron) if err != nil { return nil, errors.Wrap(EntityAirflow, "failure while fetching airflow dag runs", err) } @@ -419,10 +435,9 @@ func (s *Scheduler) ClearBatch(ctx context.Context, tnnt tenant.Tenant, jobName return nil } -func (s *Scheduler) CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error { +func (s *Scheduler) CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, dagRunID string) error { spanCtx, span := startChildSpan(ctx, "CancelRun") defer span.End() - dagRunID := fmt.Sprintf("%s__%s", dagRunIDPrefix, executionTime.UTC().Format(airflowDateFormat)) data := []byte(`{"state": "failed"}`) req := airflowRequest{ path: fmt.Sprintf(dagRunModifyURL, jobName.String(), dagRunID), diff --git a/ext/scheduler/airflow/client.go b/ext/scheduler/airflow/client.go index 6f19d78a84..9b8b4a6d07 100644 --- a/ext/scheduler/airflow/client.go +++ b/ext/scheduler/airflow/client.go @@ -35,9 +35,18 @@ type DagRunListResponse struct { } type DagRun struct { - ExecutionDate time.Time `json:"execution_date"` - State string `json:"state"` - ExternalTrigger bool `json:"external_trigger"` + ExecutionDate time.Time `json:"execution_date"` + State string `json:"state"` + ExternalTrigger bool `json:"external_trigger"` + DagRunID string `json:"dag_run_id"` + DagID string `json:"dag_id"` + LogicalDate time.Time `json:"logical_date"` + StartDate time.Time `json:"start_date"` + EndDate time.Time `json:"end_date"` + DataIntervalStart time.Time `json:"data_interval_start"` + DataIntervalEnd time.Time `json:"data_interval_end"` + LastSchedulingDecision time.Time `json:"last_scheduling_decision"` + RunType string `json:"run_type"` } type DagRunRequest struct { @@ -118,6 +127,26 @@ func getJobRuns(res DagRunListResponse, spec *cron.ScheduleSpec) ([]*scheduler.J return jobRunList, nil } +func getJobRunsWithDetails(res DagRunListResponse, spec *cron.ScheduleSpec) ([]*scheduler.JobRunWithDetails, error) { + var jobRunList []*scheduler.JobRunWithDetails + if res.TotalEntries > pageLimit { + return jobRunList, errors.InternalError(EntityAirflow, "total number of entries exceed page limit", nil) + } + for _, dag := range res.DagRuns { + scheduledAt := spec.Next(dag.ExecutionDate) + jobRunStatus, _ := scheduler.StateFromString(dag.State) + jobRunList = append(jobRunList, &scheduler.JobRunWithDetails{ + ScheduledAt: scheduledAt, + State: jobRunStatus, + RunType: dag.RunType, + ExternalTrigger: dag.ExternalTrigger, + DagRunID: dag.DagRunID, + DagID: dag.DagID, + }) + } + return jobRunList, nil +} + func startChildSpan(ctx context.Context, name string) (context.Context, trace.Span) { tracer := otel.Tracer("scheduler/airflow")