Skip to content

Commit

Permalink
fix: issue when getting interval (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
irainia authored Sep 14, 2023
1 parent 1bcc348 commit 394c19d
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 11 deletions.
4 changes: 2 additions & 2 deletions core/scheduler/service/job_run_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type EventHandler interface {
}

type ProjectGetter interface {
GetByName(context.Context, tenant.ProjectName) (*tenant.Project, error)
Get(context.Context, tenant.ProjectName) (*tenant.Project, error)
}

type JobRunService struct {
Expand Down Expand Up @@ -179,7 +179,7 @@ func (s *JobRunService) GetJobRuns(ctx context.Context, projectName tenant.Proje
}

func (s *JobRunService) GetInterval(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, referenceTime time.Time) (window.Interval, error) {
project, err := s.projectGetter.GetByName(ctx, projectName)
project, err := s.projectGetter.Get(ctx, projectName)
if err != nil {
s.l.Error("error getting project [%s]: %s", projectName, err)
return window.Interval{}, err
Expand Down
8 changes: 4 additions & 4 deletions core/scheduler/service/job_run_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ func TestJobRunService(t *testing.T) {
projectGetter := new(mockProjectGetter)
defer projectGetter.AssertExpectations(t)

projectGetter.On("GetByName", ctx, projName).Return(nil, errors.NewError(errors.ErrInternalError, tenant.EntityProject, "unexpected error"))
projectGetter.On("Get", ctx, projName).Return(nil, errors.NewError(errors.ErrInternalError, tenant.EntityProject, "unexpected error"))

service := service.NewJobRunService(logger, nil, nil, nil, nil, nil, nil, nil, nil, projectGetter)

Expand All @@ -1386,7 +1386,7 @@ func TestJobRunService(t *testing.T) {
jobRepo := new(JobRepository)
defer jobRepo.AssertExpectations(t)

projectGetter.On("GetByName", ctx, projName).Return(project, nil)
projectGetter.On("Get", ctx, projName).Return(project, nil)

jobRepo.On("GetJobDetails", ctx, projName, jobName).Return(nil, errors.NewError(errors.ErrInternalError, job.EntityJob, "unexpected error"))

Expand All @@ -1405,7 +1405,7 @@ func TestJobRunService(t *testing.T) {
jobRepo := new(JobRepository)
defer jobRepo.AssertExpectations(t)

projectGetter.On("GetByName", ctx, projName).Return(project, nil)
projectGetter.On("Get", ctx, projName).Return(project, nil)

windowConfig, err := window.NewPresetConfig("yesterday")
assert.NotNil(t, windowConfig)
Expand Down Expand Up @@ -1628,7 +1628,7 @@ type mockProjectGetter struct {
mock.Mock
}

func (m *mockProjectGetter) GetByName(ctx context.Context, projectName tenant.ProjectName) (*tenant.Project, error) {
func (m *mockProjectGetter) Get(ctx context.Context, projectName tenant.ProjectName) (*tenant.Project, error) {
args := m.Called(ctx, projectName)
if args.Get(0) == nil {
return nil, args.Error(1)
Expand Down
8 changes: 4 additions & 4 deletions ext/scheduler/airflow/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,16 @@ def __init__(self, optimus_client: OptimusAPIClient, project_name: str, job_name
def get(self, scheduled_at: str) -> (datetime, datetime):
api_response = self._fetch_task_window(scheduled_at)
return (
self._parse_datetime(api_response['start']),
self._parse_datetime(api_response['end']),
self._parse_datetime(api_response['startTime']),
self._parse_datetime(api_response['endTime']),
)

# window start is inclusive
def get_schedule_window(self, scheduled_at: str, upstream_schedule: str) -> (str, str):
api_response = self._fetch_task_window(scheduled_at)

schedule_time_window_start = self._parse_datetime(api_response['start_time'])
schedule_time_window_end = self._parse_datetime(api_response['end_time'])
schedule_time_window_start = self._parse_datetime(api_response['startTime'])
schedule_time_window_end = self._parse_datetime(api_response['endTime'])

job_cron_iter = croniter(upstream_schedule, schedule_time_window_start)
schedule_time_window_inclusive_start = job_cron_iter.get_next(datetime)
Expand Down
2 changes: 1 addition & 1 deletion server/optimus.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (s *OptimusServer) setupHandlers() error {

newJobRunService := schedulerService.NewJobRunService(
s.logger, jobProviderRepo, jobRunRepo, replayRepository, operatorRunRepository,
newScheduler, newPriorityResolver, jobInputCompiler, s.eventHandler, tProjectRepo,
newScheduler, newPriorityResolver, jobInputCompiler, s.eventHandler, tProjectService,
)

// Job Bounded Context Setup
Expand Down

0 comments on commit 394c19d

Please sign in to comment.