diff --git a/Dockerfile b/Dockerfile index 03c329ba33..ab74ab73f9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,6 +7,11 @@ WORKDIR /app RUN adduser -D $USER RUN chown -R $USER:$USER /app +# use this part on airflow task to fetch and compile assets by optimus client +COPY ./entrypoint_init_container.sh /opt/entrypoint_init_container.sh +RUN chmod +x /opt/entrypoint_init_container.sh + USER $USER + EXPOSE 8080 CMD ["optimus"] \ No newline at end of file diff --git a/entrypoint_init_container.sh b/entrypoint_init_container.sh new file mode 100644 index 0000000000..ebfd81cad4 --- /dev/null +++ b/entrypoint_init_container.sh @@ -0,0 +1,24 @@ +#! /bin/sh + +#get optimus version +echo "-- optimus client version" +# /app/optimus version + +# printenv + +# get resources +echo "-- print env variables (required for fetching assets)" +echo "JOB_NAME:$JOB_NAME" +echo "OPTIMUS_PROJECT:$PROJECT" +echo "JOB_DIR:$JOB_DIR" +echo "INSTANCE_TYPE:$INSTANCE_TYPE" +echo "INSTANCE_NAME:$INSTANCE_NAME" +echo "SCHEDULED_AT:$SCHEDULED_AT" +echo "OPTIMUS_HOST:$OPTIMUS_HOST" +echo "" + +echo "-- initializing optimus assets" +optimus job run-input "$JOB_NAME" --project-name \ + "$PROJECT" --output-dir "$JOB_DIR" \ + --type "$INSTANCE_TYPE" --name "$INSTANCE_NAME" \ + --scheduled-at "$SCHEDULED_AT" --host "$OPTIMUS_HOST" diff --git a/ext/scheduler/airflow2/resources/__lib.py b/ext/scheduler/airflow2/resources/__lib.py index 9107bad4e6..abc998e6f5 100644 --- a/ext/scheduler/airflow2/resources/__lib.py +++ b/ext/scheduler/airflow2/resources/__lib.py @@ -7,6 +7,7 @@ from datetime import datetime, timedelta from time import sleep from typing import Any, Dict, List, Optional +from kubernetes.client import models as k8s import json @@ -25,6 +26,7 @@ from airflow.utils.db import provide_session from airflow.utils.decorators import apply_defaults from airflow.utils.state import State +from airflow.utils import yaml from croniter import croniter log = logging.getLogger(__name__) @@ -70,24 +72,99 @@ class SuperKubernetesPodOperator(KubernetesPodOperator): .. note: keep this up to date if there is any change in KubernetesPodOperator execute method """ template_fields = ('image', 'cmds', 'arguments', 'env_vars', 'config_file', 'pod_template_file') - + @apply_defaults def __init__(self, + optimus_hostname, + optimus_projectname, + optimus_namespacename, + optimus_jobname, + optimus_jobtype, *args, **kwargs): super(SuperKubernetesPodOperator, self).__init__(*args, **kwargs) - self.do_xcom_push = kwargs.get('do_xcom_push') self.namespace = kwargs.get('namespace') self.in_cluster = kwargs.get('in_cluster') self.cluster_context = kwargs.get('cluster_context') self.reattach_on_restart = kwargs.get('reattach_on_restart') self.config_file = kwargs.get('config_file') + + # used to fetch job env from optimus for adding to k8s pod + self.optimus_hostname = optimus_hostname + self.optimus_namespacename = optimus_namespacename + self.optimus_jobname = optimus_jobname + self.optimus_projectname = optimus_projectname + self.optimus_jobtype = optimus_jobtype + self._optimus_client = OptimusAPIClient(optimus_hostname) - def execute(self, context): + def render_init_containers(self, context): + for ic in self.init_containers: + env = getattr(ic, 'env') + if env: + self.render_template(env, context) + + def fetch_env_from_optimus(self, context): + scheduled_at = context["next_execution_date"].strftime(TIMESTAMP_FORMAT) + job_meta = self._optimus_client.get_job_run_input(scheduled_at, self.optimus_projectname, self.optimus_jobname, self.optimus_jobtype) + return [ + k8s.V1EnvVar(name=key,value=val) for key, val in job_meta["envs"].items() + ] + [ + k8s.V1EnvVar(name=key,value=val) for key, val in job_meta["secrets"].items() + ] + + def _dry_run(self, pod): + def prune_dict(val: Any, mode='strict'): + """ + Given dict ``val``, returns new dict based on ``val`` with all + empty elements removed. + What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict' + then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x`` + will be removed if ``bool(x) is False``. + """ + + def is_empty(x): + if mode == 'strict': + return x is None + elif mode == 'truthy': + return bool(x) is False + raise ValueError("allowable values for `mode` include 'truthy' and 'strict'") + + if isinstance(val, dict): + new_dict = {} + for k, v in val.items(): + if is_empty(v): + continue + elif isinstance(v, (list, dict)): + new_val = prune_dict(v, mode=mode) + if new_val: + new_dict[k] = new_val + else: + new_dict[k] = v + return new_dict + elif isinstance(val, list): + new_list = [] + for v in val: + if is_empty(v): + continue + elif isinstance(v, (list, dict)): + new_val = prune_dict(v, mode=mode) + if new_val: + new_list.append(new_val) + else: + new_list.append(v) + return new_list + else: + return val + log.info(prune_dict(pod.to_dict(), mode='strict')) + log.info(yaml.dump(prune_dict(pod.to_dict(), mode='strict'))) + def execute(self, context): log_start_event(context, EVENT_NAMES.get("TASK_START_EVENT")) # to be done async + self.env_vars += self.fetch_env_from_optimus(context) + # init-container is not considered for rendering in airflow + self.render_init_containers(context) log.info('Task image version: %s', self.image) try: @@ -100,6 +177,8 @@ def execute(self, context): config_file=self.config_file) self.pod = self.create_pod_request_obj() + # self._dry_run(self.pod) # logs the yaml file for the pod [not compatible for future verison of implementation] + self.namespace = self.pod.metadata.namespace self.client = client @@ -165,6 +244,16 @@ def get_task_window(self, scheduled_at: str, version: int, window_size: str, win self._raise_error_if_request_failed(response) return response.json() + + def get_job_run_input(self, execution_date: str, project_name: str, job_name: str, job_type: str) -> dict: + response = requests.post(url="{}/api/v1beta1/project/{}/job/{}/run_input".format(self.host, project_name, job_name), + json={'scheduled_at': execution_date, + 'instance_name': job_name, + 'instance_type': "TYPE_" + job_type.upper()}) + + self._raise_error_if_request_failed(response) + return response.json() + def get_job_metadata(self, execution_date, namespace, project, job) -> dict: url = '{optimus_host}/api/v1beta1/project/{project_name}/namespace/{namespace_name}/job/{job_name}'.format(optimus_host=self.host, namespace_name=namespace, diff --git a/ext/scheduler/airflow2/resources/base_dag.py b/ext/scheduler/airflow2/resources/base_dag.py index 52a05f6685..95bd8596da 100644 --- a/ext/scheduler/airflow2/resources/base_dag.py +++ b/ext/scheduler/airflow2/resources/base_dag.py @@ -4,7 +4,6 @@ from datetime import datetime, timedelta, timezone from airflow.models import DAG, Variable, DagRun, DagModel, TaskInstance, BaseOperator, XCom, XCOM_RETURN_KEY -from airflow.kubernetes.secret import Secret from airflow.configuration import conf from airflow.operators.python_operator import PythonOperator from airflow.utils.weight_rule import WeightRule @@ -86,15 +85,6 @@ ) {{$baseTaskSchema := .Job.Task.Unit.Info -}} -{{ if ne $baseTaskSchema.SecretPath "" -}} -transformation_secret = Secret( - "volume", - {{ dir $baseTaskSchema.SecretPath | quote }}, - "optimus-task-{{ $baseTaskSchema.Name }}", - {{ base $baseTaskSchema.SecretPath | quote }} -) -{{- end }} - {{- $setCPURequest := not (empty .Metadata.Resource.Request.CPU) -}} {{- $setMemoryRequest := not (empty .Metadata.Resource.Request.Memory) -}} {{- $setCPULimit := not (empty .Metadata.Resource.Limit.CPU) -}} @@ -125,9 +115,51 @@ {{- end }} ) {{- end }} +JOB_DIR = "/data" +IMAGE_PULL_POLICY="Always" +INIT_CONTAINER_IMAGE="odpf/optimus:{{.Version}}" +INIT_CONTAINER_ENTRYPOINT = "/opt/entrypoint_init_container.sh" + +volume = k8s.V1Volume( + name='asset-volume', + empty_dir=k8s.V1EmptyDirVolumeSource() +) +asset_volume_mounts = [ + k8s.V1VolumeMount(mount_path=JOB_DIR, name='asset-volume', sub_path=None, read_only=False) +] +executor_env_vars = [ + k8s.V1EnvVar(name="JOB_LABELS",value='{{.Job.GetLabelsAsString}}'), + k8s.V1EnvVar(name="JOB_DIR",value=JOB_DIR), +] + +init_env_vars = [ + k8s.V1EnvVar(name="JOB_DIR",value=JOB_DIR), + k8s.V1EnvVar(name="JOB_NAME",value='{{$.Job.Name}}'), + k8s.V1EnvVar(name="OPTIMUS_HOST",value='{{$.Hostname}}'), + k8s.V1EnvVar(name="PROJECT",value='{{$.Namespace.ProjectSpec.Name}}'), + k8s.V1EnvVar(name="SCHEDULED_AT",value='{{ "{{ next_execution_date }}" }}'), +] + +init_container = k8s.V1Container( + name="init-container", + image=INIT_CONTAINER_IMAGE, + image_pull_policy=IMAGE_PULL_POLICY, + env=init_env_vars + [ + k8s.V1EnvVar(name="INSTANCE_TYPE",value='{{$.InstanceTypeTask}}'), + k8s.V1EnvVar(name="INSTANCE_NAME",value='{{$baseTaskSchema.Name}}'), + ], + security_context=k8s.V1PodSecurityContext(run_as_user=0), + volume_mounts=asset_volume_mounts, + command=["/bin/sh", INIT_CONTAINER_ENTRYPOINT], +) transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} = SuperKubernetesPodOperator( - image_pull_policy="IfNotPresent", + optimus_hostname="{{$.Hostname}}", + optimus_projectname="{{$.Namespace.ProjectSpec.Name}}", + optimus_namespacename="{{$.Namespace.Name}}", + optimus_jobname="{{.Job.Name}}", + optimus_jobtype="{{$.InstanceTypeTask}}", + image_pull_policy=IMAGE_PULL_POLICY, namespace = conf.get('kubernetes', 'namespace', fallback="default"), image = {{ $baseTaskSchema.Image | quote}}, cmds=[], @@ -139,42 +171,43 @@ in_cluster=True, is_delete_operator_pod=True, do_xcom_push=False, - secrets=[{{ if ne $baseTaskSchema.SecretPath "" -}} transformation_secret {{- end }}], - env_vars = [ - k8s.V1EnvVar(name="JOB_NAME",value='{{.Job.Name}}'), - k8s.V1EnvVar(name="OPTIMUS_HOST",value='{{.Hostname}}'), - k8s.V1EnvVar(name="JOB_LABELS",value='{{.Job.GetLabelsAsString}}'), - k8s.V1EnvVar(name="JOB_DIR",value='/data'), - k8s.V1EnvVar(name="PROJECT",value='{{.Namespace.ProjectSpec.Name}}'), - k8s.V1EnvVar(name="NAMESPACE",value='{{.Namespace.Name}}'), - k8s.V1EnvVar(name="INSTANCE_TYPE",value='{{$.InstanceTypeTask}}'), - k8s.V1EnvVar(name="INSTANCE_NAME",value='{{$baseTaskSchema.Name}}'), - k8s.V1EnvVar(name="SCHEDULED_AT",value='{{ "{{ next_execution_date }}" }}'), - ], + env_vars=executor_env_vars, {{- if gt .SLAMissDurationInSec 0 }} sla=timedelta(seconds={{ .SLAMissDurationInSec }}), {{- end }} {{- if $setResourceConfig }} resources = resources, {{- end }} - reattach_on_restart=True + reattach_on_restart=True, + volume_mounts=asset_volume_mounts, + volumes=[volume], + init_containers=[init_container], ) # hooks loop start {{ range $_, $t := .Job.Hooks }} {{ $hookSchema := $t.Unit.Info -}} -{{ if ne $hookSchema.SecretPath "" -}} -hook_{{$hookSchema.Name | replace "-" "_"}}_secret = Secret( - "volume", - {{ dir $hookSchema.SecretPath | quote }}, - "optimus-hook-{{ $hookSchema.Name }}", - {{ base $hookSchema.SecretPath | quote }} +init_container_{{$hookSchema.Name | replace "-" "__dash__"}} = k8s.V1Container( + name="init-container", + image=INIT_CONTAINER_IMAGE, + image_pull_policy=IMAGE_PULL_POLICY, + env= init_env_vars + [ + k8s.V1EnvVar(name="INSTANCE_TYPE",value='{{$.InstanceTypeHook}}'), + k8s.V1EnvVar(name="INSTANCE_NAME",value='{{$hookSchema.Name}}'), + ], + security_context=k8s.V1PodSecurityContext(run_as_user=0), + volume_mounts=asset_volume_mounts, + command=["/bin/sh", INIT_CONTAINER_ENTRYPOINT], ) -{{- end }} hook_{{$hookSchema.Name | replace "-" "__dash__"}} = SuperKubernetesPodOperator( - image_pull_policy="IfNotPresent", + optimus_hostname="{{$.Hostname}}", + optimus_projectname="{{$.Namespace.ProjectSpec.Name}}", + optimus_namespacename="{{$.Namespace.Name}}", + optimus_jobname="{{$.Job.Name}}", + optimus_jobtype="{{$.InstanceTypeHook}}", + image_pull_policy=IMAGE_PULL_POLICY, namespace = conf.get('kubernetes', 'namespace', fallback="default"), image = "{{ $hookSchema.Image }}", cmds=[], @@ -185,26 +218,17 @@ in_cluster=True, is_delete_operator_pod=True, do_xcom_push=False, - secrets=[{{ if ne $hookSchema.SecretPath "" -}} hook_{{$hookSchema.Name | replace "-" "_"}}_secret {{- end }}], - env_vars = [ - k8s.V1EnvVar(name="JOB_NAME",value='{{$.Job.Name}}'), - k8s.V1EnvVar(name="OPTIMUS_HOST",value='{{$.Hostname}}'), - k8s.V1EnvVar(name="JOB_LABELS",value='{{$.Job.GetLabelsAsString}}'), - k8s.V1EnvVar(name="JOB_DIR",value='/data'), - k8s.V1EnvVar(name="PROJECT",value='{{$.Namespace.ProjectSpec.Name}}'), - k8s.V1EnvVar(name="NAMESPACE",value='{{$.Namespace.Name}}'), - k8s.V1EnvVar(name="INSTANCE_TYPE",value='{{$.InstanceTypeHook}}'), - k8s.V1EnvVar(name="INSTANCE_NAME",value='{{$hookSchema.Name}}'), - k8s.V1EnvVar(name="SCHEDULED_AT",value='{{ "{{ next_execution_date }}" }}'), - # rest of the env vars are pulled from the container by making a GRPC call to optimus - ], + env_vars=executor_env_vars, {{- if eq $hookSchema.HookType $.HookTypeFail }} trigger_rule="one_failed", {{- end }} {{- if $setResourceConfig }} resources = resources, {{- end }} - reattach_on_restart=True + reattach_on_restart=True, + volume_mounts=asset_volume_mounts, + volumes=[volume], + init_containers=[init_container_{{$hookSchema.Name | replace "-" "__dash__"}}], ) {{- end }} # hooks loop ends diff --git a/ext/scheduler/airflow2/resources/expected_compiled_template.py b/ext/scheduler/airflow2/resources/expected_compiled_template.py index 235be17ef7..f9ddbb2cce 100644 --- a/ext/scheduler/airflow2/resources/expected_compiled_template.py +++ b/ext/scheduler/airflow2/resources/expected_compiled_template.py @@ -4,7 +4,6 @@ from datetime import datetime, timedelta, timezone from airflow.models import DAG, Variable, DagRun, DagModel, TaskInstance, BaseOperator, XCom, XCOM_RETURN_KEY -from airflow.kubernetes.secret import Secret from airflow.configuration import conf from airflow.operators.python_operator import PythonOperator from airflow.utils.weight_rule import WeightRule @@ -77,15 +76,52 @@ dag=dag ) -transformation_secret = Secret( - "volume", - "/opt/optimus/secrets", - "optimus-task-bq", - "auth.json" + +JOB_DIR = "/data" +IMAGE_PULL_POLICY="Always" +INIT_CONTAINER_IMAGE="odpf/optimus:dev" +INIT_CONTAINER_ENTRYPOINT = "/opt/entrypoint_init_container.sh" + +volume = k8s.V1Volume( + name='asset-volume', + empty_dir=k8s.V1EmptyDirVolumeSource() +) +asset_volume_mounts = [ + k8s.V1VolumeMount(mount_path=JOB_DIR, name='asset-volume', sub_path=None, read_only=False) +] +executor_env_vars = [ + k8s.V1EnvVar(name="JOB_LABELS",value='orchestrator=optimus'), + k8s.V1EnvVar(name="JOB_DIR",value=JOB_DIR), +] + +init_env_vars = [ + k8s.V1EnvVar(name="JOB_DIR",value=JOB_DIR), + k8s.V1EnvVar(name="JOB_NAME",value='foo'), + k8s.V1EnvVar(name="OPTIMUS_HOST",value='http://airflow.example.io'), + k8s.V1EnvVar(name="PROJECT",value='foo-project'), + k8s.V1EnvVar(name="SCHEDULED_AT",value='{{ next_execution_date }}'), +] + +init_container = k8s.V1Container( + name="init-container", + image=INIT_CONTAINER_IMAGE, + image_pull_policy=IMAGE_PULL_POLICY, + env=init_env_vars + [ + k8s.V1EnvVar(name="INSTANCE_TYPE",value='task'), + k8s.V1EnvVar(name="INSTANCE_NAME",value='bq'), + ], + security_context=k8s.V1PodSecurityContext(run_as_user=0), + volume_mounts=asset_volume_mounts, + command=["/bin/sh", INIT_CONTAINER_ENTRYPOINT], ) transformation_bq = SuperKubernetesPodOperator( - image_pull_policy="IfNotPresent", + optimus_hostname="http://airflow.example.io", + optimus_projectname="foo-project", + optimus_namespacename="bar-namespace", + optimus_jobname="foo", + optimus_jobtype="task", + image_pull_policy=IMAGE_PULL_POLICY, namespace = conf.get('kubernetes', 'namespace', fallback="default"), image = "example.io/namespace/image:latest", cmds=[], @@ -97,33 +133,36 @@ in_cluster=True, is_delete_operator_pod=True, do_xcom_push=False, - secrets=[transformation_secret], - env_vars = [ - k8s.V1EnvVar(name="JOB_NAME",value='foo'), - k8s.V1EnvVar(name="OPTIMUS_HOST",value='http://airflow.example.io'), - k8s.V1EnvVar(name="JOB_LABELS",value='orchestrator=optimus'), - k8s.V1EnvVar(name="JOB_DIR",value='/data'), - k8s.V1EnvVar(name="PROJECT",value='foo-project'), - k8s.V1EnvVar(name="NAMESPACE",value='bar-namespace'), - k8s.V1EnvVar(name="INSTANCE_TYPE",value='task'), - k8s.V1EnvVar(name="INSTANCE_NAME",value='bq'), - k8s.V1EnvVar(name="SCHEDULED_AT",value='{{ next_execution_date }}'), - ], + env_vars=executor_env_vars, sla=timedelta(seconds=7200), - reattach_on_restart=True + reattach_on_restart=True, + volume_mounts=asset_volume_mounts, + volumes=[volume], + init_containers=[init_container], ) # hooks loop start -hook_transporter_secret = Secret( - "volume", - "/opt/optimus/secrets", - "optimus-hook-transporter", - "auth.json" +init_container_transporter = k8s.V1Container( + name="init-container", + image=INIT_CONTAINER_IMAGE, + image_pull_policy=IMAGE_PULL_POLICY, + env= init_env_vars + [ + k8s.V1EnvVar(name="INSTANCE_TYPE",value='hook'), + k8s.V1EnvVar(name="INSTANCE_NAME",value='transporter'), + ], + security_context=k8s.V1PodSecurityContext(run_as_user=0), + volume_mounts=asset_volume_mounts, + command=["/bin/sh", INIT_CONTAINER_ENTRYPOINT], ) hook_transporter = SuperKubernetesPodOperator( - image_pull_policy="IfNotPresent", + optimus_hostname="http://airflow.example.io", + optimus_projectname="foo-project", + optimus_namespacename="bar-namespace", + optimus_jobname="foo", + optimus_jobtype="hook", + image_pull_policy=IMAGE_PULL_POLICY, namespace = conf.get('kubernetes', 'namespace', fallback="default"), image = "example.io/namespace/hook-image:latest", cmds=[], @@ -134,25 +173,32 @@ in_cluster=True, is_delete_operator_pod=True, do_xcom_push=False, - secrets=[hook_transporter_secret], - env_vars = [ - k8s.V1EnvVar(name="JOB_NAME",value='foo'), - k8s.V1EnvVar(name="OPTIMUS_HOST",value='http://airflow.example.io'), - k8s.V1EnvVar(name="JOB_LABELS",value='orchestrator=optimus'), - k8s.V1EnvVar(name="JOB_DIR",value='/data'), - k8s.V1EnvVar(name="PROJECT",value='foo-project'), - k8s.V1EnvVar(name="NAMESPACE",value='bar-namespace'), + env_vars=executor_env_vars, + reattach_on_restart=True, + volume_mounts=asset_volume_mounts, + volumes=[volume], + init_containers=[init_container_transporter], +) +init_container_predator = k8s.V1Container( + name="init-container", + image=INIT_CONTAINER_IMAGE, + image_pull_policy=IMAGE_PULL_POLICY, + env= init_env_vars + [ k8s.V1EnvVar(name="INSTANCE_TYPE",value='hook'), - k8s.V1EnvVar(name="INSTANCE_NAME",value='transporter'), - k8s.V1EnvVar(name="SCHEDULED_AT",value='{{ next_execution_date }}'), - # rest of the env vars are pulled from the container by making a GRPC call to optimus + k8s.V1EnvVar(name="INSTANCE_NAME",value='predator'), ], - reattach_on_restart=True + security_context=k8s.V1PodSecurityContext(run_as_user=0), + volume_mounts=asset_volume_mounts, + command=["/bin/sh", INIT_CONTAINER_ENTRYPOINT], ) - hook_predator = SuperKubernetesPodOperator( - image_pull_policy="IfNotPresent", + optimus_hostname="http://airflow.example.io", + optimus_projectname="foo-project", + optimus_namespacename="bar-namespace", + optimus_jobname="foo", + optimus_jobtype="hook", + image_pull_policy=IMAGE_PULL_POLICY, namespace = conf.get('kubernetes', 'namespace', fallback="default"), image = "example.io/namespace/predator-image:latest", cmds=[], @@ -163,25 +209,32 @@ in_cluster=True, is_delete_operator_pod=True, do_xcom_push=False, - secrets=[], - env_vars = [ - k8s.V1EnvVar(name="JOB_NAME",value='foo'), - k8s.V1EnvVar(name="OPTIMUS_HOST",value='http://airflow.example.io'), - k8s.V1EnvVar(name="JOB_LABELS",value='orchestrator=optimus'), - k8s.V1EnvVar(name="JOB_DIR",value='/data'), - k8s.V1EnvVar(name="PROJECT",value='foo-project'), - k8s.V1EnvVar(name="NAMESPACE",value='bar-namespace'), + env_vars=executor_env_vars, + reattach_on_restart=True, + volume_mounts=asset_volume_mounts, + volumes=[volume], + init_containers=[init_container_predator], +) +init_container_hook__dash__for__dash__fail = k8s.V1Container( + name="init-container", + image=INIT_CONTAINER_IMAGE, + image_pull_policy=IMAGE_PULL_POLICY, + env= init_env_vars + [ k8s.V1EnvVar(name="INSTANCE_TYPE",value='hook'), - k8s.V1EnvVar(name="INSTANCE_NAME",value='predator'), - k8s.V1EnvVar(name="SCHEDULED_AT",value='{{ next_execution_date }}'), - # rest of the env vars are pulled from the container by making a GRPC call to optimus + k8s.V1EnvVar(name="INSTANCE_NAME",value='hook-for-fail'), ], - reattach_on_restart=True + security_context=k8s.V1PodSecurityContext(run_as_user=0), + volume_mounts=asset_volume_mounts, + command=["/bin/sh", INIT_CONTAINER_ENTRYPOINT], ) - hook_hook__dash__for__dash__fail = SuperKubernetesPodOperator( - image_pull_policy="IfNotPresent", + optimus_hostname="http://airflow.example.io", + optimus_projectname="foo-project", + optimus_namespacename="bar-namespace", + optimus_jobname="foo", + optimus_jobtype="hook", + image_pull_policy=IMAGE_PULL_POLICY, namespace = conf.get('kubernetes', 'namespace', fallback="default"), image = "example.io/namespace/fail-image:latest", cmds=[], @@ -192,21 +245,12 @@ in_cluster=True, is_delete_operator_pod=True, do_xcom_push=False, - secrets=[], - env_vars = [ - k8s.V1EnvVar(name="JOB_NAME",value='foo'), - k8s.V1EnvVar(name="OPTIMUS_HOST",value='http://airflow.example.io'), - k8s.V1EnvVar(name="JOB_LABELS",value='orchestrator=optimus'), - k8s.V1EnvVar(name="JOB_DIR",value='/data'), - k8s.V1EnvVar(name="PROJECT",value='foo-project'), - k8s.V1EnvVar(name="NAMESPACE",value='bar-namespace'), - k8s.V1EnvVar(name="INSTANCE_TYPE",value='hook'), - k8s.V1EnvVar(name="INSTANCE_NAME",value='hook-for-fail'), - k8s.V1EnvVar(name="SCHEDULED_AT",value='{{ next_execution_date }}'), - # rest of the env vars are pulled from the container by making a GRPC call to optimus - ], + env_vars=executor_env_vars, trigger_rule="one_failed", - reattach_on_restart=True + reattach_on_restart=True, + volume_mounts=asset_volume_mounts, + volumes=[volume], + init_containers=[init_container_hook__dash__for__dash__fail], ) # hooks loop ends diff --git a/plugin/yaml/plugin.go b/plugin/yaml/plugin.go index 1a0e014f10..f6a8ee8af7 100644 --- a/plugin/yaml/plugin.go +++ b/plugin/yaml/plugin.go @@ -29,7 +29,7 @@ func (p *PluginSpec) PluginInfo() *models.PluginInfoResponse { return &models.PluginInfoResponse{ Name: p.Name, Description: p.Description, - Image: fmt.Sprintf("%s:%s", p.Image, p.PluginVersion), + Image: p.Image, SecretPath: p.SecretPath, PluginType: p.PluginType, PluginMods: []models.PluginMod{models.ModTypeCLI}, diff --git a/plugin/yaml/tests/sample_plugin.yaml b/plugin/yaml/tests/sample_plugin.yaml index 10bfefcd89..aa18fc9ddc 100644 --- a/plugin/yaml/tests/sample_plugin.yaml +++ b/plugin/yaml/tests/sample_plugin.yaml @@ -5,7 +5,7 @@ pluginmods: - cli - dependencyresolver pluginversion: latest -image: docker.io/odpf/optimus-task-bq2bq-executor +image: docker.io/odpf/optimus-task-bq2bq-executor:latest secretpath: /tmp/auth.json questions: