diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index aa9d5c5b0..dfc83863a 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -1536,7 +1536,7 @@ async def clean_up_task( await self.cleanup_browser_and_create_artifacts(close_browser_on_completion, last_step, task) # Wait for all tasks to complete before generating the links for the artifacts - await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_task(task.task_id) + await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks([task.task_id]) if need_call_webhook: await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key) diff --git a/skyvern/forge/sdk/artifact/manager.py b/skyvern/forge/sdk/artifact/manager.py index ea2188258..ec182468c 100644 --- a/skyvern/forge/sdk/artifact/manager.py +++ b/skyvern/forge/sdk/artifact/manager.py @@ -135,48 +135,28 @@ async def get_share_link(self, artifact: Artifact) -> str | None: async def get_share_links(self, artifacts: list[Artifact]) -> list[str] | None: return await app.STORAGE.get_share_links(artifacts) - async def wait_for_upload_aiotasks_for_task(self, task_id: str) -> None: - try: - st = time.time() - async with asyncio.timeout(30): - await asyncio.gather( - *[aio_task for aio_task in self.upload_aiotasks_map[task_id] if not aio_task.done()] - ) - LOG.info( - f"S3 upload tasks for task_id={task_id} completed in {time.time() - st:.2f}s", - task_id=task_id, - duration=time.time() - st, - ) - except asyncio.TimeoutError: - LOG.error( - f"Timeout (30s) while waiting for upload tasks for task_id={task_id}", - task_id=task_id, - ) - - del self.upload_aiotasks_map[task_id] - - async def wait_for_upload_aiotasks_for_tasks(self, task_ids: list[str]) -> None: + async def wait_for_upload_aiotasks(self, primary_keys: list[str]) -> None: try: st = time.time() async with asyncio.timeout(30): await asyncio.gather( *[ aio_task - for task_id in task_ids - for aio_task in self.upload_aiotasks_map[task_id] + for primary_key in primary_keys + for aio_task in self.upload_aiotasks_map[primary_key] if not aio_task.done() ] ) LOG.info( - f"S3 upload tasks for task_ids={task_ids} completed in {time.time() - st:.2f}s", - task_ids=task_ids, + f"S3 upload aio tasks for primary_keys={primary_keys} completed in {time.time() - st:.2f}s", + primary_keys=primary_keys, duration=time.time() - st, ) except asyncio.TimeoutError: LOG.error( - f"Timeout (30s) while waiting for upload tasks for task_ids={task_ids}", - task_ids=task_ids, + f"Timeout (30s) while waiting for upload aio tasks for primary_keys={primary_keys}", + primary_keys=primary_keys, ) - for task_id in task_ids: - del self.upload_aiotasks_map[task_id] + for primary_key in primary_keys: + del self.upload_aiotasks_map[primary_key] diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 7575a28a9..838f7b2f7 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -859,7 +859,7 @@ async def clean_up_workflow( ) LOG.info("Persisted browser session for workflow run", workflow_run_id=workflow_run.workflow_run_id) - await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_tasks(all_workflow_task_ids) + await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks(all_workflow_task_ids) try: async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):