Skip to content

Commit

Permalink
fix unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
ErickSeo committed Dec 19, 2024
1 parent 84c3172 commit 08d1fa7
Showing 1 changed file with 4 additions and 7 deletions.
11 changes: 4 additions & 7 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,8 @@ def process_file_with_datasets(
:returns: The result of the condition evaluation if `condition_string` is provided, otherwise a list of `Dataset` objects.
:rtype: Any
"""
if condition_string:
is_airflow_version_at_least_2_9 = version.parse(AIRFLOW_VERSION) >= version.parse("2.9.0")
if condition_string and is_airflow_version_at_least_2_9:
map_datasets = utils.get_datasets_map_uri_yaml_file(file, datasets_filter)
dataset_map = {alias_dataset: Dataset(uri) for alias_dataset, uri in map_datasets.items()}
return eval(condition_string, {}, dataset_map)
Expand Down Expand Up @@ -692,14 +693,10 @@ def configure_schedule(dag_params: Dict[str, Any], dag_kwargs: Dict[str, Any]) -

dag_kwargs["schedule"] = DagBuilder.process_file_with_datasets(file, datasets_filter, condition_string)

elif has_conditions_attr and has_datasets_attr:
elif has_conditions_attr and has_datasets_attr and is_airflow_version_at_least_2_9:
datasets_filter = schedule["datasets"]
condition_string = schedule["conditions"]

if is_airflow_version_at_least_2_9:
dag_kwargs["schedule"] = DagBuilder.evaluate_condition_with_datasets(
condition_string, datasets_filter
)
dag_kwargs["schedule"] = DagBuilder.evaluate_condition_with_datasets(condition_string, datasets_filter)

else:
dag_kwargs["schedule"] = [Dataset(uri) for uri in schedule]
Expand Down

0 comments on commit 08d1fa7

Please sign in to comment.