diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 75586cfb2..b4597bcbb 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -64,11 +64,11 @@ DecisiveAction, UserDefinedError, WebAction, - parse_actions, ) from skyvern.webeye.actions.caching import retrieve_action_plan from skyvern.webeye.actions.handler import ActionHandler, poll_verification_code from skyvern.webeye.actions.models import AgentStepOutput, DetailedAgentStepOutput +from skyvern.webeye.actions.parse_actions import parse_actions from skyvern.webeye.actions.responses import ActionResult from skyvern.webeye.browser_factory import BrowserState from skyvern.webeye.scraper.scraper import ElementTreeFormat, ScrapedPage, scrape_website diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index db10d70ac..75d210903 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -325,6 +325,25 @@ async def get_task_actions(self, task_id: str, organization_id: str | None = Non LOG.error("UnexpectedError", exc_info=True) raise + async def get_tasks_actions(self, task_ids: list[str], organization_id: str | None = None) -> list[Action]: + try: + async with self.Session() as session: + query = ( + select(ActionModel) + .filter(ActionModel.organization_id == organization_id) + .filter(ActionModel.task_id.in_(task_ids)) + .order_by(ActionModel.created_at) + ) + actions = (await session.scalars(query)).all() + return [Action.model_validate(action) for action in actions] + + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + async def get_first_step(self, task_id: str, organization_id: str | None = None) -> Step | None: try: async with self.Session() as session: @@ -1789,6 +1808,21 @@ async def get_observer_thought( return ObserverThought.model_validate(observer_thought) return None + async def get_observer_cruise_thoughts( + self, + observer_cruise_id: str, + organization_id: str | None = None, + ) -> list[ObserverThought]: + async with self.Session() as session: + observer_thoughts = ( + await session.scalars( + select(ObserverThoughtModel) + .filter_by(observer_cruise_id=observer_cruise_id) + .filter_by(organization_id=organization_id) + ) + ).all() + return [ObserverThought.model_validate(thought) for thought in observer_thoughts] + async def create_observer_cruise( self, workflow_run_id: str | None = None, diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index 444fba97a..6ebf44f8c 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -525,6 +525,7 @@ class ObserverCruiseModel(Base): class ObserverThoughtModel(Base): __tablename__ = "observer_thoughts" + __table_args__ = (Index("observer_cruise_index", "organization_id", "observer_cruise_id"),) observer_thought_id = Column(String, primary_key=True, default=generate_observer_thought_id) organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=True) diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 6359f40bc..2055cc9c3 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -51,12 +51,14 @@ TaskResponse, TaskStatus, ) +from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRunEvent, WorkflowRunEventType from skyvern.forge.sdk.services import org_auth_service from skyvern.forge.sdk.workflow.exceptions import ( FailedToCreateWorkflow, FailedToUpdateWorkflow, WorkflowParameterMissingRequiredValue, ) +from skyvern.forge.sdk.workflow.models.block import BlockType from skyvern.forge.sdk.workflow.models.workflow import ( RunWorkflowResponse, Workflow, @@ -643,6 +645,81 @@ async def get_workflow_run( ) +@base_router.get( + "/workflows/{workflow_id}/runs/{workflow_run_id}/events", +) +@base_router.get( + "/workflows/{workflow_id}/runs/{workflow_run_id}/events/", +) +async def get_workflow_run_events( + workflow_id: str, + workflow_run_id: str, + observer_cruise_id: str | None = None, + page: int = Query(1, ge=1), + page_size: int = Query(20, ge=1), + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> list[WorkflowRunEvent]: + # get all the tasks for the workflow run + tasks = await app.DATABASE.get_tasks( + page, + page_size, + workflow_run_id=workflow_run_id, + organization_id=current_org.organization_id, + ) + workflow_run_events = [ + WorkflowRunEvent( + type=WorkflowRunEventType.block, + block=WorkflowRunBlock( + workflow_run_id=workflow_run_id, + block_type=BlockType.TASK, + label=task.title, + title=task.title, + url=task.url, + status=task.status, + navigation_goal=task.navigation_goal, + data_extraction_goal=task.data_extraction_goal, + data_schema=task.extracted_information_schema, + terminate_criterion=task.terminate_criterion, + complete_criterion=task.complete_criterion, + created_at=task.created_at, + modified_at=task.modified_at, + ), + created_at=task.created_at, + modified_at=task.modified_at, + ) + for task in tasks + ] + # get all the actions for all the tasks + actions = await app.DATABASE.get_tasks_actions( + [task.task_id for task in tasks], organization_id=current_org.organization_id + ) + for action in actions: + workflow_run_events.append( + WorkflowRunEvent( + type=WorkflowRunEventType.action, + action=action, + created_at=action.created_at or datetime.datetime.utcnow(), + modified_at=action.modified_at or datetime.datetime.utcnow(), + ) + ) + # get all the thoughts for the cruise + if observer_cruise_id: + thoughts = await app.DATABASE.get_observer_cruise_thoughts( + observer_cruise_id, organization_id=current_org.organization_id + ) + for thought in thoughts: + workflow_run_events.append( + WorkflowRunEvent( + type=WorkflowRunEventType.thought, + thought=thought, + created_at=thought.created_at, + modified_at=thought.modified_at, + ) + ) + workflow_run_events.sort(key=lambda x: x.created_at, reverse=True) + return workflow_run_events + + @base_router.get( "/workflows/runs/{workflow_run_id}", response_model=WorkflowRunStatusResponse, diff --git a/skyvern/forge/sdk/schemas/workflow_runs.py b/skyvern/forge/sdk/schemas/workflow_runs.py new file mode 100644 index 000000000..2eb67af30 --- /dev/null +++ b/skyvern/forge/sdk/schemas/workflow_runs.py @@ -0,0 +1,45 @@ +from datetime import datetime +from enum import StrEnum +from typing import Any + +from pydantic import BaseModel + +from skyvern.forge.sdk.schemas.observers import ObserverThought +from skyvern.forge.sdk.workflow.models.block import BlockType +from skyvern.webeye.actions.actions import Action + + +class WorkflowRunBlock(BaseModel): + workflow_run_block_id: str = "placeholder" + workflow_run_id: str + parent_workflow_run_block_id: str | None = None + block_type: BlockType + label: str | None = None + title: str | None = None + status: str | None = None + output: dict | list | str | None = None + continue_on_failure: bool = False + task_id: str | None = None + url: str | None = None + navigation_goal: str | None = None + data_extraction_goal: str | None = None + data_schema: dict[str, Any] | list | str | None = None + terminate_criterion: str | None = None + complete_criterion: str | None = None + created_at: datetime + modified_at: datetime + + +class WorkflowRunEventType(StrEnum): + action = "action" + thought = "thought" + block = "block" + + +class WorkflowRunEvent(BaseModel): + type: WorkflowRunEventType + action: Action | None = None + thought: ObserverThought | None = None + block: WorkflowRunBlock | None = None + created_at: datetime + modified_at: datetime diff --git a/skyvern/webeye/actions/actions.py b/skyvern/webeye/actions/actions.py index c63431c1e..a09fde9fa 100644 --- a/skyvern/webeye/actions/actions.py +++ b/skyvern/webeye/actions/actions.py @@ -1,13 +1,10 @@ +from datetime import datetime from enum import StrEnum -from typing import Annotated, Any, Dict, Type, TypeVar +from typing import Annotated, Any, Type, TypeVar import structlog from litellm import ConfigDict -from pydantic import BaseModel, Field, ValidationError - -from skyvern.exceptions import UnsupportedActionType -from skyvern.forge.sdk.schemas.tasks import Task -from skyvern.webeye.scraper.scraper import ScrapedPage +from pydantic import BaseModel, Field LOG = structlog.get_logger() T = TypeVar("T", bound="Action") @@ -119,6 +116,9 @@ class Action(BaseModel): option: SelectOption | None = None is_checked: bool | None = None + created_at: datetime | None = None + modified_at: datetime | None = None + @classmethod def validate(cls: Type[T], value: Any) -> T: if isinstance(value, dict): @@ -239,176 +239,6 @@ class CompleteAction(DecisiveAction): data_extraction_goal: str | None = None -def parse_action(action: Dict[str, Any], scraped_page: ScrapedPage, data_extraction_goal: str | None = None) -> Action: - if "id" in action: - element_id = action["id"] - elif "element_id" in action: - element_id = action["element_id"] - else: - element_id = None - - skyvern_element_hash = scraped_page.id_to_element_hash.get(element_id) if element_id else None - skyvern_element_data = scraped_page.id_to_element_dict.get(element_id) if element_id else None - - reasoning = action["reasoning"] if "reasoning" in action else None - confidence_float = action["confidence_float"] if "confidence_float" in action else None - # TODO: currently action intention and response are only used for Q&A actions, like input_text - # When we start supporting click action, intention will be the reasoning for the click action (why take the action) - intention = action["user_detail_query"] if "user_detail_query" in action else None - response = action["user_detail_answer"] if "user_detail_answer" in action else None - - base_action_dict = { - "element_id": element_id, - "skyvern_element_hash": skyvern_element_hash, - "skyvern_element_data": skyvern_element_data, - "reasoning": reasoning, - "confidence_float": confidence_float, - "intention": intention, - "response": response, - } - - if "action_type" not in action or action["action_type"] is None: - return NullAction(**base_action_dict) - - # `.upper()` handles the case where the LLM returns a lowercase action type (e.g. "click" instead of "CLICK") - action_type = ActionType[action["action_type"].upper()] - - if not action_type.is_web_action(): - # LLM sometimes hallucinates and returns element id for non-web actions such as WAIT, TERMINATE, COMPLETE etc. - # That can sometimes cause cached action plan to be invalidated. This way we're making sure the element id is not - # set for non-web actions. - base_action_dict["element_id"] = None - - if action_type == ActionType.TERMINATE: - return TerminateAction(**base_action_dict, errors=action["errors"] if "errors" in action else []) - - if action_type == ActionType.CLICK: - file_url = action["file_url"] if "file_url" in action else None - return ClickAction(**base_action_dict, file_url=file_url, download=action.get("download", False)) - - if action_type == ActionType.INPUT_TEXT: - return InputTextAction(**base_action_dict, text=action["text"]) - - if action_type == ActionType.UPLOAD_FILE: - # TODO: see if the element is a file input element. if it's not, convert this action into a click action - return UploadFileAction( - **base_action_dict, - file_url=action["file_url"], - ) - - # This action is not used in the current implementation. Click actions are used instead. - if action_type == ActionType.DOWNLOAD_FILE: - return DownloadFileAction(**base_action_dict, file_name=action["file_name"]) - - if action_type == ActionType.SELECT_OPTION: - option = action["option"] - if option is None: - raise ValueError("SelectOptionAction requires an 'option' field") - label = option.get("label") - value = option.get("value") - index = option.get("index") - if label is None and value is None and index is None: - raise ValueError("At least one of 'label', 'value', or 'index' must be provided for a SelectOption") - return SelectOptionAction( - **base_action_dict, - option=SelectOption( - label=label, - value=value, - index=index, - ), - ) - - if action_type == ActionType.CHECKBOX: - return CheckboxAction( - **base_action_dict, - is_checked=action["is_checked"], - ) - - if action_type == ActionType.WAIT: - return WaitAction(**base_action_dict) - - if action_type == ActionType.COMPLETE: - return CompleteAction( - **base_action_dict, - data_extraction_goal=data_extraction_goal, - errors=action["errors"] if "errors" in action else [], - ) - - if action_type == "null": - return NullAction(**base_action_dict) - - if action_type == ActionType.SOLVE_CAPTCHA: - return SolveCaptchaAction(**base_action_dict) - - raise UnsupportedActionType(action_type=action_type) - - -def parse_actions( - task: Task, step_id: str, step_order: int, scraped_page: ScrapedPage, json_response: list[Dict[str, Any]] -) -> list[Action]: - actions: list[Action] = [] - for idx, action in enumerate(json_response): - try: - action_instance = parse_action( - action=action, scraped_page=scraped_page, data_extraction_goal=task.data_extraction_goal - ) - action_instance.organization_id = task.organization_id - action_instance.workflow_run_id = task.workflow_run_id - action_instance.task_id = task.task_id - action_instance.step_id = step_id - action_instance.step_order = step_order - action_instance.action_order = idx - if isinstance(action_instance, TerminateAction): - LOG.warning( - "Agent decided to terminate", - task_id=task.task_id, - llm_response=json_response, - reasoning=action_instance.reasoning, - actions=actions, - ) - actions.append(action_instance) - - except UnsupportedActionType: - LOG.error( - "Unsupported action type when parsing actions", - task_id=task.task_id, - raw_action=action, - exc_info=True, - ) - except (ValidationError, ValueError): - LOG.warning( - "Invalid action", - task_id=task.task_id, - raw_action=action, - exc_info=True, - ) - except Exception: - LOG.error( - "Failed to marshal action", - task_id=task.task_id, - raw_action=action, - exc_info=True, - ) - - ############################ This part of code might not be needed ############################ - # Reason #1. validation can be done in action handler but not in parser - # Reason #2. no need to validate whether the element_id has a hash. - # If there's no hash, we can fall back to normal operation - all_element_ids = [action.element_id for action in actions if action.element_id] - missing_element_ids = [ - element_id for element_id in all_element_ids if element_id not in scraped_page.id_to_element_hash - ] - if missing_element_ids: - LOG.warning( - "Missing elements in scraped page", - task_id=task.task_id, - missing_element_ids=missing_element_ids, - all_element_ids=all_element_ids, - ) - ############################ This part of code might not be needed ############################ - return actions - - class ScrapeResult(BaseModel): """ Scraped response from a webpage, including: diff --git a/skyvern/webeye/actions/parse_actions.py b/skyvern/webeye/actions/parse_actions.py new file mode 100644 index 000000000..37c953e55 --- /dev/null +++ b/skyvern/webeye/actions/parse_actions.py @@ -0,0 +1,196 @@ +from typing import Any, Dict + +import structlog +from pydantic import ValidationError + +from skyvern.exceptions import UnsupportedActionType +from skyvern.forge.sdk.schemas.tasks import Task +from skyvern.webeye.actions.actions import ( + Action, + ActionType, + CheckboxAction, + ClickAction, + CompleteAction, + DownloadFileAction, + InputTextAction, + NullAction, + SelectOption, + SelectOptionAction, + SolveCaptchaAction, + TerminateAction, + UploadFileAction, + WaitAction, +) +from skyvern.webeye.scraper.scraper import ScrapedPage + +LOG = structlog.get_logger() + + +def parse_action(action: Dict[str, Any], scraped_page: ScrapedPage, data_extraction_goal: str | None = None) -> Action: + if "id" in action: + element_id = action["id"] + elif "element_id" in action: + element_id = action["element_id"] + else: + element_id = None + + skyvern_element_hash = scraped_page.id_to_element_hash.get(element_id) if element_id else None + skyvern_element_data = scraped_page.id_to_element_dict.get(element_id) if element_id else None + + reasoning = action["reasoning"] if "reasoning" in action else None + confidence_float = action["confidence_float"] if "confidence_float" in action else None + # TODO: currently action intention and response are only used for Q&A actions, like input_text + # When we start supporting click action, intention will be the reasoning for the click action (why take the action) + intention = action["user_detail_query"] if "user_detail_query" in action else None + response = action["user_detail_answer"] if "user_detail_answer" in action else None + + base_action_dict = { + "element_id": element_id, + "skyvern_element_hash": skyvern_element_hash, + "skyvern_element_data": skyvern_element_data, + "reasoning": reasoning, + "confidence_float": confidence_float, + "intention": intention, + "response": response, + } + + if "action_type" not in action or action["action_type"] is None: + return NullAction(**base_action_dict) + + # `.upper()` handles the case where the LLM returns a lowercase action type (e.g. "click" instead of "CLICK") + action_type = ActionType[action["action_type"].upper()] + + if not action_type.is_web_action(): + # LLM sometimes hallucinates and returns element id for non-web actions such as WAIT, TERMINATE, COMPLETE etc. + # That can sometimes cause cached action plan to be invalidated. This way we're making sure the element id is not + # set for non-web actions. + base_action_dict["element_id"] = None + + if action_type == ActionType.TERMINATE: + return TerminateAction(**base_action_dict, errors=action["errors"] if "errors" in action else []) + + if action_type == ActionType.CLICK: + file_url = action["file_url"] if "file_url" in action else None + return ClickAction(**base_action_dict, file_url=file_url, download=action.get("download", False)) + + if action_type == ActionType.INPUT_TEXT: + return InputTextAction(**base_action_dict, text=action["text"]) + + if action_type == ActionType.UPLOAD_FILE: + # TODO: see if the element is a file input element. if it's not, convert this action into a click action + return UploadFileAction( + **base_action_dict, + file_url=action["file_url"], + ) + + # This action is not used in the current implementation. Click actions are used instead. + if action_type == ActionType.DOWNLOAD_FILE: + return DownloadFileAction(**base_action_dict, file_name=action["file_name"]) + + if action_type == ActionType.SELECT_OPTION: + option = action["option"] + if option is None: + raise ValueError("SelectOptionAction requires an 'option' field") + label = option.get("label") + value = option.get("value") + index = option.get("index") + if label is None and value is None and index is None: + raise ValueError("At least one of 'label', 'value', or 'index' must be provided for a SelectOption") + return SelectOptionAction( + **base_action_dict, + option=SelectOption( + label=label, + value=value, + index=index, + ), + ) + + if action_type == ActionType.CHECKBOX: + return CheckboxAction( + **base_action_dict, + is_checked=action["is_checked"], + ) + + if action_type == ActionType.WAIT: + return WaitAction(**base_action_dict) + + if action_type == ActionType.COMPLETE: + return CompleteAction( + **base_action_dict, + data_extraction_goal=data_extraction_goal, + errors=action["errors"] if "errors" in action else [], + ) + + if action_type == "null": + return NullAction(**base_action_dict) + + if action_type == ActionType.SOLVE_CAPTCHA: + return SolveCaptchaAction(**base_action_dict) + + raise UnsupportedActionType(action_type=action_type) + + +def parse_actions( + task: Task, step_id: str, step_order: int, scraped_page: ScrapedPage, json_response: list[Dict[str, Any]] +) -> list[Action]: + actions: list[Action] = [] + for idx, action in enumerate(json_response): + try: + action_instance = parse_action( + action=action, scraped_page=scraped_page, data_extraction_goal=task.data_extraction_goal + ) + action_instance.organization_id = task.organization_id + action_instance.workflow_run_id = task.workflow_run_id + action_instance.task_id = task.task_id + action_instance.step_id = step_id + action_instance.step_order = step_order + action_instance.action_order = idx + if isinstance(action_instance, TerminateAction): + LOG.warning( + "Agent decided to terminate", + task_id=task.task_id, + llm_response=json_response, + reasoning=action_instance.reasoning, + actions=actions, + ) + actions.append(action_instance) + + except UnsupportedActionType: + LOG.error( + "Unsupported action type when parsing actions", + task_id=task.task_id, + raw_action=action, + exc_info=True, + ) + except (ValidationError, ValueError): + LOG.warning( + "Invalid action", + task_id=task.task_id, + raw_action=action, + exc_info=True, + ) + except Exception: + LOG.error( + "Failed to marshal action", + task_id=task.task_id, + raw_action=action, + exc_info=True, + ) + + ############################ This part of code might not be needed ############################ + # Reason #1. validation can be done in action handler but not in parser + # Reason #2. no need to validate whether the element_id has a hash. + # If there's no hash, we can fall back to normal operation + all_element_ids = [action.element_id for action in actions if action.element_id] + missing_element_ids = [ + element_id for element_id in all_element_ids if element_id not in scraped_page.id_to_element_hash + ] + if missing_element_ids: + LOG.warning( + "Missing elements in scraped page", + task_id=task.task_id, + missing_element_ids=missing_element_ids, + all_element_ids=all_element_ids, + ) + ############################ This part of code might not be needed ############################ + return actions