Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Oct 17, 2024
1 parent c9de757 commit 0b34ea7
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
1 change: 1 addition & 0 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ def make_task(operator: str, task_params: Dict[str, Any]) -> BaseOperator:
else operator_obj.partial(**task_params).expand(**expand_kwargs)
)
except Exception as err:
raise err
raise DagFactoryException(f"Failed to create {operator_obj} task") from err
return task

Expand Down
12 changes: 7 additions & 5 deletions tests/test_dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,9 @@ def test_replace_expand_string_with_xcom():
assert updated_task_conf_xcomarg["expand"]["key_1"] == XComArg(tasks_dict["task_1"])


@pytest.mark.skipif(
version.parse(AIRFLOW_VERSION) <= version.parse("2.4.0"), reason="Requires Airflow version greater than 2.4.0"
)
@pytest.mark.parametrize(
"outlets,output",
[
Expand All @@ -898,12 +901,11 @@ def test_make_task_outlets(mock_read_file, outlets, output):
td = dagbuilder.DagBuilder("test_dag", DAG_CONFIG, DEFAULT_CONFIG)
task_params = {
"task_id": "process",
"python_callable_name": "dataset_task",
"python_callable_name": "expand_task",
"python_callable_file": os.path.realpath(__file__),
"outlets": outlets,
}
mock_read_file.return_value = output
if version.parse(AIRFLOW_VERSION) > version.parse("2.4.0"):
operator = "airflow.operators.python_operator.PythonOperator"
actual = td.make_task(operator, task_params)
assert actual.outlets == [Dataset(uri) for uri in output]
operator = "airflow.operators.python_operator.PythonOperator"
actual = td.make_task(operator, task_params)
assert actual.outlets == [Dataset(uri) for uri in output]

0 comments on commit 0b34ea7

Please sign in to comment.