Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fetch job and update stage_ic to work with fetched ICs #3141

Draft
wants to merge 14 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions jobs/JGLOBAL_FETCH
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#! /usr/bin/env bash

source "${HOMEgfs}/ush/preamble.sh"
source "${HOMEgfs}/ush/jjob_header.sh" -e "fetch" -c "base fetch"

# Execute fetching
"${SCRgfs}/exglobal_fetch.py"
err=$?

###############################################################
# Check for errors and exit if any of the above failed
if [[ "${err}" -ne 0 ]]; then
echo "FATAL ERROR: Unable to fetch ICs to ${ROTDIR}; ABORT!"
exit "${err}"
fi

##########################################
# Remove the Temporary working directory
##########################################
cd "${DATAROOT}" || (echo "${DATAROOT} does not exist. ABORT!"; exit 1)
[[ ${KEEPDATA} = "NO" ]] && rm -rf "${DATA}"

exit 0
38 changes: 38 additions & 0 deletions parm/config/gefs/config.fetch
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#! /usr/bin/env bash

########## config.fetch ##########

echo "BEGIN: config.fetch"

# Get task specific resources
source "${EXPDIR}/config.resources" fetch

export ICSDIR="@ICSDIR@" # User provided ICSDIR; blank if not provided
export BASE_IC="@BASE_IC@" # Platform home for staged ICs

export FETCH_YAML_TMPL="${PARMgfs}/stage/master_gefs.yaml.j2"
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
export fetch_yaml="stage_atm_cold.yaml"

# Set ICSDIR (if not defined)

if [[ -z "${ICSDIR}" ]] ; then

ic_ver="20240610"

if (( NMEM_ENS > 0 )) ; then
ensic="${CASE_ENS}"
fi

if [[ "${DO_OCN:-NO}" == "YES" ]] ; then
ocnic="mx${OCNRES}"
fi

export ICSDIR="${BASE_IC}/${CASE}${ensic:-}${ocnic:-}/${ic_ver}"

fi

# Use of perturbations files for ensembles
export USE_OCN_ENS_PERTURB_FILES="NO"
export USE_ATM_ENS_PERTURB_FILES="NO"
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved

echo "END: config.fetch"
37 changes: 37 additions & 0 deletions parm/config/gfs/config.fetch
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#! /usr/bin/env bash

########## config.fetch ##########

echo "BEGIN: config.fetch"

# Get task specific resources
source "${EXPDIR}/config.resources" fetch

export ICSDIR="@ICSDIR@" # User provided ICSDIR; blank if not provided
export BASE_IC="@BASE_IC@" # Platform home for staged ICs

export FETCH_YAML_TMPL="${PARMgfs}/stage/master_gfs.yaml.j2"
export fetch_yaml="stage_atm_cold.yaml"

# Set ICSDIR (if not defined)
if [[ -z "${ICSDIR}" ]] ; then

ic_ver="20240610"

if (( NMEM_ENS > 0 )) ; then
ensic="${CASE_ENS}"
fi

if [[ "${DO_OCN:-NO}" == "YES" ]] ; then
ocnic="mx${OCNRES}"
fi

export ICSDIR="${BASE_IC}/${CASE}${ensic:-}${ocnic:-}/${ic_ver}"

fi

# Use of perturbations files for ensembles
export USE_OCN_ENS_PERTURB_FILES="NO"
export USE_ATM_ENS_PERTURB_FILES="NO"
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved

echo "END: config.fetch"
19 changes: 19 additions & 0 deletions parm/fetch/stage_atm_cold.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
untar:
tarball : "{{ ATARDIR }}/{{ cycle_YMDH }}/atm_cold.tar"
on_hpss: True
contents:
- gfs_ctrl.nc
{% for ftype in ["gfs_data", "sfc_data"] %}
{% for ntile in range(1, ntiles + 1) %}
- {{ ftype }}.tile{{ ntile }}.nc
{% endfor %} # ntile
{% endfor %} # ftype
destination: "{{ DATA }}"
atmosphere_cold:
copy:
- ["{{ DATA }}/gfs_ctrl.nc", "{{ COMOUT_ATMOS_INPUT }}"]
{% for ftype in ["gfs_data", "sfc_data"] %}
{% for ntile in range(1, ntiles + 1) %}
- ["{{ DATA }}/{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_INPUT }}"]
{% endfor %} # ntile
{% endfor %} # ftype
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
45 changes: 45 additions & 0 deletions scripts/exglobal_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env python3

import os

from fetch import Fetch
from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit

# initialize root logger
logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True)


@logit(logger)
def main():

config = cast_strdict_as_dtypedict(os.environ)

# Instantiate the Fetch object
fetch = Fetch(config)

# Pull out all the configuration keys needed to run the fetch step
keys = ['current_cycle', 'RUN', 'PDY', 'PARMgfs', 'PSLOT', 'ROTDIR', 'fetch_yaml', 'ATARDIR', 'ntiles']
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved

fetch_dict = AttrDict()
for key in keys:
fetch_dict[key] = fetch.task_config.get(key)
if fetch_dict[key] is None:
print(f"Warning: key ({key}) not found in task_config!")

# Also import all COMOUT* directory and template variables
for key in fetch.task_config.keys():
if key.startswith("COMOUT_"):
fetch_dict[key] = fetch.task_config.get(key)
if fetch_dict[key] is None:
print(f"Warning: key ({key}) not found in task_config!")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it, the fetch task doesn't interact with the COM directories, so I think this can be deleted.

Suggested change
# Also import all COMOUT* directory and template variables
for key in fetch.task_config.keys():
if key.startswith("COMOUT_"):
fetch_dict[key] = fetch.task_config.get(key)
if fetch_dict[key] is None:
print(f"Warning: key ({key}) not found in task_config!")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

# Determine which archives to retrieve from HPSS
# Read the input YAML file to get the list of tarballs on tape
atardir_set = fetch.configure(fetch_dict)

# Pull the data from tape or locally and store the specified destination
fetch.execute_pull_data(atardir_set)
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved


if __name__ == '__main__':
main()
98 changes: 98 additions & 0 deletions ush/python/pygfs/task/fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env python3

import os
from logging import getLogger
from typing import Any, Dict, List

from wxflow import (AttrDict, FileHandler, Hsi, Task,
logit, parse_j2yaml)
from wxflow import htar as Htar
import tarfile
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved


logger = getLogger(__name__.split('.')[-1])


class Fetch(Task):
"""Task to pull ROTDIR data from HPSS (or locally)
"""

@logit(logger, name="Fetch")
def __init__(self, config: Dict[str, Any]) -> None:
"""Constructor for the Fetch task
The constructor is responsible for collecting necessary yamls based on
the runtime options and RUN.

Parameters
----------
config : Dict[str, Any]
Incoming configuration for the task from the environment

Returns
-------
None
"""
super().__init__(config)

DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
# Perhaps add other stuff to self.

@logit(logger)
def configure(self, fetch_dict: Dict[str, Any]):
"""Determine which tarballs will need to be extracted

Parameters
----------
fetch_dict : Dict[str, Any]
Task specific keys, e.g. COM directories, etc

Return
------
parsed_fetch: Dict[str, Any]
Dictionary derived from the yaml file with necessary HPSS info.
"""

self.hsi = Hsi()

fetch_yaml = fetch_dict.fetch_yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be FETCH_YAML_TMPL:

Suggested change
fetch_yaml = fetch_dict.fetch_yaml
fetch_yaml = fetch_dict.FETCH_YAML_TMPL

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

fetch_parm = os.path.join(fetch_dict.PARMgfs, "fetch")

parsed_fetch = parse_j2yaml(os.path.join(fetch_parm, fetch_yaml),
fetch_dict)
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved

DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
return parsed_fetch

@logit(logger)
def execute_pull_data(self, atardir_set: Dict[str, Any]) -> None:
"""Pull data from HPSS based on a yaml dictionary and store at the
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
specified destination.

Parameters
----------
atardir_set: Dict[str, Any],
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
Dict defining set of tarballs to pull and where to put them.

Return
None
"""
if len(f_names) <= 0: # Abort if no files
raise FileNotFoundError("FATAL ERROR: The tar ball has no files") # DG - add name
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved

DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
f_names = atardir_set.untar.contents
on_hpss = atardir_set.untar.on_hpss
dest = atardir_set.untar.destination

DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
# Select action whether no_hpss is True or not, and pull these data from
# tape or locally and place where it needs to go
# DG - these need testing
if on_hpss is True: # htar all files in fnames
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
htar_obj = Htar.Htar()
htar_obj.cvf(dest, f_names)

else: # tar all files in fnames
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
with tarfile.open(dest, "w") as tar:
for filename in f_names:
tar.add(filename)


# Other helper methods...

DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
Loading