From 536db4886d1c2883d9bd0f5d3c6b05d0943fab6e Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Sun, 8 Dec 2024 01:49:29 +0000 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=84=20synced=20local=20'skyvern/'=20wi?= =?UTF-8?q?th=20remote=20'skyvern/'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit > [!IMPORTANT] > Consolidates artifact upload task waiting methods into `wait_for_upload_aiotasks` and updates related code in `agent.py`, `manager.py`, and `service.py`. > > - **Behavior**: > - Replaces `wait_for_upload_aiotasks_for_task` and `wait_for_upload_aiotasks_for_tasks` with `wait_for_upload_aiotasks` in `agent.py` and `service.py`. > - Updates `wait_for_upload_aiotasks` in `manager.py` to handle a list of primary keys instead of task IDs. > - **Functions**: > - Removes `wait_for_upload_aiotasks_for_task` and `wait_for_upload_aiotasks_for_tasks` from `manager.py`. > - Modifies `wait_for_upload_aiotasks` in `manager.py` to use `primary_keys` instead of `task_ids`. > - **Misc**: > - Updates logging messages in `manager.py` to reflect changes in parameter names. > > This description was created by [Ellipsis](https://www.ellipsis.dev?ref=Skyvern-AI%2Fskyvern-cloud&utm_source=github&utm_medium=referral) for dace65d1fd348ddae2c45d38edd987c71e7aeba2. It will automatically update as commits are pushed. --- skyvern/forge/agent.py | 2 +- skyvern/forge/sdk/artifact/manager.py | 38 +++++++-------------------- skyvern/forge/sdk/workflow/service.py | 2 +- 3 files changed, 11 insertions(+), 31 deletions(-) diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index aa9d5c5b0..dfc83863a 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -1536,7 +1536,7 @@ async def clean_up_task( await self.cleanup_browser_and_create_artifacts(close_browser_on_completion, last_step, task) # Wait for all tasks to complete before generating the links for the artifacts - await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_task(task.task_id) + await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks([task.task_id]) if need_call_webhook: await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key) diff --git a/skyvern/forge/sdk/artifact/manager.py b/skyvern/forge/sdk/artifact/manager.py index ea2188258..ec182468c 100644 --- a/skyvern/forge/sdk/artifact/manager.py +++ b/skyvern/forge/sdk/artifact/manager.py @@ -135,48 +135,28 @@ async def get_share_link(self, artifact: Artifact) -> str | None: async def get_share_links(self, artifacts: list[Artifact]) -> list[str] | None: return await app.STORAGE.get_share_links(artifacts) - async def wait_for_upload_aiotasks_for_task(self, task_id: str) -> None: - try: - st = time.time() - async with asyncio.timeout(30): - await asyncio.gather( - *[aio_task for aio_task in self.upload_aiotasks_map[task_id] if not aio_task.done()] - ) - LOG.info( - f"S3 upload tasks for task_id={task_id} completed in {time.time() - st:.2f}s", - task_id=task_id, - duration=time.time() - st, - ) - except asyncio.TimeoutError: - LOG.error( - f"Timeout (30s) while waiting for upload tasks for task_id={task_id}", - task_id=task_id, - ) - - del self.upload_aiotasks_map[task_id] - - async def wait_for_upload_aiotasks_for_tasks(self, task_ids: list[str]) -> None: + async def wait_for_upload_aiotasks(self, primary_keys: list[str]) -> None: try: st = time.time() async with asyncio.timeout(30): await asyncio.gather( *[ aio_task - for task_id in task_ids - for aio_task in self.upload_aiotasks_map[task_id] + for primary_key in primary_keys + for aio_task in self.upload_aiotasks_map[primary_key] if not aio_task.done() ] ) LOG.info( - f"S3 upload tasks for task_ids={task_ids} completed in {time.time() - st:.2f}s", - task_ids=task_ids, + f"S3 upload aio tasks for primary_keys={primary_keys} completed in {time.time() - st:.2f}s", + primary_keys=primary_keys, duration=time.time() - st, ) except asyncio.TimeoutError: LOG.error( - f"Timeout (30s) while waiting for upload tasks for task_ids={task_ids}", - task_ids=task_ids, + f"Timeout (30s) while waiting for upload aio tasks for primary_keys={primary_keys}", + primary_keys=primary_keys, ) - for task_id in task_ids: - del self.upload_aiotasks_map[task_id] + for primary_key in primary_keys: + del self.upload_aiotasks_map[primary_key] diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 7575a28a9..838f7b2f7 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -859,7 +859,7 @@ async def clean_up_workflow( ) LOG.info("Persisted browser session for workflow run", workflow_run_id=workflow_run.workflow_run_id) - await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_tasks(all_workflow_task_ids) + await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks(all_workflow_task_ids) try: async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):