Skip to content

Commit

Permalink
Replay cancel scheduled cleared reruns (#283)
Browse files Browse the repository at this point in the history
* fix: replay do not cancel non replay managed runs, also cancel cleared runs(one's that were originally scheduled but cleared by replay)

* fix: typos and add logs
  • Loading branch information
Mryashbhardwaj authored and deryrahman committed Oct 22, 2024
1 parent cdd1e39 commit dff226f
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 41 deletions.
17 changes: 9 additions & 8 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ type ReplayValidator interface {

type ReplayExecutor interface {
Execute(ctx context.Context, replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName)
SyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error)
CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunStatus) []*scheduler.JobRunStatus
FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error)
FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error)
CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus
}

type ReplayService struct {
Expand Down Expand Up @@ -225,8 +226,6 @@ func (r *ReplayService) GetRunsStatus(ctx context.Context, tenant tenant.Tenant,
}

func (r *ReplayService) cancelReplayRuns(ctx context.Context, replayWithRun *scheduler.ReplayWithRun) error {
// get list of in progress runs
// stop them on the scheduler
replay := replayWithRun.Replay
jobName := replay.JobName()
jobCron, err := getJobCron(ctx, r.logger, r.jobRepo, replay.Tenant(), jobName)
Expand All @@ -235,15 +234,17 @@ func (r *ReplayService) cancelReplayRuns(ctx context.Context, replayWithRun *sch
return err
}

syncedRunStatus, err := r.executor.SyncStatus(ctx, replayWithRun, jobCron)
jobRunsWithDetails, err := r.executor.FetchRunsWithDetails(ctx, replay, jobCron)
if err != nil {
r.logger.Error("unable to sync replay runs status for job [%s]: %s", jobName.String(), err.Error())
return err
}
r.logger.Debug(fmt.Sprintf("Synced Run status from Airflow : %#v", syncedRunStatus))
r.logger.Debug(fmt.Sprintf("Synced Run status from Airflow : %#v", jobRunsWithDetails))

statesForCanceling := []scheduler.State{scheduler.StateRunning, scheduler.StateInProgress, scheduler.StateQueued}
toBeCanceledRuns := syncedRunStatus.GetSortedRunsByStates(statesForCanceling)
filteredRunsMangedByReplay := jobRunsWithDetails.FilterRunsManagedByReplay(replayWithRun.Runs)

statesForCanceling := []scheduler.State{scheduler.StateRunning, scheduler.StateUpForRetry, scheduler.StateQueued, scheduler.StateRestarting}
toBeCanceledRuns := filteredRunsMangedByReplay.GetSortedRunsByStates(statesForCanceling)
if len(toBeCanceledRuns) == 0 {
return nil
}
Expand Down
132 changes: 129 additions & 3 deletions core/scheduler/service/replay_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,128 @@ func TestReplayService(t *testing.T) {

replayWorker := new(ReplayExecutor)
defer replayWorker.AssertExpectations(t)
replayWorker.On("SyncStatus", ctx, replayWithRun, jobCron).Return(scheduler.JobRunStatusList{}, nil)
replayWorker.On("FetchRunsWithDetails", ctx, replay, jobCron).Return([]*scheduler.JobRunWithDetails{}, nil)

replayService := service.NewReplayService(replayRepository, jobRepository, nil, nil, replayWorker, nil, logger, nil, nil)
err := replayService.CancelReplay(ctx, replayWithRun)
assert.NoError(t, err)
})

t.Run("returns no error if replay has been successfully cancelled with runs to be canceled", func(t *testing.T) {
replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, startTime, message)
replayWithRun := &scheduler.ReplayWithRun{
Replay: replay,
Runs: []*scheduler.JobRunStatus{
{
ScheduledAt: startTime,
State: scheduler.StatePending,
},
},
}
replayRepository := new(ReplayRepository)
defer replayRepository.AssertExpectations(t)
replayRepository.On("UpdateReplayStatus", mock.Anything, replay.ID(), scheduler.ReplayStateCancelled, mock.Anything).Return(nil).Once()

jobRepository := new(JobRepository)
defer jobRepository.AssertExpectations(t)
jobRepository.On("GetJobDetails", mock.Anything, projName, jobName).Return(jobWithDetails, nil)

replayWorker := new(ReplayExecutor)
defer replayWorker.AssertExpectations(t)
jobRunWithDetails := []*scheduler.JobRunWithDetails{
{
ScheduledAt: startTime,
State: scheduler.StatePending,
RunType: "scheduled",
ExternalTrigger: false,
DagRunID: "scheduled_some_date",
DagID: jobName.String(),
},
}
replayWorker.On("FetchRunsWithDetails", ctx, replay, jobCron).Return(jobRunWithDetails, nil)

replayService := service.NewReplayService(replayRepository, jobRepository, nil, nil, replayWorker, nil, logger, nil, nil)
err := replayService.CancelReplay(ctx, replayWithRun)
assert.NoError(t, err)
})

t.Run("returns no error if replay has been successfully cancelled with runs in queued and running state", func(t *testing.T) {
replay := scheduler.NewReplay(replayID, jobName, tnnt, replayConfig, scheduler.ReplayStateInProgress, startTime, startTime, message)
replayWithRun := &scheduler.ReplayWithRun{
Replay: replay,
Runs: []*scheduler.JobRunStatus{
{
ScheduledAt: startTime,
State: scheduler.StatePending, // this state won't matter only the scheduler state matters
},
{
ScheduledAt: startTime.Add(48 * time.Hour),
State: scheduler.StateRunning,
},
},
}
replayRepository := new(ReplayRepository)
defer replayRepository.AssertExpectations(t)
replayRepository.On("UpdateReplayStatus", mock.Anything, replay.ID(), scheduler.ReplayStateCancelled, mock.Anything).Return(nil).Once()

jobRepository := new(JobRepository)
defer jobRepository.AssertExpectations(t)
jobRepository.On("GetJobDetails", mock.Anything, projName, jobName).Return(jobWithDetails, nil)

replayWorker := new(ReplayExecutor)
defer replayWorker.AssertExpectations(t)
jobRunWithDetailsFromScheduler := []*scheduler.JobRunWithDetails{
{
ScheduledAt: startTime,
State: scheduler.StateQueued, // this means queued on scheduler
},
{
ScheduledAt: startTime.Add(12 * time.Hour),
State: scheduler.StateQueued, // this is non replay managed extra run from scheduler
},
{
ScheduledAt: startTime.Add(24 * time.Hour),
State: scheduler.StateRunning, // this is non replay managed extra run from scheduler
},
{
ScheduledAt: startTime.Add(48 * time.Hour),
State: scheduler.StateRunning,
},
}
replayWorker.On("FetchRunsWithDetails", ctx, replay, jobCron).Return(jobRunWithDetailsFromScheduler, nil)

filteredJobRunWithDetails := []*scheduler.JobRunWithDetails{
{
ScheduledAt: startTime,
State: scheduler.StateQueued, // this means queued on scheduler
},
{
ScheduledAt: startTime.Add(48 * time.Hour),
State: scheduler.StateRunning,
},
}
replayWorker.On("CancelReplayRunsOnScheduler", ctx, replay, jobCron, filteredJobRunWithDetails).Return([]*scheduler.JobRunStatus{
{
ScheduledAt: startTime,
State: scheduler.StateQueued, // this means queued on scheduler
},
{
ScheduledAt: startTime.Add(48 * time.Hour),
State: scheduler.StateRunning,
},
})

replayRepository.On("UpdateReplayRuns", mock.Anything, replay.ID(),
[]*scheduler.JobRunStatus{
{
ScheduledAt: startTime,
State: scheduler.StateQueued, // this means queued on scheduler
},
{
ScheduledAt: startTime.Add(48 * time.Hour),
State: scheduler.StateRunning,
},
}).Return(nil).Once()

replayService := service.NewReplayService(replayRepository, jobRepository, nil, nil, replayWorker, nil, logger, nil, nil)
err := replayService.CancelReplay(ctx, replayWithRun)
Expand Down Expand Up @@ -749,12 +870,17 @@ func (_m *ReplayExecutor) Execute(ctx context.Context, replayID uuid.UUID, jobTe
_m.Called(ctx, replayID, jobTenant, jobName)
}

func (_m *ReplayExecutor) SyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) {
func (_m *ReplayExecutor) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) {
args := _m.Called(ctx, replayWithRun, jobCron)
return args.Get(0).(scheduler.JobRunStatusList), args.Error(1)
}

func (_m *ReplayExecutor) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunStatus) []*scheduler.JobRunStatus {
func (_m *ReplayExecutor) FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error) {
args := _m.Called(ctx, replay, jobCron)
return args.Get(0).([]*scheduler.JobRunWithDetails), args.Error(1)
}

func (_m *ReplayExecutor) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus {
args := _m.Called(ctx, replay, jobCron, runs)
return args.Get(0).([]*scheduler.JobRunStatus)
}
Expand Down
34 changes: 23 additions & 11 deletions core/scheduler/service/replay_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ type ReplayScheduler interface {
Clear(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) error
ClearBatch(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, startTime, endTime time.Time) error

CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error
CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, dagRunID string) error
CreateRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error
GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
GetJobRunsWithDetails(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunWithDetails, error)
}

func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig, alertManager AlertManager) *ReplayWorker {
Expand Down Expand Up @@ -107,7 +108,7 @@ func (w *ReplayWorker) Execute(ctxBack context.Context, replayID uuid.UUID, jobT
}
}

func (w *ReplayWorker) SyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) {
func (w *ReplayWorker) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) {
incomingRuns, err := w.fetchRuns(ctx, replayWithRun, jobCron)
if err != nil {
w.logger.Error("[ReplayID: %s] unable to get incoming runs: %s", replayWithRun.Replay.ID().String(), err)
Expand Down Expand Up @@ -171,9 +172,8 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
}
}

syncedRunStatus, err := w.SyncStatus(ctx, replayWithRun, jobCron)
syncedRunStatus, err := w.FetchAndSyncStatus(ctx, replayWithRun, jobCron)
if err != nil {
// todo: lets not kill watchers on such errors
w.logger.Error("[ReplayID: %s] unable to get incoming runs: %s", replayWithRun.Replay.ID().String(), err)
return err
}
Expand Down Expand Up @@ -257,6 +257,15 @@ func (w *ReplayWorker) finishReplay(ctx context.Context, replay *scheduler.Repla
return nil
}

func (w *ReplayWorker) FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error) {
jobRunCriteria := &scheduler.JobRunsCriteria{
Name: replay.JobName().String(),
StartDate: replay.Config().StartTime.UTC(),
EndDate: replay.Config().EndTime.UTC(),
}
return w.scheduler.GetJobRunsWithDetails(ctx, replay.Tenant(), jobRunCriteria, jobCron)
}

func (w *ReplayWorker) fetchRuns(ctx context.Context, replayReq *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) {
jobRunCriteria := &scheduler.JobRunsCriteria{
Name: replayReq.Replay.JobName().String(),
Expand All @@ -266,20 +275,21 @@ func (w *ReplayWorker) fetchRuns(ctx context.Context, replayReq *scheduler.Repla
return w.scheduler.GetJobRuns(ctx, replayReq.Replay.Tenant(), jobRunCriteria, jobCron)
}

func (w *ReplayWorker) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunStatus) []*scheduler.JobRunStatus {
func (w *ReplayWorker) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus {
var canceledRuns []*scheduler.JobRunStatus
for _, run := range runs {
logicalTime := run.GetLogicalTime(jobCron)
runState := scheduler.JobRunStatus{
ScheduledAt: run.ScheduledAt,
State: scheduler.StateCanceled,
}
logicalTime := runState.GetLogicalTime(jobCron)

w.logger.Info("[ReplayID: %s] Canceling run with logical time: %s", replay.ID(), logicalTime)
if err := w.scheduler.CancelRun(ctx, replay.Tenant(), replay.JobName(), logicalTime, prefixReplayed); err != nil {
if err := w.scheduler.CancelRun(ctx, replay.Tenant(), replay.JobName(), run.DagRunID); err != nil {
w.logger.Error("[ReplayID: %s] unable to cancel job run for job: %s, Schedule Time: %s, err: %s", replay.ID(), replay.JobName(), run.ScheduledAt, err.Error())
continue
}
canceledRuns = append(canceledRuns, &scheduler.JobRunStatus{
ScheduledAt: run.ScheduledAt,
State: scheduler.StateCanceled,
})
canceledRuns = append(canceledRuns, &runState)
}
return canceledRuns
}
Expand Down Expand Up @@ -391,13 +401,15 @@ func (w *ReplayWorker) getRequestsToProcess(ctx context.Context, replays []*sche
if lag.Seconds() > maxLag {
maxLag = lag.Seconds()
}
w.logger.Info(fmt.Sprintf("trying to acquired replay request with ID: %s", replay.ID()))
err := w.replayRepo.AcquireReplayRequest(ctx, replay.ID(), unhandledClassifierDuration)
if err != nil {
if errors.IsErrorType(err, errors.ErrNotFound) {
continue
}
w.logger.Error("unable to acquire lock on replay request err: %s", err.Error())
}
w.logger.Info(fmt.Sprintf("successfully acquired replay request with ID: %s", replay.ID()))
requestsToProcess = append(requestsToProcess, replay)
}
replayReqLag.Set(maxLag)
Expand Down
9 changes: 7 additions & 2 deletions core/scheduler/service/replay_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,11 +849,16 @@ func (_m *mockReplayScheduler) CreateRun(ctx context.Context, tnnt tenant.Tenant
return r0
}

func (_m *mockReplayScheduler) CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error {
args := _m.Called(ctx, tnnt, jobName, executionTime, dagRunIDPrefix)
func (_m *mockReplayScheduler) CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, dagRunID string) error {
args := _m.Called(ctx, tnnt, jobName, dagRunID)
return args.Error(0)
}

func (_m *mockReplayScheduler) GetJobRunsWithDetails(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunWithDetails, error) {
args := _m.Called(ctx, t, criteria, jobCron)
return args.Get(0).([]*scheduler.JobRunWithDetails), args.Error(1)
}

// GetJobRuns provides a mock function with given fields: ctx, t, criteria, jobCron
func (_m *mockReplayScheduler) GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) {
ret := _m.Called(ctx, t, criteria, jobCron)
Expand Down
61 changes: 59 additions & 2 deletions core/scheduler/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ const (
StateNotScheduled State = "waiting_to_schedule"
StateWaitUpstream State = "wait_upstream"
StateInProgress State = "in_progress"

StateMissing State = "missing"
StateUpForRetry State = "up_for_retry"
StateRestarting State = "restarting"
StateMissing State = "missing"
)

type State string
Expand Down Expand Up @@ -68,6 +69,14 @@ type JobRunStatus struct {
State State
}

func (j JobRunStatus) GetState() State {
return j.State
}

func (j JobRunStatus) GetScheduledAt() time.Time {
return j.ScheduledAt
}

func JobRunStatusFrom(scheduledAt time.Time, state string) (JobRunStatus, error) {
runState, err := StateFromString(state)
if err != nil {
Expand All @@ -84,6 +93,54 @@ func (j JobRunStatus) GetLogicalTime(jobCron *cron.ScheduleSpec) time.Time {
return jobCron.Prev(j.ScheduledAt)
}

type JobRunWithDetails struct {
ScheduledAt time.Time
State State
RunType string
ExternalTrigger bool
DagRunID string
DagID string
}

func (j JobRunWithDetails) GetState() State {
return j.State
}

type JobRunDetailsList []*JobRunWithDetails

func (j JobRunDetailsList) GetSortedRunsByStates(states []State) []*JobRunWithDetails {
stateMap := make(map[State]bool, len(states))
for _, state := range states {
stateMap[state] = true
}

var result []*JobRunWithDetails
for _, run := range j {
if stateMap[run.State] {
result = append(result, run)
}
}
sort.Slice(result, func(i, j int) bool {
return result[i].ScheduledAt.Before(result[j].ScheduledAt)
})
return result
}

func (j JobRunDetailsList) FilterRunsManagedByReplay(runs []*JobRunStatus) JobRunDetailsList {
runMap := make(map[time.Time]bool, len(runs))
for _, state := range runs {
runMap[state.ScheduledAt] = true
}

var result []*JobRunWithDetails
for _, run := range j {
if runMap[run.ScheduledAt] {
result = append(result, run)
}
}
return result
}

type JobRunStatusList []*JobRunStatus

func (j JobRunStatusList) GetSortedRunsByStates(states []State) []*JobRunStatus {
Expand Down
Loading

0 comments on commit dff226f

Please sign in to comment.