-
Notifications
You must be signed in to change notification settings - Fork 789
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Record logs into step artifacts #1339
Changes from 39 commits
79e5763
c3cbf4b
f3debf5
c1e7ff6
b1f14c6
b2cb9c4
eb6fc3d
57cd772
419e12b
ddbc340
f313326
9b35651
2d90fed
be0e817
5ee8c3b
f8a6b58
8bf863c
5d32d53
3c37a4a
1aa1146
f3bb2fe
6606fba
aabc0fc
b83c648
40e9dd3
5c8488d
127b8a6
4742fc4
974b37e
d48f792
f57c7b6
c8274c3
3ff49cb
c3ce7d3
72b3f15
aac781f
ff86b52
a86d81d
0f0633f
727dd7e
2312281
d66e67b
2cf5f69
63829fa
9eda7ca
631f898
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,13 +6,21 @@ | |
import structlog | ||
|
||
from skyvern.forge import app | ||
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType | ||
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityType | ||
from skyvern.forge.sdk.db.id import generate_artifact_id | ||
from skyvern.forge.sdk.models import Step | ||
from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverThought | ||
|
||
LOG = structlog.get_logger(__name__) | ||
|
||
PRIMARY_KEY = Literal[ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @LawyZheng we're introducing new primary keys for artifacts Now i think we need to ensure those aio tasks for new primary keys (workflow_run_id, step_id) will be awaited at cleanup time. |
||
"task_id", | ||
"observer_thought_id", | ||
"observer_cruise_id", | ||
"step_id", | ||
"workflow_run_id", | ||
"workflow_run_block_id", | ||
] | ||
|
||
class ArtifactManager: | ||
# task_id -> list of aio_tasks for uploading artifacts | ||
|
@@ -82,6 +90,35 @@ async def create_artifact( | |
path=path, | ||
) | ||
|
||
async def create_log_artifact( | ||
self, | ||
log_entity_type: LogEntityType, | ||
log_entity_id: str, | ||
artifact_type: ArtifactType, | ||
step_id: str | None = None, | ||
task_id: str | None = None, | ||
workflow_run_id: str | None = None, | ||
workflow_run_block_id: str | None = None, | ||
organization_id: str | None = None, | ||
data: bytes | None = None, | ||
path: str | None = None, | ||
) -> str: | ||
artifact_id = generate_artifact_id() | ||
uri = app.STORAGE.build_log_uri(log_entity_type, log_entity_id, artifact_type) | ||
return await self._create_artifact( | ||
aio_task_primary_key=log_entity_id, | ||
artifact_id=artifact_id, | ||
artifact_type=artifact_type, | ||
uri=uri, | ||
step_id=step_id, | ||
task_id=task_id, | ||
workflow_run_id=workflow_run_id, | ||
workflow_run_block_id=workflow_run_block_id, | ||
organization_id=organization_id, | ||
data=data, | ||
path=path, | ||
) | ||
|
||
async def create_observer_thought_artifact( | ||
self, | ||
observer_thought: ObserverThought, | ||
|
@@ -174,7 +211,7 @@ async def update_artifact_data( | |
artifact_id: str | None, | ||
organization_id: str | None, | ||
data: bytes, | ||
primary_key: Literal["task_id", "observer_thought_id", "observer_cruise_id"] = "task_id", | ||
primary_key: PRIMARY_KEY = "task_id", | ||
) -> None: | ||
if not artifact_id or not organization_id: | ||
return None | ||
|
@@ -183,18 +220,11 @@ async def update_artifact_data( | |
return | ||
# Fire and forget | ||
aio_task = asyncio.create_task(app.STORAGE.store_artifact(artifact, data)) | ||
if primary_key == "task_id": | ||
if not artifact.task_id: | ||
raise ValueError("Task ID is required to update artifact data.") | ||
self.upload_aiotasks_map[artifact.task_id].append(aio_task) | ||
elif primary_key == "observer_thought_id": | ||
if not artifact.observer_thought_id: | ||
raise ValueError("Observer Thought ID is required to update artifact data.") | ||
self.upload_aiotasks_map[artifact.observer_thought_id].append(aio_task) | ||
elif primary_key == "observer_cruise_id": | ||
if not artifact.observer_cruise_id: | ||
raise ValueError("Observer Cruise ID is required to update artifact data.") | ||
self.upload_aiotasks_map[artifact.observer_cruise_id].append(aio_task) | ||
|
||
if not artifact[primary_key]: | ||
raise ValueError(f"{primary_key} is required to update artifact data.") | ||
self.upload_aiotasks_map[artifact[primary_key]].append(aio_task) | ||
|
||
|
||
async def retrieve_artifact(self, artifact: Artifact) -> bytes | None: | ||
return await app.STORAGE.retrieve_artifact(artifact) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
from abc import ABC, abstractmethod | ||
|
||
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType | ||
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityType | ||
from skyvern.forge.sdk.models import Step | ||
from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverThought | ||
|
||
|
@@ -11,6 +11,8 @@ | |
ArtifactType.SCREENSHOT_LLM: "png", | ||
ArtifactType.SCREENSHOT_ACTION: "png", | ||
ArtifactType.SCREENSHOT_FINAL: "png", | ||
ArtifactType.SKYVERN_LOG: "log", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Step log.. Task log.. Workflow log... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i don't think adding STEP_LOG, TASK_LOG, WORKFLOW_LOG helps. It's better to use step_id, task_id and workflow_run_id columns to filter artifacts. Also, step log and task log are not mutually exclusive. TaskLogs should be the super set of StepLogs. WorkflowRunLogs should be the super set of TaskLogs. We don't have to create multiple sets of log artifacts that have duplicated data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if I agree with this -- why not have multiple artifact types that may or may not include duplicate data? It makes the implementation + writing of data + reading of data dead simple. Why is it better to use step_id / task_id / workflow_run_id columns to filter artifacts w/ the same name + type? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand correctly in this case we'll store workflow and task level logs in the step artifact folders Current implementation of def build_uri(self, artifact_id: str, step: Step, artifact_type: ArtifactType) -> str:
file_ext = FILE_EXTENTSION_MAP[artifact_type]
return f"file://{self.artifact_path}/{step.task_id}/{step.order:02d}_{step.retry_index}_{step.step_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" We could define a build uri method for logs that would take entity type and id So that the uri would look like this: return f"file://{self.artifact_path}/{entity_type}_{entity_id}/{datetime.utcnow().isoformat()}_{artifact_type}.{file_ext}" This way we would be able to fetch logs on the frontend using a similar query as for the other artifacts We would be able to use artifact type for all the levels There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @suchintan no concern of data duplication. you're wright, having more artifact_types makes the implementation easier. |
||
ArtifactType.SKYVERN_LOG_RAW: "json", | ||
ArtifactType.LLM_PROMPT: "txt", | ||
ArtifactType.LLM_REQUEST: "json", | ||
ArtifactType.LLM_RESPONSE: "json", | ||
|
@@ -34,6 +36,10 @@ class BaseStorage(ABC): | |
def build_uri(self, artifact_id: str, step: Step, artifact_type: ArtifactType) -> str: | ||
pass | ||
|
||
@abstractmethod | ||
def build_log_uri(self, log_entity_type: LogEntityType, log_entity_id: str, artifact_type: ArtifactType) -> str: | ||
pass | ||
|
||
@abstractmethod | ||
def build_observer_thought_uri( | ||
self, artifact_id: str, observer_thought: ObserverThought, artifact_type: ArtifactType | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@msalihaltun can you review?