Skip to content

Commit

Permalink
fix(provider/edge): add back mising method map (apache#44468)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W authored Nov 29, 2024
1 parent 79a9c82 commit eee6919
Showing 1 changed file with 104 additions and 5 deletions.
109 changes: 104 additions & 5 deletions providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
InvalidSignatureError,
)

from airflow.api_internal.endpoints.rpc_api_endpoint import (
initialize_method_map,
)
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.edge.worker_api.datamodels import JsonRpcRequest
Expand All @@ -59,13 +56,115 @@

@cache
def _initialize_method_map() -> dict[str, Callable]:
from airflow.api.common.trigger_dag import trigger_dag
from airflow.assets.manager import AssetManager
from airflow.cli.commands.task_command import _get_ti_db_access
from airflow.dag_processing.manager import DagFileProcessorManager
from airflow.dag_processing.processor import DagFileProcessor
from airflow.jobs.job import Job, most_recent_job
from airflow.models import Trigger, Variable, XCom
from airflow.models.dag import DAG, DagModel
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
from airflow.models.renderedtifields import RenderedTaskInstanceFields
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.skipmixin import SkipMixin
from airflow.models.taskinstance import (
TaskInstance,
_add_log,
_defer_task,
_get_template_context,
_handle_failure,
_handle_reschedule,
_record_task_map_for_downstreams,
_update_rtif,
_update_ti_heartbeat,
_xcom_pull,
)
from airflow.models.xcom_arg import _get_task_map_length
from airflow.providers.edge.models.edge_job import EdgeJob
from airflow.providers.edge.models.edge_logs import EdgeLogs
from airflow.providers.edge.models.edge_worker import EdgeWorker
from airflow.sdk.definitions.asset import expand_alias_to_assets
from airflow.secrets.metastore import MetastoreBackend
from airflow.sensors.base import _orig_start_date
from airflow.utils.cli_action_loggers import _default_action_log_internal
from airflow.utils.log.file_task_handler import FileTaskHandler

internal_api_functions = initialize_method_map().values()
functions: list[Callable] = [
*internal_api_functions,
_default_action_log_internal,
_defer_task,
_get_template_context,
_get_ti_db_access,
_get_task_map_length,
_update_rtif,
_update_ti_heartbeat,
_orig_start_date,
_handle_failure,
_handle_reschedule,
_add_log,
_xcom_pull,
_record_task_map_for_downstreams,
trigger_dag,
DagModel.deactivate_deleted_dags,
DagModel.get_paused_dag_ids,
DagModel.get_current,
DagFileProcessor._execute_task_callbacks,
DagFileProcessor.execute_callbacks,
DagFileProcessor.execute_callbacks_without_dag,
DagFileProcessor.save_dag_to_db,
DagFileProcessor.update_import_errors,
DagFileProcessor._validate_task_pools_and_update_dag_warnings,
DagFileProcessorManager._fetch_callbacks,
DagFileProcessorManager._get_priority_filelocs,
DagFileProcessorManager.clear_nonexistent_import_errors,
DagFileProcessorManager.deactivate_stale_dags,
DagWarning.purge_inactive_dag_warnings,
expand_alias_to_assets,
AssetManager.register_asset_change,
FileTaskHandler._render_filename_db_access,
Job._add_to_db,
Job._fetch_from_db,
Job._kill,
Job._update_heartbeat,
Job._update_in_db,
most_recent_job,
MetastoreBackend._fetch_connection,
MetastoreBackend._fetch_variable,
XCom.get_value,
XCom.get_one,
# XCom.get_many, # Not supported because it returns query
XCom.clear,
XCom.set,
Variable._set,
Variable._update,
Variable._delete,
DAG.fetch_callback,
DAG.fetch_dagrun,
DagRun.fetch_task_instances,
DagRun.get_previous_dagrun,
DagRun.get_previous_scheduled_dagrun,
DagRun.get_task_instances,
DagRun.fetch_task_instance,
DagRun._get_log_template,
RenderedTaskInstanceFields._update_runtime_evaluated_template_fields,
SerializedDagModel.get_serialized_dag,
SkipMixin._skip,
SkipMixin._skip_all_except,
TaskInstance._check_and_change_state_before_execution,
TaskInstance.get_task_instance,
TaskInstance._get_dagrun,
TaskInstance._set_state,
TaskInstance.save_to_db,
TaskInstance._clear_xcom_data,
TaskInstance._register_asset_changes_int,
Trigger.from_object,
Trigger.bulk_fetch,
Trigger.clean_unused,
Trigger.submit_event,
Trigger.submit_failure,
Trigger.ids_for_triggerer,
Trigger.assign_unassigned,
# Additional things from EdgeExecutor
EdgeJob.reserve_task,
EdgeJob.set_state,
Expand Down

0 comments on commit eee6919

Please sign in to comment.