dispatch job #28
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: dispatch job | |
on: | |
workflow_dispatch: | |
inputs: | |
repo: | |
description: 'The https github url for the recipe feedstock' | |
required: true | |
ref: | |
description: 'The tag or branch to target in your recipe repo' | |
required: true | |
default: 'main' | |
feedstock_subdir: | |
description: 'The subdir of the feedstock directory in the repo' | |
required: true | |
default: 'feedstock' | |
bucket: | |
description: 'This job runner leverages s3fs.S3FileSystem for your recipe cache and output. Choices currently are: "default"' | |
required: true | |
default: 'default' | |
prune: | |
description: 'Only run the first two time steps' | |
required: true | |
default: 'False' | |
parallelism: | |
description: 'Number of task managers to spin up' | |
required: true | |
default: '1' | |
jobs: | |
name-job: | |
runs-on: ubuntu-latest | |
outputs: | |
repo_name: ${{ steps.string_manipulation.outputs.result }} | |
steps: | |
- name: manipuluate strings | |
id: string_manipulation | |
run: | | |
repo_name=$(basename -s .git "${{ github.event.inputs.repo }}") | |
echo "result=$repo_name" >> $GITHUB_OUTPUT | |
run-job: | |
name: kickoff job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }} | |
needs: name-job | |
outputs: | |
job_name: ${{ steps.report_ids.outputs.job_name }} | |
job_id: ${{ steps.report_ids.outputs.job_id }} | |
runs-on: ubuntu-latest | |
steps: | |
- name: checkout repository | |
uses: actions/checkout@v3 | |
- name: set up python 3.10 | |
uses: actions/setup-python@v3 | |
with: | |
python-version: '3.10' | |
- name: echo server | |
run: | | |
echo "Manually triggered workflow: \ | |
${{ github.event.inputs.repo }} \ | |
${{ github.event.inputs.ref }} \ | |
${{ github.event.inputs.bucket }} \ | |
${{ github.event.inputs.parallelism }} \ | |
${{ github.event.inputs.prune }}" | |
- name: install deps | |
run: | | |
# TODO: move to requirements file | |
python -m pip install --upgrade pip | |
pip install \ | |
fsspec \ | |
s3fs \ | |
apache-beam==2.52.0 \ | |
pangeo-forge-recipes>=0.10.0 \ | |
pangeo-forge-runner>=0.9.1 | |
- name: set up aws credentials for job runner user | |
uses: aws-actions/configure-aws-credentials@v2 | |
with: | |
aws-access-key-id: ${{ secrets.GH_ACTIONS_AWS_ACCESS_KEY_ID }} | |
aws-secret-access-key: ${{ secrets.GH_ACTIONS_AWS_SECRET_ACCESS_KEY }} | |
aws-region: ${{ secrets.GH_ACTIONS_AWS_REGION }} | |
- name: install kubectl | |
run: | | |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" | |
chmod +x ./kubectl | |
sudo mv ./kubectl /usr/local/bin/kubectl | |
- name: update kubeconfig with cluster | |
run: | | |
aws eks update-kubeconfig --name pangeo-forge-v3 --region ${{ secrets.GH_ACTIONS_AWS_REGION }} | |
- name: execute recipe on k8s cluster | |
id: executejob | |
continue-on-error: true | |
run: | | |
pangeo-forge-runner \ | |
bake \ | |
--repo=${{ github.event.inputs.repo }} \ | |
--ref=${{ github.event.inputs.ref }} \ | |
-f .github/workflows/config.py > execute.log | |
# show all logs | |
cat execute.log | |
# export all the valuable information from the logs | |
JOB_NAME=$(cat execute.log | grep -oP 'flinkdeployment\.flink\.apache\.org/\K[^ ]+' | head -n1) | |
echo "JOB_NAME=$JOB_NAME" >> $GITHUB_ENV | |
JOB_ID=$(cat execute.log | grep -oP 'Started Flink job as \K[^ ]+') | |
echo "JOB_ID=$JOB_ID" >> $GITHUB_ENV | |
FLINK_DASH=$(cat execute.log | grep -oP "You can run '\K[^']+(?=')") | |
echo "FLINK_DASH=$FLINK_DASH" >> $GITHUB_ENV | |
env: | |
REPO: ${{ github.event.inputs.repo }} | |
REF: ${{ github.event.inputs.ref }} | |
FEEDSTOCK_SUBDIR: ${{ github.event.inputs.feedstock_subdir }} | |
PRUNE_OPTION: ${{ github.event.inputs.prune }} | |
PARALLELISM_OPTION: ${{ github.event.inputs.parallelism }} | |
S3_BUCKET: ${{ github.event.inputs.bucket }} | |
S3_DEFAULT_AWS_ACCESS_KEY_ID: ${{ secrets.S3_DEFAULT_AWS_ACCESS_KEY_ID }} | |
S3_DEFAULT_AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_DEFAULT_AWS_SECRET_ACCESS_KEY }} | |
- name: cleanup if "pangeo-forge-runner bake" failed | |
if: steps.excutejob.outcome == 'failure' | |
run: | | |
echo "The previous 'bake' command failed or timed out. Running cleanup logic..." | |
# much easier to do in bash than in Python via subprocess | |
echo "##################### OPERATOR ######################" | |
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000 | |
echo "##################### JOB MANAGER ######################" | |
kubectl get pod | grep -v manager | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000 | |
# delete the flinkdeployment so we don't have old failures hanging around | |
kubectl get flinkdepoyment --no-headers | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{} | |
# force GH action to show failed result | |
exit 128 | |
- name: report running job id for user | |
id: report_ids | |
run: | | |
# TODO: we also need to report historyserver URL and flink dashboard URL | |
# but this also requires us to think how we're going to have a thin | |
# layer of authentication around these services so they aren't totally public | |
echo '############ JOB NAME ################' | |
echo $JOB_NAME | |
echo "job_name=$JOB_NAME" >> $GITHUB_OUTPUT | |
echo '############ JOB ID ################' | |
echo $JOB_ID | |
echo "job_id=$JOB_ID" >> $GITHUB_OUTPUT | |
echo '############ FLINK DASHBOARD ################' | |
echo $FLINK_DASH | |
echo "flink_dash=$FLINK_DASH" >> $GITHUB_OUTPUT | |
monitor-job: | |
runs-on: ubuntu-latest | |
name: monitor job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }} | |
needs: [name-job, run-job] | |
steps: | |
- name: set up aws credentials for job runner user | |
uses: aws-actions/configure-aws-credentials@v2 | |
with: | |
aws-access-key-id: ${{ secrets.GH_ACTIONS_AWS_ACCESS_KEY_ID }} | |
aws-secret-access-key: ${{ secrets.GH_ACTIONS_AWS_SECRET_ACCESS_KEY }} | |
aws-region: ${{ secrets.GH_ACTIONS_AWS_REGION }} | |
- name: install kubectl | |
run: | | |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" | |
chmod +x ./kubectl | |
sudo mv ./kubectl /usr/local/bin/kubectl | |
- name: update kubeconfig with cluster | |
run: | | |
aws eks update-kubeconfig --name pangeo-forge-v3 --region ${{ secrets.GH_ACTIONS_AWS_REGION }} | |
- name: monitor logs of job manager | |
id: monitorjob | |
timeout-minutes: 120 | |
continue-on-error: true | |
run: | | |
# TODO: this needs to not check the logs but the historyserver status | |
# but first we need think about authentication and a reverse proxy | |
echo "find job status on the job manager logs..." | |
while [[ -z "$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1)" ]]; do | |
echo "still waiting for a status on the job manager logs..." | |
sleep 1 | |
done | |
input_status=$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1) | |
echo "##### INPUT STATUS #####" | |
echo $input_status | |
status=$(echo "$input_status" | grep -oP '\b\w+(?=\.$)') | |
echo "##### STATUS #####" | |
echo $status | |
if [[ "$status" == "FAILING" || "$status" == "FAILED" ]]; then | |
echo "job failed with '$status', will dump the logs now..." | |
# force exit so we can move to next step | |
exit 128 | |
fi | |
- name: cleanup if monitor job fails | |
if: steps.monitorjob.outcome == 'failure' | |
run: | | |
# much easier to do in bash than in Python via subprocess | |
echo "##################### OPERATOR ######################" | |
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000 | |
echo "##################### JOB MANAGER ######################" | |
kubectl get pod | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000 | |
# delete the flinkdeployment so we don't have old failures hanging around | |
kubectl get flinkdeployment --no-headers | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{} | |
# force GH action to show failed result | |
exit 128 |