diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py index 30a64d97cfc9c..222525e9279c2 100644 --- a/airflow/decorators/__init__.py +++ b/airflow/decorators/__init__.py @@ -31,6 +31,7 @@ from airflow.decorators.setup_teardown import setup_task, teardown_task from airflow.decorators.short_circuit import short_circuit_task from airflow.decorators.task_group import task_group +from airflow.decorators.empty import empty_task from airflow.models.dag import dag from airflow.providers_manager import ProvidersManager @@ -49,9 +50,10 @@ "branch_external_python_task", "short_circuit_task", "sensor_task", - "bash_task", + "empty_task", "setup", "teardown", + "bash_task", ] @@ -69,6 +71,7 @@ class TaskDecoratorCollection: bash = staticmethod(bash_task) run_if = staticmethod(run_if) skip_if = staticmethod(skip_if) + empty = staticmethod(empty_task) __call__: Any = python # Alias '@task' to '@task.python'. diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index 7dd887431c6f5..cc3971f0f562f 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -60,6 +60,7 @@ __all__ = [ "bash_task", "setup", "teardown", + "empty_task", ] _T = TypeVar("_T", bound=Task[..., Any] | _TaskDecorator[..., Any, Any]) @@ -791,6 +792,17 @@ class TaskDecoratorCollection: """ @overload def bash(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... + @overload + def empty(self, *, task_id: str, **kwargs) -> TaskDecorator: + """ + Wraps a Python callable into an EmptyOperator task. + + :param task_id: Task ID. + :param kwargs: Additional keyword arguments passed to the EmptyOperator. + """ + + @overload + def empty(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... def run_if(self, condition: AnyConditionFunc, skip_message: str | None = None) -> Callable[[_T], _T]: """ Decorate a task to run only if a condition is met. diff --git a/airflow/decorators/empty.py b/airflow/decorators/empty.py new file mode 100644 index 0000000000000..0a0c48cb87d74 --- /dev/null +++ b/airflow/decorators/empty.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.operators.empty import EmptyOperator +from airflow.decorators.base import task_decorator_factory +from typing import Callable, Any + + +class _EmptyDecoratedOperator(EmptyOperator): + """ + Wraps a Python callable and captures args/kwargs when called for execution. + + :param python_callable: A reference to an object that is callable. If provided, it will be executed + when the task runs. + :param task_id: Task ID. + :param op_args: A list of positional arguments that will be passed to the callable (templated). + :param op_kwargs: A dictionary of keyword arguments that will be passed to the callable (templated). + :param kwargs: Additional keyword arguments that will be passed to the base EmptyOperator. + + Example Usage: + @task.empty + def start_task(): + print("Starting the workflow.") + + @task.empty + def end_task(): + print("Workflow complete.") + """ + custom_operator_name: str = "@task.empty" + + def __init__( + self, + *, + python_callable: Callable | None = None, + op_args: list[Any] | None = None, + op_kwargs: dict[str, Any] | None = None, + **kwargs, + ) -> None: + # Remove arguments that are not valid for EmptyOperator + kwargs.pop("op_args", None) + kwargs.pop("op_kwargs", None) + + super().__init__(**kwargs) + self.python_callable = python_callable + + def execute(self, context: Any) -> None: + """Executes the Python callable if provided, otherwise does nothing.""" + if self.python_callable: + self.python_callable() + self.log.info("Executing a custom empty task.") + +def empty_task( + python_callable: Callable | None = None, + **kwargs, +) -> Callable: + """ + Wrap a function into a EmptyOperator. + + :param python_callable: Function to decorate. + """ + return task_decorator_factory( + python_callable=python_callable, + decorated_operator_class=_EmptyDecoratedOperator, + **kwargs, + )