Skip to content

Commit

Permalink
feat : simplify plugin ecosystem (init-container) (#609)
Browse files Browse the repository at this point in the history
* feat: executor boot process via init-container (#425)

* update job boot process for transformer

* fix : hooks, google cred env

* fix hook compilation issue : job.name

* fix hook launch issue : list

* fix : unit test for dag compilation

* add entrypoint for init_container

* add dry_run for skpo

* update job boot process for transformer

* fix : hooks, google cred env

* fix hook compilation issue : job.name

* fix hook launch issue : list

* fix : unit test for dag compilation

* add entrypoint for init_container

* add dry_run for skpo

* fix init_container_imagename

* fix tests and init_container image

* fix: k8s secret migration

* remove comments (secrets)

* fix: missis namespace name (#532)

* feat: call job run input to compile + fetch assets (#529)

* fix : image tag is not plugin version

* fix tests

* fix : remove job-run input from init entrypoint

* fix : remove admin build instance from init-container

* fix : fetch env + secret api & minor changes

* fix base_dag test

* fix : init-container file write permission issue

Co-authored-by: Dery Rahman Ahaddienata <[email protected]>
  • Loading branch information
smarchint and deryrahman authored Nov 21, 2022
1 parent 930a8e1 commit 34ec9ce
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 119 deletions.
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ WORKDIR /app
RUN adduser -D $USER
RUN chown -R $USER:$USER /app

# use this part on airflow task to fetch and compile assets by optimus client
COPY ./entrypoint_init_container.sh /opt/entrypoint_init_container.sh
RUN chmod +x /opt/entrypoint_init_container.sh

USER $USER

EXPOSE 8080
CMD ["optimus"]
24 changes: 24 additions & 0 deletions entrypoint_init_container.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#! /bin/sh

#get optimus version
echo "-- optimus client version"
# /app/optimus version

# printenv

# get resources
echo "-- print env variables (required for fetching assets)"
echo "JOB_NAME:$JOB_NAME"
echo "OPTIMUS_PROJECT:$PROJECT"
echo "JOB_DIR:$JOB_DIR"
echo "INSTANCE_TYPE:$INSTANCE_TYPE"
echo "INSTANCE_NAME:$INSTANCE_NAME"
echo "SCHEDULED_AT:$SCHEDULED_AT"
echo "OPTIMUS_HOST:$OPTIMUS_HOST"
echo ""

echo "-- initializing optimus assets"
optimus job run-input "$JOB_NAME" --project-name \
"$PROJECT" --output-dir "$JOB_DIR" \
--type "$INSTANCE_TYPE" --name "$INSTANCE_NAME" \
--scheduled-at "$SCHEDULED_AT" --host "$OPTIMUS_HOST"
95 changes: 92 additions & 3 deletions ext/scheduler/airflow2/resources/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime, timedelta
from time import sleep
from typing import Any, Dict, List, Optional
from kubernetes.client import models as k8s

import json

Expand All @@ -25,6 +26,7 @@
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import yaml
from croniter import croniter

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -70,24 +72,99 @@ class SuperKubernetesPodOperator(KubernetesPodOperator):
.. note: keep this up to date if there is any change in KubernetesPodOperator execute method
"""
template_fields = ('image', 'cmds', 'arguments', 'env_vars', 'config_file', 'pod_template_file')

@apply_defaults
def __init__(self,
optimus_hostname,
optimus_projectname,
optimus_namespacename,
optimus_jobname,
optimus_jobtype,
*args,
**kwargs):
super(SuperKubernetesPodOperator, self).__init__(*args, **kwargs)

self.do_xcom_push = kwargs.get('do_xcom_push')
self.namespace = kwargs.get('namespace')
self.in_cluster = kwargs.get('in_cluster')
self.cluster_context = kwargs.get('cluster_context')
self.reattach_on_restart = kwargs.get('reattach_on_restart')
self.config_file = kwargs.get('config_file')

# used to fetch job env from optimus for adding to k8s pod
self.optimus_hostname = optimus_hostname
self.optimus_namespacename = optimus_namespacename
self.optimus_jobname = optimus_jobname
self.optimus_projectname = optimus_projectname
self.optimus_jobtype = optimus_jobtype
self._optimus_client = OptimusAPIClient(optimus_hostname)

def execute(self, context):
def render_init_containers(self, context):
for ic in self.init_containers:
env = getattr(ic, 'env')
if env:
self.render_template(env, context)

def fetch_env_from_optimus(self, context):
scheduled_at = context["next_execution_date"].strftime(TIMESTAMP_FORMAT)
job_meta = self._optimus_client.get_job_run_input(scheduled_at, self.optimus_projectname, self.optimus_jobname, self.optimus_jobtype)
return [
k8s.V1EnvVar(name=key,value=val) for key, val in job_meta["envs"].items()
] + [
k8s.V1EnvVar(name=key,value=val) for key, val in job_meta["secrets"].items()
]

def _dry_run(self, pod):
def prune_dict(val: Any, mode='strict'):
"""
Given dict ``val``, returns new dict based on ``val`` with all
empty elements removed.
What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict'
then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x``
will be removed if ``bool(x) is False``.
"""

def is_empty(x):
if mode == 'strict':
return x is None
elif mode == 'truthy':
return bool(x) is False
raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")

if isinstance(val, dict):
new_dict = {}
for k, v in val.items():
if is_empty(v):
continue
elif isinstance(v, (list, dict)):
new_val = prune_dict(v, mode=mode)
if new_val:
new_dict[k] = new_val
else:
new_dict[k] = v
return new_dict
elif isinstance(val, list):
new_list = []
for v in val:
if is_empty(v):
continue
elif isinstance(v, (list, dict)):
new_val = prune_dict(v, mode=mode)
if new_val:
new_list.append(new_val)
else:
new_list.append(v)
return new_list
else:
return val
log.info(prune_dict(pod.to_dict(), mode='strict'))
log.info(yaml.dump(prune_dict(pod.to_dict(), mode='strict')))

def execute(self, context):
log_start_event(context, EVENT_NAMES.get("TASK_START_EVENT"))
# to be done async
self.env_vars += self.fetch_env_from_optimus(context)
# init-container is not considered for rendering in airflow
self.render_init_containers(context)

log.info('Task image version: %s', self.image)
try:
Expand All @@ -100,6 +177,8 @@ def execute(self, context):
config_file=self.config_file)

self.pod = self.create_pod_request_obj()
# self._dry_run(self.pod) # logs the yaml file for the pod [not compatible for future verison of implementation]

self.namespace = self.pod.metadata.namespace
self.client = client

Expand Down Expand Up @@ -165,6 +244,16 @@ def get_task_window(self, scheduled_at: str, version: int, window_size: str, win
self._raise_error_if_request_failed(response)
return response.json()


def get_job_run_input(self, execution_date: str, project_name: str, job_name: str, job_type: str) -> dict:
response = requests.post(url="{}/api/v1beta1/project/{}/job/{}/run_input".format(self.host, project_name, job_name),
json={'scheduled_at': execution_date,
'instance_name': job_name,
'instance_type': "TYPE_" + job_type.upper()})

self._raise_error_if_request_failed(response)
return response.json()

def get_job_metadata(self, execution_date, namespace, project, job) -> dict:
url = '{optimus_host}/api/v1beta1/project/{project_name}/namespace/{namespace_name}/job/{job_name}'.format(optimus_host=self.host,
namespace_name=namespace,
Expand Down
116 changes: 70 additions & 46 deletions ext/scheduler/airflow2/resources/base_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from datetime import datetime, timedelta, timezone

from airflow.models import DAG, Variable, DagRun, DagModel, TaskInstance, BaseOperator, XCom, XCOM_RETURN_KEY
from airflow.kubernetes.secret import Secret
from airflow.configuration import conf
from airflow.operators.python_operator import PythonOperator
from airflow.utils.weight_rule import WeightRule
Expand Down Expand Up @@ -86,15 +85,6 @@
)

{{$baseTaskSchema := .Job.Task.Unit.Info -}}
{{ if ne $baseTaskSchema.SecretPath "" -}}
transformation_secret = Secret(
"volume",
{{ dir $baseTaskSchema.SecretPath | quote }},
"optimus-task-{{ $baseTaskSchema.Name }}",
{{ base $baseTaskSchema.SecretPath | quote }}
)
{{- end }}

{{- $setCPURequest := not (empty .Metadata.Resource.Request.CPU) -}}
{{- $setMemoryRequest := not (empty .Metadata.Resource.Request.Memory) -}}
{{- $setCPULimit := not (empty .Metadata.Resource.Limit.CPU) -}}
Expand Down Expand Up @@ -125,9 +115,51 @@
{{- end }}
)
{{- end }}
JOB_DIR = "/data"
IMAGE_PULL_POLICY="Always"
INIT_CONTAINER_IMAGE="odpf/optimus:{{.Version}}"
INIT_CONTAINER_ENTRYPOINT = "/opt/entrypoint_init_container.sh"

volume = k8s.V1Volume(
name='asset-volume',
empty_dir=k8s.V1EmptyDirVolumeSource()
)
asset_volume_mounts = [
k8s.V1VolumeMount(mount_path=JOB_DIR, name='asset-volume', sub_path=None, read_only=False)
]
executor_env_vars = [
k8s.V1EnvVar(name="JOB_LABELS",value='{{.Job.GetLabelsAsString}}'),
k8s.V1EnvVar(name="JOB_DIR",value=JOB_DIR),
]

init_env_vars = [
k8s.V1EnvVar(name="JOB_DIR",value=JOB_DIR),
k8s.V1EnvVar(name="JOB_NAME",value='{{$.Job.Name}}'),
k8s.V1EnvVar(name="OPTIMUS_HOST",value='{{$.Hostname}}'),
k8s.V1EnvVar(name="PROJECT",value='{{$.Namespace.ProjectSpec.Name}}'),
k8s.V1EnvVar(name="SCHEDULED_AT",value='{{ "{{ next_execution_date }}" }}'),
]

init_container = k8s.V1Container(
name="init-container",
image=INIT_CONTAINER_IMAGE,
image_pull_policy=IMAGE_PULL_POLICY,
env=init_env_vars + [
k8s.V1EnvVar(name="INSTANCE_TYPE",value='{{$.InstanceTypeTask}}'),
k8s.V1EnvVar(name="INSTANCE_NAME",value='{{$baseTaskSchema.Name}}'),
],
security_context=k8s.V1PodSecurityContext(run_as_user=0),
volume_mounts=asset_volume_mounts,
command=["/bin/sh", INIT_CONTAINER_ENTRYPOINT],
)

transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} = SuperKubernetesPodOperator(
image_pull_policy="IfNotPresent",
optimus_hostname="{{$.Hostname}}",
optimus_projectname="{{$.Namespace.ProjectSpec.Name}}",
optimus_namespacename="{{$.Namespace.Name}}",
optimus_jobname="{{.Job.Name}}",
optimus_jobtype="{{$.InstanceTypeTask}}",
image_pull_policy=IMAGE_PULL_POLICY,
namespace = conf.get('kubernetes', 'namespace', fallback="default"),
image = {{ $baseTaskSchema.Image | quote}},
cmds=[],
Expand All @@ -139,42 +171,43 @@
in_cluster=True,
is_delete_operator_pod=True,
do_xcom_push=False,
secrets=[{{ if ne $baseTaskSchema.SecretPath "" -}} transformation_secret {{- end }}],
env_vars = [
k8s.V1EnvVar(name="JOB_NAME",value='{{.Job.Name}}'),
k8s.V1EnvVar(name="OPTIMUS_HOST",value='{{.Hostname}}'),
k8s.V1EnvVar(name="JOB_LABELS",value='{{.Job.GetLabelsAsString}}'),
k8s.V1EnvVar(name="JOB_DIR",value='/data'),
k8s.V1EnvVar(name="PROJECT",value='{{.Namespace.ProjectSpec.Name}}'),
k8s.V1EnvVar(name="NAMESPACE",value='{{.Namespace.Name}}'),
k8s.V1EnvVar(name="INSTANCE_TYPE",value='{{$.InstanceTypeTask}}'),
k8s.V1EnvVar(name="INSTANCE_NAME",value='{{$baseTaskSchema.Name}}'),
k8s.V1EnvVar(name="SCHEDULED_AT",value='{{ "{{ next_execution_date }}" }}'),
],
env_vars=executor_env_vars,
{{- if gt .SLAMissDurationInSec 0 }}
sla=timedelta(seconds={{ .SLAMissDurationInSec }}),
{{- end }}
{{- if $setResourceConfig }}
resources = resources,
{{- end }}
reattach_on_restart=True
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
init_containers=[init_container],
)

# hooks loop start
{{ range $_, $t := .Job.Hooks }}
{{ $hookSchema := $t.Unit.Info -}}

{{ if ne $hookSchema.SecretPath "" -}}
hook_{{$hookSchema.Name | replace "-" "_"}}_secret = Secret(
"volume",
{{ dir $hookSchema.SecretPath | quote }},
"optimus-hook-{{ $hookSchema.Name }}",
{{ base $hookSchema.SecretPath | quote }}
init_container_{{$hookSchema.Name | replace "-" "__dash__"}} = k8s.V1Container(
name="init-container",
image=INIT_CONTAINER_IMAGE,
image_pull_policy=IMAGE_PULL_POLICY,
env= init_env_vars + [
k8s.V1EnvVar(name="INSTANCE_TYPE",value='{{$.InstanceTypeHook}}'),
k8s.V1EnvVar(name="INSTANCE_NAME",value='{{$hookSchema.Name}}'),
],
security_context=k8s.V1PodSecurityContext(run_as_user=0),
volume_mounts=asset_volume_mounts,
command=["/bin/sh", INIT_CONTAINER_ENTRYPOINT],
)
{{- end }}

hook_{{$hookSchema.Name | replace "-" "__dash__"}} = SuperKubernetesPodOperator(
image_pull_policy="IfNotPresent",
optimus_hostname="{{$.Hostname}}",
optimus_projectname="{{$.Namespace.ProjectSpec.Name}}",
optimus_namespacename="{{$.Namespace.Name}}",
optimus_jobname="{{$.Job.Name}}",
optimus_jobtype="{{$.InstanceTypeHook}}",
image_pull_policy=IMAGE_PULL_POLICY,
namespace = conf.get('kubernetes', 'namespace', fallback="default"),
image = "{{ $hookSchema.Image }}",
cmds=[],
Expand All @@ -185,26 +218,17 @@
in_cluster=True,
is_delete_operator_pod=True,
do_xcom_push=False,
secrets=[{{ if ne $hookSchema.SecretPath "" -}} hook_{{$hookSchema.Name | replace "-" "_"}}_secret {{- end }}],
env_vars = [
k8s.V1EnvVar(name="JOB_NAME",value='{{$.Job.Name}}'),
k8s.V1EnvVar(name="OPTIMUS_HOST",value='{{$.Hostname}}'),
k8s.V1EnvVar(name="JOB_LABELS",value='{{$.Job.GetLabelsAsString}}'),
k8s.V1EnvVar(name="JOB_DIR",value='/data'),
k8s.V1EnvVar(name="PROJECT",value='{{$.Namespace.ProjectSpec.Name}}'),
k8s.V1EnvVar(name="NAMESPACE",value='{{$.Namespace.Name}}'),
k8s.V1EnvVar(name="INSTANCE_TYPE",value='{{$.InstanceTypeHook}}'),
k8s.V1EnvVar(name="INSTANCE_NAME",value='{{$hookSchema.Name}}'),
k8s.V1EnvVar(name="SCHEDULED_AT",value='{{ "{{ next_execution_date }}" }}'),
# rest of the env vars are pulled from the container by making a GRPC call to optimus
],
env_vars=executor_env_vars,
{{- if eq $hookSchema.HookType $.HookTypeFail }}
trigger_rule="one_failed",
{{- end }}
{{- if $setResourceConfig }}
resources = resources,
{{- end }}
reattach_on_restart=True
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
init_containers=[init_container_{{$hookSchema.Name | replace "-" "__dash__"}}],
)
{{- end }}
# hooks loop ends
Expand Down
Loading

0 comments on commit 34ec9ce

Please sign in to comment.