diff --git a/skyvern-frontend/src/api/types.ts b/skyvern-frontend/src/api/types.ts index d9c00a614..3eda171d8 100644 --- a/skyvern-frontend/src/api/types.ts +++ b/skyvern-frontend/src/api/types.ts @@ -10,6 +10,8 @@ export const ArtifactType = { LLMPrompt: "llm_prompt", LLMRequest: "llm_request", HTMLScrape: "html_scrape", + SkyvernLog: "skyvern_log", + SkyvernLogRaw: "skyvern_log_raw", } as const; export type ArtifactType = (typeof ArtifactType)[keyof typeof ArtifactType]; diff --git a/skyvern-frontend/src/routes/tasks/detail/ScrollableActionList.tsx b/skyvern-frontend/src/routes/tasks/detail/ScrollableActionList.tsx index dc66828e2..e4bd3f83a 100644 --- a/skyvern-frontend/src/routes/tasks/detail/ScrollableActionList.tsx +++ b/skyvern-frontend/src/routes/tasks/detail/ScrollableActionList.tsx @@ -11,7 +11,6 @@ import { } from "@radix-ui/react-icons"; import { useQueryClient } from "@tanstack/react-query"; import { ReactNode, useRef } from "react"; -import { useParams } from "react-router-dom"; import { ActionTypePill } from "./ActionTypePill"; type Props = { @@ -33,7 +32,6 @@ function ScrollableActionList({ showStreamOption, taskDetails, }: Props) { - const { taskId } = useParams(); const queryClient = useQueryClient(); const credentialGetter = useCredentialGetter(); const refs = useRef>( @@ -65,11 +63,11 @@ function ScrollableActionList({ onClick={() => onActiveIndexChange(i)} onMouseEnter={() => { queryClient.prefetchQuery({ - queryKey: ["task", taskId, "steps", action.stepId, "artifacts"], + queryKey: ["step", action.stepId, "artifacts"], queryFn: async () => { const client = await getClient(credentialGetter); return client - .get(`/tasks/${taskId}/steps/${action.stepId}/artifacts`) + .get(`/step/${action.stepId}/artifacts`) .then((response) => response.data); }, }); diff --git a/skyvern-frontend/src/routes/tasks/detail/StepArtifacts.tsx b/skyvern-frontend/src/routes/tasks/detail/StepArtifacts.tsx index 815154013..b09be32ba 100644 --- a/skyvern-frontend/src/routes/tasks/detail/StepArtifacts.tsx +++ b/skyvern-frontend/src/routes/tasks/detail/StepArtifacts.tsx @@ -7,7 +7,7 @@ import { import { StatusBadge } from "@/components/StatusBadge"; import { Label } from "@/components/ui/label"; import { useQuery } from "@tanstack/react-query"; -import { useParams, useSearchParams } from "react-router-dom"; +import { useSearchParams } from "react-router-dom"; import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"; import { ZoomableImage } from "@/components/ZoomableImage"; import { Skeleton } from "@/components/ui/skeleton"; @@ -25,7 +25,6 @@ type Props = { function StepArtifacts({ id, stepProps }: Props) { const [searchParams, setSearchParams] = useSearchParams(); const artifact = searchParams.get("artifact") ?? "info"; - const { taskId } = useParams(); const credentialGetter = useCredentialGetter(); const { data: artifacts, @@ -33,11 +32,11 @@ function StepArtifacts({ id, stepProps }: Props) { isError, error, } = useQuery>({ - queryKey: ["task", taskId, "steps", id, "artifacts"], + queryKey: ["step", id, "artifacts"], queryFn: async () => { const client = await getClient(credentialGetter); return client - .get(`/tasks/${taskId}/steps/${id}/artifacts`) + .get(`/step/${id}/artifacts`) .then((response) => response.data); }, }); @@ -79,6 +78,10 @@ function StepArtifacts({ id, stepProps }: Props) { (artifact) => artifact.artifact_type === ArtifactType.HTMLScrape, ); + const skyvernLog = artifacts?.filter( + (artifact) => artifact.artifact_type === ArtifactType.SkyvernLog, + ); + return ( Action List HTML (Raw) LLM Request (Raw) + Skyvern Log
@@ -209,6 +213,9 @@ function StepArtifacts({ id, stepProps }: Props) { {llmRequest ? : null} + + {skyvernLog ? : null} + ); } diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 75586cfb2..707c060cd 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -50,6 +50,7 @@ from skyvern.forge.sdk.core.security import generate_skyvern_signature from skyvern.forge.sdk.core.validators import prepend_scheme_and_validate_url from skyvern.forge.sdk.db.enums import TaskType +from skyvern.forge.sdk.log_artifacts import save_step_logs, save_task_logs from skyvern.forge.sdk.models import Step, StepStatus from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, TaskStatus @@ -1783,6 +1784,9 @@ async def update_step( step_id=step.step_id, diff=update_comparison, ) + + await save_step_logs(step.step_id) + return await app.DATABASE.update_step( task_id=step.task_id, step_id=step.step_id, @@ -1815,6 +1819,7 @@ async def update_task( for key, value in updates.items() if getattr(task, key) != value } + await save_task_logs(task.task_id) LOG.info("Updating task in db", task_id=task.task_id, diff=update_comparison) return await app.DATABASE.update_task( task.task_id, diff --git a/skyvern/forge/sdk/artifact/manager.py b/skyvern/forge/sdk/artifact/manager.py index bbe2af54c..68ed10e56 100644 --- a/skyvern/forge/sdk/artifact/manager.py +++ b/skyvern/forge/sdk/artifact/manager.py @@ -1,12 +1,11 @@ import asyncio import time from collections import defaultdict -from typing import Literal import structlog from skyvern.forge import app -from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType +from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityType from skyvern.forge.sdk.db.id import generate_artifact_id from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverThought @@ -82,6 +81,35 @@ async def create_artifact( path=path, ) + async def create_log_artifact( + self, + log_entity_type: LogEntityType, + log_entity_id: str, + artifact_type: ArtifactType, + step_id: str | None = None, + task_id: str | None = None, + workflow_run_id: str | None = None, + workflow_run_block_id: str | None = None, + organization_id: str | None = None, + data: bytes | None = None, + path: str | None = None, + ) -> str: + artifact_id = generate_artifact_id() + uri = app.STORAGE.build_log_uri(log_entity_type, log_entity_id, artifact_type) + return await self._create_artifact( + aio_task_primary_key=log_entity_id, + artifact_id=artifact_id, + artifact_type=artifact_type, + uri=uri, + step_id=step_id, + task_id=task_id, + workflow_run_id=workflow_run_id, + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + data=data, + path=path, + ) + async def create_observer_thought_artifact( self, observer_thought: ObserverThought, @@ -174,7 +202,7 @@ async def update_artifact_data( artifact_id: str | None, organization_id: str | None, data: bytes, - primary_key: Literal["task_id", "observer_thought_id", "observer_cruise_id"] = "task_id", + primary_key: str = "task_id", ) -> None: if not artifact_id or not organization_id: return None @@ -183,18 +211,10 @@ async def update_artifact_data( return # Fire and forget aio_task = asyncio.create_task(app.STORAGE.store_artifact(artifact, data)) - if primary_key == "task_id": - if not artifact.task_id: - raise ValueError("Task ID is required to update artifact data.") - self.upload_aiotasks_map[artifact.task_id].append(aio_task) - elif primary_key == "observer_thought_id": - if not artifact.observer_thought_id: - raise ValueError("Observer Thought ID is required to update artifact data.") - self.upload_aiotasks_map[artifact.observer_thought_id].append(aio_task) - elif primary_key == "observer_cruise_id": - if not artifact.observer_cruise_id: - raise ValueError("Observer Cruise ID is required to update artifact data.") - self.upload_aiotasks_map[artifact.observer_cruise_id].append(aio_task) + + if not artifact[primary_key]: + raise ValueError(f"{primary_key} is required to update artifact data.") + self.upload_aiotasks_map[artifact[primary_key]].append(aio_task) async def retrieve_artifact(self, artifact: Artifact) -> bytes | None: return await app.STORAGE.retrieve_artifact(artifact) diff --git a/skyvern/forge/sdk/artifact/models.py b/skyvern/forge/sdk/artifact/models.py index 4900c6432..307c902bf 100644 --- a/skyvern/forge/sdk/artifact/models.py +++ b/skyvern/forge/sdk/artifact/models.py @@ -2,6 +2,7 @@ from datetime import datetime from enum import StrEnum +from typing import Any from pydantic import BaseModel, Field, field_serializer @@ -10,6 +11,9 @@ class ArtifactType(StrEnum): RECORDING = "recording" BROWSER_CONSOLE_LOG = "browser_console_log" + SKYVERN_LOG = "skyvern_log" + SKYVERN_LOG_RAW = "skyvern_log_raw" + # DEPRECATED. pls use SCREENSHOT_LLM, SCREENSHOT_ACTION or SCREENSHOT_FINAL SCREENSHOT = "screenshot" @@ -70,3 +74,13 @@ def serialize_datetime_to_isoformat(self, value: datetime) -> str: observer_thought_id: str | None = None signed_url: str | None = None organization_id: str | None = None + + def __getitem__(self, key: str) -> Any: + return getattr(self, key) + + +class LogEntityType(StrEnum): + STEP = "step" + TASK = "task" + WORKFLOW_RUN = "workflow_run" + WORKFLOW_RUN_BLOCK = "workflow_run_block" diff --git a/skyvern/forge/sdk/artifact/storage/base.py b/skyvern/forge/sdk/artifact/storage/base.py index ac92b7bc9..1de906109 100644 --- a/skyvern/forge/sdk/artifact/storage/base.py +++ b/skyvern/forge/sdk/artifact/storage/base.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod -from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType +from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityType from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverThought @@ -11,6 +11,8 @@ ArtifactType.SCREENSHOT_LLM: "png", ArtifactType.SCREENSHOT_ACTION: "png", ArtifactType.SCREENSHOT_FINAL: "png", + ArtifactType.SKYVERN_LOG: "log", + ArtifactType.SKYVERN_LOG_RAW: "json", ArtifactType.LLM_PROMPT: "txt", ArtifactType.LLM_REQUEST: "json", ArtifactType.LLM_RESPONSE: "json", @@ -34,6 +36,10 @@ class BaseStorage(ABC): def build_uri(self, artifact_id: str, step: Step, artifact_type: ArtifactType) -> str: pass + @abstractmethod + def build_log_uri(self, log_entity_type: LogEntityType, log_entity_id: str, artifact_type: ArtifactType) -> str: + pass + @abstractmethod def build_observer_thought_uri( self, artifact_id: str, observer_thought: ObserverThought, artifact_type: ArtifactType diff --git a/skyvern/forge/sdk/artifact/storage/local.py b/skyvern/forge/sdk/artifact/storage/local.py index 1c73465be..506ed57dd 100644 --- a/skyvern/forge/sdk/artifact/storage/local.py +++ b/skyvern/forge/sdk/artifact/storage/local.py @@ -8,7 +8,7 @@ from skyvern.config import settings from skyvern.forge.sdk.api.files import get_download_dir, get_skyvern_temp_dir -from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType +from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityType from skyvern.forge.sdk.artifact.storage.base import FILE_EXTENTSION_MAP, BaseStorage from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverThought @@ -24,6 +24,10 @@ def build_uri(self, artifact_id: str, step: Step, artifact_type: ArtifactType) - file_ext = FILE_EXTENTSION_MAP[artifact_type] return f"file://{self.artifact_path}/{step.task_id}/{step.order:02d}_{step.retry_index}_{step.step_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" + def build_log_uri(self, log_entity_type: LogEntityType, log_entity_id: str, artifact_type: ArtifactType) -> str: + file_ext = FILE_EXTENTSION_MAP[artifact_type] + return f"file://{self.artifact_path}/logs/{log_entity_type}/{log_entity_id}/{datetime.utcnow().isoformat()}_{artifact_type}.{file_ext}" + def build_observer_thought_uri( self, artifact_id: str, observer_thought: ObserverThought, artifact_type: ArtifactType ) -> str: diff --git a/skyvern/forge/sdk/artifact/storage/s3.py b/skyvern/forge/sdk/artifact/storage/s3.py index baf08de79..59dded160 100644 --- a/skyvern/forge/sdk/artifact/storage/s3.py +++ b/skyvern/forge/sdk/artifact/storage/s3.py @@ -12,7 +12,7 @@ make_temp_directory, unzip_files, ) -from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType +from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityType from skyvern.forge.sdk.artifact.storage.base import FILE_EXTENTSION_MAP, BaseStorage from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverThought @@ -27,6 +27,10 @@ def build_uri(self, artifact_id: str, step: Step, artifact_type: ArtifactType) - file_ext = FILE_EXTENTSION_MAP[artifact_type] return f"s3://{self.bucket}/{settings.ENV}/{step.task_id}/{step.order:02d}_{step.retry_index}_{step.step_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" + def build_log_uri(self, log_entity_type: LogEntityType, log_entity_id: str, artifact_type: ArtifactType) -> str: + file_ext = FILE_EXTENTSION_MAP[artifact_type] + return f"s3://{self.bucket}/{settings.ENV}/logs/{log_entity_type}/{log_entity_id}/{datetime.utcnow().isoformat()}_{artifact_type}.{file_ext}" + def build_observer_thought_uri( self, artifact_id: str, observer_thought: ObserverThought, artifact_type: ArtifactType ) -> str: diff --git a/skyvern/forge/sdk/core/skyvern_context.py b/skyvern/forge/sdk/core/skyvern_context.py index 2a6a29be2..8497e0797 100644 --- a/skyvern/forge/sdk/core/skyvern_context.py +++ b/skyvern/forge/sdk/core/skyvern_context.py @@ -13,6 +13,7 @@ class SkyvernContext: max_steps_override: int | None = None tz_info: ZoneInfo | None = None totp_codes: dict[str, str | None] = field(default_factory=dict) + log: list[dict] = field(default_factory=list) def __repr__(self) -> str: return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, max_steps_override={self.max_steps_override})" diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index db10d70ac..b8c2c5a6e 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -51,6 +51,7 @@ convert_to_workflow_run_output_parameter, convert_to_workflow_run_parameter, ) +from skyvern.forge.sdk.log_artifacts import save_workflow_run_logs from skyvern.forge.sdk.models import Step, StepStatus from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverCruiseStatus, ObserverThought from skyvern.forge.sdk.schemas.organizations import Organization, OrganizationAuthToken @@ -808,6 +809,73 @@ async def get_artifact_by_id( LOG.exception("UnexpectedError") raise + async def get_artifacts_by_entity_id( + self, + artifact_type: ArtifactType | None = None, + task_id: str | None = None, + step_id: str | None = None, + workflow_run_id: str | None = None, + workflow_run_block_id: str | None = None, + observer_thought_id: str | None = None, + observer_cruise_id: str | None = None, + organization_id: str | None = None, + ) -> list[Artifact]: + try: + async with self.Session() as session: + query = select(ArtifactModel) + + if artifact_type is not None: + query = query.filter_by(artifact_type=artifact_type) + if task_id is not None: + query = query.filter_by(task_id=task_id) + if step_id is not None: + query = query.filter_by(step_id=step_id) + if workflow_run_id is not None: + query = query.filter_by(workflow_run_id=workflow_run_id) + if workflow_run_block_id is not None: + query = query.filter_by(workflow_run_block_id=workflow_run_block_id) + if observer_thought_id is not None: + query = query.filter_by(observer_thought_id=observer_thought_id) + if observer_cruise_id is not None: + query = query.filter_by(observer_cruise_id=observer_cruise_id) + if organization_id is not None: + query = query.filter_by(organization_id=organization_id) + + query = query.order_by(ArtifactModel.created_at.desc()) + if artifacts := (await session.scalars(query)).all(): + return [convert_to_artifact(artifact, self.debug_enabled) for artifact in artifacts] + else: + return [] + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + + async def get_artifact_by_entity_id( + self, + artifact_type: ArtifactType, + task_id: str | None = None, + step_id: str | None = None, + workflow_run_id: str | None = None, + workflow_run_block_id: str | None = None, + observer_thought_id: str | None = None, + observer_cruise_id: str | None = None, + organization_id: str | None = None, + ) -> Artifact | None: + artifacts = await self.get_artifacts_by_entity_id( + artifact_type=artifact_type, + task_id=task_id, + step_id=step_id, + workflow_run_id=workflow_run_id, + workflow_run_block_id=workflow_run_block_id, + observer_thought_id=observer_thought_id, + observer_cruise_id=observer_cruise_id, + organization_id=organization_id, + ) + return artifacts[0] if artifacts else None + async def get_artifact( self, task_id: str, @@ -1183,6 +1251,7 @@ async def update_workflow_run( workflow_run.failure_reason = failure_reason await session.commit() await session.refresh(workflow_run) + await save_workflow_run_logs(workflow_run_id) return convert_to_workflow_run(workflow_run) LOG.error( "WorkflowRun not found, nothing to update", diff --git a/skyvern/forge/sdk/forge_log.py b/skyvern/forge/sdk/forge_log.py index 612780900..a6862cc6c 100644 --- a/skyvern/forge/sdk/forge_log.py +++ b/skyvern/forge/sdk/forge_log.py @@ -54,6 +54,21 @@ def add_kv_pairs_to_msg(logger: logging.Logger, method_name: str, event_dict: Ev return event_dict +def skyvern_logs_processor(logger: logging.Logger, method_name: str, event_dict: EventDict) -> EventDict: + """ + A custom processor to add skyvern logs to the context + """ + if method_name not in ["info", "warning", "error", "critical", "exception"]: + return event_dict + + context = skyvern_context.current() + if context: + log_entry = dict(event_dict) + context.log.append(log_entry) + + return event_dict + + def setup_logger() -> None: """ Setup the logger with the specified format @@ -88,7 +103,7 @@ def setup_logger() -> None: structlog.processors.format_exc_info, ] + additional_processors - + [renderer], + + [skyvern_logs_processor, renderer], ) uvicorn_error = logging.getLogger("uvicorn.error") uvicorn_error.disabled = True diff --git a/skyvern/forge/sdk/log_artifacts.py b/skyvern/forge/sdk/log_artifacts.py new file mode 100644 index 000000000..1d555abdc --- /dev/null +++ b/skyvern/forge/sdk/log_artifacts.py @@ -0,0 +1,175 @@ +import json + +import structlog + +from skyvern.forge import app +from skyvern.forge.sdk.artifact.models import ArtifactType, LogEntityType +from skyvern.forge.sdk.core import skyvern_context +from skyvern.forge.skyvern_json_encoder import SkyvernJSONLogEncoder +from skyvern.forge.skyvern_log_encoder import SkyvernLogEncoder + +LOG = structlog.get_logger() + + +def primary_key_from_log_entity_type(log_entity_type: LogEntityType) -> str: + if log_entity_type == LogEntityType.STEP: + return "step_id" + elif log_entity_type == LogEntityType.TASK: + return "task_id" + elif log_entity_type == LogEntityType.WORKFLOW_RUN: + return "workflow_run_id" + elif log_entity_type == LogEntityType.WORKFLOW_RUN_BLOCK: + return "workflow_run_block_id" + else: + raise ValueError(f"Invalid log entity type: {log_entity_type}") + + +async def save_step_logs(step_id: str) -> None: + context = skyvern_context.ensure_context() + log = context.log + organization_id = context.organization_id + + current_step_log = [entry for entry in log if entry.get("step_id", "") == step_id] + + await _save_log_artifacts( + log=current_step_log, + log_entity_type=LogEntityType.STEP, + log_entity_id=step_id, + organization_id=organization_id, + step_id=step_id, + ) + + +async def save_task_logs(task_id: str) -> None: + context = skyvern_context.ensure_context() + log = context.log + organization_id = context.organization_id + + current_task_log = [entry for entry in log if entry.get("task_id", "") == task_id] + + await _save_log_artifacts( + log=current_task_log, + log_entity_type=LogEntityType.TASK, + log_entity_id=task_id, + organization_id=organization_id, + task_id=task_id, + ) + + +async def save_workflow_run_logs(workflow_run_id: str) -> None: + context = skyvern_context.ensure_context() + log = context.log + organization_id = context.organization_id + + current_workflow_run_log = [entry for entry in log if entry.get("workflow_run_id", "") == workflow_run_id] + + await _save_log_artifacts( + log=current_workflow_run_log, + log_entity_type=LogEntityType.WORKFLOW_RUN, + log_entity_id=workflow_run_id, + organization_id=organization_id, + workflow_run_id=workflow_run_id, + ) + + +async def save_workflow_run_block_logs(workflow_run_block_id: str) -> None: + context = skyvern_context.ensure_context() + log = context.log + organization_id = context.organization_id + current_workflow_run_block_log = [ + entry for entry in log if entry.get("workflow_run_block_id", "") == workflow_run_block_id + ] + + await _save_log_artifacts( + log=current_workflow_run_block_log, + log_entity_type=LogEntityType.WORKFLOW_RUN_BLOCK, + log_entity_id=workflow_run_block_id, + organization_id=organization_id, + workflow_run_block_id=workflow_run_block_id, + ) + + +async def _save_log_artifacts( + log: list[dict], + log_entity_type: LogEntityType, + log_entity_id: str, + organization_id: str | None, + step_id: str | None = None, + task_id: str | None = None, + workflow_run_id: str | None = None, + workflow_run_block_id: str | None = None, +) -> None: + try: + log_json = json.dumps(log, cls=SkyvernJSONLogEncoder, indent=2) + + log_artifact = await app.DATABASE.get_artifact_by_entity_id( + artifact_type=ArtifactType.SKYVERN_LOG_RAW, + step_id=step_id, + task_id=task_id, + workflow_run_id=workflow_run_id, + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) + + if log_artifact: + await app.ARTIFACT_MANAGER.update_artifact_data( + artifact_id=log_artifact.artifact_id, + organization_id=organization_id, + data=log_json.encode(), + primary_key=primary_key_from_log_entity_type(log_entity_type), + ) + else: + await app.ARTIFACT_MANAGER.create_log_artifact( + organization_id=organization_id, + step_id=step_id, + task_id=task_id, + workflow_run_id=workflow_run_id, + workflow_run_block_id=workflow_run_block_id, + log_entity_type=log_entity_type, + log_entity_id=log_entity_id, + artifact_type=ArtifactType.SKYVERN_LOG_RAW, + data=log_json.encode(), + ) + + formatted_log = SkyvernLogEncoder.encode(log) + + formatted_log_artifact = await app.DATABASE.get_artifact_by_entity_id( + artifact_type=ArtifactType.SKYVERN_LOG, + step_id=step_id, + task_id=task_id, + workflow_run_id=workflow_run_id, + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) + + if formatted_log_artifact: + await app.ARTIFACT_MANAGER.update_artifact_data( + artifact_id=formatted_log_artifact.artifact_id, + organization_id=organization_id, + data=formatted_log.encode(), + primary_key=primary_key_from_log_entity_type(log_entity_type), + ) + else: + await app.ARTIFACT_MANAGER.create_log_artifact( + organization_id=organization_id, + step_id=step_id, + task_id=task_id, + workflow_run_id=workflow_run_id, + workflow_run_block_id=workflow_run_block_id, + log_entity_type=log_entity_type, + log_entity_id=log_entity_id, + artifact_type=ArtifactType.SKYVERN_LOG, + data=formatted_log.encode(), + ) + except Exception: + LOG.error( + "Failed to save log artifacts", + log_entity_type=log_entity_type, + log_entity_id=log_entity_id, + organization_id=organization_id, + step_id=step_id, + task_id=task_id, + workflow_run_id=workflow_run_id, + workflow_run_block_id=workflow_run_block_id, + exc_info=True, + ) diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 6359f40bc..e5ea80577 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -1,6 +1,7 @@ import datetime import hashlib import uuid +from enum import Enum from typing import Annotated, Any import structlog @@ -467,6 +468,82 @@ async def get_agent_task_steps( return ORJSONResponse([step.model_dump(exclude_none=True) for step in steps]) +class EntityType(str, Enum): + STEP = "step" + TASK = "task" + WORKFLOW_RUN = "workflow_run" + WORKFLOW_RUN_BLOCK = "workflow_run_block" + + +entity_type_to_param = { + EntityType.STEP: "step_id", + EntityType.TASK: "task_id", + EntityType.WORKFLOW_RUN: "workflow_run_id", + EntityType.WORKFLOW_RUN_BLOCK: "workflow_run_block_id", +} + + +@base_router.get( + "/{entity_type}/{entity_id}/artifacts", + tags=["agent"], + response_model=list[Artifact], +) +@base_router.get( + "/{entity_type}/{entity_id}/artifacts/", + tags=["agent"], + response_model=list[Artifact], + include_in_schema=False, +) +async def get_agent_entity_artifacts( + entity_type: EntityType, + entity_id: str, + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> Response: + """ + Get all artifacts for an entity (step, task, workflow_run). + + Args: + entity_type: Type of entity to fetch artifacts for + entity_id: ID of the entity + current_org: Current organization from auth + + Returns: + List of artifacts for the entity + + Raises: + HTTPException: If entity is not supported + """ + + if entity_type not in entity_type_to_param: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid entity_type: {entity_type}", + ) + + analytics.capture("skyvern-oss-agent-entity-artifacts-get") + + params = { + "organization_id": current_org.organization_id, + entity_type_to_param[entity_type]: entity_id, + } + + artifacts = await app.DATABASE.get_artifacts_by_entity_id(**params) # type: ignore + + if settings.ENV != "local" or settings.GENERATE_PRESIGNED_URLS: + signed_urls = await app.ARTIFACT_MANAGER.get_share_links(artifacts) + if signed_urls: + for i, artifact in enumerate(artifacts): + artifact.signed_url = signed_urls[i] + else: + LOG.warning( + "Failed to get signed urls for artifacts", + entity_type=entity_type, + entity_id=entity_id, + ) + + return ORJSONResponse([artifact.model_dump() for artifact in artifacts]) + + @base_router.get( "/tasks/{task_id}/steps/{step_id}/artifacts", tags=["agent"], diff --git a/skyvern/forge/skyvern_json_encoder.py b/skyvern/forge/skyvern_json_encoder.py new file mode 100644 index 000000000..206334acb --- /dev/null +++ b/skyvern/forge/skyvern_json_encoder.py @@ -0,0 +1,53 @@ +import json +from typing import Any + + +class SkyvernJSONLogEncoder(json.JSONEncoder): + """Custom JSON encoder for Skyvern logs that handles non-serializable objects""" + + def default(self, obj: Any) -> Any: + if hasattr(obj, "model_dump"): + return self._encode_value(obj.model_dump()) + + if hasattr(obj, "__dataclass_fields__"): + return self._encode_value({k: getattr(obj, k) for k in obj.__dataclass_fields__}) + + if hasattr(obj, "to_dict"): + return self._encode_value(obj.to_dict()) + + if hasattr(obj, "asdict"): + return self._encode_value(obj.asdict()) + + if hasattr(obj, "__dict__"): + return { + "type": obj.__class__.__name__, + "attributes": { + k: self._encode_value(v) + for k, v in obj.__dict__.items() + if not k.startswith("_") and not callable(v) + }, + } + + try: + return str(obj) + except Exception: + return f"" + + def _encode_value(self, value: Any) -> Any: + """Helper method to encode nested values recursively""" + if isinstance(value, (str, int, float, bool, type(None))): + return value + + if isinstance(value, (list, tuple)): + return [self._encode_value(item) for item in value] + + if isinstance(value, dict): + return {self._encode_value(k): self._encode_value(v) for k, v in value.items()} + + # For any other type, try to encode it using our custom logic + return self.default(value) + + @classmethod + def dumps(cls, obj: Any, **kwargs: Any) -> str: + """Helper method to properly encode objects to JSON string""" + return json.dumps(obj, cls=cls, **kwargs) diff --git a/skyvern/forge/skyvern_log_encoder.py b/skyvern/forge/skyvern_log_encoder.py new file mode 100644 index 000000000..9b8dd23e9 --- /dev/null +++ b/skyvern/forge/skyvern_log_encoder.py @@ -0,0 +1,83 @@ +import json +from datetime import datetime +from typing import Any + +import structlog +from structlog.dev import ConsoleRenderer + +from skyvern.forge.skyvern_json_encoder import SkyvernJSONLogEncoder + +LOG = structlog.get_logger() + + +class SkyvernLogEncoder: + """Encodes Skyvern logs from JSON format to human-readable string format""" + + def __init__(self) -> None: + self.renderer = ConsoleRenderer( + pad_event=30, + colors=False, + ) + + @classmethod + def _format_value(cls, value: Any) -> str: + return SkyvernJSONLogEncoder.dumps(value, sort_keys=True) + + @staticmethod + def _parse_json_entry(entry: dict[str, Any]) -> dict[str, Any]: + """Convert a JSON log entry into our standard format.""" + event = entry.get("message", entry.get("event", "")) + + clean_entry = { + "timestamp": entry.get("timestamp", datetime.utcnow().isoformat() + "Z"), + "level": entry.get("level", "info").lower(), + "event": event, + } + + for key, value in entry.items(): + if key not in ("timestamp", "level", "event", "message"): + clean_entry[key] = SkyvernLogEncoder._format_value(value) + + return clean_entry + + @classmethod + def encode(cls, log_entries: list[dict[str, Any]]) -> str: + """ + Encode log entries into formatted string output using structlog's ConsoleRenderer. + + Args: + log_entries: List of log entry dictionaries + + Returns: + Formatted string with one log entry per line + """ + encoder = cls() + formatted_lines = [] + + for entry in log_entries: + try: + if isinstance(entry, str): + try: + entry = json.loads(entry) + except json.JSONDecodeError: + entry = {"event": entry, "level": "info"} + + parsed_entry = cls._parse_json_entry(entry) + + formatted_line = encoder.renderer(None, None, parsed_entry) + formatted_lines.append(formatted_line) + + except Exception as e: + LOG.error("Failed to format log entry", entry=entry, error=str(e), exc_info=True) + # Add error line to output + error_timestamp = datetime.utcnow().isoformat() + "Z" + error_entry = { + "timestamp": error_timestamp, + "level": "error", + "event": "Failed to format log entry", + "entry": str(entry), + "error": str(e), + } + formatted_lines.append(encoder.renderer(None, None, error_entry)) + + return "\n".join(formatted_lines)