Skip to content

Commit

Permalink
Merge pull request #1515 from astronomer/jc_fixes_dag-parsing-uses-ta…
Browse files Browse the repository at this point in the history
…sk-sdk

Fix standalone dag processor startup
  • Loading branch information
ashb authored Dec 10, 2024
2 parents 7940e39 + ef0ee70 commit b0730e6
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 23 deletions.
22 changes: 0 additions & 22 deletions airflow/cli/commands/local_commands/dag_processor_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,19 @@
from __future__ import annotations

import logging
from datetime import timedelta
from typing import Any

from airflow.cli.commands.local_commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.dag_processing.manager import (
DagFileProcessorManager,
TaskSDKBasedDagCollector,
reload_configuration_for_dag_processing,
)
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
from airflow.jobs.job import Job
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

log = logging.getLogger(__name__)


def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
"""Create DagFileProcessorProcess instance."""
processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
processor_timeout = timedelta(seconds=processor_timeout_seconds)
return DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
processor_timeout=processor_timeout,
dag_directory=args.subdir,
max_runs=args.num_runs,
dag_ids=[],
),
)


@cli_utils.action_cli
@providers_configuration_loaded
def dag_processor(args):
Expand All @@ -63,9 +43,7 @@ def dag_processor(args):
# if sql_conn.startswith("sqlite"):
# raise SystemExit("Standalone DagProcessor is not supported when using sqlite.")

# job_runner = _create_dag_processor_job_runner(args)
collector = TaskSDKBasedDagCollector(dag_directory=args.subdir, max_runs=args.num_runs, parallelism=1)
collector.run()

reload_configuration_for_dag_processing()
run_command_with_daemon_option(
Expand Down
4 changes: 3 additions & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,9 @@ def _scan_stale_dags(self):
now = time.monotonic()
elapsed_time_since_refresh = now - self._last_deactivate_stale_dags_time
if elapsed_time_since_refresh > self.parsing_cleanup_interval:
last_parsed = {fp: stat.last_finish_time for fp, stat in self._file_stats.items()}
last_parsed = {
fp: stat.last_finish_time for fp, stat in self._file_stats.items() if stat.last_finish_time
}
self.deactivate_stale_dags(
last_parsed=last_parsed,
dag_directory=self.get_dag_directory(),
Expand Down

0 comments on commit b0730e6

Please sign in to comment.