Skip to content

Commit

Permalink
fix: upload to scheduler (#710)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryashbhardwaj authored Jan 6, 2023
1 parent 56c8645 commit a72d635
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 48 deletions.
11 changes: 3 additions & 8 deletions client/cmd/scheduler/upload_all.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package scheduler

import (
"fmt"
"time"

"github.com/MakeNowJust/heredoc"
Expand Down Expand Up @@ -61,14 +60,10 @@ func (u *uploadCommand) RunE(_ *cobra.Command, _ []string) error {
u.logger.Info("Uploading jobs for project " + u.clientConfig.Project.Name)
u.logger.Info("please wait...")

uploadResponse, err := u.sendUploadAllRequest(u.clientConfig.Project.Name)
_, err := u.sendUploadAllRequest(u.clientConfig.Project.Name)
u.logger.Info("Finished uploading to scheduler")
if err != nil {
return fmt.Errorf("request failed for project %s: %w", u.clientConfig.Project.Name, err)
}
if uploadResponse.Status {
u.logger.Info("Uploaded jobs for project " + u.clientConfig.Project.Name)
} else {
u.logger.Error("Error Uploading jobs for project: "+u.clientConfig.Project.Name+", error: ", uploadResponse.ErrorMessage)
u.logger.Error("With %v", err.Error())
}
return nil
}
Expand Down
7 changes: 2 additions & 5 deletions core/scheduler/handler/v1beta1/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,9 @@ func (h JobRunHandler) UploadToScheduler(ctx context.Context, req *pb.UploadToSc
}
err = h.service.UploadToScheduler(ctx, projectName, req.GetNamespaceName())
if err != nil {
return nil, errors.GRPCErr(err, "unsuccessful upload to scheduler for "+projectName.String())
return nil, errors.GRPCErr(err, "\nuploaded to scheduler with error")
}
return &pb.UploadToSchedulerResponse{
Status: true,
ErrorMessage: "",
}, nil
return &pb.UploadToSchedulerResponse{}, nil
}

// TODO: check in jaeger if this api takes time, then we can make this async
Expand Down
5 changes: 2 additions & 3 deletions core/scheduler/handler/v1beta1/job_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func TestJobRunHandler(t *testing.T) {

resp, err := jobRunHandler.UploadToScheduler(ctx, req)
assert.NotNil(t, err)
assert.EqualError(t, err, "rpc error: code = Internal desc = some error: unsuccessful upload to scheduler for a-data-proj")
assert.EqualError(t, err, "rpc error: code = Internal desc = some error: \nuploaded to scheduler with error")
assert.Nil(t, resp)
})
t.Run("should return success if deployment succeeds", func(t *testing.T) {
Expand All @@ -385,9 +385,8 @@ func TestJobRunHandler(t *testing.T) {
defer jobRunService.AssertExpectations(t)
jobRunHandler := v1beta1.NewJobRunHandler(logger, jobRunService, nil)

resp, err := jobRunHandler.UploadToScheduler(ctx, req)
_, err := jobRunHandler.UploadToScheduler(ctx, req)
assert.Nil(t, err)
assert.Equal(t, true, resp.Status)
})
t.Run("should return error if projectName is not valid", func(t *testing.T) {
namespaceName := "namespace-name"
Expand Down
4 changes: 2 additions & 2 deletions core/scheduler/service/deployment_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func (s JobRunService) UploadToScheduler(ctx context.Context, projectName tenant.ProjectName, namespaceName string) error {
multiError := errors.NewMultiError("ErrorInUploadToScheduler")
multiError := errors.NewMultiError("errorInUploadToScheduler")
allJobsWithDetails, err := s.jobRepo.GetAll(ctx, projectName)
multiError.Append(err)
if allJobsWithDetails == nil {
Expand All @@ -28,7 +28,7 @@ func (s JobRunService) UploadToScheduler(ctx context.Context, projectName tenant
}
multiError.Append(err)
}
return errors.MultiToError(multiError)
return multiError.ToErr()
}

func (s JobRunService) deployJobsPerNamespace(ctx context.Context, t tenant.Tenant, jobs []*scheduler.JobWithDetails) error {
Expand Down
8 changes: 4 additions & 4 deletions core/scheduler/service/deployment_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestDeploymentService(t *testing.T) {

err := runService.UploadToScheduler(ctx, proj1Name, namespace1Name.String())
assert.NotNil(t, err)
assert.EqualError(t, err, "ErrorInUploadToScheduler:\n GetAll error")
assert.EqualError(t, err, "errorInUploadToScheduler:\n GetAll error")
})
t.Run("should return error if error in priority resolution", func(t *testing.T) {
jobRepo := new(JobRepository)
Expand All @@ -74,7 +74,7 @@ func TestDeploymentService(t *testing.T) {

err := runService.UploadToScheduler(ctx, proj1Name, namespace1Name.String())
assert.NotNil(t, err)
assert.EqualError(t, err, "ErrorInUploadToScheduler:\n priority resolution error")
assert.EqualError(t, err, "errorInUploadToScheduler:\n priority resolution error")
})
t.Run("should deploy Jobs Per Namespace returning error", func(t *testing.T) {
jobRepo := new(JobRepository)
Expand All @@ -95,7 +95,7 @@ func TestDeploymentService(t *testing.T) {

err := runService.UploadToScheduler(ctx, proj1Name, namespace1Name.String())
assert.NotNil(t, err)
assert.EqualError(t, err, "ErrorInUploadToScheduler:\n DeployJobs tnnt1 error")
assert.EqualError(t, err, "errorInUploadToScheduler:\n DeployJobs tnnt1 error")
})
t.Run("should deploy Jobs Per Namespace and cleanPerNamespace, appropriately", func(t *testing.T) {
jobRepo := new(JobRepository)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestDeploymentService(t *testing.T) {

err := runService.UploadToScheduler(ctx, proj1Name, namespace1Name.String())
assert.NotNil(t, err)
assert.EqualError(t, err, "ErrorInUploadToScheduler:\n listJobs error")
assert.EqualError(t, err, "errorInUploadToScheduler:\n listJobs error")
})
})
}
Expand Down
7 changes: 0 additions & 7 deletions ext/scheduler/airflow/dag/compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,6 @@ func setupJobDetails(tnnt tenant.Tenant) *scheduler.JobWithDetails {
TaskName: "bq-bq",
State: "resolved",
},
{
Host: "http://optimus.example.com",
Tenant: tnnt1,
JobName: "foo-inter-dep-job-unresolved",
TaskName: "bq-bq",
State: "unresolved",
},
{
JobName: "foo-external-optimus-dep-job",
Host: "http://optimus.external.io",
Expand Down
3 changes: 0 additions & 3 deletions ext/scheduler/airflow/dag/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ type Upstream struct {
func SetupUpstreams(upstreams scheduler.Upstreams, host string) Upstreams {
var ups []Upstream
for _, u := range upstreams.UpstreamJobs {
if u.State != "resolved" {
continue
}
var upstreamHost string
if !u.External {
upstreamHost = host
Expand Down
36 changes: 20 additions & 16 deletions internal/store/postgres/scheduler/job_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,36 +51,36 @@ type JobUpstreams struct {
JobName string
ProjectName string
UpstreamJobID uuid.UUID
UpstreamJobName string
UpstreamResourceUrn string
UpstreamProjectName string
UpstreamNamespaceName string
UpstreamTaskName string
UpstreamHost string
UpstreamJobName sql.NullString
UpstreamResourceUrn sql.NullString
UpstreamProjectName sql.NullString
UpstreamNamespaceName sql.NullString
UpstreamTaskName sql.NullString
UpstreamHost sql.NullString
UpstreamType string
UpstreamState string
UpstreamExternal bool
UpstreamExternal sql.NullBool

CreatedAt time.Time
UpdatedAt time.Time
}

func (j *JobUpstreams) toJobUpstreams() (*scheduler.JobUpstream, error) {
t, err := tenant.NewTenant(j.UpstreamProjectName, j.UpstreamNamespaceName)
t, err := tenant.NewTenant(j.UpstreamProjectName.String, j.UpstreamNamespaceName.String)
if err != nil {
return nil, err
}

return &scheduler.JobUpstream{
JobName: j.UpstreamJobName,
Host: j.UpstreamHost,
TaskName: j.UpstreamTaskName,
DestinationURN: j.UpstreamResourceUrn,
JobName: j.UpstreamJobName.String,
Host: j.UpstreamHost.String,
TaskName: j.UpstreamTaskName.String,
DestinationURN: j.UpstreamResourceUrn.String,
Tenant: t,
Type: j.UpstreamType,
External: j.UpstreamExternal,
External: j.UpstreamExternal.Bool,
State: j.UpstreamState,
}, err
}, nil
}

type Job struct {
Expand Down Expand Up @@ -234,7 +234,7 @@ func FromRow(row pgx.Row) (*Job, error) {
return nil, errors.NotFound(job.EntityJob, "job not found")
}

return nil, errors.Wrap(scheduler.EntityJobRun, "error in reading row for resource", err)
return nil, errors.Wrap(scheduler.EntityJobRun, "error in reading row for job", err)
}

return &js, nil
Expand Down Expand Up @@ -263,9 +263,13 @@ func groupUpstreamsByJobName(jobUpstreams []JobUpstreams) (map[string][]*schedul
jobUpstreamGroup := map[string][]*scheduler.JobUpstream{}

for _, upstream := range jobUpstreams {
if upstream.UpstreamState != "resolved" {
multiError.Append(errors.NewError(errors.ErrInvalidState, scheduler.EntityJobRun, "unresolved upstream "+upstream.UpstreamJobName.String+" for "+upstream.JobName))
continue
}
schedulerUpstream, err := upstream.toJobUpstreams()
if err != nil {
msg := fmt.Sprintf("unable to parse upstream:%s for job:%s", upstream.UpstreamJobName, upstream.JobName)
msg := fmt.Sprintf("unable to parse upstream:%s for job:%s", upstream.UpstreamJobName.String, upstream.JobName)
multiError.Append(errors.Wrap(scheduler.EntityJobRun, msg, err))
continue
}
Expand Down

0 comments on commit a72d635

Please sign in to comment.