Orchestrator to run Notebooks on an Ephemeral Dataproc cluster via Cloud Composer
├── composer_input
│ ├── initialization_scripts/ init_pip_gcsfuse.sh
│ ├── jobs/ wrapper_papermill.py
│ ├── DAGs/ composer_pyspark_notebook.py
├── notebooks
│ ├── python/ sample.py
│ ├── jupyter/ sample_notebook.ipynb
│ ├── jupyter/output
- init_pip_gscfuse.sh: this script completes following two tasks
- Installs desired python packages
- Installs gcsfuse and mounts the desired bucket to the path
- wrapper_papermill.py: runs a papermill execution of input notebook and writes the output file into the assgined location
- composer_pyspark_notebook.py: orchestrates the workflow
- Dataproc Cluster Creation
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator()
- Dataproc PySpark Job submission
pyspark_task = DataprocSubmitJobOperator()
- Dataproc Cluster Deletion
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator()
- Dataproc Cluster Creation
- sample.py : sample python in GCS bucket which will be directly invoked by the notebook (this is possible because of gcsfuse mounting GCS bucket as a file system)
- sample_notebook.ibynp :
- verify if GCS buckets are mounted at pwd as a file system
- verify Python files in mounted GCS buckets are executable via !python and %run command
!ls /path-1 !sudo python /path-1/sample.py %run /path-1/sample.py
- jupyter/output: this is where papermill notebook execution outputs will be stored
-
Make sure to ctrl+f
EDIT:
in each file to modify details -
Create a Cloud Composer Environment
-
Find DAGs folder from Composer Environment and add composer_pyspark_notebook.py (DAGs file) to it in order to trigger DAGs execution:
DAG folder from Cloud Composer Console
-
Have all the files available in GCS bucket, except DAGs file which should go into your Composer DAGs folder
-
From Composer environment, create two Airflow variables (see composer_pyspark_notebook.py for the usage)
https://airflow.apache.org/concepts.html#variables
- gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
- gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be created.
e. Open Airflow UI to monitor DAG executions, Runs and logs
(Notes: utilize initialization script from this repository for python packages installation and GCS bucket mount)
Refer to this GCP tutorial to
- Install the Dataproc Jupyter component on a new cluster
- Connect to the Jupyter notebook UI running on the cluster from your local browser using the Dataproc Component Gateway
If you're adapting this example for your own use consider the following:
- Setting an appropriate input path within your environment (gcs, mounting point for gcsfuse, DAGs folder, etc)
- Setting more appropriate configurations (DAGs, Dataproc cluster, init_script for additional python packages, etc)
- Kristin Kim (Google)
- Anu Venkataraman (Google)
- Jerry Ding (Google) - wrapper_papermill.py