This fork of Apache Airflow executes DAGs similarly to the Kubernetes Executor shipped with stock Apache Airflow. However, instead of creating and deleting Kubernetes pods to execute tasks, it uses Knative services.
These Knative services must be created ahead of time. However, due to how Knative works, this does not mean that a worker pod is running constantly. Instead, Knative creates/scales/deletes the pods automatically as needed. Moreover, instead of looking up task arguments in the database used by Airflow, this fork directly sends them in the RPC to the Knative services Likewise, the HTTP response by the Knative services includes the return value of the task.
First, set up a cluster of one or more nodes with the stock-only
configuration
as described in
vHive Quickstart.
Do not start vHive, as it is not needed.
The relevant commands for a single-node cluster are reproduced here.
git clone --depth=1 https://github.com/vhive-serverless/vhive.git
cd vhive
mkdir -p /tmp/vhive-logs
./scripts/cloudlab/setup_node.sh stock-only # this might print errors, ignore them
The setup_node.sh
script might print some errors, don't worry about them and continue.
sudo screen -d -m containerd
./scripts/cluster/create_one_node_cluster.sh stock-only
cd ..
Now, the Kubernetes cluster and Knative should be ready. It is time to deploy this fork of airflow with the following commands:
git clone --single-branch --branch integrate-knative --depth 1 [email protected]:eth-easl/airflow.git
cd airflow
./scripts/setup_airflow.sh
The script will create the namespace airflow
and deploy all resources to
that namespace.
After running the setup script, airflow should be up and running.
Verify by running kubectl -n airflow get pods
, the output should look similar
to what is shown below.
airflow-create-user-jw7t8 0/1 Completed 1 2m23s
airflow-postgresql-0 1/1 Running 0 2m23s
airflow-run-airflow-migrations-xldsv 0/1 Completed 0 2m23s
airflow-scheduler-cdcc9b98b-dqkrn 2/2 Running 0 2m23s
airflow-statsd-59895f6c69-p4rbg 1/1 Running 0 2m23s
airflow-triggerer-7d5f6d85b8-6pptm 1/1 Running 0 2m23s
airflow-webserver-5c58849cd9-mvkgx 1/1 Running 0 2m23s
You can also check that the Knative services are ready with
kn service list -n airflow
.
Again, the output should look similar to this.
NAME URL LATEST AGE CONDITIONS READY REASON
airflow-avg-worker-distributed-compute-count http://airflow-avg-worker-distributed-compute-count.airflow.192.168.1.240.sslip.io airflow-avg-worker-distributed-compute-count-00001 5m58s 3 OK / 3 True
airflow-avg-worker-distributed-compute-sum http://airflow-avg-worker-distributed-compute-sum.airflow.192.168.1.240.sslip.io airflow-avg-worker-distributed-compute-sum-00001 5m50s 3 OK / 3 True
airflow-avg-worker-distributed-do-avg http://airflow-avg-worker-distributed-do-avg.airflow.192.168.1.240.sslip.io airflow-avg-worker-distributed-do-avg-00001 5m39s 3 OK / 3 True
airflow-avg-worker-distributed-extract http://airflow-avg-worker-distributed-extract.airflow.192.168.1.240.sslip.io airflow-avg-worker-distributed-extract-00001 5m28s 3 OK / 3 True
airflow-workflow-gateway http://airflow-workflow-gateway.airflow.192.168.1.240.sslip.io airflow-workflow-gateway-00001 5m23s 3 OK / 3 True
Now all that's left to do is to deploy and run a workflow.
GATEWAY_URL="$(kn service list -o json -n airflow | jq -r '.items[] | select(.metadata.name=="airflow-workflow-gateway").status.url')"
curl -u admin:admin -X POST -H 'application/json' --data '{"input": [1,2,3,4]}' "$GATEWAY_URL"/runWorkflow/compute_avg_distributed
Running the workflow will take some time (~20s) but if all went well, it should return
{"output":2.5}
.
The reason it takes so long for a simple computation is that the
workflow is artificially split into small steps, and each of them must
start a Knative service.
A workflow (Apache Airflow also calls them DAGs) consists of the following files:
- A python file that defines the DAG using Airflow Taskflow.
- YAML files that define the Knative services for each function in the workflow.
Examples can be found in the workflows directory. For instance, avg_distributed.py contains a workflow that computes the average of its inputs. The corresponding YAML files that define the Knative services for each function in the workflow can be found in workflows/avg_distributed.
Since the DAGs are baked into the function and airflow images, it is a bit tedious to deploy new DAGs. However, the below step-by-step guide should make it easier.
- Place your python workflow file in
workflows/image/airflow-dags
- Run
scripts/update_images.sh
. This will build two images:airflow
andairflow-worker
. - Tag and push these images, e.g.
Don't forget to adjust the registry.
docker tag airflow:latest ghcr.io/jonamuen/airflow:latest docker tag airflow-worker:latest ghcr.io/jonamuen/airflow-worker:latest docker push ghcr.io/jonamuen/airflow:latest docker push ghcr.io/jonamuen/airflow-worker:latest
- If you previously followed the setup guide above, run
This removes the namespace
kubectl delete namespace airflow
airflow
and all resources in that namespace. It might take a while. - Modify
configs/values.yaml
to point to your new image, i.e. replace references toghcr.io/jonamuen/airflow
with the name of yourairflow
image. Also adjust the tag if needed. - Adjust the template for Knative services in workflows/knative_yaml_builder/knative_service_template.yaml to point to your
airflow-worker
image. Then runscripts/build_knative_yamls.sh
. This will generate Knative service definitions in workflows/knative_yamls for all dags inworkflows/image/airflow-dags
. - Run
scripts/setup_airflow.sh
. - Run
scripts/deploy_workflow.sh dag_id
, replacingdag_id
with the id of your dag. Look inworkflows/knative\_yamls
if you are not sure what the id of your dag is. - Airflow should now be up and running (check with
kubectl -n airflow get pods
) and a Knative service for each function of your workflow should be available, which can be verified withkn service list -n airflow
. - Execute your dag with
Make sure to replace
DAG_ID="<dag_id>" GATEWAY_URL="$(kn service list -o json -n airflow | jq -r '.items[] | select(.metadata.name=="airflow-workflow-gateway").status.url')" curl -u admin:admin -X POST -H 'application/json' --data '{"input": [1,2,3,4]}' "$GATEWAY_URL"/runWorkflow/"$DAG_ID"
<dag_id>
with the id of your DAG. Modify the--data
parameter as needed.
To send input (i.e. arguments of the root function) to a workflow, you must send
them under the "input"
key to the /runWorkflow/<dag_id>
endpoint.
Set up the root function to accept a single argument params
.
The input data you sent will then be accessible via params["data"]
.
Example: If you send {"input": "foo"}
to /runWorkflow/<dag_id>
, then
params["data"] == "foo"
in the first function of your workflow.
If you started a workflow, but it is crashing or does not terminate, you might need to inspect the logs. The most likely place to find useful information are the logs of Airflow's scheduler, which can be accessed as shown below.
scheduler="$(kubectl -n airflow get pods | grep scheduler)"
kubectl -n airflog logs "$scheduler" scheduler
If you need access to the logs of a function, you will need to find its pod id
with kubectl -n airflow get pods
.
Then kubectl -n airflow logs <pod_id> user-container
will give you the
logs of the webserver that handles function invocations.
To get the logs of the function execution, you can open a shell in the pod with
kubectl -n airflow exec <pod_id> -- bash
and then navigate to the ./logs
directory.
Airflow's web interface might also be helpful.
You can expose it at http://localhost:8080
with the below command.
Log in with username admin
and password admin
.
screen -d -m kubectl -n airflow port-forward deployment/airflow-webserver 8080:8080
The last component that might be worth checking is the workflow-gateway
.
Since it is a Knative service, you will need to find its pod id with
kubectl -n airflow get pods
Since it is a Knative service, there will only be a pod if there were recent requests to it. With the pod id, run
kubectl -n airflow logs <pod_id>