Skip to content

Commit

Permalink
fix: typos and add logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryashbhardwaj committed Oct 21, 2024
1 parent 492f2a3 commit a4fc0af
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
6 changes: 3 additions & 3 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,14 @@ func (r *ReplayService) cancelReplayRuns(ctx context.Context, replayWithRun *sch
return err
}

jobRunsRunsWithDetails, err := r.executor.FetchRunsWithDetails(ctx, replay, jobCron)
jobRunsWithDetails, err := r.executor.FetchRunsWithDetails(ctx, replay, jobCron)
if err != nil {
r.logger.Error("unable to sync replay runs status for job [%s]: %s", jobName.String(), err.Error())
return err
}
r.logger.Debug(fmt.Sprintf("Synced Run status from Airflow : %#v", jobRunsRunsWithDetails))
r.logger.Debug(fmt.Sprintf("Synced Run status from Airflow : %#v", jobRunsWithDetails))

filteredRunsMangedByReplay := jobRunsRunsWithDetails.FilterRunsManagedByReplay(replayWithRun.Runs)
filteredRunsMangedByReplay := jobRunsWithDetails.FilterRunsManagedByReplay(replayWithRun.Runs)

statesForCanceling := []scheduler.State{scheduler.StateRunning, scheduler.StateUpForRetry, scheduler.StateQueued, scheduler.StateRestarting}
toBeCanceledRuns := filteredRunsMangedByReplay.GetSortedRunsByStates(statesForCanceling)
Expand Down
4 changes: 3 additions & 1 deletion core/scheduler/service/replay_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (w *ReplayWorker) CancelReplayRunsOnScheduler(ctx context.Context, replay *
for _, run := range runs {
runState := scheduler.JobRunStatus{
ScheduledAt: run.ScheduledAt,
State: run.State,
State: scheduler.StateCanceled,
}
logicalTime := runState.GetLogicalTime(jobCron)

Expand Down Expand Up @@ -401,13 +401,15 @@ func (w *ReplayWorker) getRequestsToProcess(ctx context.Context, replays []*sche
if lag.Seconds() > maxLag {
maxLag = lag.Seconds()
}
w.logger.Info(fmt.Sprintf("trying to acquired replay request with ID: %s", replay.ID()))
err := w.replayRepo.AcquireReplayRequest(ctx, replay.ID(), unhandledClassifierDuration)
if err != nil {
if errors.IsErrorType(err, errors.ErrNotFound) {
continue
}
w.logger.Error("unable to acquire lock on replay request err: %s", err.Error())
}
w.logger.Info(fmt.Sprintf("successfully acquired replay request with ID: %s", replay.ID()))
requestsToProcess = append(requestsToProcess, replay)
}
replayReqLag.Set(maxLag)
Expand Down

0 comments on commit a4fc0af

Please sign in to comment.