Skip to content

Commit

Permalink
fix: Only upload lib.py once for every project (#302)
Browse files Browse the repository at this point in the history
* fix: Only upload lib.py once for every project

* fix lint
  • Loading branch information
ahmadnaufal authored Nov 15, 2024
1 parent cafe283 commit 13c6218
Show file tree
Hide file tree
Showing 2 changed files with 397 additions and 2 deletions.
25 changes: 23 additions & 2 deletions ext/scheduler/airflow/airflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"time"

"github.com/goto/salt/log"
Expand Down Expand Up @@ -95,6 +96,8 @@ type Scheduler struct {

projectGetter ProjectGetter
secretGetter SecretGetter

hasLibSyncedMap sync.Map
}

func (s *Scheduler) DeployJobs(ctx context.Context, tenant tenant.Tenant, jobs []*scheduler.JobWithDetails) error {
Expand All @@ -107,11 +110,12 @@ func (s *Scheduler) DeployJobs(ctx context.Context, tenant tenant.Tenant, jobs [
}
defer bucket.Close()

err = bucket.WriteAll(spanCtx, filepath.Join(jobsDir, baseLibFileName), SharedLib, nil)
err = s.uploadCommonLib(ctx, tenant, bucket)
if err != nil {
s.l.Error("failed to upload __lib.py file")
s.l.Error("failed to upload __lib.py file:", err)
return errors.AddErrContext(err, EntityAirflow, "error in writing __lib.py file")
}

multiError := errors.NewMultiError("ErrorsInDeployJobs")
runner := parallel.NewRunner(parallel.WithTicket(concurrentTicketPerSec), parallel.WithLimit(concurrentLimit))
project, err := s.projectGetter.Get(ctx, tenant.ProjectName())
Expand Down Expand Up @@ -146,6 +150,23 @@ func (s *Scheduler) DeployJobs(ctx context.Context, tenant tenant.Tenant, jobs [
return multiError.ToErr()
}

// uploadCommonLib is a function to upload the shared lib file to the bucket, encapsulated in a sync process.
// we need to ensure that the file is uploaded only once, not on every job upload.
func (s *Scheduler) uploadCommonLib(ctx context.Context, tenant tenant.Tenant, bucket Bucket) error {
if _, isSynced := s.hasLibSyncedMap.LoadOrStore(tenant.ProjectName().String(), true); isSynced {
return nil
}

err := bucket.WriteAll(ctx, filepath.Join(jobsDir, baseLibFileName), SharedLib, nil)
if err != nil {
// on failure, flag is removed so that the upload is retried in the next job upload
s.hasLibSyncedMap.Delete(tenant.ProjectName().String())
return err
}

return nil
}

// TODO list jobs should not refer from the scheduler, rather should list from db and it has nothing to do with scheduler.
func (s *Scheduler) ListJobs(ctx context.Context, t tenant.Tenant) ([]string, error) {
spanCtx, span := startChildSpan(ctx, "ListJobs")
Expand Down
Loading

0 comments on commit 13c6218

Please sign in to comment.