Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix standalone dag processor startup #1515

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions airflow/cli/commands/local_commands/dag_processor_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,19 @@
from __future__ import annotations

import logging
from datetime import timedelta
from typing import Any

from airflow.cli.commands.local_commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.dag_processing.manager import (
DagFileProcessorManager,
TaskSDKBasedDagCollector,
reload_configuration_for_dag_processing,
)
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
from airflow.jobs.job import Job
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

log = logging.getLogger(__name__)


def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
"""Create DagFileProcessorProcess instance."""
processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
processor_timeout = timedelta(seconds=processor_timeout_seconds)
return DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
processor_timeout=processor_timeout,
dag_directory=args.subdir,
max_runs=args.num_runs,
dag_ids=[],
),
)


@cli_utils.action_cli
@providers_configuration_loaded
def dag_processor(args):
Expand All @@ -63,9 +43,7 @@ def dag_processor(args):
# if sql_conn.startswith("sqlite"):
# raise SystemExit("Standalone DagProcessor is not supported when using sqlite.")

# job_runner = _create_dag_processor_job_runner(args)
collector = TaskSDKBasedDagCollector(dag_directory=args.subdir, max_runs=args.num_runs, parallelism=1)
collector.run()

reload_configuration_for_dag_processing()
run_command_with_daemon_option(
Expand Down
4 changes: 3 additions & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,9 @@ def _scan_stale_dags(self):
now = time.monotonic()
elapsed_time_since_refresh = now - self._last_deactivate_stale_dags_time
if elapsed_time_since_refresh > self.parsing_cleanup_interval:
last_parsed = {fp: stat.last_finish_time for fp, stat in self._file_stats.items()}
last_parsed = {
fp: stat.last_finish_time for fp, stat in self._file_stats.items() if stat.last_finish_time
}
self.deactivate_stale_dags(
last_parsed=last_parsed,
dag_directory=self.get_dag_directory(),
Expand Down