From d55914d9890117086073d7861ebc7b6aaae1e10d Mon Sep 17 00:00:00 2001 From: Pankaj Date: Fri, 3 May 2024 15:05:53 +0530 Subject: [PATCH] Adjust test for Airflow-2.9 --- python-sdk/tests/airflow_tests/test_datasets.py | 4 +--- python-sdk/tests/sql/operators/test_cleanup.py | 10 +++++----- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/python-sdk/tests/airflow_tests/test_datasets.py b/python-sdk/tests/airflow_tests/test_datasets.py index dc21c1a96..5ecde4707 100644 --- a/python-sdk/tests/airflow_tests/test_datasets.py +++ b/python-sdk/tests/airflow_tests/test_datasets.py @@ -104,7 +104,6 @@ def test_kwargs_with_temp_table(): @pytest.mark.skipif(airflow.__version__ < "2.4.0", reason="Require Airflow version >= 2.4.0") def test_example_dataset_dag(): from airflow.datasets import Dataset - from airflow.models.dataset import DatasetModel dir_path = os.path.dirname(os.path.realpath(__file__)) db = DagBag(dir_path + "/../../example_dags/example_datasets.py") @@ -115,9 +114,8 @@ def test_example_dataset_dag(): outlets = producer_dag.tasks[-1].outlets assert isinstance(outlets[0], Dataset) # Test that dataset_triggers is only set if all the instances passed to the DAG object are Datasets - assert consumer_dag.dataset_triggers == outlets + assert consumer_dag.dataset_triggers.objects[0] == outlets[0] assert outlets[0].uri == "astro://postgres_conn@?table=imdb_movies" - assert DatasetModel.from_public(outlets[0]) == Dataset("astro://postgres_conn@?table=imdb_movies") def test_disable_auto_inlets_outlets(): diff --git a/python-sdk/tests/sql/operators/test_cleanup.py b/python-sdk/tests/sql/operators/test_cleanup.py index dde4ac724..4f4940be2 100644 --- a/python-sdk/tests/sql/operators/test_cleanup.py +++ b/python-sdk/tests/sql/operators/test_cleanup.py @@ -105,12 +105,12 @@ def test_error_raised_with_blocking_op_executors( reason="BackfillJobRunner and Job classes are only available in airflow >= 2.6", ) @pytest.mark.parametrize( - "executor_in_job,executor_in_cfg,expected_val", + "executor_in_job, executor_in_cfg, expected_val", [ (SequentialExecutor(), "LocalExecutor", True), (LocalExecutor(), "LocalExecutor", False), - (None, "LocalExecutor", False), - (None, "SequentialExecutor", True), + (LocalExecutor(), "LocalExecutor", False), + (SequentialExecutor(), "SequentialExecutor", True), ], ) def test_single_worker_mode_backfill(executor_in_job, executor_in_cfg, expected_val): @@ -177,8 +177,8 @@ def test_single_worker_mode_backfill_airflow_2_5(executor_in_job, executor_in_cf [ (SequentialExecutor(), "LocalExecutor", True), (LocalExecutor(), "LocalExecutor", False), - (None, "LocalExecutor", False), - (None, "SequentialExecutor", True), + (LocalExecutor(), "LocalExecutor", False), + (SequentialExecutor(), "SequentialExecutor", True), ], ) def test_single_worker_mode_scheduler_job(executor_in_job, executor_in_cfg, expected_val):