From 8b75586fb1082322e8be427771505fe66cb6d41e Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Fri, 20 Dec 2024 07:40:32 -0800 Subject: [PATCH] workflow run block db + schema transformation code (#1418) --- skyvern/forge/sdk/db/client.py | 64 ++++++++++++++++++++++ skyvern/forge/sdk/db/utils.py | 31 +++++++++++ skyvern/forge/sdk/schemas/workflow_runs.py | 1 - skyvern/forge/sdk/workflow/models/block.py | 1 + 4 files changed, 96 insertions(+), 1 deletion(-) diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 151e89f61..8f98b9e0d 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -30,6 +30,7 @@ TOTPCodeModel, WorkflowModel, WorkflowParameterModel, + WorkflowRunBlockModel, WorkflowRunModel, WorkflowRunOutputParameterModel, WorkflowRunParameterModel, @@ -48,6 +49,7 @@ convert_to_workflow, convert_to_workflow_parameter, convert_to_workflow_run, + convert_to_workflow_run_block, convert_to_workflow_run_output_parameter, convert_to_workflow_run_parameter, ) @@ -58,6 +60,8 @@ from skyvern.forge.sdk.schemas.task_generations import TaskGeneration from skyvern.forge.sdk.schemas.tasks import OrderBy, ProxyLocation, SortDirection, Task, TaskStatus from skyvern.forge.sdk.schemas.totp_codes import TOTPCode +from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock +from skyvern.forge.sdk.workflow.models.block import BlockStatus, BlockType from skyvern.forge.sdk.workflow.models.parameter import ( AWSSecretParameter, BitwardenCreditCardDataParameter, @@ -1984,3 +1988,63 @@ async def update_observer_cruise( await session.refresh(observer_cruise) return ObserverCruise.model_validate(observer_cruise) raise NotFoundError(f"ObserverCruise {observer_cruise_id} not found") + + async def create_workflow_run_block( + self, + workflow_run_id: str, + parent_workflow_run_block_id: str | None = None, + organization_id: str | None = None, + task_id: str | None = None, + label: str | None = None, + block_type: BlockType | None = None, + status: BlockStatus = BlockStatus.running, + output: dict | list | str | None = None, + continue_on_failure: bool = False, + ) -> WorkflowRunBlock: + async with self.Session() as session: + new_workflow_run_block = WorkflowRunBlockModel( + workflow_run_id=workflow_run_id, + parent_workflow_run_block_id=parent_workflow_run_block_id, + organization_id=organization_id, + task_id=task_id, + label=label, + block_type=block_type, + status=status, + output=output, + continue_on_failure=continue_on_failure, + ) + session.add(new_workflow_run_block) + await session.commit() + await session.refresh(new_workflow_run_block) + + task = None + if task_id: + task = await self.get_task(task_id, organization_id=organization_id) + return convert_to_workflow_run_block(new_workflow_run_block, task=task) + + async def update_workflow_run_block( + self, + workflow_run_block_id: str, + status: BlockStatus | None = None, + output: dict | list | str | None = None, + ) -> WorkflowRunBlock: + async with self.Session() as session: + workflow_run_block = ( + await session.scalars( + select(WorkflowRunBlockModel).filter_by(workflow_run_block_id=workflow_run_block_id) + ) + ).first() + if workflow_run_block: + if status: + workflow_run_block.status = status + if output: + workflow_run_block.output = output + await session.commit() + await session.refresh(workflow_run_block) + else: + raise NotFoundError(f"WorkflowRunBlock {workflow_run_block_id} not found") + task = None + task_id = workflow_run_block.task_id + if task_id: + task = await self.get_task(task_id, organization_id=workflow_run_block.organization_id) + return convert_to_workflow_run_block(workflow_run_block, task=task) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index e34115c7b..d406b43be 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -18,6 +18,7 @@ TaskModel, WorkflowModel, WorkflowParameterModel, + WorkflowRunBlockModel, WorkflowRunModel, WorkflowRunOutputParameterModel, WorkflowRunParameterModel, @@ -25,6 +26,8 @@ from skyvern.forge.sdk.models import Step, StepStatus from skyvern.forge.sdk.schemas.organizations import Organization, OrganizationAuthToken from skyvern.forge.sdk.schemas.tasks import ProxyLocation, Task, TaskStatus +from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock +from skyvern.forge.sdk.workflow.models.block import BlockStatus, BlockType from skyvern.forge.sdk.workflow.models.parameter import ( AWSSecretParameter, BitwardenLoginCredentialParameter, @@ -368,3 +371,31 @@ def convert_to_workflow_run_parameter( value=workflow_parameter.workflow_parameter_type.convert_value(workflow_run_parameter_model.value), created_at=workflow_run_parameter_model.created_at, ) + + +def convert_to_workflow_run_block( + workflow_run_block_model: WorkflowRunBlockModel, + task: Task | None = None, +) -> WorkflowRunBlock: + block = WorkflowRunBlock( + workflow_run_block_id=workflow_run_block_model.workflow_run_block_id, + workflow_run_id=workflow_run_block_model.workflow_run_id, + parent_workflow_run_block_id=workflow_run_block_model.parent_workflow_run_block_id, + block_type=BlockType(workflow_run_block_model.block_type), + label=workflow_run_block_model.label, + status=BlockStatus(workflow_run_block_model.status), + output=workflow_run_block_model.output, + continue_on_failure=workflow_run_block_model.continue_on_failure, + task_id=workflow_run_block_model.task_id, + created_at=workflow_run_block_model.created_at, + modified_at=workflow_run_block_model.modified_at, + ) + if task: + block.url = task.url + block.navigation_goal = task.navigation_goal + block.data_extraction_goal = task.data_extraction_goal + block.data_schema = task.extracted_information_schema + block.terminate_criterion = task.terminate_criterion + block.complete_criterion = task.complete_criterion + + return block diff --git a/skyvern/forge/sdk/schemas/workflow_runs.py b/skyvern/forge/sdk/schemas/workflow_runs.py index 2eb67af30..d291065a1 100644 --- a/skyvern/forge/sdk/schemas/workflow_runs.py +++ b/skyvern/forge/sdk/schemas/workflow_runs.py @@ -15,7 +15,6 @@ class WorkflowRunBlock(BaseModel): parent_workflow_run_block_id: str | None = None block_type: BlockType label: str | None = None - title: str | None = None status: str | None = None output: dict | list | str | None = None continue_on_failure: bool = False diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 33cfbc8db..8b4633e16 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -84,6 +84,7 @@ class BlockType(StrEnum): class BlockStatus(StrEnum): + running = "running" completed = "completed" failed = "failed" terminated = "terminated"