Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replay cancel scheduled cleared reruns #283

Merged
merged 3 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ type ReplayValidator interface {

type ReplayExecutor interface {
Execute(ctx context.Context, replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName)
SyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error)
CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunStatus) []*scheduler.JobRunStatus
FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error)
FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error)
CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus
}

type ReplayService struct {
Expand Down Expand Up @@ -225,8 +226,6 @@ func (r *ReplayService) GetRunsStatus(ctx context.Context, tenant tenant.Tenant,
}

func (r *ReplayService) cancelReplayRuns(ctx context.Context, replayWithRun *scheduler.ReplayWithRun) error {
// get list of in progress runs
// stop them on the scheduler
replay := replayWithRun.Replay
jobName := replay.JobName()
jobCron, err := getJobCron(ctx, r.logger, r.jobRepo, replay.Tenant(), jobName)
Expand All @@ -235,15 +234,17 @@ func (r *ReplayService) cancelReplayRuns(ctx context.Context, replayWithRun *sch
return err
}

syncedRunStatus, err := r.executor.SyncStatus(ctx, replayWithRun, jobCron)
jobRunsWithDetails, err := r.executor.FetchRunsWithDetails(ctx, replay, jobCron)
if err != nil {
r.logger.Error("unable to sync replay runs status for job [%s]: %s", jobName.String(), err.Error())
return err
}
r.logger.Debug(fmt.Sprintf("Synced Run status from Airflow : %#v", syncedRunStatus))
r.logger.Debug(fmt.Sprintf("Synced Run status from Airflow : %#v", jobRunsWithDetails))

statesForCanceling := []scheduler.State{scheduler.StateRunning, scheduler.StateInProgress, scheduler.StateQueued}
toBeCanceledRuns := syncedRunStatus.GetSortedRunsByStates(statesForCanceling)
filteredRunsMangedByReplay := jobRunsWithDetails.FilterRunsManagedByReplay(replayWithRun.Runs)

statesForCanceling := []scheduler.State{scheduler.StateRunning, scheduler.StateUpForRetry, scheduler.StateQueued, scheduler.StateRestarting}
toBeCanceledRuns := filteredRunsMangedByReplay.GetSortedRunsByStates(statesForCanceling)
if len(toBeCanceledRuns) == 0 {
return nil
}
Expand Down
132 changes: 129 additions & 3 deletions core/scheduler/service/replay_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,128 @@ func TestReplayService(t *testing.T) {

replayWorker := new(ReplayExecutor)
defer replayWorker.AssertExpectations(t)
replayWorker.On("SyncStatus", ctx, replayWithRun, jobCron).Return(scheduler.JobRunStatusList{}, nil)
replayWorker.On("FetchRunsWithDetails", ctx, replay, jobCron).Return([]*scheduler.JobRunWithDetails{}, nil)

replayService := service.NewReplayService(replayRepository, jobRepository, nil, nil, replayWorker, nil, logger, nil, nil)
err := replayService.CancelReplay(ctx, replayWithRun)
assert.NoError(t, err)
})

t.Run("returns no error if replay has been successfully cancelled with runs to be canceled", func(t *testing.T) {
replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, startTime, message)
replayWithRun := &scheduler.ReplayWithRun{
Replay: replay,
Runs: []*scheduler.JobRunStatus{
{
ScheduledAt: startTime,
State: scheduler.StatePending,
},
},
}
replayRepository := new(ReplayRepository)
defer replayRepository.AssertExpectations(t)
replayRepository.On("UpdateReplayStatus", mock.Anything, replay.ID(), scheduler.ReplayStateCancelled, mock.Anything).Return(nil).Once()

jobRepository := new(JobRepository)
defer jobRepository.AssertExpectations(t)
jobRepository.On("GetJobDetails", mock.Anything, projName, jobName).Return(jobWithDetails, nil)

replayWorker := new(ReplayExecutor)
defer replayWorker.AssertExpectations(t)
jobRunWithDetails := []*scheduler.JobRunWithDetails{
{
ScheduledAt: startTime,
State: scheduler.StatePending,
RunType: "scheduled",
ExternalTrigger: false,
DagRunID: "scheduled_some_date",
DagID: jobName.String(),
},
}
replayWorker.On("FetchRunsWithDetails", ctx, replay, jobCron).Return(jobRunWithDetails, nil)

replayService := service.NewReplayService(replayRepository, jobRepository, nil, nil, replayWorker, nil, logger, nil, nil)
err := replayService.CancelReplay(ctx, replayWithRun)
assert.NoError(t, err)
})

t.Run("returns no error if replay has been successfully cancelled with runs in queued and running state", func(t *testing.T) {
replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, startTime, message)
replayWithRun := &scheduler.ReplayWithRun{
Replay: replay,
Runs: []*scheduler.JobRunStatus{
{
ScheduledAt: startTime,
State: scheduler.StatePending, // this state won't matter only the scheduler state matters
},
{
ScheduledAt: startTime.Add(48 * time.Hour),
State: scheduler.StateRunning,
},
},
}
replayRepository := new(ReplayRepository)
defer replayRepository.AssertExpectations(t)
replayRepository.On("UpdateReplayStatus", mock.Anything, replay.ID(), scheduler.ReplayStateCancelled, mock.Anything).Return(nil).Once()

jobRepository := new(JobRepository)
defer jobRepository.AssertExpectations(t)
jobRepository.On("GetJobDetails", mock.Anything, projName, jobName).Return(jobWithDetails, nil)

replayWorker := new(ReplayExecutor)
defer replayWorker.AssertExpectations(t)
jobRunWithDetailsFromScheduler := []*scheduler.JobRunWithDetails{
{
ScheduledAt: startTime,
State: scheduler.StateQueued, // this means queued on scheduler
},
{
ScheduledAt: startTime.Add(12 * time.Hour),
State: scheduler.StateQueued, // this is non replay managed extra run from scheduler
},
{
ScheduledAt: startTime.Add(24 * time.Hour),
State: scheduler.StateRunning, // this is non replay managed extra run from scheduler
},
{
ScheduledAt: startTime.Add(48 * time.Hour),
State: scheduler.StateRunning,
},
}
replayWorker.On("FetchRunsWithDetails", ctx, replay, jobCron).Return(jobRunWithDetailsFromScheduler, nil)

filteredJobRunWithDetails := []*scheduler.JobRunWithDetails{
{
ScheduledAt: startTime,
State: scheduler.StateQueued, // this means queued on scheduler
},
{
ScheduledAt: startTime.Add(48 * time.Hour),
State: scheduler.StateRunning,
},
}
replayWorker.On("CancelReplayRunsOnScheduler", ctx, replay, jobCron, filteredJobRunWithDetails).Return([]*scheduler.JobRunStatus{
{
ScheduledAt: startTime,
State: scheduler.StateQueued, // this means queued on scheduler
},
{
ScheduledAt: startTime.Add(48 * time.Hour),
State: scheduler.StateRunning,
},
})

replayRepository.On("UpdateReplayRuns", mock.Anything, replay.ID(),
[]*scheduler.JobRunStatus{
{
ScheduledAt: startTime,
State: scheduler.StateQueued, // this means queued on scheduler
},
{
ScheduledAt: startTime.Add(48 * time.Hour),
State: scheduler.StateRunning,
},
}).Return(nil).Once()

replayService := service.NewReplayService(replayRepository, jobRepository, nil, nil, replayWorker, nil, logger, nil, nil)
err := replayService.CancelReplay(ctx, replayWithRun)
Expand Down Expand Up @@ -749,12 +870,17 @@ func (_m *ReplayExecutor) Execute(ctx context.Context, replayID uuid.UUID, jobTe
_m.Called(ctx, replayID, jobTenant, jobName)
}

func (_m *ReplayExecutor) SyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) {
func (_m *ReplayExecutor) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) {
args := _m.Called(ctx, replayWithRun, jobCron)
return args.Get(0).(scheduler.JobRunStatusList), args.Error(1)
}

func (_m *ReplayExecutor) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunStatus) []*scheduler.JobRunStatus {
func (_m *ReplayExecutor) FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error) {
args := _m.Called(ctx, replay, jobCron)
return args.Get(0).([]*scheduler.JobRunWithDetails), args.Error(1)
}

func (_m *ReplayExecutor) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus {
args := _m.Called(ctx, replay, jobCron, runs)
return args.Get(0).([]*scheduler.JobRunStatus)
}
Expand Down
34 changes: 23 additions & 11 deletions core/scheduler/service/replay_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ type ReplayScheduler interface {
Clear(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) error
ClearBatch(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, startTime, endTime time.Time) error

CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error
CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, dagRunID string) error
CreateRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error
GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
GetJobRunsWithDetails(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunWithDetails, error)
}

func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig, alertManager AlertManager) *ReplayWorker {
Expand Down Expand Up @@ -107,7 +108,7 @@ func (w *ReplayWorker) Execute(ctxBack context.Context, replayID uuid.UUID, jobT
}
}

func (w *ReplayWorker) SyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) {
func (w *ReplayWorker) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) {
incomingRuns, err := w.fetchRuns(ctx, replayWithRun, jobCron)
if err != nil {
w.logger.Error("[ReplayID: %s] unable to get incoming runs: %s", replayWithRun.Replay.ID().String(), err)
Expand Down Expand Up @@ -171,9 +172,8 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
}
}

syncedRunStatus, err := w.SyncStatus(ctx, replayWithRun, jobCron)
syncedRunStatus, err := w.FetchAndSyncStatus(ctx, replayWithRun, jobCron)
if err != nil {
// todo: lets not kill watchers on such errors
w.logger.Error("[ReplayID: %s] unable to get incoming runs: %s", replayWithRun.Replay.ID().String(), err)
return err
}
Expand Down Expand Up @@ -257,6 +257,15 @@ func (w *ReplayWorker) finishReplay(ctx context.Context, replay *scheduler.Repla
return nil
}

func (w *ReplayWorker) FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error) {
jobRunCriteria := &scheduler.JobRunsCriteria{
Name: replay.JobName().String(),
StartDate: replay.Config().StartTime.UTC(),
EndDate: replay.Config().EndTime.UTC(),
}
return w.scheduler.GetJobRunsWithDetails(ctx, replay.Tenant(), jobRunCriteria, jobCron)
}

func (w *ReplayWorker) fetchRuns(ctx context.Context, replayReq *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) {
jobRunCriteria := &scheduler.JobRunsCriteria{
Name: replayReq.Replay.JobName().String(),
Expand All @@ -266,20 +275,21 @@ func (w *ReplayWorker) fetchRuns(ctx context.Context, replayReq *scheduler.Repla
return w.scheduler.GetJobRuns(ctx, replayReq.Replay.Tenant(), jobRunCriteria, jobCron)
}

func (w *ReplayWorker) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunStatus) []*scheduler.JobRunStatus {
func (w *ReplayWorker) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus {
var canceledRuns []*scheduler.JobRunStatus
for _, run := range runs {
logicalTime := run.GetLogicalTime(jobCron)
runState := scheduler.JobRunStatus{
ScheduledAt: run.ScheduledAt,
State: scheduler.StateCanceled,
}
logicalTime := runState.GetLogicalTime(jobCron)

w.logger.Info("[ReplayID: %s] Canceling run with logical time: %s", replay.ID(), logicalTime)
if err := w.scheduler.CancelRun(ctx, replay.Tenant(), replay.JobName(), logicalTime, prefixReplayed); err != nil {
if err := w.scheduler.CancelRun(ctx, replay.Tenant(), replay.JobName(), run.DagRunID); err != nil {
w.logger.Error("[ReplayID: %s] unable to cancel job run for job: %s, Schedule Time: %s, err: %s", replay.ID(), replay.JobName(), run.ScheduledAt, err.Error())
continue
}
canceledRuns = append(canceledRuns, &scheduler.JobRunStatus{
ScheduledAt: run.ScheduledAt,
State: scheduler.StateCanceled,
})
canceledRuns = append(canceledRuns, &runState)
}
return canceledRuns
}
Expand Down Expand Up @@ -391,13 +401,15 @@ func (w *ReplayWorker) getRequestsToProcess(ctx context.Context, replays []*sche
if lag.Seconds() > maxLag {
maxLag = lag.Seconds()
}
w.logger.Info(fmt.Sprintf("trying to acquired replay request with ID: %s", replay.ID()))
err := w.replayRepo.AcquireReplayRequest(ctx, replay.ID(), unhandledClassifierDuration)
if err != nil {
if errors.IsErrorType(err, errors.ErrNotFound) {
continue
}
w.logger.Error("unable to acquire lock on replay request err: %s", err.Error())
}
w.logger.Info(fmt.Sprintf("successfully acquired replay request with ID: %s", replay.ID()))
requestsToProcess = append(requestsToProcess, replay)
}
replayReqLag.Set(maxLag)
Expand Down
9 changes: 7 additions & 2 deletions core/scheduler/service/replay_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,11 +849,16 @@ func (_m *mockReplayScheduler) CreateRun(ctx context.Context, tnnt tenant.Tenant
return r0
}

func (_m *mockReplayScheduler) CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error {
args := _m.Called(ctx, tnnt, jobName, executionTime, dagRunIDPrefix)
func (_m *mockReplayScheduler) CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, dagRunID string) error {
args := _m.Called(ctx, tnnt, jobName, dagRunID)
return args.Error(0)
}

func (_m *mockReplayScheduler) GetJobRunsWithDetails(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunWithDetails, error) {
args := _m.Called(ctx, t, criteria, jobCron)
return args.Get(0).([]*scheduler.JobRunWithDetails), args.Error(1)
}

// GetJobRuns provides a mock function with given fields: ctx, t, criteria, jobCron
func (_m *mockReplayScheduler) GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) {
ret := _m.Called(ctx, t, criteria, jobCron)
Expand Down
61 changes: 59 additions & 2 deletions core/scheduler/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ const (
StateNotScheduled State = "waiting_to_schedule"
StateWaitUpstream State = "wait_upstream"
StateInProgress State = "in_progress"

StateMissing State = "missing"
StateUpForRetry State = "up_for_retry"
StateRestarting State = "restarting"
StateMissing State = "missing"
)

type State string
Expand Down Expand Up @@ -68,6 +69,14 @@ type JobRunStatus struct {
State State
}

func (j JobRunStatus) GetState() State {
return j.State
}

func (j JobRunStatus) GetScheduledAt() time.Time {
return j.ScheduledAt
}

func JobRunStatusFrom(scheduledAt time.Time, state string) (JobRunStatus, error) {
runState, err := StateFromString(state)
if err != nil {
Expand All @@ -84,6 +93,54 @@ func (j JobRunStatus) GetLogicalTime(jobCron *cron.ScheduleSpec) time.Time {
return jobCron.Prev(j.ScheduledAt)
}

type JobRunWithDetails struct {
ScheduledAt time.Time
State State
RunType string
ExternalTrigger bool
DagRunID string
DagID string
}

func (j JobRunWithDetails) GetState() State {
return j.State
}

type JobRunDetailsList []*JobRunWithDetails

func (j JobRunDetailsList) GetSortedRunsByStates(states []State) []*JobRunWithDetails {
stateMap := make(map[State]bool, len(states))
for _, state := range states {
stateMap[state] = true
}

var result []*JobRunWithDetails
for _, run := range j {
if stateMap[run.State] {
result = append(result, run)
}
}
sort.Slice(result, func(i, j int) bool {
return result[i].ScheduledAt.Before(result[j].ScheduledAt)
})
return result
}

func (j JobRunDetailsList) FilterRunsManagedByReplay(runs []*JobRunStatus) JobRunDetailsList {
runMap := make(map[time.Time]bool, len(runs))
for _, state := range runs {
runMap[state.ScheduledAt] = true
}

var result []*JobRunWithDetails
for _, run := range j {
if runMap[run.ScheduledAt] {
result = append(result, run)
}
}
return result
}

type JobRunStatusList []*JobRunStatus

func (j JobRunStatusList) GetSortedRunsByStates(states []State) []*JobRunStatus {
Expand Down
Loading
Loading