From 43ad2ca6edd11c7305d72f3e17a2535f6c94325d Mon Sep 17 00:00:00 2001 From: Pratiksha <128999446+Prab-27@users.noreply.github.com> Date: Tue, 10 Dec 2024 13:00:56 +0530 Subject: [PATCH] Remove Provider Deprecations in Trino (#44717) * remove provider deprecations in trino * remove trino.py and test_trino.py and fix mypy test * add trino.py and test_trino.py to fix static checks * fix static checks --------- Co-authored-by: pratiksha rajendrabhai badheka --- .../src/airflow/providers/trino/CHANGELOG.rst | 9 ++ .../providers/trino/operators/__init__.py | 16 --- .../providers/trino/operators/trino.py | 80 ------------- .../src/airflow/providers/trino/provider.yaml | 5 - .../integration/trino/hooks/test_trino.py | 4 +- providers/tests/trino/operators/__init__.py | 16 --- providers/tests/trino/operators/test_trino.py | 113 ------------------ 7 files changed, 11 insertions(+), 232 deletions(-) delete mode 100644 providers/src/airflow/providers/trino/operators/__init__.py delete mode 100644 providers/src/airflow/providers/trino/operators/trino.py delete mode 100644 providers/tests/trino/operators/__init__.py delete mode 100644 providers/tests/trino/operators/test_trino.py diff --git a/providers/src/airflow/providers/trino/CHANGELOG.rst b/providers/src/airflow/providers/trino/CHANGELOG.rst index 42bf0211e1c93..c7bb3d14ef166 100644 --- a/providers/src/airflow/providers/trino/CHANGELOG.rst +++ b/providers/src/airflow/providers/trino/CHANGELOG.rst @@ -27,6 +27,15 @@ Changelog --------- +main +.... + +.. warning:: + All deprecated classes, parameters and features have been removed from the {provider_name} provider package. + The following breaking changes were introduced: + + * Remove ``TrinoOperator`` class from airflow.providers.trino.operators.trino. Please use ``airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator``` + 5.9.0 ..... diff --git a/providers/src/airflow/providers/trino/operators/__init__.py b/providers/src/airflow/providers/trino/operators/__init__.py deleted file mode 100644 index 13a83393a9124..0000000000000 --- a/providers/src/airflow/providers/trino/operators/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/providers/src/airflow/providers/trino/operators/trino.py b/providers/src/airflow/providers/trino/operators/trino.py deleted file mode 100644 index 5a3858342a523..0000000000000 --- a/providers/src/airflow/providers/trino/operators/trino.py +++ /dev/null @@ -1,80 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""This module contains the Trino operator.""" - -from __future__ import annotations - -from collections.abc import Sequence -from typing import Any, ClassVar - -from deprecated import deprecated -from trino.exceptions import TrinoQueryError - -from airflow.exceptions import AirflowProviderDeprecationWarning -from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator -from airflow.providers.trino.hooks.trino import TrinoHook - - -@deprecated( - reason="Please use `airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`.", - category=AirflowProviderDeprecationWarning, -) -class TrinoOperator(SQLExecuteQueryOperator): - """ - Executes sql code using a specific Trino query Engine. - - This class is deprecated. - - Please use :class:`airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`. - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:TrinoOperator` - - :param sql: the SQL code to be executed as a single string, or - a list of str (sql statements), or a reference to a template file. - :param trino_conn_id: id of the connection config for the target Trino - environment - :param autocommit: What to set the connection's autocommit setting to - before executing the query - :param handler: The result handler which is called with the result of each statement. - :param parameters: (optional) the parameters to render the SQL query with. - """ - - template_fields: Sequence[str] = ("sql",) - template_fields_renderers: ClassVar[dict] = {"sql": "sql"} - template_ext: Sequence[str] = (".sql",) - ui_color = "#ededed" - - def __init__(self, *, trino_conn_id: str = "trino_default", **kwargs: Any) -> None: - super().__init__(conn_id=trino_conn_id, **kwargs) - - def on_kill(self) -> None: - if self._hook is not None and isinstance(self._hook, TrinoHook): # type: ignore[attr-defined] - query_id = "'" + self._hook.query_id + "'" # type: ignore[attr-defined] - try: - self.log.info("Stopping query run with queryId - %s", self._hook.query_id) # type: ignore[attr-defined] - self._hook.run( # type: ignore[attr-defined] - sql=f"CALL system.runtime.kill_query(query_id => {query_id},message => 'Job " - f"killed by " - f"user');", - handler=list, - ) - except TrinoQueryError as e: - self.log.info(str(e)) - self.log.info("Trino query (%s) terminated", query_id) diff --git a/providers/src/airflow/providers/trino/provider.yaml b/providers/src/airflow/providers/trino/provider.yaml index d94001f6d2b2e..960ce1c6c7102 100644 --- a/providers/src/airflow/providers/trino/provider.yaml +++ b/providers/src/airflow/providers/trino/provider.yaml @@ -83,11 +83,6 @@ integrations: - /docs/apache-airflow-providers-trino/operators/trino.rst tags: [software] -operators: - - integration-name: Trino - python-modules: - - airflow.providers.trino.operators.trino - asset-uris: - schemes: [trino] handler: airflow.providers.trino.assets.trino.sanitize_uri diff --git a/providers/tests/integration/trino/hooks/test_trino.py b/providers/tests/integration/trino/hooks/test_trino.py index 023989605f0b1..1faf183117b76 100644 --- a/providers/tests/integration/trino/hooks/test_trino.py +++ b/providers/tests/integration/trino/hooks/test_trino.py @@ -21,8 +21,8 @@ import pytest +from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.providers.trino.hooks.trino import TrinoHook -from airflow.providers.trino.operators.trino import TrinoOperator @pytest.mark.integration("trino") @@ -50,7 +50,7 @@ def test_should_record_records_with_kerberos_auth(self): @mock.patch.dict("os.environ", AIRFLOW_CONN_TRINO_DEFAULT="trino://airflow@trino:8080/") def test_openlineage_methods(self): - op = TrinoOperator(task_id="trino_test", sql="SELECT name FROM tpch.sf1.customer LIMIT 3") + op = SQLExecuteQueryOperator(task_id="trino_test", sql="SELECT name FROM tpch.sf1.customer LIMIT 3") op.execute({}) lineage = op.get_openlineage_facets_on_start() assert lineage.inputs[0].namespace == "trino://trino:8080" diff --git a/providers/tests/trino/operators/__init__.py b/providers/tests/trino/operators/__init__.py deleted file mode 100644 index 13a83393a9124..0000000000000 --- a/providers/tests/trino/operators/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/providers/tests/trino/operators/test_trino.py b/providers/tests/trino/operators/test_trino.py deleted file mode 100644 index 24d933bf0d360..0000000000000 --- a/providers/tests/trino/operators/test_trino.py +++ /dev/null @@ -1,113 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from unittest import mock - -from airflow.models.connection import Connection -from airflow.providers.common.compat.openlineage.facet import ( - Dataset, - SchemaDatasetFacet, - SchemaDatasetFacetFields, - SQLJobFacet, -) -from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator -from airflow.providers.trino.hooks.trino import TrinoHook - -TRINO_CONN_ID = "test_trino" -TASK_ID = "test_trino_task" -TRINO_DEFAULT = "trino_default" - - -class TestTrinoOperator: - @mock.patch("airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator.get_db_hook") - def test_execute(self, mock_get_db_hook): - """Asserts that the run method is called when a TrinoOperator task is executed""" - - op = SQLExecuteQueryOperator( - task_id=TASK_ID, - sql="SELECT 1;", - conn_id=TRINO_CONN_ID, - handler=list, - ) - op.execute(None) - - mock_get_db_hook.return_value.run.assert_called_once_with( - sql="SELECT 1;", - autocommit=False, - handler=list, - parameters=None, - return_last=True, - ) - - -def test_execute_openlineage_events(): - DB_NAME = "tpch" - DB_SCHEMA_NAME = "sf1" - - class TrinoHookForTests(TrinoHook): - get_conn = mock.MagicMock(name="conn") - get_connection = mock.MagicMock() - - def get_first(self, *_): - return [f"{DB_NAME}.{DB_SCHEMA_NAME}"] - - dbapi_hook = TrinoHookForTests() - - sql = "SELECT name FROM tpch.sf1.customer LIMIT 3" - op = SQLExecuteQueryOperator(task_id="trino-operator", sql=sql, conn_id=TRINO_DEFAULT) - op._hook = dbapi_hook - rows = [ - (DB_SCHEMA_NAME, "customer", "custkey", 1, "bigint", DB_NAME), - (DB_SCHEMA_NAME, "customer", "name", 2, "varchar(25)", DB_NAME), - (DB_SCHEMA_NAME, "customer", "address", 3, "varchar(40)", DB_NAME), - (DB_SCHEMA_NAME, "customer", "nationkey", 4, "bigint", DB_NAME), - (DB_SCHEMA_NAME, "customer", "phone", 5, "varchar(15)", DB_NAME), - (DB_SCHEMA_NAME, "customer", "acctbal", 6, "double", DB_NAME), - ] - dbapi_hook.get_connection.return_value = Connection( - conn_id=TRINO_DEFAULT, - conn_type="trino", - host="trino", - port=8080, - ) - dbapi_hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = [rows, []] - - lineage = op.get_openlineage_facets_on_start() - assert lineage.inputs == [ - Dataset( - namespace="trino://trino:8080", - name=f"{DB_NAME}.{DB_SCHEMA_NAME}.customer", - facets={ - "schema": SchemaDatasetFacet( - fields=[ - SchemaDatasetFacetFields(name="custkey", type="bigint"), - SchemaDatasetFacetFields(name="name", type="varchar(25)"), - SchemaDatasetFacetFields(name="address", type="varchar(40)"), - SchemaDatasetFacetFields(name="nationkey", type="bigint"), - SchemaDatasetFacetFields(name="phone", type="varchar(15)"), - SchemaDatasetFacetFields(name="acctbal", type="double"), - ] - ) - }, - ) - ] - - assert len(lineage.outputs) == 0 - - assert lineage.job_facets == {"sql": SQLJobFacet(query=sql)}