Skip to content

Commit

Permalink
feat: deprecate climod compile asset, move to depResMod (#639)
Browse files Browse the repository at this point in the history
* deprecate: climod compile asset, move to depResMod

* fix: setup test

* delete: unused proto climod on plugin

* delete: climod plugin serve & client

* delete: unecessary compile asset call

* refactor: remove climod related method on pluginRepo and plugin struct

* refactor: remove commandline mod

* fix: yaml mod equal to cli

* Revert "refactor: remove commandline mod"

This reverts commit 5092069.

* refactor: remove commandline mod

* refactor: interface segregation for commandline mod
  • Loading branch information
deryrahman authored Nov 7, 2022
1 parent 88894c3 commit 989cf7a
Show file tree
Hide file tree
Showing 30 changed files with 948 additions and 2,792 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/odpf/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "0113e40d8a0ddf6cfef5b2f545fad3f09d6d6f28"
PROTON_COMMIT := "19c4d5b1a3be5f5efb2ebea14a78c815888e5204"

.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint

Expand Down
4 changes: 2 additions & 2 deletions client/cmd/internal/survey/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (*JobSurvey) AskToSelectJobName(jobSpecReader local.SpecReader[*model.JobSp
return selectedJobName, nil
}

func (j *JobSurvey) askCLIModSurveyQuestion(ctx context.Context, cliMod models.CommandLineMod, question models.PluginQuestion) (models.PluginAnswers, error) {
func (j *JobSurvey) askCliModSurveyQuestion(ctx context.Context, cliMod models.CommandLineMod, question models.PluginQuestion) (models.PluginAnswers, error) {
surveyPrompt := j.getSurveyPromptFromPluginQuestion(question)

var responseStr string
Expand All @@ -74,7 +74,7 @@ func (j *JobSurvey) askCLIModSurveyQuestion(ctx context.Context, cliMod models.C
for _, subQues := range question.SubQuestions {
if responseStr == subQues.IfValue {
for _, subQuestion := range subQues.Questions {
subQuestionAnswers, err := j.askCLIModSurveyQuestion(ctx, cliMod, subQuestion)
subQuestionAnswers, err := j.askCliModSurveyQuestion(ctx, cliMod, subQuestion)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions client/cmd/internal/survey/job_addhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func (j *JobAddHookSurvey) AskToAddHook(jobSpec *model.JobSpec) (*model.JobSpec,
}

var config map[string]string
if cliMod := selectedHook.GetSurveyMod(); cliMod != nil {
if yamlMod := selectedHook.GetSurveyMod(); yamlMod != nil {
ctx := context.Background()
hookAnswers, err := j.askHookQuestions(ctx, cliMod, jobSpec.Name)
hookAnswers, err := j.askHookQuestions(ctx, yamlMod, jobSpec.Name)
if err != nil {
return nil, err
}

config, err = j.getHookConfig(cliMod, hookAnswers)
config, err = j.getHookConfig(yamlMod, hookAnswers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -125,7 +125,7 @@ func (j *JobAddHookSurvey) askHookQuestions(ctx context.Context, cliMod models.C

answers := models.PluginAnswers{}
for _, question := range questionResponse.Questions {
responseAnswer, err := j.jobSurvey.askCLIModSurveyQuestion(ctx, cliMod, question)
responseAnswer, err := j.jobSurvey.askCliModSurveyQuestion(ctx, cliMod, question)
if err != nil {
return nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions client/cmd/internal/survey/job_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,28 @@ func (j *JobCreateSurvey) AskToCreateJob(jobSpecReader local.SpecReader[*model.J
return model.JobSpec{}, err
}

cliMod, err := j.getPluginCLIMod(jobInput.Task.Name)
yamlMod, err := j.getPluginCliMod(jobInput.Task.Name)
if err != nil {
return jobInput, err
}
if cliMod == nil {
if yamlMod == nil {
return jobInput, nil
}

pluginAnswers, err := j.askPluginQuestions(cliMod, jobInput.Name)
pluginAnswers, err := j.askPluginQuestions(yamlMod, jobInput.Name)
if err != nil {
return jobInput, err
}

taskConfig, err := j.getTaskConfig(cliMod, pluginAnswers)
taskConfig, err := j.getTaskConfig(yamlMod, pluginAnswers)
if err != nil {
return jobInput, err
}
if taskConfig != nil {
jobInput.Task.Config = taskConfig
}

asset, err := j.getJobAsset(cliMod, pluginAnswers)
asset, err := j.getJobAsset(yamlMod, pluginAnswers)
if err != nil {
return jobInput, err
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func (j *JobCreateSurvey) askCreateQuestions(questions []*survey.Question) (mode
}, nil
}

func (*JobCreateSurvey) getPluginCLIMod(taskName string) (models.CommandLineMod, error) {
func (*JobCreateSurvey) getPluginCliMod(taskName string) (models.CommandLineMod, error) {
pluginRepo := models.PluginRegistry
plugin, err := pluginRepo.GetByName(taskName)
if err != nil {
Expand All @@ -235,7 +235,7 @@ func (j *JobCreateSurvey) askPluginQuestions(cliMod models.CommandLineMod, jobNa

answers := models.PluginAnswers{}
for _, question := range questionResponse.Questions {
subAnswers, err := j.jobSurvey.askCLIModSurveyQuestion(ctx, cliMod, question)
subAnswers, err := j.jobSurvey.askCliModSurveyQuestion(ctx, cliMod, question)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions compiler/job_asset_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ func (c *JobRunAssetsCompiler) CompileJobRunAssets(ctx context.Context, jobSpec

inputFiles := utils.MergeMaps(instanceFileMap, jobSpec.Assets.ToMap())

if plugin.CLIMod != nil {
if plugin.DependencyMod != nil {
// check if task needs to override the compilation behaviour
compiledAssetResponse, err := plugin.CLIMod.CompileAssets(ctx, models.CompileAssetsRequest{
compiledAssetResponse, err := plugin.DependencyMod.CompileAssets(ctx, models.CompileAssetsRequest{
StartTime: startTime,
EndTime: endTime,
Config: models.PluginConfigs{}.FromJobSpec(jobSpec.Task.Config),
Expand Down
10 changes: 6 additions & 4 deletions compiler/job_asset_compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func TestJobRunAssetsCompiler(t *testing.T) {
ctx := context.Background()
engine := compiler.NewGoEngine()
execUnit := new(mock.BasePlugin)
cliMod := new(mock.CLIMod)
plugin := &models.Plugin{Base: execUnit, CLIMod: cliMod}
depResMod := new(mock.DependencyResolverMod)
plugin := &models.Plugin{Base: execUnit, DependencyMod: depResMod}

execUnit.On("PluginInfo").Return(&models.PluginInfoResponse{Name: "bq"}, nil)

Expand Down Expand Up @@ -116,7 +116,7 @@ func TestJobRunAssetsCompiler(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, "error", err.Error())
})
t.Run("compiles the assets when plugin has no cliMod", func(t *testing.T) {
t.Run("compiles the assets when plugin has no yamlMod", func(t *testing.T) {
pluginRepo := mock.NewPluginRepository(t)
pluginRepo.On("GetByName", "bq").Return(&models.Plugin{}, nil)

Expand All @@ -129,7 +129,7 @@ func TestJobRunAssetsCompiler(t *testing.T) {
assert.Equal(t, expectedQuery, assets["query.sql"])
})
t.Run("compiles the assets successfully", func(t *testing.T) {
cliMod.On("CompileAssets", context.Background(), models.CompileAssetsRequest{
depResMod.On("CompileAssets", context.Background(), models.CompileAssetsRequest{
Config: models.PluginConfigs{}.FromJobSpec(jobSpec.Task.Config),
Assets: models.PluginAssets{}.FromJobSpec(jobSpec.Assets),
InstanceData: instanceSpec.Data,
Expand All @@ -141,6 +141,8 @@ func TestJobRunAssetsCompiler(t *testing.T) {
Value: "select * from table WHERE event_timestamp > '{{.EXECUTION_TIME}}' and name = '{{.secret.table_name}}'",
},
}}, nil)
defer depResMod.AssertExpectations(t)

pluginRepo := mock.NewPluginRepository(t)
pluginRepo.On("GetByName", "bq").Return(plugin, nil)

Expand Down
26 changes: 1 addition & 25 deletions compiler/job_input_render.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// DumpAssets used for dry run and does not effect actual execution of a job
func DumpAssets(ctx context.Context, jobSpec models.JobSpec, scheduledAt time.Time, engine models.TemplateEngine, allowOverride bool) (map[string]string, error) {
func DumpAssets(ctx context.Context, jobSpec models.JobSpec, scheduledAt time.Time, engine models.TemplateEngine) (map[string]string, error) {
var jobDestination string
if jobSpec.Task.Unit.DependencyMod != nil {
jobDestinationResponse, err := jobSpec.Task.Unit.DependencyMod.GenerateDestination(ctx, models.GenerateDestinationRequest{
Expand All @@ -36,30 +36,6 @@ func DumpAssets(ctx context.Context, jobSpec models.JobSpec, scheduledAt time.Ti

assetsToDump := jobSpec.Assets.ToMap()

if allowOverride {
// check if task needs to override the compilation behaviour
compiledAssetResponse, err := jobSpec.Task.Unit.CLIMod.CompileAssets(ctx, models.CompileAssetsRequest{
StartTime: startTime,
EndTime: endTime,
Config: models.PluginConfigs{}.FromJobSpec(jobSpec.Task.Config),
Assets: models.PluginAssets{}.FromJobSpec(jobSpec.Assets),
InstanceData: []models.JobRunSpecData{
{
Name: models.ConfigKeyExecutionTime,
Value: scheduledAt.Format(models.InstanceScheduledAtTimeLayout),
Type: models.InstanceDataTypeEnv,
},
},
PluginOptions: models.PluginOptions{
DryRun: true,
},
})
if err != nil {
return nil, err
}
assetsToDump = compiledAssetResponse.Assets.ToJobSpec().ToMap()
}

// compile again if needed
templates, err := engine.CompileFiles(assetsToDump, map[string]interface{}{
models.ConfigKeyDstart: startTime.Format(models.InstanceScheduledAtTimeLayout),
Expand Down
42 changes: 18 additions & 24 deletions compiler/job_run_input_compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func TestJobRunInputCompiler(t *testing.T) {
execUnit.On("PluginInfo").Return(&models.PluginInfoResponse{
Name: "bq",
}, nil)
cliMod := new(mock.CLIMod)
plugin := &models.Plugin{Base: execUnit, CLIMod: cliMod}
depResMod := new(mock.DependencyResolverMod)
plugin := &models.Plugin{Base: execUnit, DependencyMod: depResMod}

behavior := models.JobSpecBehavior{
CatchUp: false,
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Data: instanceSpecData,
}

cliMod.On("CompileAssets", context.TODO(), models.CompileAssetsRequest{
depResMod.On("CompileAssets", context.TODO(), models.CompileAssetsRequest{
Config: models.PluginConfigs{}.FromJobSpec(jobSpec.Task.Config),
Assets: models.PluginAssets{}.FromJobSpec(jobSpec.Assets),
InstanceData: instanceSpecData,
Expand All @@ -168,7 +168,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Value: "select * from table WHERE event_timestamp > '{{.EXECUTION_TIME}}'",
},
}}, nil)
defer cliMod.AssertExpectations(t)
defer depResMod.AssertExpectations(t)

pluginRepo.On("GetByName", "bq").Return(plugin, nil)

Expand Down Expand Up @@ -205,7 +205,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Behavior: behavior,
Schedule: schedule,
Task: models.JobSpecTask{
Unit: &models.Plugin{Base: execUnit, CLIMod: cliMod},
Unit: &models.Plugin{Base: execUnit},
Priority: 2000,
Window: window,
Config: models.JobSpecConfigs{
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Status: models.RunStateRunning,
Data: instanceSpecData,
}
cliMod.On("CompileAssets", context.TODO(), models.CompileAssetsRequest{
depResMod.On("CompileAssets", context.TODO(), models.CompileAssetsRequest{
Config: models.PluginConfigs{}.FromJobSpec(jobSpec.Task.Config),
Assets: models.PluginAssets{}.FromJobSpec(jobSpec.Assets),
InstanceData: instanceSpecData,
Expand All @@ -275,7 +275,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Value: "select * from table WHERE event_timestamp > '{{.EXECUTION_TIME}}'",
},
}}, nil)
defer cliMod.AssertExpectations(t)
defer depResMod.AssertExpectations(t)

pluginRepo.On("GetByName", "bq").Return(plugin, nil)

Expand Down Expand Up @@ -325,7 +325,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Behavior: behavior,
Schedule: schedule,
Task: models.JobSpecTask{
Unit: &models.Plugin{Base: execUnit, CLIMod: cliMod},
Unit: &models.Plugin{Base: execUnit},
Priority: 2000,
Window: window,
Config: models.JobSpecConfigs{
Expand Down Expand Up @@ -369,7 +369,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Data: instanceSpecData,
}

cliMod.On("CompileAssets", context.TODO(), models.CompileAssetsRequest{
depResMod.On("CompileAssets", context.TODO(), models.CompileAssetsRequest{
Config: models.PluginConfigs{}.FromJobSpec(jobSpec.Task.Config),
Assets: models.PluginAssets{}.FromJobSpec(jobSpec.Assets),
InstanceData: instanceSpecData,
Expand All @@ -381,7 +381,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Value: "select * from table WHERE event_timestamp > '{{.EXECUTION_TIME}}'",
},
}}, nil)
defer cliMod.AssertExpectations(t)
defer depResMod.AssertExpectations(t)

pluginRepo.On("GetByName", "bq").Return(plugin, nil)

Expand Down Expand Up @@ -410,7 +410,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Behavior: behavior,
Schedule: schedule,
Task: models.JobSpecTask{
Unit: &models.Plugin{Base: execUnit, CLIMod: cliMod},
Unit: &models.Plugin{Base: execUnit},
Priority: 2000,
Window: window,
Config: models.JobSpecConfigs{
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestJobRunInputCompiler(t *testing.T) {
UpdatedAt: time.Time{},
}

cliMod.On("CompileAssets", context.TODO(), models.CompileAssetsRequest{
depResMod.On("CompileAssets", context.TODO(), models.CompileAssetsRequest{
Config: models.PluginConfigs{}.FromJobSpec(jobSpec.Task.Config),
Assets: models.PluginAssets{}.FromJobSpec(jobSpec.Assets),
InstanceData: instanceSpecData,
Expand All @@ -464,7 +464,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Value: "select * from table WHERE event_timestamp > '{{.EXECUTION_TIME}}' and name = '{{.secret.table_name}}'",
},
}}, nil)
defer cliMod.AssertExpectations(t)
defer depResMod.AssertExpectations(t)

pluginRepo.On("GetByName", "bq").Return(plugin, nil)

Expand Down Expand Up @@ -543,7 +543,7 @@ func TestJobRunInputCompiler(t *testing.T) {
instanceName := "bq"
instanceType := models.InstanceTypeTask

cliMod.On("CompileAssets", context.TODO(), models.CompileAssetsRequest{
depResMod.On("CompileAssets", context.TODO(), models.CompileAssetsRequest{
StartTime: startTime,
EndTime: endTime,
Config: models.PluginConfigs{}.FromJobSpec(jobSpec.Task.Config),
Expand All @@ -555,7 +555,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Value: "select * from table WHERE event_timestamp > '{{.EXECUTION_TIME}}'",
},
}}, nil)
defer cliMod.AssertExpectations(t)
defer depResMod.AssertExpectations(t)

pluginRepo.On("GetByName", "bq").Return(plugin, nil)

Expand Down Expand Up @@ -591,7 +591,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Behavior: behavior,
Schedule: schedule,
Task: models.JobSpecTask{
Unit: &models.Plugin{Base: execUnit, CLIMod: cliMod},
Unit: &models.Plugin{Base: execUnit},
Priority: 2000,
Window: window,
Config: models.JobSpecConfigs{
Expand Down Expand Up @@ -652,13 +652,7 @@ func TestJobRunInputCompiler(t *testing.T) {
instanceName := transporterHook
instanceType := models.InstanceTypeHook

// instanceSpec := models.InstanceSpec{
// Name: transporterHook,
// Type: models.InstanceTypeHook,
// Status: models.RunStateRunning,
// Data: instanceSpecData,
// }
cliMod.On("CompileAssets", context.TODO(), models.CompileAssetsRequest{
depResMod.On("CompileAssets", context.TODO(), models.CompileAssetsRequest{
StartTime: startTime,
EndTime: endTime,
Config: models.PluginConfigs{}.FromJobSpec(jobSpec.Task.Config),
Expand All @@ -670,7 +664,7 @@ func TestJobRunInputCompiler(t *testing.T) {
Value: "select * from table WHERE event_timestamp > '{{.EXECUTION_TIME}}'",
},
}}, nil)
defer cliMod.AssertExpectations(t)
defer depResMod.AssertExpectations(t)

pluginRepo.On("GetByName", "bq").Return(plugin, nil)

Expand Down
Loading

0 comments on commit 989cf7a

Please sign in to comment.