From 007cf860311db2e13fd48e3bd66c3d34954b6329 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Sat, 14 Dec 2024 09:59:37 -0800 Subject: [PATCH] send webhook when task or workflow run is canceled (#1374) --- skyvern/forge/agent.py | 4 ++-- skyvern/forge/sdk/routes/agent_protocol.py | 15 ++++++++++++- skyvern/forge/sdk/workflow/service.py | 26 ++++++++++++++-------- 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index dd0aee5f6..5c9eb7f68 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -285,7 +285,7 @@ async def execute_step( task=task, last_step=step, api_key=api_key, - need_call_webhook=False, + need_call_webhook=True, ) return step, None, None @@ -1543,7 +1543,7 @@ async def clean_up_task( async def execute_task_webhook( self, task: Task, - last_step: Step, + last_step: Step | None, api_key: str | None, ) -> None: if not api_key: diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index a5981274c..6359f40bc 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -284,6 +284,7 @@ async def get_task( async def cancel_task( task_id: str, current_org: Organization = Depends(org_auth_service.get_current_org), + x_api_key: Annotated[str | None, Header()] = None, ) -> None: analytics.capture("skyvern-oss-agent-task-get") task_obj = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id) @@ -292,7 +293,11 @@ async def cancel_task( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task not found {task_id}", ) - await app.agent.update_task(task_obj, status=TaskStatus.canceled) + task = await app.agent.update_task(task_obj, status=TaskStatus.canceled) + # get latest step + latest_step = await app.DATABASE.get_latest_step(task_id, organization_id=current_org.organization_id) + # retry the webhook + await app.agent.execute_task_webhook(task=task, last_step=latest_step, api_key=x_api_key) @base_router.post("/workflows/runs/{workflow_run_id}/cancel") @@ -300,8 +305,16 @@ async def cancel_task( async def cancel_workflow_run( workflow_run_id: str, current_org: Organization = Depends(org_auth_service.get_current_org), + x_api_key: Annotated[str | None, Header()] = None, ) -> None: + workflow_run = await app.DATABASE.get_workflow_run(workflow_run_id=workflow_run_id) + if not workflow_run: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Workflow run not found {workflow_run_id}", + ) await app.WORKFLOW_SERVICE.mark_workflow_run_as_canceled(workflow_run_id) + await app.WORKFLOW_SERVICE.execute_workflow_webhook(workflow_run, api_key=x_api_key) @base_router.post( diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index d8012d97b..8399d5406 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -233,7 +233,7 @@ async def execute_workflow( workflow=workflow, workflow_run=workflow_run, api_key=api_key, - need_call_webhook=False, + need_call_webhook=True, ) return workflow_run parameters = block.get_all_parameters(workflow_run_id) @@ -881,10 +881,18 @@ async def clean_up_workflow( if not need_call_webhook: return + await self.execute_workflow_webhook(workflow_run, api_key) + + async def execute_workflow_webhook( + self, + workflow_run: WorkflowRun, + api_key: str | None = None, + ) -> None: + workflow_id = workflow_run.workflow_id workflow_run_status_response = await self.build_workflow_run_status_response( - workflow_permanent_id=workflow.workflow_permanent_id, + workflow_permanent_id=workflow_run.workflow_permanent_id, workflow_run_id=workflow_run.workflow_run_id, - organization_id=workflow.organization_id, + organization_id=workflow_run.organization_id, ) LOG.info( "Built workflow run status response", @@ -894,7 +902,7 @@ async def clean_up_workflow( if not workflow_run.webhook_callback_url: LOG.warning( "Workflow has no webhook callback url. Not sending workflow response", - workflow_id=workflow.workflow_id, + workflow_id=workflow_id, workflow_run_id=workflow_run.workflow_run_id, ) return @@ -902,7 +910,7 @@ async def clean_up_workflow( if not api_key: LOG.warning( "Request has no api key. Not sending workflow response", - workflow_id=workflow.workflow_id, + workflow_id=workflow_id, workflow_run_id=workflow_run.workflow_run_id, ) return @@ -921,7 +929,7 @@ async def clean_up_workflow( } LOG.info( "Sending webhook run status to webhook callback url", - workflow_id=workflow.workflow_id, + workflow_id=workflow_id, workflow_run_id=workflow_run.workflow_run_id, webhook_callback_url=workflow_run.webhook_callback_url, payload=payload, @@ -934,7 +942,7 @@ async def clean_up_workflow( if resp.status_code == 200: LOG.info( "Webhook sent successfully", - workflow_id=workflow.workflow_id, + workflow_id=workflow_id, workflow_run_id=workflow_run.workflow_run_id, resp_code=resp.status_code, resp_text=resp.text, @@ -942,7 +950,7 @@ async def clean_up_workflow( else: LOG.info( "Webhook failed", - workflow_id=workflow.workflow_id, + workflow_id=workflow_id, workflow_run_id=workflow_run.workflow_run_id, webhook_data=payload, resp=resp, @@ -951,7 +959,7 @@ async def clean_up_workflow( ) except Exception as e: raise FailedToSendWebhook( - workflow_id=workflow.workflow_id, + workflow_id=workflow_id, workflow_run_id=workflow_run.workflow_run_id, ) from e