-
Notifications
You must be signed in to change notification settings - Fork 0
/
gcp_spark_airflow.py
70 lines (61 loc) · 2.34 KB
/
gcp_spark_airflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# STEP 1 : Libraries needed
from datetime import timedelta, datetime
from airflow import models
from airflow.operators.bash_operator import BaseOperator
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# STEP 2 : Define a start date
# In this case yesterday
yesterday = datetime(2020,2, 29)
# Spark references
SPARK_CODE = ('gs://us-central1-cl-composer-tes-fa29d311-bucket/spark_files/transformation.py')
dataproc_job_name = 'spark_job_dataproc'
# STEP 3 : Set default arguments for DAG
default_dag_args = {
'start_date' : yesterday,
'depends_on_past ' : False,
'email_on_failure ' : False,
'email_on_retry' : False,
'retries' : 1,
'retrye_delay' : timedelta(minutes=5),
'project_id ': models.Variable.get('project_id')
}
# STEP 4 : Define DAG
# set the DAG name, and A DAG description, define the schedule interval and pass the default arguments defined before
with models.DAG(
'spark_workflow',
description='DAF for deployment a Dataproc Cluster',
schedule_interval=timedelta(days=1),
default_args=default_dag_args) as dag:
#STEP 5 : Set Operators
#bashOperator
# A simple print date
print_date = BaseOperator(
task_id='print_date',
bash_command = 'date'
)
#dataproc_operator
#Create small dataproc Cluster
create_dataproc = dataproc_operator.DataprocClusterCreateOperator(
task_id = 'create_dataproc',
cluster_name = 'dataproc-cluster-demo-{{ ds_nodash }}',
num_workers= 2,
zone=models.Variable.get('dataproc_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
#Run the PySpark job
run_spark = dataproc_operator.DataProcPySparkOperator(
task_id = 'run_spark',
main= SPARK_CODE,
cluster_name='dataproc-cluster-demo{{ ds_nodash }}',
job_name= dataproc_job_name
)
#dataproc_operator
#delete Cloud Dataproc Cluster
delete_dataproc = dataproc_operator.DataprocClusterDeleteOperator(
task_id = 'delete_dataproc',
cluster_name = 'dataproc-cluster-demo{{ ds_nodash }}',
trigger_rule= trigger_rule.TriggerRule.ALL_DONE)
#STEP 6 : Set DAGS dependencies
# Each task should run after have finished the tas before
print_date >> create_dataproc >> run_spark >> delete_dataproc