diff --git a/animepipeline/encode/finalrip.py b/animepipeline/encode/finalrip.py index 2835454..5a556ad 100644 --- a/animepipeline/encode/finalrip.py +++ b/animepipeline/encode/finalrip.py @@ -1,9 +1,11 @@ +import asyncio +import gc import mimetypes import time from pathlib import Path from typing import Union -import aiofiles +import httpx from httpx import AsyncClient from loguru import logger from tenacity import retry, stop_after_attempt, wait_random @@ -17,6 +19,8 @@ OSSPresignedURLRequest, OSSPresignedURLResponse, PingResponse, + RetryMergeRequest, + RetryMergeResponse, StartTaskRequest, StartTaskResponse, TaskNotCompletedError, @@ -67,6 +71,14 @@ async def _get_oss_presigned_url(self, data: OSSPresignedURLRequest) -> OSSPresi logger.error(f"Error getting presigned URL: {e}, {data}") raise e + async def _retry_merge(self, data: RetryMergeRequest) -> RetryMergeResponse: + try: + response = await self.client.post("/api/v1/task/retry/merge", params=data.model_dump()) + return RetryMergeResponse(**response.json()) + except Exception as e: + logger.error(f"Error retrying merge: {e}, {data}") + raise e + async def check_task_exist(self, video_key: str) -> bool: try: get_task_progress_response = await self._get_task_progress(GetTaskProgressRequest(video_key=video_key)) @@ -85,6 +97,27 @@ async def check_task_completed(self, video_key: str) -> bool: logger.error(f"Error checking task completed: {e}, video_key: {video_key}") return False + async def check_task_all_clips_done(self, video_key: str) -> bool: + try: + get_task_progress_response = await self._get_task_progress(GetTaskProgressRequest(video_key=video_key)) + if not get_task_progress_response.success: + logger.error(f"Error getting task progress: {get_task_progress_response.error.message}") # type: ignore + return False + + for clip in get_task_progress_response.data.progress: # type: ignore + if not clip.completed: + return False + + return True + except Exception as e: + logger.error(f"Error checking task all clips done: {e}, video_key: {video_key}") + return False + + async def retry_merge(self, video_key: str) -> None: + retry_merge_response = await self._retry_merge(RetryMergeRequest(video_key=video_key)) + if not retry_merge_response.success: + logger.error(f"Error retrying merge: {retry_merge_response.error.message}") # type: ignore + @retry(wait=wait_random(min=3, max=5), stop=stop_after_attempt(5)) async def upload_and_new_task(self, video_path: Union[str, Path]) -> None: """ @@ -102,36 +135,50 @@ async def upload_and_new_task(self, video_path: Union[str, Path]) -> None: # gen oss presigned url video_key = Path(video_path).name - oss_presigned_url_response = await self._get_oss_presigned_url(OSSPresignedURLRequest(video_key=video_key)) - if not oss_presigned_url_response.success: - logger.error(f"Error getting presigned URL: {oss_presigned_url_response.error.message}") # type: ignore - raise ValueError(f"Error getting presigned URL: {oss_presigned_url_response.error.message}") # type: ignore - try: - content_type = mimetypes.guess_type(video_path)[0] - except Exception: - content_type = "application/octet-stream" - - # upload file - try: - logger.info(f"Uploading file: {video_path}") - t0 = time.time() - async with aiofiles.open(video_path, mode="rb") as v: - video_content = await v.read() - - response = await self.client.put( - url=oss_presigned_url_response.data.url, # type: ignore - content=video_content, - headers={"Content-Type": content_type}, - timeout=60 * 60, - ) - if response.status_code != 200: - raise IOError(f"Error uploading file: {response.text}") - logger.info(f"Upload file Successfully! path: {video_path} time: {time.time() - t0:.2f}s") + oss_presigned_url_response = await self._get_oss_presigned_url(OSSPresignedURLRequest(video_key=video_key)) + if not oss_presigned_url_response.success: + logger.error(f"Error getting presigned URL: {oss_presigned_url_response.error.message}") # type: ignore + raise ValueError(f"Error getting presigned URL: {oss_presigned_url_response.error.message}") # type: ignore except Exception as e: - logger.error(f"Error in uploading file: {video_path}: {e}") + logger.error(f"Error getting presigned URL: {e}") raise e + if not oss_presigned_url_response.data.exist: # type: ignore + try: + content_type = mimetypes.guess_type(video_path)[0] + except Exception: + content_type = "application/octet-stream" + + # upload file + try: + logger.info(f"Uploading file: {video_path}") + t0 = time.time() + + # 这里不要用异步,会内存泄漏 + def _upload_file() -> None: + with open(video_path, mode="rb") as v: + video_content = v.read() + logger.info(f"Read file Successfully! path: {video_path} time: {time.time() - t0:.2f}s") + response = httpx.put( + url=oss_presigned_url_response.data.url, # type: ignore + content=video_content, + headers={"Content-Type": content_type}, + timeout=60 * 60, + ) + if response.status_code != 200: + raise IOError(f"Error uploading file: {response.text}") + + _upload_file() + del _upload_file + gc.collect() + logger.info(f"Upload file Successfully! path: {video_path} time: {time.time() - t0:.2f}s") + except Exception as e: + logger.error(f"Error in uploading file: {video_path}: {e}") + raise e + + await asyncio.sleep(2) + # new task new_task_response = await self._new_task(NewTaskRequest(video_key=video_key)) if not new_task_response.success: @@ -168,5 +215,5 @@ async def download_completed_task(self, video_key: str, save_path: Union[str, Pa if response.status_code != 200: raise IOError(f"Error downloading file: {response.text}") - async with aiofiles.open(save_path, mode="wb") as v: - await v.write(response.content) + with open(save_path, mode="wb") as v: + v.write(response.content) diff --git a/animepipeline/encode/type.py b/animepipeline/encode/type.py index 679d46a..d284cee 100644 --- a/animepipeline/encode/type.py +++ b/animepipeline/encode/type.py @@ -85,3 +85,12 @@ class Data(BaseModel): data: Optional[Data] = None error: Optional[Error] = None success: bool + + +class RetryMergeRequest(BaseModel): + video_key: str + + +class RetryMergeResponse(BaseModel): + error: Optional[Error] = None + success: bool diff --git a/animepipeline/loop.py b/animepipeline/loop.py index f30d141..a92a8f9 100644 --- a/animepipeline/loop.py +++ b/animepipeline/loop.py @@ -173,9 +173,9 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None: while not await self.finalrip_client.check_task_exist(bt_downloaded_path.name): try: await self.finalrip_client.upload_and_new_task(bt_downloaded_path) + logger.info(f'FinalRip Task Created for "{task_info.name}" EP {task_info.episode}') except Exception as e: logger.error(f"Failed to upload and new finalrip task: {e}") - raise e await asyncio.sleep(10) try: @@ -184,6 +184,7 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None: encode_param=task_info.param, script=task_info.script, ) + logger.info(f'FinalRip Task Started for "{task_info.name}" EP {task_info.episode}') except Exception as e: logger.error(f"Failed to start finalrip task: {e}") @@ -192,6 +193,20 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None: # check task progress while not await self.finalrip_client.check_task_completed(bt_downloaded_path.name): + # retry merge if all clips are done but merge failed? + if await self.finalrip_client.check_task_all_clips_done(bt_downloaded_path.name): + # wait 30s before retry merge + await asyncio.sleep(30) + # check again + if await self.finalrip_client.check_task_completed(bt_downloaded_path.name): + break + + try: + await self.finalrip_client.retry_merge(bt_downloaded_path.name) + logger.info(f'Retry Merge Clips for "{task_info.name}" EP {task_info.episode}') + except Exception as e: + logger.error(f'Failed to retry merge clips for "{task_info.name}" EP {task_info.episode}: {e}') + await asyncio.sleep(10) # download temp file to bt_downloaded_path's parent directory diff --git a/animepipeline/post/tg.py b/animepipeline/post/tg.py index 4c95ab3..763e598 100644 --- a/animepipeline/post/tg.py +++ b/animepipeline/post/tg.py @@ -54,6 +54,7 @@ async def send_video(self, video_path: Union[Path, str], caption: Optional[str] caption=caption, read_timeout=6000, write_timeout=6000, + pool_timeout=6000, ) except telegram.error.NetworkError as e: logger.error(f"Network error: {e}, video path: {video_path}, video_caption: {caption}") diff --git a/poetry.lock b/poetry.lock index 1de1298..2798d69 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,21 +1,10 @@ # This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [metadata] -content-hash = "d4f729afc76838e1fb16a3d1a40c0a6336247fd4d2e34a24c2b7a6efb3565b18" +content-hash = "f1522981fac4f9397a5eaf6a81328ddb77dd9be8c69fd1a2dc905195693e21f8" lock-version = "2.0" python-versions = "^3.9" -[[package]] -description = "File support for asyncio." -files = [ - {file = "aiofiles-24.1.0-py3-none-any.whl", hash = "sha256:b4ec55f4195e3eb5d7abd1bf7e061763e864dd4954231fb8539a0ef8bb8260e5"}, - {file = "aiofiles-24.1.0.tar.gz", hash = "sha256:22a075c9e5a3810f0c2e48f3008c94d68c65d763b9b03857924c99e57355166c"} -] -name = "aiofiles" -optional = false -python-versions = ">=3.8" -version = "24.1.0" - [[package]] description = "Reusable constraint types to use with typing.Annotated" files = [ diff --git a/pyproject.toml b/pyproject.toml index 3547142..6b5b39d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,14 +39,13 @@ license = "MIT" name = "animepipeline" readme = "README.md" repository = "https://github.com/TensoRaws/AnimePipeline" -version = "0.0.2" +version = "0.0.3" # Requirements [tool.poetry.dependencies] python = "^3.9" [tool.poetry.group.dev.dependencies] -aiofiles = "^24.1.0" feedparser = "^6.0.11" httpx = "^0.27.2" loguru = "^0.7.2" diff --git a/tests/test_encode.py b/tests/test_encode.py index 554f143..36bc304 100644 --- a/tests/test_encode.py +++ b/tests/test_encode.py @@ -52,6 +52,9 @@ async def test_task_progress(self) -> None: task_progress = await self.finalrip._get_task_progress(GetTaskProgressRequest(video_key=video_key)) print(task_progress) + async def test_retry_merge(self) -> None: + await self.finalrip.retry_merge(video_key) + async def test_download_completed_task(self) -> None: while True: try: