-
Notifications
You must be signed in to change notification settings - Fork 0
/
example_kubernetes_executor_config.py
176 lines (161 loc) · 6.53 KB
/
example_kubernetes_executor_config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example dag for using a Kubernetes Executor Configuration.
"""
import logging
import os
from airflow import DAG
from airflow.example_dags.libs.helper import print_stuff
from airflow.operators.python import PythonOperator
from airflow.settings import AIRFLOW_HOME
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
log = logging.getLogger(__name__)
try:
from kubernetes.client import models as k8s
with DAG(
dag_id='example_kubernetes_executor_config',
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=['example3'],
) as dag:
def test_sharedvolume_mount():
"""
Tests whether the volume has been mounted.
"""
for i in range(5):
try:
return_code = os.system("cat /shared/test.txt")
if return_code != 0:
raise ValueError(f"Error when checking volume mount. Return code {return_code}")
except ValueError as e:
if i > 4:
raise e
def test_volume_mount():
"""
Tests whether the volume has been mounted.
"""
with open('/foo/volume_mount_test.txt', 'w') as foo:
foo.write('Hello')
return_code = os.system("cat /foo/volume_mount_test.txt")
if return_code != 0:
raise ValueError(f"Error when checking volume mount. Return code {return_code}")
# You can use annotations on your kubernetes pods!
start_task = PythonOperator(
task_id="start_task",
python_callable=print_stuff,
executor_config={
"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
},
)
# [START task_with_volume]
volume_task = PythonOperator(
task_id="task_with_volume",
python_callable=test_volume_mount,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/foo/", name="example-kubernetes-test-volume"
)
],
)
],
volumes=[
k8s.V1Volume(
name="example-kubernetes-test-volume",
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
)
],
)
),
},
)
# [END task_with_volume]
# [START task_with_template]
task_with_template = PythonOperator(
task_id="task_with_template",
python_callable=print_stuff,
executor_config={
"pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
},
)
# [END task_with_template]
# [START task_with_sidecar]
sidecar_task = PythonOperator(
task_id="task_with_sidecar",
python_callable=test_sharedvolume_mount,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")
],
),
k8s.V1Container(
name="sidecar",
image="ubuntu",
args=["echo \"retrieved from mount\" > /shared/test.txt"],
command=["bash", "-cx"],
volume_mounts=[
k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")
],
),
],
volumes=[
k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
],
)
),
},
)
# [END task_with_sidecar]
# Test that we can add labels to pods
third_task = PythonOperator(
task_id="non_root_task",
python_callable=print_stuff,
executor_config={
"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
},
)
other_ns_task = PythonOperator(
task_id="other_namespace_task",
python_callable=print_stuff,
executor_config={
"KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}}
},
)
start_task >> volume_task >> third_task
start_task >> other_ns_task
start_task >> sidecar_task
start_task >> task_with_template
except ImportError as e:
log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s", str(e))
log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")