Skip to content

Commit

Permalink
workflow run block db + schema transformation code (#1418)
Browse files Browse the repository at this point in the history
  • Loading branch information
wintonzheng authored Dec 20, 2024
1 parent a12776e commit 8b75586
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 1 deletion.
64 changes: 64 additions & 0 deletions skyvern/forge/sdk/db/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
TOTPCodeModel,
WorkflowModel,
WorkflowParameterModel,
WorkflowRunBlockModel,
WorkflowRunModel,
WorkflowRunOutputParameterModel,
WorkflowRunParameterModel,
Expand All @@ -48,6 +49,7 @@
convert_to_workflow,
convert_to_workflow_parameter,
convert_to_workflow_run,
convert_to_workflow_run_block,
convert_to_workflow_run_output_parameter,
convert_to_workflow_run_parameter,
)
Expand All @@ -58,6 +60,8 @@
from skyvern.forge.sdk.schemas.task_generations import TaskGeneration
from skyvern.forge.sdk.schemas.tasks import OrderBy, ProxyLocation, SortDirection, Task, TaskStatus
from skyvern.forge.sdk.schemas.totp_codes import TOTPCode
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock
from skyvern.forge.sdk.workflow.models.block import BlockStatus, BlockType
from skyvern.forge.sdk.workflow.models.parameter import (
AWSSecretParameter,
BitwardenCreditCardDataParameter,
Expand Down Expand Up @@ -1984,3 +1988,63 @@ async def update_observer_cruise(
await session.refresh(observer_cruise)
return ObserverCruise.model_validate(observer_cruise)
raise NotFoundError(f"ObserverCruise {observer_cruise_id} not found")

async def create_workflow_run_block(
self,
workflow_run_id: str,
parent_workflow_run_block_id: str | None = None,
organization_id: str | None = None,
task_id: str | None = None,
label: str | None = None,
block_type: BlockType | None = None,
status: BlockStatus = BlockStatus.running,
output: dict | list | str | None = None,
continue_on_failure: bool = False,
) -> WorkflowRunBlock:
async with self.Session() as session:
new_workflow_run_block = WorkflowRunBlockModel(
workflow_run_id=workflow_run_id,
parent_workflow_run_block_id=parent_workflow_run_block_id,
organization_id=organization_id,
task_id=task_id,
label=label,
block_type=block_type,
status=status,
output=output,
continue_on_failure=continue_on_failure,
)
session.add(new_workflow_run_block)
await session.commit()
await session.refresh(new_workflow_run_block)

task = None
if task_id:
task = await self.get_task(task_id, organization_id=organization_id)
return convert_to_workflow_run_block(new_workflow_run_block, task=task)

async def update_workflow_run_block(
self,
workflow_run_block_id: str,
status: BlockStatus | None = None,
output: dict | list | str | None = None,
) -> WorkflowRunBlock:
async with self.Session() as session:
workflow_run_block = (
await session.scalars(
select(WorkflowRunBlockModel).filter_by(workflow_run_block_id=workflow_run_block_id)
)
).first()
if workflow_run_block:
if status:
workflow_run_block.status = status
if output:
workflow_run_block.output = output
await session.commit()
await session.refresh(workflow_run_block)
else:
raise NotFoundError(f"WorkflowRunBlock {workflow_run_block_id} not found")
task = None
task_id = workflow_run_block.task_id
if task_id:
task = await self.get_task(task_id, organization_id=workflow_run_block.organization_id)
return convert_to_workflow_run_block(workflow_run_block, task=task)
31 changes: 31 additions & 0 deletions skyvern/forge/sdk/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
TaskModel,
WorkflowModel,
WorkflowParameterModel,
WorkflowRunBlockModel,
WorkflowRunModel,
WorkflowRunOutputParameterModel,
WorkflowRunParameterModel,
)
from skyvern.forge.sdk.models import Step, StepStatus
from skyvern.forge.sdk.schemas.organizations import Organization, OrganizationAuthToken
from skyvern.forge.sdk.schemas.tasks import ProxyLocation, Task, TaskStatus
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock
from skyvern.forge.sdk.workflow.models.block import BlockStatus, BlockType
from skyvern.forge.sdk.workflow.models.parameter import (
AWSSecretParameter,
BitwardenLoginCredentialParameter,
Expand Down Expand Up @@ -368,3 +371,31 @@ def convert_to_workflow_run_parameter(
value=workflow_parameter.workflow_parameter_type.convert_value(workflow_run_parameter_model.value),
created_at=workflow_run_parameter_model.created_at,
)


def convert_to_workflow_run_block(
workflow_run_block_model: WorkflowRunBlockModel,
task: Task | None = None,
) -> WorkflowRunBlock:
block = WorkflowRunBlock(
workflow_run_block_id=workflow_run_block_model.workflow_run_block_id,
workflow_run_id=workflow_run_block_model.workflow_run_id,
parent_workflow_run_block_id=workflow_run_block_model.parent_workflow_run_block_id,
block_type=BlockType(workflow_run_block_model.block_type),
label=workflow_run_block_model.label,
status=BlockStatus(workflow_run_block_model.status),
output=workflow_run_block_model.output,
continue_on_failure=workflow_run_block_model.continue_on_failure,
task_id=workflow_run_block_model.task_id,
created_at=workflow_run_block_model.created_at,
modified_at=workflow_run_block_model.modified_at,
)
if task:
block.url = task.url
block.navigation_goal = task.navigation_goal
block.data_extraction_goal = task.data_extraction_goal
block.data_schema = task.extracted_information_schema
block.terminate_criterion = task.terminate_criterion
block.complete_criterion = task.complete_criterion

return block
1 change: 0 additions & 1 deletion skyvern/forge/sdk/schemas/workflow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class WorkflowRunBlock(BaseModel):
parent_workflow_run_block_id: str | None = None
block_type: BlockType
label: str | None = None
title: str | None = None
status: str | None = None
output: dict | list | str | None = None
continue_on_failure: bool = False
Expand Down
1 change: 1 addition & 0 deletions skyvern/forge/sdk/workflow/models/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class BlockType(StrEnum):


class BlockStatus(StrEnum):
running = "running"
completed = "completed"
failed = "failed"
terminated = "terminated"
Expand Down

0 comments on commit 8b75586

Please sign in to comment.