-
Notifications
You must be signed in to change notification settings - Fork 2
301 lines (266 loc) · 13.7 KB
/
job-runner.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
name: dispatch job
on:
workflow_dispatch:
inputs:
repo:
description: 'The https github url for the recipe feedstock'
required: true
ref:
description: 'The tag or branch to target in your recipe repo'
required: true
default: 'main'
feedstock_subdir:
description: 'The subdir of the feedstock directory in the repo'
required: true
default: 'feedstock'
bucket:
description: 'This job runner leverages s3fs.S3FileSystem for your recipe cache and output. Choices currently are: "default"'
required: true
default: 'default'
prune:
description: 'Only run the first two time steps'
required: true
default: '0'
parallelism:
description: 'Number of task managers to spin up'
required: true
default: '1'
protocol:
description: 'What protocol to use when accessing files (s3 or https).'
required: false
default: 's3'
resource_profile:
description: 'jobs have different memory requirements so choose (small[7168M], medium[10240M], large[15360M], xlarge[20480M])'
required: false
default: 'small'
jobs:
name-job:
runs-on: ubuntu-latest
outputs:
repo_name: ${{ steps.string_manipulation.outputs.result }}
steps:
- name: manipuluate strings
id: string_manipulation
run: |
repo_name=$(basename -s .git "${{ github.event.inputs.repo }}")
echo "result=$repo_name" >> $GITHUB_OUTPUT
run-job:
if: contains('["ranchodeluxe","abarciauskas-bgse", "norlandrhagen", "sharkinsspatial", "moradology", "thodson-usgs"]', github.actor)
name: kickoff job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }}
needs: name-job
outputs:
job_name: ${{ steps.report_ids.outputs.job_name }}
job_id: ${{ steps.report_ids.outputs.job_id }}
runs-on: ubuntu-latest
steps:
- name: checkout repository
uses: actions/checkout@v3
- name: set up python 3.10
uses: actions/setup-python@v3
with:
python-version: '3.10'
- name: echo inputs to user
run: |
echo "Manually triggered workflow: \
${{ github.event.inputs.repo }} \
${{ github.event.inputs.ref }} \
${{ github.event.inputs.bucket }} \
${{ github.event.inputs.parallelism }} \
${{ github.event.inputs.prune }}"
- name: install deps
run: |
python -m pip install --upgrade pip
pip install pangeo-forge-runner>=0.10.0
- name: set up aws credentials for job runner user
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.GH_ACTIONS_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.GH_ACTIONS_AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ secrets.GH_ACTIONS_AWS_REGION }}
- name: install kubectl
run: |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x ./kubectl
sudo mv ./kubectl /usr/local/bin/kubectl
- name: update kubeconfig with cluster
run: |
aws eks update-kubeconfig --name pangeo-forge-v3 --region ${{ secrets.GH_ACTIONS_AWS_REGION }}
- name: execute recipe on k8s cluster
id: executejob
continue-on-error: true
run: |
# NOTE: we can't use `2>&1 | tee execute.log` b/c it hangs forever
# so if the command fails (for example b/c it doesn't have the right requirements)
# then we wont' be able to see the errors until we run it without redirecting output
pangeo-forge-runner \
bake \
--repo=${{ github.event.inputs.repo }} \
--ref=${{ github.event.inputs.ref }} \
-f .github/workflows/config.py > execute.log
# export all the valuable information from the logs
RECIPE_JOB_NAME=$(cat execute.log | grep -oP 'Job name is \K[^ ]+' | head -n1)
echo "RECIPE_JOB_NAME=$RECIPE_JOB_NAME" >> $GITHUB_ENV
JOB_NAME=$(cat execute.log | grep -oP 'flinkdeployment\.flink\.apache\.org/\K[^ ]+' | head -n1)
echo "JOB_NAME=$JOB_NAME" >> $GITHUB_ENV
JOB_ID=$(cat execute.log | grep -oP 'Started Flink job as \K[^ ]+')
echo "JOB_ID=$JOB_ID" >> $GITHUB_ENV
FLINK_DASH=$(cat execute.log | grep -oP "You can run '\K[^']+(?=')")
echo "FLINK_DASH=$FLINK_DASH" >> $GITHUB_ENV
env:
EARTHDATA_TOKEN: ${{ secrets.EARTHDATA_TOKEN }}
EARTHDATA_USERNAME: ${{ secrets.EARTHDATA_USERNAME }}
EARTHDATA_PASSWORD: ${{ secrets.EARTHDATA_PASSWORD }}
REPO: ${{ github.event.inputs.repo }}
REF: ${{ github.event.inputs.ref }}
FEEDSTOCK_SUBDIR: ${{ github.event.inputs.feedstock_subdir }}
PRUNE_OPTION: ${{ github.event.inputs.prune }}
PARALLELISM_OPTION: ${{ github.event.inputs.parallelism }}
S3_BUCKET: ${{ github.event.inputs.bucket }}
S3_DEFAULT_AWS_ACCESS_KEY_ID: ${{ secrets.S3_DEFAULT_AWS_ACCESS_KEY_ID }}
S3_DEFAULT_AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_DEFAULT_AWS_SECRET_ACCESS_KEY }}
PROTOCOL: ${{ github.event.inputs.protocol }}
RESOURCE_PROFILE: ${{ github.event.inputs.resource_profile }}
- name: cleanup if "pangeo-forge-runner bake" failed
if: steps.executejob.outcome == 'failure'
run: |
echo "The previous 'bake' command failed or timed out. Running cleanup logic..."
# much easier to do in bash than in Python via subprocess
echo "##################### OPERATOR ######################"
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/operator.log
cat /tmp/operator.log
echo "##################### JOB MANAGER ######################"
kubectl get pod | grep -v manager | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/jobmanager.log
cat /tmp/jobmanager.log
#################################################################
# provide feedback about OOM errors where we've seen them before
#################################################################
RED='\033[0;31m'
GREEN='\033[0;32m'
NOCOLOR='\033[0m' # To reset the color
# grok if operator logs produced a error that makes things unable to schedule
error=$(cat /tmp/operator.log | grep "ReconciliationException")
if [[ "$error" ]]; then
echo -e "${RED}################### ERROR ###########################${NOCOLOR}"
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be a ReconciliationException in the operator logs...${NOCOLOR}"
dump_error=$(cat /tmp/operator.log | grep -a20 "ReconciliationException")
echo "$dump_error"
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}"
fi
#################################################################
# end
#################################################################
# delete the flinkdeployment so we don't have old failures hanging around
kubectl get flinkdeployment --no-headers | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{}
# force GH action to show failed result
exit 128
- name: echo JobID, JobName, FlinkDashboard to user
id: report_ids
run: |
# TODO: we also need to report historyserver URL and flink dashboard URL
# but this also requires us to think how we're going to have a thin
# layer of authentication around these services so they aren't totally public
echo '############ RECIPE JOB NAME ################'
echo $RECIPE_JOB_NAME
echo '############ FLINK JOB NAME ################'
echo $JOB_NAME
echo "job_name=$JOB_NAME" >> $GITHUB_OUTPUT
echo '############ JOB ID ################'
echo $JOB_ID
echo "job_id=$JOB_ID" >> $GITHUB_OUTPUT
echo '############ FLINK DASHBOARD ################'
echo $FLINK_DASH
echo "flink_dash=$FLINK_DASH" >> $GITHUB_OUTPUT
monitor-job:
runs-on: ubuntu-latest
name: monitor job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }}
needs: [name-job, run-job]
steps:
- name: set up aws credentials for job runner user
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.GH_ACTIONS_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.GH_ACTIONS_AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ secrets.GH_ACTIONS_AWS_REGION }}
- name: install kubectl
run: |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x ./kubectl
sudo mv ./kubectl /usr/local/bin/kubectl
- name: update kubeconfig with cluster
run: |
aws eks update-kubeconfig --name pangeo-forge-v3 --region ${{ secrets.GH_ACTIONS_AWS_REGION }}
# - name: Setup upterm session
# uses: lhotari/action-upterm@v1
#
- name: monitor logs of job manager and report final status
id: monitorjob
timeout-minutes: 240
continue-on-error: true
run: |
# TODO: this needs to not check the logs but the historyserver status
# but first we need think about authentication and a reverse proxy
echo "find job status on the job manager logs..."
while [[ -z "$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1)" ]]; do
echo "still waiting for a status on the job manager logs..."
sleep 1
done
input_status=$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1)
echo "##### INPUT STATUS #####"
echo $input_status
status=$(echo "$input_status" | grep -oP '\b\w+(?=\.$)')
echo "##### STATUS #####"
echo $status
if [[ "$status" == "FAILING" || "$status" == "FAILED" ]]; then
echo "job failed with '$status', will dump the logs now..."
# force exit so we can move to next step
exit 128
fi
- name: dump logs
if: steps.monitorjob.outcome == 'failure'
run: |
# much easier to do in bash than in Python via subprocess
echo "##################### OPERATOR ######################"
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000
echo "##################### JOB MANAGER ######################"
kubectl get pod | grep -v taskmanager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/jobmanager.log
cat /tmp/jobmanager.log
echo "##################### TASK MANAGER ######################"
# depending on the `inputs.parallism` we can have more than one taskmanager
parallelism_input="${{ github.event.inputs.parallelism }}"
iterations=$(expr $parallelism_input + 0) # cast to integer
for (( i = 1; i <= iterations; i++ )); do
echo "echo #### taskmanager-$i ####"
kubectl get pod | grep ${{ needs.run-job.outputs.job_name }} | grep taskmanager-1-$i | cut -d' ' -f1 | head -n1 | xargs -I{} kubectl logs pod/{} -c flink-main-container > /tmp/taskmanager.log
cat /tmp/taskmanager.log
done
# NOTE: we actually want the failed flink deployments to stick around b/c we might want to inspect the flink dashboard
# kubectl get flinkdeployment --no-headers | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{}
#################################################################
# provide feedback about OOM errors where we've seen them before
#################################################################
RED='\033[0;31m'
GREEN='\033[0;32m'
NOCOLOR='\033[0m' # To reset the color
# grok if taskmanager produced a JVM OOM error
error=$(cat /tmp/taskmanager.log | grep "java.lang.OutOfMemoryError")
if [[ "$error" ]]; then
echo -e "${RED}################### ERROR ###########################${NOCOLOR}"
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be a JVM OOM error in the taskmanager logs...${NOCOLOR}"
dump_error=$(cat /tmp/taskmanager.log | grep -a20 "java.lang.OutOfMemoryError")
echo "$dump_error"
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}"
fi
# grok if this was OOM killed
error=$(cat /tmp/jobmanager.log | grep "reason=OOMKilled")
if [[ "$error" ]]; then
echo -e "${RED}################### ERROR ###########################${NOCOLOR}"
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be an OOMKilled error in the jobmanager logs...${NOCOLOR}"
dump_error=$(cat /tmp/jobmanager.log | grep -a20 "reason=OOMKilled")
echo "$dump_error"
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}"
fi
#################################################################
# end
#################################################################
# force GH action to show failed result
exit 128