(Se recomienda instalarlo en una máquina virtual de Linux, en docker o en Kubernetes, más información en la documentación oficial)
- Creamos un entorno de Python con virtualenv y lo activamos
>!python -m venv airflow_env
>source airflow_env/bin/ativate
- Definimos AIRFLOW_HOME
>export AIRFLOW_HOME=/home/airflow
- Instalamos airflow desde Pypi
!pip install "apache-airflow[amazon]"
- Modificar la configuración de airflow
>nano /root/airflow/airflow.cfg
- load_examples=False
- sql_alchemy_conn = postgresql+psycopg2://user:pass@localhost:5432/airflow_db (recomendado en producción)
- Comprobamos que está bien instalado
airflow version
- Arrancamos airflow manualmente
-
su - postgres; psql; \l (show databases); \du (show users)
-
CREATE DATABASE airflow_db;
-
CREATE USER airflow_user WITH PASSWORD 'XXX';
-
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
-
ALTER ROLE airflow_user SET search_path = public;
Comprobar que un fichero existe en la ruta dada. Para ello usamos un BashOperator que ejecuta un script de bash.
Con este DAG aprenderemos:
-
Configurar Dags: parámetro, intervalos, programación...
-
Visualización de Dags en la web: Ver programación, estado del DAG, historial, logs...
-
Añadir markdown en DAG y las tareas (instance details).
-
BashOperator.
BashOperator(
task_id='check_file',
bash_command='sh ' + absolute_bash_file_path + ' ' + absolute_file_path
)
- Default args:
Hay muchos más argumentos que se pueden ver en https://airflow.apache.org/docs/apache-airflow/1.10.2/code.html#airflow.models.BaseOperator
- Schedule times
Más información en https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/timetable.html
Subir un fichero (en este caso csv) a un bucket de AWS S3.
Con este DAG aprenderemos:
-
S3CreateObjectOperator.
-
Configuración de conexiones.
create_s3_object = S3CreateObjectOperator(
task_id="upload_to_s3",
aws_conn_id='aws_default', # The AWS connection set up in Airflow
s3_bucket='airflow-bucket-s3-fps', # Your target S3 bucket
s3_key='hello_world.txt', # S3 key (file path in the bucket)
data=read_local_file('/home/hello_world.txt'), # Local file path
)
Más información sobre dependencia de tareas https://docs.astronomer.io/learn/managing-dependencies
Acceder a la API de los juegos olímpicos https://apis.codante.io/olympic-games/events, transformar los datos para generar un diccionario de países, deporte y categoría, y finalmente escribir los datos en bucket de S3 y en local.
-
PythonOperator.
-
Configuración de variables.
-
Dependencia de tareas.
-
Empleo de Hooks.
create_json = PythonOperator(
task_id='create_json',
python_callable=create_json_olympic_games,
provide_context=True
)
Más información sobre el operador de Amazon https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/index.html
Comando de bash para matar los procesos arrancados usando FastAPI y otro para arranacar la API que usa los datos generados en el DAG anterior.
- BashOperator.
launch_fastapi = BashOperator(
task_id='launch_fastapi',
bash_command='cd /home/ec2-user/fast_api && nohup uvicorn main:app --host 0.0.0.0 --port 8000 --reload &',
execution_timeout=None # Disable timeout to keep FastAPI running
)
Más información sobre qué es FastAPI https://fastapi.tiangolo.com/
Enviar un correo
- EmailOperator.
Guía para configurar el envío de correos con una cuenta de GMAIL
EmailOperator(
task_id='send_email',
to=dest_email,
subject='La ejecución del dag ' + dag_args['dag_id'] +' correcta',
html_content=f'''<h3>ÉXITO EN LA EJECUCIÓN!!</h3> <p>La ejecución del dag {dag_args['dag_id']} ha acabado correctamente :)</p> ''',
dag=dag
)