From 964c6547f85b6dfc8cc16ca184c3a5e85b30ad3c Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Thu, 2 Mar 2023 15:10:52 +0700 Subject: [PATCH] Fix: source entrypoint with custom shell (#751) * fix: adding exec_entrypoint script * Revert "fix: adding exec_entrypoint script" This reverts commit ce373c2c6ceb8cbb18e2b74e70c116cb7d3135b1. * feat: add cmds for custom shell * refactor: rename entrypoint cmds and script * remove: join template func --- ext/scheduler/airflow/dag/compiler_test.go | 44 ++++++++++++------- ext/scheduler/airflow/dag/dag.py.tmpl | 12 ++--- ext/scheduler/airflow/dag/expected_dag.py | 20 ++++----- ext/scheduler/airflow/dag/models.go | 4 +- plugin/yaml/plugin.go | 9 ++++ plugin/yaml/plugin_test.go | 27 ++++++++---- plugin/yaml/tests/sample_plugin.yaml | 6 ++- .../tests/sample_plugin_without_shell.yaml | 24 ++++++++++ .../tests/sample_plugin_without_version.yaml | 4 +- sdk/plugin/mock/plugin.go | 6 ++- sdk/plugin/plugin.go | 11 +++-- sdk/plugin/plugin_test.go | 34 +++++++++----- 12 files changed, 140 insertions(+), 61 deletions(-) create mode 100644 plugin/yaml/tests/sample_plugin_without_shell.yaml diff --git a/ext/scheduler/airflow/dag/compiler_test.go b/ext/scheduler/airflow/dag/compiler_test.go index a7a1e463ca..54d68740dd 100644 --- a/ext/scheduler/airflow/dag/compiler_test.go +++ b/ext/scheduler/airflow/dag/compiler_test.go @@ -157,36 +157,48 @@ func (m mockPluginRepo) GetByName(name string) (*plugin.Plugin, error) { func setupPluginRepo() mockPluginRepo { execUnit := new(mock.YamlMod) execUnit.On("PluginInfo").Return(&plugin.Info{ - Name: "bq-bq", - Image: "example.io/namespace/bq2bq-executor:latest", - Entrypoint: "python3 /opt/bumblebee/main.py", + Name: "bq-bq", + Image: "example.io/namespace/bq2bq-executor:latest", + Entrypoint: plugin.Entrypoint{ + Shell: "/bin/bash", + Script: "python3 /opt/bumblebee/main.py", + }, }, nil) transporterHook := "transporter" hookUnit := new(mock.YamlMod) hookUnit.On("PluginInfo").Return(&plugin.Info{ - Name: transporterHook, - HookType: plugin.HookTypePre, - Image: "example.io/namespace/transporter-executor:latest", - Entrypoint: "java -cp /opt/transporter/transporter.jar:/opt/transporter/jolokia-jvm-agent.jar -javaagent:jolokia-jvm-agent.jar=port=7777,host=0.0.0.0 com.gojek.transporter.Main", - DependsOn: []string{"predator"}, + Name: transporterHook, + HookType: plugin.HookTypePre, + Image: "example.io/namespace/transporter-executor:latest", + Entrypoint: plugin.Entrypoint{ + Shell: "/bin/sh", + Script: "java -cp /opt/transporter/transporter.jar:/opt/transporter/jolokia-jvm-agent.jar -javaagent:jolokia-jvm-agent.jar=port=7777,host=0.0.0.0 com.gojek.transporter.Main", + }, + DependsOn: []string{"predator"}, }, nil) predatorHook := "predator" hookUnit2 := new(mock.YamlMod) hookUnit2.On("PluginInfo").Return(&plugin.Info{ - Name: predatorHook, - HookType: plugin.HookTypePost, - Image: "example.io/namespace/predator-image:latest", - Entrypoint: "predator ${SUB_COMMAND} -s ${PREDATOR_URL} -u \"${BQ_PROJECT}.${BQ_DATASET}.${BQ_TABLE}\"", + Name: predatorHook, + HookType: plugin.HookTypePost, + Image: "example.io/namespace/predator-image:latest", + Entrypoint: plugin.Entrypoint{ + Shell: "/bin/sh", + Script: "predator ${SUB_COMMAND} -s ${PREDATOR_URL} -u \"${BQ_PROJECT}.${BQ_DATASET}.${BQ_TABLE}\"", + }, }, nil) hookUnit3 := new(mock.YamlMod) hookUnit3.On("PluginInfo").Return(&plugin.Info{ - Name: "failureHook", - HookType: plugin.HookTypeFail, - Image: "example.io/namespace/failure-hook-image:latest", - Entrypoint: "sleep 5", + Name: "failureHook", + HookType: plugin.HookTypeFail, + Image: "example.io/namespace/failure-hook-image:latest", + Entrypoint: plugin.Entrypoint{ + Shell: "/bin/sh", + Script: "sleep 5", + }, }, nil) repo := mockPluginRepo{plugins: []*plugin.Plugin{ diff --git a/ext/scheduler/airflow/dag/dag.py.tmpl b/ext/scheduler/airflow/dag/dag.py.tmpl index 426e5c0d82..b922e84406 100644 --- a/ext/scheduler/airflow/dag/dag.py.tmpl +++ b/ext/scheduler/airflow/dag/dag.py.tmpl @@ -102,12 +102,12 @@ IMAGE_PULL_POLICY = "IfNotPresent" INIT_CONTAINER_IMAGE = "odpf/optimus:{{.Version}}" INIT_CONTAINER_ENTRYPOINT = "/opt/entrypoint_init_container.sh" -def get_entrypoint_cmd(plugin_entrypoint): +def get_entrypoint_cmd(plugin_entrypoint_script): path_config = JOB_DIR + "/in/.env" path_secret = JOB_DIR + "/in/.secret" entrypoint = "set -o allexport; source {path_config}; set +o allexport; cat {path_config}; ".format(path_config=path_config) entrypoint += "set -o allexport; source {path_secret}; set +o allexport; ".format(path_secret=path_secret) - return entrypoint + plugin_entrypoint + return entrypoint + plugin_entrypoint_script volume = k8s.V1Volume( name='asset-volume', @@ -150,8 +150,8 @@ init_container = k8s.V1Container( image_pull_policy=IMAGE_PULL_POLICY, namespace=conf.get('kubernetes', 'namespace', fallback="default"), image={{ .Task.Image | quote}}, - cmds=["/bin/sh"], - arguments=["-c", get_entrypoint_cmd("""{{.Task.Entrypoint}} """)], + cmds=["{{.Task.Entrypoint.Shell}}", "-c"], + arguments=[get_entrypoint_cmd("""{{.Task.Entrypoint.Script}} """)], name="{{ .Task.Name | replace "_" "-" }}", task_id={{ .Task.Name | quote}}, get_logs=True, @@ -193,8 +193,8 @@ hook_{{$hookName}} = SuperKubernetesPodOperator( image_pull_policy=IMAGE_PULL_POLICY, namespace=conf.get('kubernetes', 'namespace', fallback="default"), image="{{ $t.Image }}", - cmds=["/bin/sh"], - arguments=["-c", get_entrypoint_cmd("""{{ $t.Entrypoint }} """)], + cmds=["{{$t.Entrypoint.Shell}}", "-c"], + arguments=[get_entrypoint_cmd("""{{$t.Entrypoint.Script}} """)], name="hook_{{ $t.Name | replace "_" "-" }}", task_id="hook_{{ $t.Name }}", get_logs=True, diff --git a/ext/scheduler/airflow/dag/expected_dag.py b/ext/scheduler/airflow/dag/expected_dag.py index a51ef117aa..c54ca5f96e 100644 --- a/ext/scheduler/airflow/dag/expected_dag.py +++ b/ext/scheduler/airflow/dag/expected_dag.py @@ -72,12 +72,12 @@ INIT_CONTAINER_IMAGE = "odpf/optimus:dev" INIT_CONTAINER_ENTRYPOINT = "/opt/entrypoint_init_container.sh" -def get_entrypoint_cmd(plugin_entrypoint): +def get_entrypoint_cmd(plugin_entrypoint_script): path_config = JOB_DIR + "/in/.env" path_secret = JOB_DIR + "/in/.secret" entrypoint = "set -o allexport; source {path_config}; set +o allexport; cat {path_config}; ".format(path_config=path_config) entrypoint += "set -o allexport; source {path_secret}; set +o allexport; ".format(path_secret=path_secret) - return entrypoint + plugin_entrypoint + return entrypoint + plugin_entrypoint_script volume = k8s.V1Volume( name='asset-volume', @@ -119,8 +119,8 @@ def get_entrypoint_cmd(plugin_entrypoint): image_pull_policy=IMAGE_PULL_POLICY, namespace=conf.get('kubernetes', 'namespace', fallback="default"), image="example.io/namespace/bq2bq-executor:latest", - cmds=["/bin/sh"], - arguments=["-c", get_entrypoint_cmd("""python3 /opt/bumblebee/main.py """)], + cmds=["/bin/bash", "-c"], + arguments=[get_entrypoint_cmd("""python3 /opt/bumblebee/main.py """)], name="bq-bq", task_id="bq-bq", get_logs=True, @@ -156,8 +156,8 @@ def get_entrypoint_cmd(plugin_entrypoint): image_pull_policy=IMAGE_PULL_POLICY, namespace=conf.get('kubernetes', 'namespace', fallback="default"), image="example.io/namespace/transporter-executor:latest", - cmds=["/bin/sh"], - arguments=["-c", get_entrypoint_cmd("""java -cp /opt/transporter/transporter.jar:/opt/transporter/jolokia-jvm-agent.jar -javaagent:jolokia-jvm-agent.jar=port=7777,host=0.0.0.0 com.gojek.transporter.Main """)], + cmds=["/bin/sh", "-c"], + arguments=[get_entrypoint_cmd("""java -cp /opt/transporter/transporter.jar:/opt/transporter/jolokia-jvm-agent.jar -javaagent:jolokia-jvm-agent.jar=port=7777,host=0.0.0.0 com.gojek.transporter.Main """)], name="hook_transporter", task_id="hook_transporter", get_logs=True, @@ -189,8 +189,8 @@ def get_entrypoint_cmd(plugin_entrypoint): image_pull_policy=IMAGE_PULL_POLICY, namespace=conf.get('kubernetes', 'namespace', fallback="default"), image="example.io/namespace/predator-image:latest", - cmds=["/bin/sh"], - arguments=["-c", get_entrypoint_cmd("""predator ${SUB_COMMAND} -s ${PREDATOR_URL} -u "${BQ_PROJECT}.${BQ_DATASET}.${BQ_TABLE}" """)], + cmds=["/bin/sh", "-c"], + arguments=[get_entrypoint_cmd("""predator ${SUB_COMMAND} -s ${PREDATOR_URL} -u "${BQ_PROJECT}.${BQ_DATASET}.${BQ_TABLE}" """)], name="hook_predator", task_id="hook_predator", get_logs=True, @@ -222,8 +222,8 @@ def get_entrypoint_cmd(plugin_entrypoint): image_pull_policy=IMAGE_PULL_POLICY, namespace=conf.get('kubernetes', 'namespace', fallback="default"), image="example.io/namespace/failure-hook-image:latest", - cmds=["/bin/sh"], - arguments=["-c", get_entrypoint_cmd("""sleep 5 """)], + cmds=["/bin/sh", "-c"], + arguments=[get_entrypoint_cmd("""sleep 5 """)], name="hook_failureHook", task_id="hook_failureHook", get_logs=True, diff --git a/ext/scheduler/airflow/dag/models.go b/ext/scheduler/airflow/dag/models.go index 0c0b2886f5..0e51507990 100644 --- a/ext/scheduler/airflow/dag/models.go +++ b/ext/scheduler/airflow/dag/models.go @@ -33,7 +33,7 @@ type TemplateContext struct { type Task struct { Name string Image string - Entrypoint string + Entrypoint plugin.Entrypoint } func PrepareTask(job *scheduler.Job, pluginRepo PluginRepo) (Task, error) { @@ -54,7 +54,7 @@ func PrepareTask(job *scheduler.Job, pluginRepo PluginRepo) (Task, error) { type Hook struct { Name string Image string - Entrypoint string + Entrypoint plugin.Entrypoint IsFailHook bool } diff --git a/plugin/yaml/plugin.go b/plugin/yaml/plugin.go index 3b35c1f1cf..9d82a070c9 100644 --- a/plugin/yaml/plugin.go +++ b/plugin/yaml/plugin.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "strings" "github.com/hashicorp/go-hclog" "github.com/spf13/afero" @@ -104,6 +105,14 @@ func NewPluginSpec(pluginPath string) (*PluginSpec, error) { if err := yaml.UnmarshalStrict(pluginBytes, &plugin); err != nil { return &plugin, err } + // default values + if plugin.Info.Entrypoint.Shell == "" { + plugin.Info.Entrypoint.Shell = "/bin/sh" + } + + // standardize script value + script := plugin.Info.Entrypoint.Script + plugin.Info.Entrypoint.Script = strings.ReplaceAll(script, "\n", "; ") return &plugin, nil } diff --git a/plugin/yaml/plugin_test.go b/plugin/yaml/plugin_test.go index 90847d5109..3187c34cdb 100644 --- a/plugin/yaml/plugin_test.go +++ b/plugin/yaml/plugin_test.go @@ -16,7 +16,7 @@ import ( type mockYamlMod struct { Name string Image string - Entrypoint string + Entrypoint plugin.Entrypoint PluginVersion string PluginType string } @@ -51,10 +51,13 @@ func TestYamlPlugin(t *testing.T) { testYamlPluginPath := "tests/sample_plugin.yaml" // success testYamlPluginName := "bq2bqtest" expectedInfo := &plugin.Info{ - Name: "bq2bqtest", - Description: "Testing", - Image: "docker.io/odpf/optimus-task-bq2bq-executor:latest", - Entrypoint: "sleep 100", + Name: "bq2bqtest", + Description: "Testing", + Image: "docker.io/odpf/optimus-task-bq2bq-executor:latest", + Entrypoint: plugin.Entrypoint{ + Shell: "/bin/bash", + Script: "sleep 100; sleep 150", + }, PluginType: "task", PluginMods: []plugin.Mod{"cli"}, PluginVersion: "latest", @@ -168,12 +171,20 @@ func TestYamlPlugin(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, repo.GetAll()) }) + t.Run("should use default when entrypoint cmds is empty", func(t *testing.T) { + pluginSpec, err := yaml.NewPluginSpec("tests/sample_plugin_without_shell.yaml") + assert.NoError(t, err) + assert.NotEmpty(t, pluginSpec) + assert.Equal(t, "/bin/sh", pluginSpec.Entrypoint.Shell) + }) t.Run("should returns error when load yaml when same name exists", func(t *testing.T) { repoWithBinayPlugin := models.NewPluginRepository() err := repoWithBinayPlugin.AddYaml(&mockYamlMod{ - Name: testYamlPluginName, - Image: "sdsd", - Entrypoint: "sleep 100", + Name: testYamlPluginName, + Image: "sdsd", + Entrypoint: plugin.Entrypoint{ + Script: "sleep 100", + }, PluginVersion: "asdasd", PluginType: plugin.TypeTask.String(), }) diff --git a/plugin/yaml/tests/sample_plugin.yaml b/plugin/yaml/tests/sample_plugin.yaml index af79c0e279..75e4eb3627 100644 --- a/plugin/yaml/tests/sample_plugin.yaml +++ b/plugin/yaml/tests/sample_plugin.yaml @@ -6,7 +6,11 @@ pluginmods: - dependencyresolver pluginversion: latest image: docker.io/odpf/optimus-task-bq2bq-executor:latest -entrypoint: "sleep 100" +entrypoint: + shell: "/bin/bash" + script: |- + sleep 100 + sleep 150 questions: - name: PROJECT diff --git a/plugin/yaml/tests/sample_plugin_without_shell.yaml b/plugin/yaml/tests/sample_plugin_without_shell.yaml new file mode 100644 index 0000000000..f45e6bfbf9 --- /dev/null +++ b/plugin/yaml/tests/sample_plugin_without_shell.yaml @@ -0,0 +1,24 @@ +name: bq2bqtest +description: Testing +plugintype: task +pluginmods: + - cli + - dependencyresolver +pluginversion: latest +image: docker.io/odpf/optimus-task-bq2bq-executor:latest +entrypoint: + script: "sleep 100" + +questions: + - name: PROJECT + prompt: Project ID + regexp: ^[a-zA-Z0-9_\-]+$ + minlength: 3 + +defaultconfig: +- name: TEST + value: "{{.test}}" + +defaultassets: + - name: query.sql + value: Select * from "project.dataset.table"; \ No newline at end of file diff --git a/plugin/yaml/tests/sample_plugin_without_version.yaml b/plugin/yaml/tests/sample_plugin_without_version.yaml index 5d8f0a784d..567bb671c8 100644 --- a/plugin/yaml/tests/sample_plugin_without_version.yaml +++ b/plugin/yaml/tests/sample_plugin_without_version.yaml @@ -5,7 +5,9 @@ pluginmods: - cli pluginversion: "" image: "" -entrypoint: "sleep 100" +entrypoint: + shell: "/bin/bash" + script: "sleep 100" questions: - name: PROJECT diff --git a/sdk/plugin/mock/plugin.go b/sdk/plugin/mock/plugin.go index 52cc9226ba..fd4b45874b 100644 --- a/sdk/plugin/mock/plugin.go +++ b/sdk/plugin/mock/plugin.go @@ -34,8 +34,10 @@ func (p *MockYamlMod) PluginInfo() *plugin.Info { DependsOn: nil, HookType: "", Image: "gcr.io/bq-plugin:dev", - Entrypoint: "sleep 60", - PluginMods: []plugin.Mod{plugin.ModTypeCLI}, + Entrypoint: plugin.Entrypoint{ + Script: "sleep 60", + }, + PluginMods: []plugin.Mod{plugin.ModTypeCLI}, } } diff --git a/sdk/plugin/plugin.go b/sdk/plugin/plugin.go index 0848ba3d54..663b1eac3a 100644 --- a/sdk/plugin/plugin.go +++ b/sdk/plugin/plugin.go @@ -37,6 +37,11 @@ func (m Mod) String() string { return string(m) } +type Entrypoint struct { + Shell string + Script string +} + type Info struct { // Name should as simple as possible with no special characters // should start with a character, better if all lowercase @@ -52,7 +57,7 @@ type Info struct { Image string // Entrypoint command which will be used to execute the plugin - Entrypoint string + Entrypoint Entrypoint // DependsOn returns list of hooks this should be executed after DependsOn []string `yaml:",omitempty"` @@ -78,8 +83,8 @@ func (info *Info) Validate() error { } // entrypoint is a required field - if info.Entrypoint == "" { - return errors.New("entrypoint cannot be empty") + if info.Entrypoint.Script == "" { + return errors.New("entrypoint script cannot be empty") } switch info.PluginType { diff --git a/sdk/plugin/plugin_test.go b/sdk/plugin/plugin_test.go index ae8068c2ee..5182516558 100644 --- a/sdk/plugin/plugin_test.go +++ b/sdk/plugin/plugin_test.go @@ -49,8 +49,10 @@ func TestPlugins(t *testing.T) { Name: "", Image: "odpf.io/example", PluginVersion: "0.2", - Entrypoint: "sleep 10", - PluginType: plugin.TypeTask, + Entrypoint: plugin.Entrypoint{ + Script: "sleep 10", + }, + PluginType: plugin.TypeTask, }, }, { @@ -60,8 +62,10 @@ func TestPlugins(t *testing.T) { Name: "example", Image: "", PluginVersion: "0.2", - Entrypoint: "sleep 10", - PluginType: plugin.TypeTask, + Entrypoint: plugin.Entrypoint{ + Script: "sleep 10", + }, + PluginType: plugin.TypeTask, }, }, { @@ -71,18 +75,20 @@ func TestPlugins(t *testing.T) { Name: "example", Image: "odpf.io/example", PluginVersion: "", - Entrypoint: "sleep 10", - PluginType: plugin.TypeTask, + Entrypoint: plugin.Entrypoint{ + Script: "sleep 10", + }, + PluginType: plugin.TypeTask, }, }, { name: "when entrypoint is empty", - err: errors.New("entrypoint cannot be empty"), + err: errors.New("entrypoint args cannot be empty"), info: plugin.Info{ Name: "example", Image: "odpf.io/example", PluginVersion: "0.2", - Entrypoint: "", + Entrypoint: plugin.Entrypoint{}, PluginType: plugin.TypeTask, }, }, @@ -93,8 +99,10 @@ func TestPlugins(t *testing.T) { Name: "example", Image: "odpf.io/example", PluginVersion: "0.2", - Entrypoint: "sleep 10", - PluginType: "", + Entrypoint: plugin.Entrypoint{ + Script: "sleep 10", + }, + PluginType: "", }, }, { @@ -104,8 +112,10 @@ func TestPlugins(t *testing.T) { Name: "example", Image: "odpf.io/example", PluginVersion: "0.2", - Entrypoint: "sleep 10", - PluginType: plugin.TypeTask, + Entrypoint: plugin.Entrypoint{ + Script: "sleep 10", + }, + PluginType: plugin.TypeTask, }, }, }