Skip to content

Commit

Permalink
🔄 synced local 'skyvern/' with remote 'skyvern/'
Browse files Browse the repository at this point in the history
<!-- ELLIPSIS_HIDDEN -->

> [!IMPORTANT]
> Consolidates artifact upload task waiting methods into `wait_for_upload_aiotasks` and updates related code in `agent.py`, `manager.py`, and `service.py`.
>
>   - **Behavior**:
>     - Replaces `wait_for_upload_aiotasks_for_task` and `wait_for_upload_aiotasks_for_tasks` with `wait_for_upload_aiotasks` in `agent.py` and `service.py`.
>     - Updates `wait_for_upload_aiotasks` in `manager.py` to handle a list of primary keys instead of task IDs.
>   - **Functions**:
>     - Removes `wait_for_upload_aiotasks_for_task` and `wait_for_upload_aiotasks_for_tasks` from `manager.py`.
>     - Modifies `wait_for_upload_aiotasks` in `manager.py` to use `primary_keys` instead of `task_ids`.
>   - **Misc**:
>     - Updates logging messages in `manager.py` to reflect changes in parameter names.
>
> <sup>This description was created by </sup>[<img alt="Ellipsis" src="https://img.shields.io/badge/Ellipsis-blue?color=175173">](https://www.ellipsis.dev?ref=Skyvern-AI%2Fskyvern-cloud&utm_source=github&utm_medium=referral)<sup> for dace65d1fd348ddae2c45d38edd987c71e7aeba2. It will automatically update as commits are pushed.</sup>

<!-- ELLIPSIS_HIDDEN -->
  • Loading branch information
wintonzheng committed Dec 8, 2024
1 parent 620b5bf commit 536db48
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 31 deletions.
2 changes: 1 addition & 1 deletion skyvern/forge/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ async def clean_up_task(
await self.cleanup_browser_and_create_artifacts(close_browser_on_completion, last_step, task)

# Wait for all tasks to complete before generating the links for the artifacts
await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_task(task.task_id)
await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks([task.task_id])

if need_call_webhook:
await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key)
Expand Down
38 changes: 9 additions & 29 deletions skyvern/forge/sdk/artifact/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,48 +135,28 @@ async def get_share_link(self, artifact: Artifact) -> str | None:
async def get_share_links(self, artifacts: list[Artifact]) -> list[str] | None:
return await app.STORAGE.get_share_links(artifacts)

async def wait_for_upload_aiotasks_for_task(self, task_id: str) -> None:
try:
st = time.time()
async with asyncio.timeout(30):
await asyncio.gather(
*[aio_task for aio_task in self.upload_aiotasks_map[task_id] if not aio_task.done()]
)
LOG.info(
f"S3 upload tasks for task_id={task_id} completed in {time.time() - st:.2f}s",
task_id=task_id,
duration=time.time() - st,
)
except asyncio.TimeoutError:
LOG.error(
f"Timeout (30s) while waiting for upload tasks for task_id={task_id}",
task_id=task_id,
)

del self.upload_aiotasks_map[task_id]

async def wait_for_upload_aiotasks_for_tasks(self, task_ids: list[str]) -> None:
async def wait_for_upload_aiotasks(self, primary_keys: list[str]) -> None:
try:
st = time.time()
async with asyncio.timeout(30):
await asyncio.gather(
*[
aio_task
for task_id in task_ids
for aio_task in self.upload_aiotasks_map[task_id]
for primary_key in primary_keys
for aio_task in self.upload_aiotasks_map[primary_key]
if not aio_task.done()
]
)
LOG.info(
f"S3 upload tasks for task_ids={task_ids} completed in {time.time() - st:.2f}s",
task_ids=task_ids,
f"S3 upload aio tasks for primary_keys={primary_keys} completed in {time.time() - st:.2f}s",
primary_keys=primary_keys,
duration=time.time() - st,
)
except asyncio.TimeoutError:
LOG.error(
f"Timeout (30s) while waiting for upload tasks for task_ids={task_ids}",
task_ids=task_ids,
f"Timeout (30s) while waiting for upload aio tasks for primary_keys={primary_keys}",
primary_keys=primary_keys,
)

for task_id in task_ids:
del self.upload_aiotasks_map[task_id]
for primary_key in primary_keys:
del self.upload_aiotasks_map[primary_key]
2 changes: 1 addition & 1 deletion skyvern/forge/sdk/workflow/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ async def clean_up_workflow(
)
LOG.info("Persisted browser session for workflow run", workflow_run_id=workflow_run.workflow_run_id)

await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_tasks(all_workflow_task_ids)
await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks(all_workflow_task_ids)

try:
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
Expand Down

0 comments on commit 536db48

Please sign in to comment.