dispatch job #237
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: dispatch job | |
on: | |
workflow_dispatch: | |
inputs: | |
repo: | |
description: 'The https github url for the recipe feedstock' | |
required: true | |
ref: | |
description: 'The tag or branch to target in your recipe repo' | |
required: true | |
default: 'main' | |
feedstock_subdir: | |
description: 'The subdir of the feedstock directory in the repo' | |
required: true | |
default: 'feedstock' | |
bucket: | |
description: 'This job runner leverages s3fs.S3FileSystem for your recipe cache and output. Choices currently are: "default"' | |
required: true | |
default: 'default' | |
prune: | |
description: 'Only run the first two time steps' | |
required: true | |
default: '0' | |
parallelism: | |
description: 'Number of task managers to spin up' | |
required: true | |
default: '1' | |
protocol: | |
description: 'What protocol to use when accessing files (s3 or https).' | |
required: false | |
default: 's3' | |
resource_profile: | |
description: 'jobs have different memory requirements so choose (small[7168M], medium[10240M], large[15360M], xlarge[20480M])' | |
required: false | |
default: 'small' | |
jobs: | |
name-job: | |
runs-on: ubuntu-latest | |
outputs: | |
repo_name: ${{ steps.string_manipulation.outputs.result }} | |
steps: | |
- name: manipuluate strings | |
id: string_manipulation | |
run: | | |
repo_name=$(basename -s .git "${{ github.event.inputs.repo }}") | |
echo "result=$repo_name" >> $GITHUB_OUTPUT | |
run-job: | |
if: contains('["ranchodeluxe","abarciauskas-bgse", "norlandrhagen", "sharkinsspatial", "moradology", "thodson-usgs"]', github.actor) | |
name: kickoff job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }} | |
needs: name-job | |
outputs: | |
job_name: ${{ steps.report_ids.outputs.job_name }} | |
job_id: ${{ steps.report_ids.outputs.job_id }} | |
runs-on: ubuntu-latest | |
steps: | |
- name: checkout repository | |
uses: actions/checkout@v3 | |
- name: set up python 3.10 | |
uses: actions/setup-python@v3 | |
with: | |
python-version: '3.10' | |
- name: echo inputs to user | |
run: | | |
echo "Manually triggered workflow: \ | |
${{ github.event.inputs.repo }} \ | |
${{ github.event.inputs.ref }} \ | |
${{ github.event.inputs.bucket }} \ | |
${{ github.event.inputs.parallelism }} \ | |
${{ github.event.inputs.prune }}" | |
- name: install deps | |
run: | | |
python -m pip install --upgrade pip | |
pip install pangeo-forge-runner>=0.10.0 | |
- name: set up aws credentials for job runner user | |
uses: aws-actions/configure-aws-credentials@v2 | |
with: | |
aws-access-key-id: ${{ secrets.GH_ACTIONS_AWS_ACCESS_KEY_ID }} | |
aws-secret-access-key: ${{ secrets.GH_ACTIONS_AWS_SECRET_ACCESS_KEY }} | |
aws-region: ${{ secrets.GH_ACTIONS_AWS_REGION }} | |
- name: install kubectl | |
run: | | |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" | |
chmod +x ./kubectl | |
sudo mv ./kubectl /usr/local/bin/kubectl | |
- name: update kubeconfig with cluster | |
run: | | |
aws eks update-kubeconfig --name pangeo-forge-v3 --region ${{ secrets.GH_ACTIONS_AWS_REGION }} | |
- name: execute recipe on k8s cluster | |
id: executejob | |
continue-on-error: true | |
run: | | |
# NOTE: we can't use `2>&1 | tee execute.log` b/c it hangs forever | |
# so if the command fails (for example b/c it doesn't have the right requirements) | |
# then we wont' be able to see the errors until we run it without redirecting output | |
pangeo-forge-runner \ | |
bake \ | |
--repo=${{ github.event.inputs.repo }} \ | |
--ref=${{ github.event.inputs.ref }} \ | |
-f .github/workflows/config.py | |
# we just use the `inspect` branch to dump runner errors and fail fast | |
exit 666 | |
# export all the valuable information from the logs | |
JOB_NAME=$(cat execute.log | grep -oP 'flinkdeployment\.flink\.apache\.org/\K[^ ]+' | head -n1) | |
echo "JOB_NAME=$JOB_NAME" >> $GITHUB_ENV | |
JOB_ID=$(cat execute.log | grep -oP 'Started Flink job as \K[^ ]+') | |
echo "JOB_ID=$JOB_ID" >> $GITHUB_ENV | |
FLINK_DASH=$(cat execute.log | grep -oP "You can run '\K[^']+(?=')") | |
echo "FLINK_DASH=$FLINK_DASH" >> $GITHUB_ENV | |
env: | |
EARTHDATA_TOKEN: ${{ secrets.EARTHDATA_TOKEN }} | |
EARTHDATA_USERNAME: ${{ secrets.EARTHDATA_USERNAME }} | |
EARTHDATA_PASSWORD: ${{ secrets.EARTHDATA_PASSWORD }} | |
REPO: ${{ github.event.inputs.repo }} | |
REF: ${{ github.event.inputs.ref }} | |
FEEDSTOCK_SUBDIR: ${{ github.event.inputs.feedstock_subdir }} | |
PRUNE_OPTION: ${{ github.event.inputs.prune }} | |
PARALLELISM_OPTION: ${{ github.event.inputs.parallelism }} | |
S3_BUCKET: ${{ github.event.inputs.bucket }} | |
S3_DEFAULT_AWS_ACCESS_KEY_ID: ${{ secrets.S3_DEFAULT_AWS_ACCESS_KEY_ID }} | |
S3_DEFAULT_AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_DEFAULT_AWS_SECRET_ACCESS_KEY }} | |
PROTOCOL: ${{ github.event.inputs.protocol }} | |
RESOURCE_PROFILE: ${{ github.event.inputs.resource_profile }} | |
- name: cleanup if "pangeo-forge-runner bake" failed | |
if: steps.executejob.outcome == 'failure' | |
run: | | |
echo "The previous 'bake' command failed or timed out. Running cleanup logic..." | |
# much easier to do in bash than in Python via subprocess | |
echo "##################### OPERATOR ######################" | |
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/operator.log | |
cat /tmp/operator.log | |
echo "##################### JOB MANAGER ######################" | |
kubectl get pod | grep -v manager | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/jobmanager.log | |
cat /tmp/jobmanager.log | |
################################################################# | |
# provide feedback about OOM errors where we've seen them before | |
################################################################# | |
RED='\033[0;31m' | |
GREEN='\033[0;32m' | |
NOCOLOR='\033[0m' # To reset the color | |
# grok if operator logs produced a error that makes things unable to schedule | |
error=$(cat /tmp/operator.log | grep "ReconciliationException") | |
if [[ "$error" ]]; then | |
echo -e "${RED}################### ERROR ###########################${NOCOLOR}" | |
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be a ReconciliationException in the operator logs...${NOCOLOR}" | |
dump_error=$(cat /tmp/operator.log | grep -a20 "ReconciliationException") | |
echo "$dump_error" | |
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}" | |
fi | |
################################################################# | |
# end | |
################################################################# | |
# delete the flinkdeployment so we don't have old failures hanging around | |
kubectl get flinkdeployment --no-headers | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{} | |
# force GH action to show failed result | |
exit 128 | |
- name: echo JobID, JobName, FlinkDashboard to user | |
id: report_ids | |
run: | | |
# TODO: we also need to report historyserver URL and flink dashboard URL | |
# but this also requires us to think how we're going to have a thin | |
# layer of authentication around these services so they aren't totally public | |
echo '############ JOB NAME ################' | |
echo $JOB_NAME | |
echo "job_name=$JOB_NAME" >> $GITHUB_OUTPUT | |
echo '############ JOB ID ################' | |
echo $JOB_ID | |
echo "job_id=$JOB_ID" >> $GITHUB_OUTPUT | |
echo '############ FLINK DASHBOARD ################' | |
echo $FLINK_DASH | |
echo "flink_dash=$FLINK_DASH" >> $GITHUB_OUTPUT | |
monitor-job: | |
runs-on: ubuntu-latest | |
name: monitor job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }} | |
needs: [name-job, run-job] | |
steps: | |
- name: set up aws credentials for job runner user | |
uses: aws-actions/configure-aws-credentials@v2 | |
with: | |
aws-access-key-id: ${{ secrets.GH_ACTIONS_AWS_ACCESS_KEY_ID }} | |
aws-secret-access-key: ${{ secrets.GH_ACTIONS_AWS_SECRET_ACCESS_KEY }} | |
aws-region: ${{ secrets.GH_ACTIONS_AWS_REGION }} | |
- name: install kubectl | |
run: | | |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" | |
chmod +x ./kubectl | |
sudo mv ./kubectl /usr/local/bin/kubectl | |
- name: update kubeconfig with cluster | |
run: | | |
aws eks update-kubeconfig --name pangeo-forge-v3 --region ${{ secrets.GH_ACTIONS_AWS_REGION }} | |
# - name: Setup upterm session | |
# uses: lhotari/action-upterm@v1 | |
# | |
- name: monitor logs of job manager and report final status | |
id: monitorjob | |
timeout-minutes: 240 | |
continue-on-error: true | |
run: | | |
# TODO: this needs to not check the logs but the historyserver status | |
# but first we need think about authentication and a reverse proxy | |
echo "find job status on the job manager logs..." | |
while [[ -z "$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1)" ]]; do | |
echo "still waiting for a status on the job manager logs..." | |
sleep 1 | |
done | |
input_status=$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1) | |
echo "##### INPUT STATUS #####" | |
echo $input_status | |
status=$(echo "$input_status" | grep -oP '\b\w+(?=\.$)') | |
echo "##### STATUS #####" | |
echo $status | |
if [[ "$status" == "FAILING" || "$status" == "FAILED" ]]; then | |
echo "job failed with '$status', will dump the logs now..." | |
# force exit so we can move to next step | |
exit 128 | |
fi | |
- name: dump logs | |
if: steps.monitorjob.outcome == 'failure' | |
run: | | |
# much easier to do in bash than in Python via subprocess | |
echo "##################### OPERATOR ######################" | |
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000 | |
echo "##################### JOB MANAGER ######################" | |
kubectl get pod | grep -v taskmanager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/jobmanager.log | |
cat /tmp/jobmanager.log | |
echo "##################### TASK MANAGER ######################" | |
# depending on the `inputs.parallism` we can have more than one taskmanager | |
parallelism_input="${{ github.event.inputs.parallelism }}" | |
iterations=$(expr $parallelism_input + 0) # cast to integer | |
for (( i = 1; i <= iterations; i++ )); do | |
echo "echo #### taskmanager-$i ####" | |
kubectl get pod | grep ${{ needs.run-job.outputs.job_name }} | grep taskmanager-1-$i | cut -d' ' -f1 | head -n1 | xargs -I{} kubectl logs pod/{} -c flink-main-container > /tmp/taskmanager.log | |
cat /tmp/taskmanager.log | |
done | |
# NOTE: we actually want the failed flink deployments to stick around b/c we might want to inspect the flink dashboard | |
# kubectl get flinkdeployment --no-headers | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{} | |
################################################################# | |
# provide feedback about OOM errors where we've seen them before | |
################################################################# | |
RED='\033[0;31m' | |
GREEN='\033[0;32m' | |
NOCOLOR='\033[0m' # To reset the color | |
# grok if taskmanager produced a JVM OOM error | |
error=$(cat /tmp/taskmanager.log | grep "java.lang.OutOfMemoryError") | |
if [[ "$error" ]]; then | |
echo -e "${RED}################### ERROR ###########################${NOCOLOR}" | |
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be a JVM OOM error in the taskmanager logs...${NOCOLOR}" | |
dump_error=$(cat /tmp/taskmanager.log | grep -a20 "java.lang.OutOfMemoryError") | |
echo "$dump_error" | |
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}" | |
fi | |
# grok if this was OOM killed | |
error=$(cat /tmp/jobmanager.log | grep "reason=OOMKilled") | |
if [[ "$error" ]]; then | |
echo -e "${RED}################### ERROR ###########################${NOCOLOR}" | |
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be an OOMKilled error in the jobmanager logs...${NOCOLOR}" | |
dump_error=$(cat /tmp/jobmanager.log | grep -a20 "reason=OOMKilled") | |
echo "$dump_error" | |
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}" | |
fi | |
################################################################# | |
# end | |
################################################################# | |
# force GH action to show failed result | |
exit 128 |