Skip to content

Commit

Permalink
fix: replay ignore upstreams (#264)
Browse files Browse the repository at this point in the history
* fix: replay ignore upstreams

* fix(optimus): get replay with filter

* fix(optimus): get replay by filter cleanup
  • Loading branch information
Mryashbhardwaj committed Sep 24, 2024
1 parent 24d3d06 commit 14f213a
Show file tree
Hide file tree
Showing 28 changed files with 867 additions and 238 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/goto/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "10aa548f6af9b12a1560a9f2f80610a7e46fdf13"
PROTON_COMMIT := "750afa5b646347179ab2abaf3ea3b51ce03cc3ab"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down
2 changes: 1 addition & 1 deletion core/job/handler/v1beta1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (

"github.com/goto/optimus/core/job"
"github.com/goto/optimus/core/job/dto"
"github.com/goto/optimus/core/job/service/filter"
"github.com/goto/optimus/core/tenant"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/internal/models"
"github.com/goto/optimus/internal/telemetry"
"github.com/goto/optimus/internal/utils/filter"
"github.com/goto/optimus/internal/writer"
pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1"
"github.com/goto/optimus/sdk/plugin"
Expand Down
2 changes: 1 addition & 1 deletion core/job/handler/v1beta1/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
"github.com/goto/optimus/core/job"
"github.com/goto/optimus/core/job/dto"
"github.com/goto/optimus/core/job/handler/v1beta1"
"github.com/goto/optimus/core/job/service/filter"
"github.com/goto/optimus/core/resource"
"github.com/goto/optimus/core/tenant"
"github.com/goto/optimus/internal/lib/window"
"github.com/goto/optimus/internal/models"
"github.com/goto/optimus/internal/utils/filter"
"github.com/goto/optimus/internal/writer"
pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1"
"github.com/goto/optimus/sdk/plugin"
Expand Down
2 changes: 1 addition & 1 deletion core/job/service/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/goto/optimus/core/event/moderator"
"github.com/goto/optimus/core/job"
"github.com/goto/optimus/core/job/dto"
"github.com/goto/optimus/core/job/service/filter"
"github.com/goto/optimus/core/resource"
"github.com/goto/optimus/core/scheduler"
"github.com/goto/optimus/core/tenant"
Expand All @@ -23,6 +22,7 @@ import (
"github.com/goto/optimus/internal/lib/tree"
"github.com/goto/optimus/internal/lib/window"
"github.com/goto/optimus/internal/telemetry"
"github.com/goto/optimus/internal/utils/filter"
"github.com/goto/optimus/internal/writer"
"github.com/goto/optimus/sdk/plugin"
)
Expand Down
2 changes: 1 addition & 1 deletion core/job/service/job_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
"github.com/goto/optimus/core/job"
"github.com/goto/optimus/core/job/dto"
"github.com/goto/optimus/core/job/service"
"github.com/goto/optimus/core/job/service/filter"
resource "github.com/goto/optimus/core/resource"
scheduler "github.com/goto/optimus/core/scheduler"
"github.com/goto/optimus/core/tenant"
"github.com/goto/optimus/internal/compiler"
optErrors "github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/internal/lib/window"
"github.com/goto/optimus/internal/models"
"github.com/goto/optimus/internal/utils/filter"
"github.com/goto/optimus/internal/writer"
"github.com/goto/optimus/sdk/plugin"
)
Expand Down
32 changes: 32 additions & 0 deletions core/scheduler/handler/v1beta1/replay.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package v1beta1

import (
"fmt"
"strings"
"time"

"github.com/google/uuid"
"github.com/goto/salt/log"
Expand All @@ -11,12 +13,14 @@ import (
"github.com/goto/optimus/core/scheduler"
"github.com/goto/optimus/core/tenant"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/internal/utils/filter"
pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1"
)

type ReplayService interface {
CreateReplay(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error)
GetReplayList(ctx context.Context, projectName tenant.ProjectName) (replays []*scheduler.Replay, err error)
GetByFilter(ctx context.Context, project tenant.ProjectName, filters ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error)
GetReplayByID(ctx context.Context, replayID uuid.UUID) (replay *scheduler.ReplayWithRun, err error)
GetRunsStatus(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (runs []*scheduler.JobRunStatus, err error)
CancelReplay(ctx context.Context, replayWithRun *scheduler.ReplayWithRun) error
Expand Down Expand Up @@ -95,6 +99,34 @@ func (h ReplayHandler) ListReplay(ctx context.Context, req *pb.ListReplayRequest
return &pb.ListReplayResponse{Replays: replayProtos}, nil
}

func (h ReplayHandler) GetReplayDetails(ctx context.Context, req *pb.GetReplayDetailsRequest) (*pb.GetReplayDetailsResponse, error) {
projectName, err := tenant.ProjectNameFrom(req.GetProjectName())
if err != nil {
h.l.Error("error adapting project name [%s]: %s", req.GetProjectName(), err)
return nil, errors.GRPCErr(err, "unable to get replay config for project "+req.GetProjectName())
}

replays, err := h.service.GetByFilter(ctx, projectName,
filter.WithStringArray(filter.JobNames, req.GetJobNames()),
filter.WithString(filter.ScheduledAt, req.GetScheduledAt().AsTime().Format(time.DateTime)),
filter.WithString(filter.ReplayID, req.GetReplayId()),
filter.WithString(filter.ReplayStatus, req.GetStatus()),
)
if err != nil {
h.l.Error(fmt.Sprintf("error getting replays for req: %+v, err: %s", req, err.Error()))
return nil, errors.GRPCErr(err, "unable to get replays with filter")
}
replayProtos := make([]*pb.GetReplayResponse, len(replays))
for i, replay := range replays {
replayProtos[i] = replayToProto(replay.Replay)
replayProtos[i].ReplayRuns = replayRunsToProto(replay.Runs)
}

return &pb.GetReplayDetailsResponse{
Replays: replayProtos,
}, nil
}

func (h ReplayHandler) GetReplay(ctx context.Context, req *pb.GetReplayRequest) (*pb.GetReplayResponse, error) {
id, err := uuid.Parse(req.GetReplayId())
if err != nil {
Expand Down
35 changes: 35 additions & 0 deletions core/scheduler/handler/v1beta1/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/goto/optimus/core/scheduler/handler/v1beta1"
"github.com/goto/optimus/core/tenant"
errs "github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/internal/utils/filter"
pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1"
)

Expand Down Expand Up @@ -667,6 +668,32 @@ func (_m *mockReplayService) GetRunsStatus(ctx context.Context, _a1 tenant.Tenan
return r0, r1
}

// GetRunsStatus provides a mock function with given fields: ctx, _a1, jobName, config
func (_m *mockReplayService) GetByFilter(ctx context.Context, project tenant.ProjectName, filters ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error) {
ret := _m.Called(ctx, project, filters)

var r0 []*scheduler.ReplayWithRun
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, tenant.ProjectName, ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error)); ok {
return rf(ctx, project, filters...)
}
if rf, ok := ret.Get(0).(func(context.Context, tenant.ProjectName, ...filter.FilterOpt) []*scheduler.ReplayWithRun); ok {
r0 = rf(ctx, project, filters...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*scheduler.ReplayWithRun)
}
}

if rf, ok := ret.Get(1).(func(context.Context, tenant.ProjectName, ...filter.FilterOpt) error); ok {
r1 = rf(ctx, project, filters...)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// CancelReplay provides a mock function with given fields: ctx, replayWithRun
func (_m *mockReplayService) CancelReplay(ctx context.Context, replayWithRun *scheduler.ReplayWithRun) error {
ret := _m.Called(ctx, replayWithRun)
Expand All @@ -680,3 +707,11 @@ func (_m *mockReplayService) CancelReplay(ctx context.Context, replayWithRun *sc

return r0
}

func (_m *mockReplayService) GetReplayConfig(ctx context.Context, projectName tenant.ProjectName, name scheduler.JobName, scheduledAt time.Time) (map[string]string, error) {
args := _m.Called(ctx, projectName, name, scheduledAt)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(map[string]string), args.Error(1)
}
27 changes: 27 additions & 0 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/internal/lib/cron"
"github.com/goto/optimus/internal/telemetry"
"github.com/goto/optimus/internal/utils/filter"
)

const (
Expand All @@ -31,6 +32,7 @@ type ReplayRepository interface {
UpdateReplay(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, runs []*scheduler.JobRunStatus, message string) error
UpdateReplayStatus(ctx context.Context, replayID uuid.UUID, state scheduler.ReplayState, message string) error

GetReplayByFilters(ctx context.Context, projectName tenant.ProjectName, filters ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error)
GetReplayRequestsByStatus(ctx context.Context, statusList []scheduler.ReplayState) ([]*scheduler.Replay, error)
GetReplaysByProject(ctx context.Context, projectName tenant.ProjectName, dayLimits int) ([]*scheduler.Replay, error)
GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error)
Expand Down Expand Up @@ -143,6 +145,31 @@ func (r *ReplayService) GetReplayList(ctx context.Context, projectName tenant.Pr
return r.replayRepo.GetReplaysByProject(ctx, projectName, getReplaysDayLimit)
}

func (r *ReplayService) GetByFilter(ctx context.Context, project tenant.ProjectName, filters ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error) {
f := filter.NewFilter(filters...)
if f.Contains(filter.ReplayID) {
r.logger.Debug("getting all replays by replayId [%s]", f.GetStringValue(filter.ReplayID))
replayIDString := f.GetStringValue(filter.ReplayID)
id, err := uuid.Parse(replayIDString)
if err != nil {
r.logger.Error("error parsing replay id [%s]: %s", replayIDString, err)
err = errors.InvalidArgument(scheduler.EntityReplay, err.Error())
return nil, errors.GRPCErr(err, "unable to get replay for replayID "+replayIDString)
}
replay, err := r.GetReplayByID(ctx, id)
if err != nil {
return nil, err
}
return []*scheduler.ReplayWithRun{replay}, nil
}

replayWithRuns, err := r.replayRepo.GetReplayByFilters(ctx, project, filters...)
if err != nil {
return nil, err
}
return replayWithRuns, nil
}

func (r *ReplayService) GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error) {
replayWithRun, err := r.replayRepo.GetReplayByID(ctx, replayID)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions core/scheduler/service/replay_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/goto/optimus/core/tenant"
errs "github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/internal/lib/cron"
"github.com/goto/optimus/internal/utils/filter"
)

func TestReplayService(t *testing.T) {
Expand Down Expand Up @@ -656,6 +657,29 @@ func (_m *ReplayRepository) GetReplayJobConfig(ctx context.Context, jobTenant te
return r0, r1
}

// GetReplayByFilters provides a mock function with given fields: ctx, jobTenant, projectName, filters
func (_m *ReplayRepository) GetReplayByFilters(ctx context.Context, projectName tenant.ProjectName, filters ...filter.FilterOpt) ([]*scheduler.ReplayWithRun, error) {
ret := _m.Called(ctx, projectName, filters)

var r0 []*scheduler.ReplayWithRun
if rf, ok := ret.Get(0).(func(context.Context, tenant.ProjectName, ...filter.FilterOpt) []*scheduler.ReplayWithRun); ok {
r0 = rf(ctx, projectName, filters...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*scheduler.ReplayWithRun)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(context.Context, tenant.ProjectName, ...filter.FilterOpt) error); ok {
r1 = rf(ctx, projectName, filters...)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// ReplayValidator is an autogenerated mock type for the ReplayValidator type
type ReplayValidator struct {
mock.Mock
Expand Down
31 changes: 30 additions & 1 deletion ext/scheduler/airflow/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,31 @@ def _add_connection_adapter_if_absent(self, host):
return host
return "http://" + host

def get_job_replay_config(self, project_name, job_name, schedule_time) -> dict:
scheduled_at_str = schedule_time.strftime(TIMESTAMP_FORMAT)
url = '{optimus_host}/api/v1beta1/project/{optimus_project}/replay'.format(
optimus_host=self.host,
optimus_project=project_name,
)
response = requests.get(url, params={
'scheduled_at': scheduled_at_str,
'job_names': job_name,
'status': "in progress",
}, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS)
if response.status_code != 200:
return {}
replay_config = response.json()

if 'replays' in replay_config.keys():
for config in replay_config['replays']:
if 'replayConfig' in config.keys():
if 'startTime' in config['replayConfig'].keys() & 'endTime' in config['replayConfig'].keys():
start_time = datetime.strptime(config['replayConfig']['startTime'], TIMESTAMP_FORMAT)
end_time = datetime.strptime(config['replayConfig']['endTime'], TIMESTAMP_FORMAT)
if start_time <= schedule_time & schedule_time >= end_time:
return config['replayConfig']['jobConfig']
return {}

def get_job_run(self, project_name: str, job_name: str, start_date: str, end_date: str, downstream_project_name: str, downstream_job_name: str) -> dict:
url = '{optimus_host}/api/v1beta1/project/{optimus_project}/job/{optimus_job}/run'.format(
optimus_host=self.host,
Expand Down Expand Up @@ -215,7 +240,11 @@ def __init__(

def poke(self, context):
schedule_time = get_scheduled_at(context)

job_config = self._optimus_client.get_job_replay_config(self.project_name, self.name, schedule_time)
if 'replays' in job_config.keys():
if 'IGNORE_UPSTREAM' in job_config['jobConfig'].keys():
if job_config['jobConfig']['IGNORE_UPSTREAM'] == "True":
return True
try:
upstream_schedule = self.get_schedule_interval(schedule_time)
except Exception as e:
Expand Down
Loading

0 comments on commit 14f213a

Please sign in to comment.