diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index eabb177d..c5b1f573 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -651,7 +651,8 @@ def process_file_with_datasets( :returns: The result of the condition evaluation if `condition_string` is provided, otherwise a list of `Dataset` objects. :rtype: Any """ - if condition_string: + is_airflow_version_at_least_2_9 = version.parse(AIRFLOW_VERSION) >= version.parse("2.9.0") + if condition_string and is_airflow_version_at_least_2_9: map_datasets = utils.get_datasets_map_uri_yaml_file(file, datasets_filter) dataset_map = {alias_dataset: Dataset(uri) for alias_dataset, uri in map_datasets.items()} return eval(condition_string, {}, dataset_map) @@ -692,14 +693,10 @@ def configure_schedule(dag_params: Dict[str, Any], dag_kwargs: Dict[str, Any]) - dag_kwargs["schedule"] = DagBuilder.process_file_with_datasets(file, datasets_filter, condition_string) - elif has_conditions_attr and has_datasets_attr: + elif has_conditions_attr and has_datasets_attr and is_airflow_version_at_least_2_9: datasets_filter = schedule["datasets"] condition_string = schedule["conditions"] - - if is_airflow_version_at_least_2_9: - dag_kwargs["schedule"] = DagBuilder.evaluate_condition_with_datasets( - condition_string, datasets_filter - ) + dag_kwargs["schedule"] = DagBuilder.evaluate_condition_with_datasets(condition_string, datasets_filter) else: dag_kwargs["schedule"] = [Dataset(uri) for uri in schedule]