Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 253 #322

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open

Conversation

jroach-astronomer
Copy link
Member

@jroach-astronomer jroach-astronomer commented Dec 17, 2024

Pull request to address #253, which aims to provide enhanced support for callback types at the DAG, default_args, Task and TaskGroup level. There are now four different ways that callbacks can be defined via dag-factory. Below, you'll also see an example of each.

  1. By passing a string that points to a callable.
  2. By passing a file path and file name that points to a callable, similar to above.
  3. By providing a string that points to a callable, but with some parameters being provided at runtime.
  4. Using callbacks from providers (like Slack).

Passing a string that points to a callable

...
  task_groups:
    task_group_1:
      default_args:
        on_success_callback: print_hello.print_hello_from_callback
      dependencies: [task_1, task_2]
...

Passing a file path and file name that points to a callable

...
    task_2:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 2"
      on_success_callback_name: print_hello_from_callback
      on_success_callback_file: $CONFIG_ROOT_DIR/print_hello.py
      dependencies: [start]
...

By providing a string that points to a callable, but with some parameters being provided at runtime

...
  on_failure_callback:
    callback: customized.callbacks.custom_callbacks.output_message
    param1: param1
    param2: param2
...

Using callbacks from providers

...
    on_failure_callback:
      callback: airflow.providers.slack.notifications.slack.send_slack_notification
      slack_conn_id: slack_conn_id
      text: |
        :red_circle: Task Failed.
        This task has failed and needs to be addressed.
        Please remediate this issue ASAP.
      channel: "#channel"
...

Unit-tests have been added and updated appropriately.

@jroach-astronomer jroach-astronomer requested a review from a team as a code owner December 17, 2024 04:21
@codecov-commenter
Copy link

codecov-commenter commented Dec 18, 2024

Codecov Report

Attention: Patch coverage is 93.10345% with 2 lines in your changes missing coverage. Please review.

Project coverage is 93.69%. Comparing base (2045b2f) to head (060b96a).

Files with missing lines Patch % Lines
dagfactory/dagbuilder.py 93.10% 2 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main     #322   +/-   ##
=======================================
  Coverage   93.68%   93.69%           
=======================================
  Files          10       10           
  Lines         792      777   -15     
=======================================
- Hits          742      728   -14     
+ Misses         50       49    -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@jroach-astronomer
Copy link
Member Author

@pankajastro, do you mind reviewing this, open to feedback?

cc: @tatiana

@tatiana tatiana added this to the DAG Factory 0.22.0 milestone Dec 30, 2024
@pankajkoti pankajkoti self-requested a review January 2, 2025 09:50
@pankajkoti
Copy link
Contributor

pankajkoti commented Jan 6, 2025

@jroach-astronomer please let me know once you'd like me to re-review/test the pull request. Could you please also update the PR description after the changes especially with the example for on_success_callback: $CONFIG_ROOT_DIR.print_hello.print_hello_from_callback

There seem to be a couple of conflicts, would be nice if you could resolve them too. We're planning on releasing DAG Factory on 10th Jan, so would be nice if we could get this change in by then.

@jroach-astronomer
Copy link
Member Author

@pankajkoti, resolved MC's, updated PR docs.

task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
on_success_callback: customized.callbacks.custom_callbacks.output_message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently this callback is failing as I'm testing the DAG. Can we correct this callback execution by passing the missing params, please

Screenshot 2025-01-07 at 5 55 10 PM

The callbacks logs are unfortunately wrapped under Post task execution logs, so you would have to expand to check on any possible failures.

[2025-01-07, 12:22:15 UTC] {taskinstance.py:340} ▼ Post task execution logs
[2025-01-07, 12:22:15 UTC] {taskinstance.py:352} INFO - Marking task as SUCCESS. dag_id=example_callbacks, task_id=task_1, run_id=scheduled__2025-01-06T00:00:00+00:00, execution_date=20250106T000000, start_date=20250107T122215, end_date=20250107T122215
[2025-01-07, 12:22:15 UTC] {taskinstance.py:1563} INFO - Executing callback at index 0: output_message
[2025-01-07, 12:22:15 UTC] {taskinstance.py:1567} ERROR - Error in callback at index 0: output_message
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 1565, in _run_finished_callback
    callback(context)
TypeError: output_message() missing 2 required positional arguments: 'param1' and 'param2'
[2025-01-07, 12:22:15 UTC] {local_task_job_runner.py:266} INFO - Task exited with return code 0
[2025-01-07, 12:22:15 UTC] {local_task_job_runner.py:245} ▲▲▲ Log group end

Copy link
Contributor

@pankajkoti pankajkoti Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also what is the planned interface(change intended if any?)?

Do we intend to continue accepting direct callback functions e.g. for on_success_callback arg or they would mandatorily need to be provided as values as nested arg callable? Or we plan to support both approaches? Is it that we need a nested callable arg when params needs to be passed too? and otherwise we can directly pass callable functions?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants