Skip to content

Commit

Permalink
fix: redesign issues (#707)
Browse files Browse the repository at this point in the history
* fix: empty upstream job id issue

* fix: single static internal upstream resolver issue causing error in job inspect

* feat: summarize all error messages when job replace all finished

* fix: issue when job is not found when inspecting

* fix: wrap job not found error in repository to also mention the subject job

* fix: handle empty job schedule end times at scheduler job provider repo

* fix: make end date optional when storing to job table

* fix: omit endDate from schedule when it is null

Co-authored-by: Yash Bhardwaj <[email protected]>
  • Loading branch information
arinda-arif and Mryashbhardwaj authored Jan 6, 2023
1 parent 794803c commit 2e77c97
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 41 deletions.
13 changes: 10 additions & 3 deletions core/job/handler/v1beta1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (jh *JobHandler) GetJobSpecification(ctx context.Context, req *pb.GetJobSpe
}

jobSpec, err := jh.jobService.Get(ctx, jobTenant, jobName)
if err != nil {
if err != nil && !errors.IsErrorType(err, errors.ErrNotFound) {
errorMsg := "failed to get job specification"
jh.l.Error(fmt.Sprintf("%s: %s", err.Error(), errorMsg))
return nil, errors.GRPCErr(err, errorMsg)
Expand Down Expand Up @@ -258,6 +258,7 @@ func (*JobHandler) GetWindow(_ context.Context, req *pb.GetWindowRequest) (*pb.G
func (jh *JobHandler) ReplaceAllJobSpecifications(stream pb.JobSpecificationService_ReplaceAllJobSpecificationsServer) error {
responseWriter := writer.NewReplaceAllJobSpecificationsResponseWriter(stream)
var errNamespaces []string
var errMessages []string

for {
request, err := stream.Recv()
Expand All @@ -272,10 +273,11 @@ func (jh *JobHandler) ReplaceAllJobSpecifications(stream pb.JobSpecificationServ

jobTenant, err := tenant.NewTenant(request.ProjectName, request.NamespaceName)
if err != nil {
errMsg := fmt.Sprintf("invalid replace all job specifications request for %s: %s", request.GetNamespaceName(), err.Error())
errMsg := fmt.Sprintf("[%s] invalid replace all job specifications request: %s", request.GetNamespaceName(), err.Error())
jh.l.Error(errMsg)
responseWriter.Write(writer.LogLevelError, errMsg)
errNamespaces = append(errNamespaces, request.NamespaceName)
errMessages = append(errMessages, errMsg)
continue
}

Expand All @@ -285,16 +287,21 @@ func (jh *JobHandler) ReplaceAllJobSpecifications(stream pb.JobSpecificationServ
jh.l.Error(errMsg)
responseWriter.Write(writer.LogLevelError, errMsg)
errNamespaces = append(errNamespaces, request.NamespaceName)
errMessages = append(errMessages, errMsg)
}

if err := jh.jobService.ReplaceAll(stream.Context(), jobTenant, jobSpecs, jobNamesWithValidationErrors, responseWriter); err != nil {
errMsg := fmt.Sprintf("replace all job specifications failure for namespace %s: %s", request.NamespaceName, err.Error())
errMsg := fmt.Sprintf("[%s] replace all job specifications failure: %s", request.NamespaceName, err.Error())
jh.l.Error(errMsg)
responseWriter.Write(writer.LogLevelError, errMsg)
errNamespaces = append(errNamespaces, request.NamespaceName)
errMessages = append(errMessages, errMsg)
}
}
if len(errNamespaces) > 0 {
errMessageSummary := strings.Join(errMessages, "\n")
responseWriter.Write(writer.LogLevelError, fmt.Sprintf("\njob replace all finished with errors:\n%s", errMessageSummary))

namespacesWithError := strings.Join(errNamespaces, ", ")
return fmt.Errorf("error when replacing job specifications: [%s]", namespacesWithError)
}
Expand Down
6 changes: 3 additions & 3 deletions core/job/handler/v1beta1/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ func TestNewJobHandler(t *testing.T) {

jobService.On("ReplaceAll", ctx, sampleTenant, mock.Anything, []job.Name{"job-A"}, mock.Anything).Return(nil)

stream.On("Send", mock.AnythingOfType("*optimus.ReplaceAllJobSpecificationsResponse")).Return(nil).Twice()
stream.On("Send", mock.AnythingOfType("*optimus.ReplaceAllJobSpecificationsResponse")).Return(nil).Times(3)

err := jobHandler.ReplaceAllJobSpecifications(stream)
assert.ErrorContains(t, err, "error when replacing job specifications")
Expand Down Expand Up @@ -941,7 +941,7 @@ func TestNewJobHandler(t *testing.T) {

jobService.On("ReplaceAll", ctx, sampleTenant, mock.Anything, jobNamesWithValidationError, mock.Anything).Return(nil)

stream.On("Send", mock.AnythingOfType("*optimus.ReplaceAllJobSpecificationsResponse")).Return(nil).Times(3)
stream.On("Send", mock.AnythingOfType("*optimus.ReplaceAllJobSpecificationsResponse")).Return(nil).Times(4)

err := jobHandler.ReplaceAllJobSpecifications(stream)
assert.Error(t, err)
Expand Down Expand Up @@ -990,7 +990,7 @@ func TestNewJobHandler(t *testing.T) {

jobService.On("ReplaceAll", ctx, sampleTenant, mock.Anything, jobNamesWithValidationError, mock.Anything).Return(errors.New("internal error"))

stream.On("Send", mock.AnythingOfType("*optimus.ReplaceAllJobSpecificationsResponse")).Return(nil).Twice()
stream.On("Send", mock.AnythingOfType("*optimus.ReplaceAllJobSpecificationsResponse")).Return(nil).Times(3)

err := jobHandler.ReplaceAllJobSpecifications(stream)
assert.Error(t, err)
Expand Down
8 changes: 6 additions & 2 deletions core/job/resolver/internal_upstream_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ func NewInternalUpstreamResolver(jobRepository JobRepository) *internalUpstreamR

func (i internalUpstreamResolver) Resolve(ctx context.Context, jobWithUnresolvedUpstream *job.WithUpstream) (*job.WithUpstream, error) {
me := errors.NewMultiError("internal upstream resolution errors")

internalUpstreamInferred, err := i.resolveInferredUpstream(ctx, jobWithUnresolvedUpstream.Job().Sources())
me.Append(err)

internalUpstreamStatic, err := i.resolveStaticUpstream(ctx, jobWithUnresolvedUpstream.Job().Tenant().ProjectName(), jobWithUnresolvedUpstream.Job().Spec().UpstreamSpec())
me.Append(err)
var internalUpstreamStatic []*job.Upstream
if staticUpstreamSpec := jobWithUnresolvedUpstream.Job().Spec().UpstreamSpec(); staticUpstreamSpec != nil {
internalUpstreamStatic, err = i.resolveStaticUpstream(ctx, jobWithUnresolvedUpstream.Job().Tenant().ProjectName(), staticUpstreamSpec)
me.Append(err)
}

internalUpstream := mergeUpstreams(internalUpstreamInferred, internalUpstreamStatic)
fullNameUpstreamMap := job.Upstreams(internalUpstream).ToFullNameAndUpstreamMap()
Expand Down
43 changes: 39 additions & 4 deletions core/job/resolver/internal_upstream_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,61 @@ func TestInternalUpstreamResolver(t *testing.T) {
unresolvedUpstreamD := job.NewUpstreamUnresolvedInferred("resource-D")

t.Run("Resolve", func(t *testing.T) {
t.Run("resolves upstream internally", func(t *testing.T) {
t.Run("resolves inferred and static upstream internally", func(t *testing.T) {
jobRepo := new(JobRepository)

logWriter := new(mockWriter)
defer logWriter.AssertExpectations(t)

jobRepo.On("GetAllByResourceDestination", ctx, jobASources[0]).Return([]*job.Job{jobB}, nil)
jobRepo.On("GetAllByResourceDestination", ctx, jobASources[1]).Return([]*job.Job{}, nil)

jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)

jobWithUnresolvedUpstream := job.NewWithUpstream(jobA, []*job.Upstream{unresolvedUpstreamB, unresolvedUpstreamC, unresolvedUpstreamD})

expectedJobWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{internalUpstreamB, internalUpstreamC, unresolvedUpstreamD})

internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo)
result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream)
assert.NoError(t, err)
assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
})
t.Run("resolves inferred upstream internally", func(t *testing.T) {
jobRepo := new(JobRepository)
logWriter := new(mockWriter)
defer logWriter.AssertExpectations(t)

specX, _ := job.NewSpecBuilder(jobVersion, "job-X", "sample-owner", jobSchedule, jobWindow, jobTask).Build()
jobXDestination := job.ResourceURN("resource-X")
jobX := job.NewJob(sampleTenant, specX, jobXDestination, []job.ResourceURN{"resource-B"})

jobRepo.On("GetAllByResourceDestination", ctx, jobX.Sources()[0]).Return([]*job.Job{jobB}, nil)

jobWithUnresolvedUpstream := job.NewWithUpstream(jobX, []*job.Upstream{unresolvedUpstreamB})
expectedJobWithUpstream := job.NewWithUpstream(jobX, []*job.Upstream{internalUpstreamB})

internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo)
result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream)
assert.NoError(t, err)
assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
})
t.Run("resolves static upstream internally", func(t *testing.T) {
jobRepo := new(JobRepository)
logWriter := new(mockWriter)
defer logWriter.AssertExpectations(t)

specX, _ := job.NewSpecBuilder(jobVersion, "job-X", "sample-owner", jobSchedule, jobWindow, jobTask).WithSpecUpstream(upstreamSpec).Build()
jobXDestination := job.ResourceURN("resource-X")
jobX := job.NewJob(sampleTenant, specX, jobXDestination, nil)

jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)

jobWithUnresolvedUpstream := job.NewWithUpstream(jobX, []*job.Upstream{unresolvedUpstreamC})
expectedJobWithUpstream := job.NewWithUpstream(jobX, []*job.Upstream{internalUpstreamC})

internalUpstreamResolver := resolver.NewInternalUpstreamResolver(jobRepo)
result, err := internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream)
assert.NoError(t, err)
assert.EqualValues(t, expectedJobWithUpstream.Upstreams(), result.Upstreams())
})
t.Run("should not stop the process but keep appending error when unable to resolve inferred upstream", func(t *testing.T) {
jobRepo := new(JobRepository)

Expand Down
12 changes: 3 additions & 9 deletions core/job/service/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,10 @@ func (j JobService) Get(ctx context.Context, jobTenant tenant.Tenant, jobName jo
if err != nil {
return nil, err
}
if len(jobs) > 0 {
return jobs[0], nil
if len(jobs) == 0 {
return nil, errors.NotFound(job.EntityJob, fmt.Sprintf("job %s is not found", jobName))
}

// TODO: should not return dummy job
return &job.Job{}, nil
return jobs[0], nil
}

func (j JobService) GetTaskInfo(ctx context.Context, task job.Task) (*plugin.Info, error) {
Expand Down Expand Up @@ -599,10 +597,6 @@ func (j JobService) GetJobBasicInfo(ctx context.Context, jobTenant tenant.Tenant
logger.Write(writer.LogLevelError, fmt.Sprintf("unable to get job, err: %v", err))
return nil, logger
}
if subjectJob == nil {
logger.Write(writer.LogLevelError, fmt.Sprintf("job %s not found in the server", jobName.String()))
return nil, logger
}
}

if len(subjectJob.Sources()) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions core/job/service/job_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2156,11 +2156,11 @@ func TestJobService(t *testing.T) {

specA, _ := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).Build()

jobRepo.On("GetByJobName", ctx, project.Name(), specA.Name()).Return(nil, nil)
jobRepo.On("GetByJobName", ctx, project.Name(), specA.Name()).Return(nil, errors.New("job not found"))

jobService := service.NewJobService(jobRepo, pluginService, upstreamResolver, tenantDetailsGetter, log)
result, logger := jobService.GetJobBasicInfo(ctx, sampleTenant, specA.Name(), nil)
assert.Contains(t, logger.Messages[0].Message, "job job-A not found in the server")
assert.Contains(t, logger.Messages[0].Message, "job not found")
assert.Nil(t, result)
})
t.Run("should write log if found existing job with same resource destination", func(t *testing.T) {
Expand Down
20 changes: 9 additions & 11 deletions internal/store/postgres/job/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Spec struct {

type Schedule struct {
StartDate time.Time
EndDate *time.Time
EndDate *time.Time `json:",omitempty"`
Interval string
DependsOnPast bool
CatchUp bool
Expand Down Expand Up @@ -253,14 +253,6 @@ func toStorageSchedule(scheduleSpec *job.Schedule) ([]byte, error) {
return nil, err
}

var endDate time.Time
if scheduleSpec.EndDate() != "" {
endDate, err = time.Parse(jobDatetimeLayout, scheduleSpec.EndDate().String())
if err != nil {
return nil, err
}
}

var retry *Retry
if scheduleSpec.Retry() != nil {
retry = &Retry{
Expand All @@ -272,12 +264,18 @@ func toStorageSchedule(scheduleSpec *job.Schedule) ([]byte, error) {

schedule := Schedule{
StartDate: startDate,
EndDate: &endDate,
Interval: scheduleSpec.Interval(),
DependsOnPast: scheduleSpec.DependsOnPast(),
CatchUp: scheduleSpec.CatchUp(),
Retry: retry,
}
if scheduleSpec.EndDate() != "" {
endDate, err := time.Parse(jobDatetimeLayout, scheduleSpec.EndDate().String())
if err != nil {
return nil, err
}
schedule.EndDate = &endDate
}
return json.Marshal(schedule)
}

Expand Down Expand Up @@ -468,7 +466,7 @@ func fromStorageSchedule(raw []byte) (*job.Schedule, error) {
WithDependsOnPast(storageSchedule.DependsOnPast).
WithInterval(storageSchedule.Interval)

if !storageSchedule.EndDate.IsZero() {
if storageSchedule.EndDate != nil && !storageSchedule.EndDate.IsZero() {
endDate, err := job.ScheduleDateFrom(storageSchedule.EndDate.Format(job.DateLayout))
if err != nil {
return nil, err
Expand Down
8 changes: 6 additions & 2 deletions internal/store/postgres/job/job_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ func (j JobRepository) get(ctx context.Context, projectName tenant.ProjectName,
getJobByNameAtProject += jobDeletedFilter
}

return FromRow(j.db.QueryRow(ctx, getJobByNameAtProject, jobName.String(), projectName.String()))
spec, err := FromRow(j.db.QueryRow(ctx, getJobByNameAtProject, jobName.String(), projectName.String()))
if errors.IsErrorType(err, errors.ErrNotFound) {
err = errors.NotFound(job.EntityJob, fmt.Sprintf("unable to get job %s", jobName))
}
return spec, err
}

func (j JobRepository) ResolveUpstreams(ctx context.Context, projectName tenant.ProjectName, jobNames []job.Name) (map[job.Name][]*job.Upstream, error) {
Expand Down Expand Up @@ -516,7 +520,7 @@ INSERT INTO job_upstream (
)
VALUES (
(select id FROM job WHERE name = $1 and project_name = $2), $1, $2,
(select id FROM job WHERE name = $3 and project_name = $4), $3, $4,
(select id FROM job WHERE name = $3 and project_name = $5), $3, $4,
$5, $6, $7,
$8, $9,
$10, $11,
Expand Down
24 changes: 20 additions & 4 deletions internal/store/postgres/job/job_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,15 +769,26 @@ func TestPostgresJobRepository(t *testing.T) {
assert.NoError(t, err)
jobA := job.NewJob(sampleTenant, jobSpecA, "dev.resource.sample_a", []job.ResourceURN{"dev.resource.sample_b"})

jobSpecX, err := job.NewSpecBuilder(jobVersion, "sample-job-X", jobOwner, jobSchedule, jobWindow, jobTask).WithDescription(jobDescription).Build()
assert.NoError(t, err)
jobX := job.NewJob(sampleTenant, jobSpecX, "dev.resource.sample_x", []job.ResourceURN{"dev.resource.sample_a"})

jobRepo := postgres.NewJobRepository(db)

addedJob, err := jobRepo.Add(ctx, []*job.Job{jobA})
addedJob, err := jobRepo.Add(ctx, []*job.Job{jobA, jobX})
assert.NoError(t, err)
assert.NotNil(t, addedJob)

upstreamCInferred := job.NewUpstreamResolved("sample-job-B", "host-1", "dev.resource.sample_b", sampleTenant, "inferred", taskName, false)
jobAWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{upstreamCInferred})
err = jobRepo.ReplaceUpstreams(ctx, []*job.WithUpstream{jobAWithUpstream})
upstreamBInferred := job.NewUpstreamResolved("sample-job-B", "host-1", "dev.resource.sample_b", sampleTenant, "inferred", taskName, false)
jobAWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{upstreamBInferred})
upstreamAInferred := job.NewUpstreamResolved("sample-job-A", "host-1", "dev.resource.sample_a", sampleTenant, "inferred", taskName, false)
jobXWithUpstream := job.NewWithUpstream(jobX, []*job.Upstream{upstreamAInferred})

err = jobRepo.ReplaceUpstreams(ctx, []*job.WithUpstream{jobAWithUpstream, jobXWithUpstream})
assert.NoError(t, err)

upstreams, err := jobRepo.GetUpstreams(ctx, proj.Name(), jobSpecX.Name())
assert.Equal(t, 1, len(upstreams))
assert.NoError(t, err)

err = jobRepo.Delete(ctx, proj.Name(), jobSpecA.Name(), true)
Expand All @@ -787,6 +798,11 @@ func TestPostgresJobRepository(t *testing.T) {
addedJob, err = jobRepo.Add(ctx, []*job.Job{jobA})
assert.NoError(t, err)
assert.NotNil(t, addedJob)

// data in upstream should already be deleted
upstreams, err = jobRepo.GetUpstreams(ctx, proj.Name(), jobSpecX.Name())
assert.Equal(t, 0, len(upstreams))
assert.NoError(t, err)
})
})

Expand Down
2 changes: 1 addition & 1 deletion internal/store/postgres/scheduler/job_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (j *Job) toJobWithDetails() (*scheduler.JobWithDetails, error) {
Interval: storageSchedule.Interval,
},
}
if !storageSchedule.EndDate.IsZero() {
if !(storageSchedule.EndDate == nil || storageSchedule.EndDate.IsZero()) {
schedulerJobWithDetails.Schedule.EndDate = storageSchedule.EndDate
}

Expand Down

0 comments on commit 2e77c97

Please sign in to comment.