Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch tasks using Dask in Argo #120

Merged
merged 16 commits into from
Jan 6, 2023
44 changes: 44 additions & 0 deletions src/python-scripts/compute-base-flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging

import dask
import dask_gateway
import numpy as np
import pandas as pd
import xarray as xr


logger = logging.getLogger("DaskWorkflow")

def client_code():
with dask.config.set(**{'array.slicing.split_large_chunks': False}):
nwm_uri = 's3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr'
ds = xr.open_zarr(nwm_uri)
recent = ds.where(ds['time'] >= np.datetime64('2010-01-01'), drop=True)
weekly_avg = recent.streamflow.groupby('time.week').mean().rename('mean')
weekly_std = recent.streamflow.groupby('time.week').std().rename('std')
base_flow = xr.merge([weekly_avg, weekly_std])

base_flow.to_zarr('s3://azavea-noaa-hydro-data-public/nwm-base-flow.zarr', mode='w')

def main():
gw = dask_gateway.Gateway(auth="jupyterhub")
logger.warning(f"Using auth of type {type(gw.auth)}")

try:
opts = gw.cluster_options()
opts.worker_memory = int(os.environ['DASK_OPTS__WORKER_MEMORY'])
opts.worker_cores = int(os.environ['DASK_OPTS__WORKER_CORES'])
opts.scheduler_memory = int(os.environ['DASK_OPTS__SCHEDULER_MEMORY'])
opts.scheduler_cores = int(os.environ['DASK_OPTS__SCHEDULER_CORES'])
cluster = gw.new_cluster(opts)
cluster.scale(int(os.environ['DASK_OPTS__N_WORKERS']))
client = cluster.get_client()

logger.warning(f"Client dashboard: {client.dashboard_link}")

client_code()
finally:
gw.stop_cluster(client.cluster.name)

if __name__ == "__main__":
main()
36 changes: 36 additions & 0 deletions workflows/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,39 @@
# Argo Workflow Archive

This directory contains a collection of `Workflow` definitions for Argo to do tasks that we may want to repeat. As we learn Argo better, we might be able to make better use of these resources for more complex tasks.

## Dask task runner
The provided `run-dask-job.yaml` allows for the specification of an HTTP(S) URL to a Python script which will be downloaded and run in a Dask cluster that will be configured to the scale dictated by the other job parameters.

Scripts that wish to use this framework should use the following as the starting point for their code:
```python
import logging
import dask_gateway

logger = logging.getLogger("DaskWorkflow")

def client_code():
pass

def main():
gw = dask_gateway.Gateway(auth="jupyterhub")

try:
opts = gw.cluster_options()
opts.worker_memory = int(os.environ['DASK_OPTS__WORKER_MEMORY'])
opts.worker_cores = int(os.environ['DASK_OPTS__WORKER_CORES'])
opts.scheduler_memory = int(os.environ['DASK_OPTS__SCHEDULER_MEMORY'])
opts.scheduler_cores = int(os.environ['DASK_OPTS__SCHEDULER_CORES'])
cluster = gw.new_cluster(opts)
cluster.scale(int(os.environ['DASK_OPTS__N_WORKERS']))
client = cluster.get_client()

logger.warning(f"Client dashboard: {client.dashboard_link}")

client_code()
finally:
gw.stop_cluster(client.cluster.name)

if __name__=="__main__":
main()
```
81 changes: 81 additions & 0 deletions workflows/run-dask-job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# inspired by https://raw.githubusercontent.com/pipekit/talk-demos/main/argocon-demos/2021-processing-petabytes-with-dask/dask_standard_cluster_workflow_template.yaml
apiVersion: argoproj.io/v1alpha1
#kind: ClusterWorkflowTemplate
kind: Workflow
metadata:
generateName: dask-distributed-task-workflow-
spec:
entrypoint: execute-dask-job
securityContext:
fsGroup: 1000
volumeClaimTemplates:
- metadata:
name: scratch
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 128Gi
arguments:
parameters:
- name: script-location
- name: image
value: "pangeo/pangeo-notebook:2022.05.18"
- name: n-workers
value: "1"
- name: worker-cores
value: "1"
- name: worker-mem-gb
value: "2"
- name: scheduler-cores
value: "1"
- name: scheduler-mem-gb
value: "4"
templates:
- name: execute-dask-job
inputs:
parameters:
- name: script-location
- name: image
- name: n-workers
- name: worker-cores
- name: worker-mem-gb
- name: scheduler-cores
- name: scheduler-mem-gb
script:
image: "{{inputs.parameters.image}}"
imagePullPolicy: Always
command: [bash]
env:
- name: DASK_GATEWAY__ADDRESS
value: http://traefik-dask-gateway/services/dask-gateway
- name: DASK_GATEWAY__PROXY_ADDRESS
value: gateway://traefik-dask-gateway:80
- name: DASK_GATEWAY__PUBLIC_ADDRESS
value: /services/dask-gateway
- name: DASK_GATEWAY__AUTH__TYPE
value: jupyterhub
- name: JUPYTERHUB_API_TOKEN
valueFrom:
secretKeyRef:
name: auth-token
key: token
- name: DASK_OPTS__WORKER_MEMORY
value: "{{inputs.parameters.worker-mem-gb}}"
- name: DASK_OPTS__WORKER_CORES
value: "{{inputs.parameters.worker-cores}}"
- name: DASK_OPTS__N_WORKERS
value: "{{inputs.parameters.n-workers}}"
- name: DASK_OPTS__SCHEDULER_MEMORY
value: "{{inputs.parameters.scheduler-mem-gb}}"
- name: DASK_OPTS__SCHEDULER_CORES
value: "{{inputs.parameters.scheduler-cores}}"
volumeMounts:
- name: scratch
mountPath: /scratch
workingDir: "/scratch"
source: |
pwd
export
wget -q --output-document=source.py "{{inputs.parameters.script-location}}"
python source.py