From 394c19d66988ca4a55d9f2688476f4b809dd9498 Mon Sep 17 00:00:00 2001 From: Anwar Hidayat <15167551+irainia@users.noreply.github.com> Date: Thu, 14 Sep 2023 13:25:46 +0700 Subject: [PATCH] fix: issue when getting interval (#149) --- core/scheduler/service/job_run_service.go | 4 ++-- core/scheduler/service/job_run_service_test.go | 8 ++++---- ext/scheduler/airflow/__lib.py | 8 ++++---- server/optimus.go | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/scheduler/service/job_run_service.go b/core/scheduler/service/job_run_service.go index 438e66ed78..fc8b9492e6 100644 --- a/core/scheduler/service/job_run_service.go +++ b/core/scheduler/service/job_run_service.go @@ -83,7 +83,7 @@ type EventHandler interface { } type ProjectGetter interface { - GetByName(context.Context, tenant.ProjectName) (*tenant.Project, error) + Get(context.Context, tenant.ProjectName) (*tenant.Project, error) } type JobRunService struct { @@ -179,7 +179,7 @@ func (s *JobRunService) GetJobRuns(ctx context.Context, projectName tenant.Proje } func (s *JobRunService) GetInterval(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, referenceTime time.Time) (window.Interval, error) { - project, err := s.projectGetter.GetByName(ctx, projectName) + project, err := s.projectGetter.Get(ctx, projectName) if err != nil { s.l.Error("error getting project [%s]: %s", projectName, err) return window.Interval{}, err diff --git a/core/scheduler/service/job_run_service_test.go b/core/scheduler/service/job_run_service_test.go index 99bcd3472f..f4ff129648 100644 --- a/core/scheduler/service/job_run_service_test.go +++ b/core/scheduler/service/job_run_service_test.go @@ -1369,7 +1369,7 @@ func TestJobRunService(t *testing.T) { projectGetter := new(mockProjectGetter) defer projectGetter.AssertExpectations(t) - projectGetter.On("GetByName", ctx, projName).Return(nil, errors.NewError(errors.ErrInternalError, tenant.EntityProject, "unexpected error")) + projectGetter.On("Get", ctx, projName).Return(nil, errors.NewError(errors.ErrInternalError, tenant.EntityProject, "unexpected error")) service := service.NewJobRunService(logger, nil, nil, nil, nil, nil, nil, nil, nil, projectGetter) @@ -1386,7 +1386,7 @@ func TestJobRunService(t *testing.T) { jobRepo := new(JobRepository) defer jobRepo.AssertExpectations(t) - projectGetter.On("GetByName", ctx, projName).Return(project, nil) + projectGetter.On("Get", ctx, projName).Return(project, nil) jobRepo.On("GetJobDetails", ctx, projName, jobName).Return(nil, errors.NewError(errors.ErrInternalError, job.EntityJob, "unexpected error")) @@ -1405,7 +1405,7 @@ func TestJobRunService(t *testing.T) { jobRepo := new(JobRepository) defer jobRepo.AssertExpectations(t) - projectGetter.On("GetByName", ctx, projName).Return(project, nil) + projectGetter.On("Get", ctx, projName).Return(project, nil) windowConfig, err := window.NewPresetConfig("yesterday") assert.NotNil(t, windowConfig) @@ -1628,7 +1628,7 @@ type mockProjectGetter struct { mock.Mock } -func (m *mockProjectGetter) GetByName(ctx context.Context, projectName tenant.ProjectName) (*tenant.Project, error) { +func (m *mockProjectGetter) Get(ctx context.Context, projectName tenant.ProjectName) (*tenant.Project, error) { args := m.Called(ctx, projectName) if args.Get(0) == nil { return nil, args.Error(1) diff --git a/ext/scheduler/airflow/__lib.py b/ext/scheduler/airflow/__lib.py index 7346a5bf9d..eb10e978f8 100644 --- a/ext/scheduler/airflow/__lib.py +++ b/ext/scheduler/airflow/__lib.py @@ -153,16 +153,16 @@ def __init__(self, optimus_client: OptimusAPIClient, project_name: str, job_name def get(self, scheduled_at: str) -> (datetime, datetime): api_response = self._fetch_task_window(scheduled_at) return ( - self._parse_datetime(api_response['start']), - self._parse_datetime(api_response['end']), + self._parse_datetime(api_response['startTime']), + self._parse_datetime(api_response['endTime']), ) # window start is inclusive def get_schedule_window(self, scheduled_at: str, upstream_schedule: str) -> (str, str): api_response = self._fetch_task_window(scheduled_at) - schedule_time_window_start = self._parse_datetime(api_response['start_time']) - schedule_time_window_end = self._parse_datetime(api_response['end_time']) + schedule_time_window_start = self._parse_datetime(api_response['startTime']) + schedule_time_window_end = self._parse_datetime(api_response['endTime']) job_cron_iter = croniter(upstream_schedule, schedule_time_window_start) schedule_time_window_inclusive_start = job_cron_iter.get_next(datetime) diff --git a/server/optimus.go b/server/optimus.go index 3261791636..36e8450195 100644 --- a/server/optimus.go +++ b/server/optimus.go @@ -331,7 +331,7 @@ func (s *OptimusServer) setupHandlers() error { newJobRunService := schedulerService.NewJobRunService( s.logger, jobProviderRepo, jobRunRepo, replayRepository, operatorRunRepository, - newScheduler, newPriorityResolver, jobInputCompiler, s.eventHandler, tProjectRepo, + newScheduler, newPriorityResolver, jobInputCompiler, s.eventHandler, tProjectService, ) // Job Bounded Context Setup