Skip to content

Commit

Permalink
fix : handle deletions for both namespace id vs namespace name folders (
Browse files Browse the repository at this point in the history
#623)

* fix : handle deletions for both namespace id vs namespace name folders (#621)

* fix : handle deletions for both namespace id vs namespace name folders

* fix: test cases fix for job deleteion bug on namespace directory name change

* fix: delete old namespace folder if empty after the delete api too

* fix: dont expose dag dir deletion code

* fix: prime scheduler adapt new method definitions

Co-authored-by: Yash Bhardwaj <[email protected]>
  • Loading branch information
sravankorumilli and Mryashbhardwaj authored Sep 27, 2022
1 parent eb50224 commit 71ef005
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 100 deletions.
31 changes: 14 additions & 17 deletions ext/scheduler/airflow2/airflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ func (s *scheduler) DeployJobs(ctx context.Context, namespace models.NamespaceSp
}

// deleteDirectoryIfEmpty remove jobs Folder if it exists
func deleteDirectoryIfEmpty(ctx context.Context, jobsDir string, bucket Bucket) error {
func deleteDirectoryIfEmpty(ctx context.Context, nsDirectoryIdentifier string, bucket Bucket) error {
spanCtx, span := startChildSpan(ctx, "deleteDirectoryIfEmpty")
span.End()

jobsDir := PathForJobDirectory(JobsDir, nsDirectoryIdentifier)

it := bucket.List(&blob.ListOptions{
Prefix: jobsDir,
})
Expand Down Expand Up @@ -157,20 +159,10 @@ func (s *scheduler) compileAndUpload(ctx context.Context, namespace models.Names
return deployFailure
}
}
err = deleteDirectoryIfEmpty(ctx, PathForJobDirectory(JobsDir, namespace.ID.String()), bucket)
if err != nil {
if gcerrors.Code(err) != gcerrors.NotFound {
deployFailure := models.JobDeploymentFailure{
JobName: currentJobSpec.Name,
Message: "failed to cleanup old dags folder " + err.Error(),
}
return deployFailure
}
}
return nil
}

func (s *scheduler) DeleteJobs(ctx context.Context, namespace models.NamespaceSpec, jobNames []string, progressObserver progress.Observer) error {
func (s *scheduler) DeleteJobs(ctx context.Context, nsDirectoryIdentifier string, namespace models.NamespaceSpec, jobNames []string, progressObserver progress.Observer) error {
spanCtx, span := startChildSpan(ctx, "DeleteJobs")
defer span.End()

Expand All @@ -182,7 +174,7 @@ func (s *scheduler) DeleteJobs(ctx context.Context, namespace models.NamespaceSp
if strings.TrimSpace(jobName) == "" {
return ErrEmptyJobName
}
blobKey := PathFromJobName(JobsDir, namespace.ID.String(), jobName, JobsExtension)
blobKey := PathFromJobName(JobsDir, nsDirectoryIdentifier, jobName, JobsExtension)
if err := bucket.Delete(spanCtx, blobKey); err != nil {
// ignore missing files
if gcerrors.Code(err) != gcerrors.NotFound {
Expand All @@ -193,11 +185,17 @@ func (s *scheduler) DeleteJobs(ctx context.Context, namespace models.NamespaceSp
Name: jobName,
})
}
err = deleteDirectoryIfEmpty(ctx, nsDirectoryIdentifier, bucket)
if err != nil {
if gcerrors.Code(err) != gcerrors.NotFound {
return err
}
}
return nil
}

// TODO list jobs should not refer from the scheduler, rather should list from db and it has notthing to do with scheduler.
func (s *scheduler) ListJobs(ctx context.Context, namespace models.NamespaceSpec, opts models.SchedulerListOptions) ([]models.Job, error) {
func (s *scheduler) ListJobs(ctx context.Context, nsDirectoryIdentifier string, namespace models.NamespaceSpec, opts models.SchedulerListOptions) ([]models.Job, error) {
spanCtx, span := startChildSpan(ctx, "ListJobs")
defer span.End()

Expand All @@ -207,11 +205,10 @@ func (s *scheduler) ListJobs(ctx context.Context, namespace models.NamespaceSpec
}
defer bucket.Close()

namespaceID := namespace.ID.String()
var jobs []models.Job
// get all items under namespace directory
it := bucket.List(&blob.ListOptions{
Prefix: PathForJobDirectory(JobsDir, namespaceID),
Prefix: PathForJobDirectory(JobsDir, nsDirectoryIdentifier),
})
for {
obj, err := it.Next(spanCtx)
Expand All @@ -233,7 +230,7 @@ func (s *scheduler) ListJobs(ctx context.Context, namespace models.NamespaceSpec
return jobs, nil
}
for idx, job := range jobs {
jobs[idx].Contents, err = bucket.ReadAll(spanCtx, PathFromJobName(JobsDir, namespaceID, job.Name, JobsExtension))
jobs[idx].Contents, err = bucket.ReadAll(spanCtx, PathFromJobName(JobsDir, nsDirectoryIdentifier, job.Name, JobsExtension))
if err != nil {
return nil, err
}
Expand Down
50 changes: 27 additions & 23 deletions ext/scheduler/airflow2/airflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,6 @@ func TestAirflow2(t *testing.T) {
mockBucket.On("WriteAll", mock.Anything, "dags/__lib.py", airflow2.SharedLib, (*blob.WriterOptions)(nil)).Return(nil)
mockBucket.On("WriteAll", ctx, fmt.Sprintf("dags/%s/%s.py", ns.Name, jobSpecs[0].Name), []byte("job-1-compiled"), (*blob.WriterOptions)(nil)).Return(nil)
mockBucket.On("Delete", ctx, fmt.Sprintf("dags/%s/%s.py", ns.ID.String(), jobSpecs[0].Name)).Return(nil)
mockBucket.On("Delete", ctx, fmt.Sprintf("dags/%s", ns.ID.String())).Return(nil)
mockBucket.On("List", &blob.ListOptions{
Prefix: fmt.Sprintf("dags/%s", ns.ID.String()),
}).Return(&blob.ListIterator{})

expectedDeployDetail := models.JobDeploymentDetail{
SuccessCount: 1,
Expand Down Expand Up @@ -211,23 +207,27 @@ func TestAirflow2(t *testing.T) {
})
t.Run("DeleteJobs", func(t *testing.T) {
t.Run("should successfully delete jobs from blob buckets", func(t *testing.T) {
jobKey := fmt.Sprintf("dags/%s/%s.py", nsUUID, jobSpecs[0].Name)
jobKey := fmt.Sprintf("dags/%s/%s.py", ns.Name, jobSpecs[0].Name)

inMemBlob := memblob.OpenBucket(nil)
_ = inMemBlob.WriteAll(ctx, jobKey, []byte("hello"), nil)

mockBucket := &MockedBucket{
bucket: inMemBlob,
}
mockBucket.On("Delete", mock.Anything, fmt.Sprintf("dags/%s/%s.py", nsUUID, jobSpecs[0].Name)).Return(nil)
mockBucket.On("Delete", mock.Anything, fmt.Sprintf("dags/%s/%s.py", ns.Name, jobSpecs[0].Name)).Return(nil)
mockBucket.On("Delete", mock.Anything, fmt.Sprintf("dags/%s", ns.Name)).Return(nil)
mockBucket.On("List", &blob.ListOptions{
Prefix: fmt.Sprintf("dags/%s", ns.Name),
}).Return(&blob.ListIterator{})
defer mockBucket.AssertExpectations(t)

mockBucketFac := new(MockedBucketFactory)
mockBucketFac.On("New", mock.Anything, proj).Return(mockBucket, nil)
defer mockBucketFac.AssertExpectations(t)

air := airflow2.NewScheduler(mockBucketFac, nil, nil)
err := air.DeleteJobs(ctx, ns, []string{"job-1"}, nil)
err := air.DeleteJobs(ctx, ns.Name, ns, []string{"job-1"}, nil)
assert.Nil(t, err)

jobStillExist, err := inMemBlob.Exists(ctx, jobKey)
Expand All @@ -239,15 +239,19 @@ func TestAirflow2(t *testing.T) {
mockBucket := &MockedBucket{
bucket: inMemBlob,
}
mockBucket.On("Delete", mock.Anything, fmt.Sprintf("dags/%s/%s.py", nsUUID, jobSpecs[0].Name)).Return(nil)
mockBucket.On("Delete", mock.Anything, fmt.Sprintf("dags/%s/%s.py", ns.Name, jobSpecs[0].Name)).Return(nil)
mockBucket.On("Delete", mock.Anything, fmt.Sprintf("dags/%s", ns.Name)).Return(nil)
mockBucket.On("List", &blob.ListOptions{
Prefix: fmt.Sprintf("dags/%s", ns.Name),
}).Return(&blob.ListIterator{})
defer mockBucket.AssertExpectations(t)

mockBucketFac := new(MockedBucketFactory)
mockBucketFac.On("New", mock.Anything, proj).Return(mockBucket, nil)
defer mockBucketFac.AssertExpectations(t)

air := airflow2.NewScheduler(mockBucketFac, nil, nil)
err := air.DeleteJobs(ctx, ns, []string{"job-1"}, nil)
err := air.DeleteJobs(ctx, ns.Name, ns, []string{"job-1"}, nil)
assert.Nil(t, err)
})
})
Expand All @@ -257,11 +261,11 @@ func TestAirflow2(t *testing.T) {
mockBucket := &MockedBucket{
bucket: inMemBlob,
}
_ = inMemBlob.WriteAll(ctx, filepath.Join(airflow2.PathForJobDirectory(airflow2.JobsDir, ns.ID.String()), "file1.py"), []byte("test1"), nil)
_ = inMemBlob.WriteAll(ctx, filepath.Join(airflow2.PathForJobDirectory(airflow2.JobsDir, ns.ID.String()), "file2.py"), []byte("test2"), nil)
_ = inMemBlob.WriteAll(ctx, filepath.Join(airflow2.PathForJobDirectory(airflow2.JobsDir, ns.Name), "file1.py"), []byte("test1"), nil)
_ = inMemBlob.WriteAll(ctx, filepath.Join(airflow2.PathForJobDirectory(airflow2.JobsDir, ns.Name), "file2.py"), []byte("test2"), nil)
_ = inMemBlob.WriteAll(ctx, "bar.py", []byte("test3"), nil)
mockBucket.On("List", &blob.ListOptions{
Prefix: airflow2.PathForJobDirectory(airflow2.JobsDir, ns.ID.String()),
Prefix: airflow2.PathForJobDirectory(airflow2.JobsDir, ns.Name),
})
defer mockBucket.AssertExpectations(t)

Expand All @@ -270,7 +274,7 @@ func TestAirflow2(t *testing.T) {
defer mockBucketFac.AssertExpectations(t)

air := airflow2.NewScheduler(mockBucketFac, nil, nil)
respJobs, err := air.ListJobs(ctx, ns, models.SchedulerListOptions{OnlyName: true})
respJobs, err := air.ListJobs(ctx, ns.Name, ns, models.SchedulerListOptions{OnlyName: true})
assert.Nil(t, err)
assert.Equal(t, 2, len(respJobs))
})
Expand All @@ -279,11 +283,11 @@ func TestAirflow2(t *testing.T) {
mockBucket := &MockedBucket{
bucket: inMemBlob,
}
_ = inMemBlob.WriteAll(ctx, filepath.Join(airflow2.PathForJobDirectory(airflow2.JobsDir, ns.ID.String()), "file1.py"), []byte("test1"), nil)
_ = inMemBlob.WriteAll(ctx, filepath.Join(airflow2.PathForJobDirectory(airflow2.JobsDir, ns.ID.String()), "file2.json"), []byte("test2"), nil)
_ = inMemBlob.WriteAll(ctx, filepath.Join(airflow2.PathForJobDirectory(airflow2.JobsDir, ns.Name), "file1.py"), []byte("test1"), nil)
_ = inMemBlob.WriteAll(ctx, filepath.Join(airflow2.PathForJobDirectory(airflow2.JobsDir, ns.Name), "file2.json"), []byte("test2"), nil)
_ = inMemBlob.WriteAll(ctx, "bar.py", []byte("test3"), nil)
mockBucket.On("List", &blob.ListOptions{
Prefix: airflow2.PathForJobDirectory(airflow2.JobsDir, ns.ID.String()),
Prefix: airflow2.PathForJobDirectory(airflow2.JobsDir, ns.Name),
})
defer mockBucket.AssertExpectations(t)

Expand All @@ -292,7 +296,7 @@ func TestAirflow2(t *testing.T) {
defer mockBucketFac.AssertExpectations(t)

air := airflow2.NewScheduler(mockBucketFac, nil, nil)
respJobs, err := air.ListJobs(ctx, ns, models.SchedulerListOptions{OnlyName: true})
respJobs, err := air.ListJobs(ctx, ns.Name, ns, models.SchedulerListOptions{OnlyName: true})
assert.Nil(t, err)
assert.Equal(t, 1, len(respJobs))
})
Expand All @@ -301,21 +305,21 @@ func TestAirflow2(t *testing.T) {
mockBucket := &MockedBucket{
bucket: inMemBlob,
}
_ = inMemBlob.WriteAll(ctx, airflow2.PathFromJobName(airflow2.JobsDir, ns.ID.String(), "file1", airflow2.JobsExtension), []byte("test1"), nil)
_ = inMemBlob.WriteAll(ctx, airflow2.PathFromJobName(airflow2.JobsDir, ns.ID.String(), "file2", airflow2.JobsExtension), []byte("test2"), nil)
_ = inMemBlob.WriteAll(ctx, airflow2.PathFromJobName(airflow2.JobsDir, ns.Name, "file1", airflow2.JobsExtension), []byte("test1"), nil)
_ = inMemBlob.WriteAll(ctx, airflow2.PathFromJobName(airflow2.JobsDir, ns.Name, "file2", airflow2.JobsExtension), []byte("test2"), nil)
mockBucket.On("List", &blob.ListOptions{
Prefix: airflow2.PathForJobDirectory(airflow2.JobsDir, ns.ID.String()),
Prefix: airflow2.PathForJobDirectory(airflow2.JobsDir, ns.Name),
})
mockBucket.On("ReadAll", mock.Anything, airflow2.PathFromJobName(airflow2.JobsDir, ns.ID.String(), "file1", airflow2.JobsExtension))
mockBucket.On("ReadAll", mock.Anything, airflow2.PathFromJobName(airflow2.JobsDir, ns.ID.String(), "file2", airflow2.JobsExtension))
mockBucket.On("ReadAll", mock.Anything, airflow2.PathFromJobName(airflow2.JobsDir, ns.Name, "file1", airflow2.JobsExtension))
mockBucket.On("ReadAll", mock.Anything, airflow2.PathFromJobName(airflow2.JobsDir, ns.Name, "file2", airflow2.JobsExtension))
defer mockBucket.AssertExpectations(t)

mockBucketFac := new(MockedBucketFactory)
mockBucketFac.On("New", mock.Anything, proj).Return(mockBucket, nil)
defer mockBucketFac.AssertExpectations(t)

air := airflow2.NewScheduler(mockBucketFac, nil, nil)
respJobs, err := air.ListJobs(ctx, ns, models.SchedulerListOptions{})
respJobs, err := air.ListJobs(ctx, ns.Name, ns, models.SchedulerListOptions{})
assert.Nil(t, err)
assert.Equal(t, 2, len(respJobs))
})
Expand Down
4 changes: 2 additions & 2 deletions ext/scheduler/prime/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (*Scheduler) VerifyJob(context.Context, models.NamespaceSpec, models.JobSpe
return nil
}

func (*Scheduler) ListJobs(context.Context, models.NamespaceSpec, models.SchedulerListOptions) ([]models.Job, error) {
func (*Scheduler) ListJobs(context.Context, string, models.NamespaceSpec, models.SchedulerListOptions) ([]models.Job, error) {
panic("implement me")
}

Expand All @@ -46,7 +46,7 @@ func (s *Scheduler) DeployJobs(ctx context.Context, namespace models.NamespaceSp
return models.JobDeploymentDetail{}, nil
}

func (*Scheduler) DeleteJobs(context.Context, models.NamespaceSpec, []string, progress.Observer) error {
func (*Scheduler) DeleteJobs(context.Context, string, models.NamespaceSpec, []string, progress.Observer) error {
return nil
}

Expand Down
41 changes: 23 additions & 18 deletions job/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,27 +113,32 @@ func (d *deployer) completeJobDeployment(ctx context.Context, jobDeployment mode
}

func (d *deployer) cleanPerNamespace(ctx context.Context, namespaceSpec models.NamespaceSpec, jobs []models.JobSpec) error {
// get all stored job names
schedulerJobs, err := d.batchScheduler.ListJobs(ctx, namespaceSpec, models.SchedulerListOptions{OnlyName: true})
if err != nil {
return err
}
var destJobNames []string
for _, j := range schedulerJobs {
destJobNames = append(destJobNames, j.Name)
}

// filter what we need to keep/delete
var sourceJobNames []string
for _, jobSpec := range jobs {
sourceJobNames = append(sourceJobNames, jobSpec.Name)
namespaceIdentifiers := []string{
namespaceSpec.ID.String(), // old, kept for folder cleanup, to be removed after complete migration of name space folder #cleaup
namespaceSpec.Name,
}
jobsToDelete := setSubtract(destJobNames, sourceJobNames)
jobsToDelete = jobDeletionFilter(jobsToDelete)
if len(jobsToDelete) > 0 {
if err := d.batchScheduler.DeleteJobs(ctx, namespaceSpec, jobsToDelete, nil); err != nil {
for _, nsDirectoryIdentifier := range namespaceIdentifiers {
// get all stored job names
schedulerJobs, err := d.batchScheduler.ListJobs(ctx, nsDirectoryIdentifier, namespaceSpec, models.SchedulerListOptions{OnlyName: true})
if err != nil {
return err
}
var destJobNames []string
for _, j := range schedulerJobs {
destJobNames = append(destJobNames, j.Name)
}

// filter what we need to keep/delete
var sourceJobNames []string
for _, jobSpec := range jobs {
sourceJobNames = append(sourceJobNames, jobSpec.Name)
}
jobsToDelete := setSubtract(destJobNames, sourceJobNames)
if len(jobsToDelete) > 0 {
if err := d.batchScheduler.DeleteJobs(ctx, nsDirectoryIdentifier, namespaceSpec, jobsToDelete, nil); err != nil {
return err
}
}
}
return nil
}
Loading

0 comments on commit 71ef005

Please sign in to comment.