Skip to content

Commit

Permalink
fix: async and many bugs... (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tohrusky authored Oct 26, 2024
1 parent e83eb6c commit fd3c08b
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 44 deletions.
105 changes: 76 additions & 29 deletions animepipeline/encode/finalrip.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import gc
import mimetypes
import time
from pathlib import Path
from typing import Union

import aiofiles
import httpx
from httpx import AsyncClient
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_random
Expand All @@ -17,6 +19,8 @@
OSSPresignedURLRequest,
OSSPresignedURLResponse,
PingResponse,
RetryMergeRequest,
RetryMergeResponse,
StartTaskRequest,
StartTaskResponse,
TaskNotCompletedError,
Expand Down Expand Up @@ -67,6 +71,14 @@ async def _get_oss_presigned_url(self, data: OSSPresignedURLRequest) -> OSSPresi
logger.error(f"Error getting presigned URL: {e}, {data}")
raise e

async def _retry_merge(self, data: RetryMergeRequest) -> RetryMergeResponse:
try:
response = await self.client.post("/api/v1/task/retry/merge", params=data.model_dump())
return RetryMergeResponse(**response.json())
except Exception as e:
logger.error(f"Error retrying merge: {e}, {data}")
raise e

async def check_task_exist(self, video_key: str) -> bool:
try:
get_task_progress_response = await self._get_task_progress(GetTaskProgressRequest(video_key=video_key))
Expand All @@ -85,6 +97,27 @@ async def check_task_completed(self, video_key: str) -> bool:
logger.error(f"Error checking task completed: {e}, video_key: {video_key}")
return False

async def check_task_all_clips_done(self, video_key: str) -> bool:
try:
get_task_progress_response = await self._get_task_progress(GetTaskProgressRequest(video_key=video_key))
if not get_task_progress_response.success:
logger.error(f"Error getting task progress: {get_task_progress_response.error.message}") # type: ignore
return False

for clip in get_task_progress_response.data.progress: # type: ignore
if not clip.completed:
return False

return True
except Exception as e:
logger.error(f"Error checking task all clips done: {e}, video_key: {video_key}")
return False

async def retry_merge(self, video_key: str) -> None:
retry_merge_response = await self._retry_merge(RetryMergeRequest(video_key=video_key))
if not retry_merge_response.success:
logger.error(f"Error retrying merge: {retry_merge_response.error.message}") # type: ignore

@retry(wait=wait_random(min=3, max=5), stop=stop_after_attempt(5))
async def upload_and_new_task(self, video_path: Union[str, Path]) -> None:
"""
Expand All @@ -102,36 +135,50 @@ async def upload_and_new_task(self, video_path: Union[str, Path]) -> None:

# gen oss presigned url
video_key = Path(video_path).name
oss_presigned_url_response = await self._get_oss_presigned_url(OSSPresignedURLRequest(video_key=video_key))
if not oss_presigned_url_response.success:
logger.error(f"Error getting presigned URL: {oss_presigned_url_response.error.message}") # type: ignore
raise ValueError(f"Error getting presigned URL: {oss_presigned_url_response.error.message}") # type: ignore

try:
content_type = mimetypes.guess_type(video_path)[0]
except Exception:
content_type = "application/octet-stream"

# upload file
try:
logger.info(f"Uploading file: {video_path}")
t0 = time.time()
async with aiofiles.open(video_path, mode="rb") as v:
video_content = await v.read()

response = await self.client.put(
url=oss_presigned_url_response.data.url, # type: ignore
content=video_content,
headers={"Content-Type": content_type},
timeout=60 * 60,
)
if response.status_code != 200:
raise IOError(f"Error uploading file: {response.text}")
logger.info(f"Upload file Successfully! path: {video_path} time: {time.time() - t0:.2f}s")
oss_presigned_url_response = await self._get_oss_presigned_url(OSSPresignedURLRequest(video_key=video_key))
if not oss_presigned_url_response.success:
logger.error(f"Error getting presigned URL: {oss_presigned_url_response.error.message}") # type: ignore
raise ValueError(f"Error getting presigned URL: {oss_presigned_url_response.error.message}") # type: ignore
except Exception as e:
logger.error(f"Error in uploading file: {video_path}: {e}")
logger.error(f"Error getting presigned URL: {e}")
raise e

if not oss_presigned_url_response.data.exist: # type: ignore
try:
content_type = mimetypes.guess_type(video_path)[0]
except Exception:
content_type = "application/octet-stream"

# upload file
try:
logger.info(f"Uploading file: {video_path}")
t0 = time.time()

# 这里不要用异步,会内存泄漏
def _upload_file() -> None:
with open(video_path, mode="rb") as v:
video_content = v.read()
logger.info(f"Read file Successfully! path: {video_path} time: {time.time() - t0:.2f}s")
response = httpx.put(
url=oss_presigned_url_response.data.url, # type: ignore
content=video_content,
headers={"Content-Type": content_type},
timeout=60 * 60,
)
if response.status_code != 200:
raise IOError(f"Error uploading file: {response.text}")

_upload_file()
del _upload_file
gc.collect()
logger.info(f"Upload file Successfully! path: {video_path} time: {time.time() - t0:.2f}s")
except Exception as e:
logger.error(f"Error in uploading file: {video_path}: {e}")
raise e

await asyncio.sleep(2)

# new task
new_task_response = await self._new_task(NewTaskRequest(video_key=video_key))
if not new_task_response.success:
Expand Down Expand Up @@ -168,5 +215,5 @@ async def download_completed_task(self, video_key: str, save_path: Union[str, Pa
if response.status_code != 200:
raise IOError(f"Error downloading file: {response.text}")

async with aiofiles.open(save_path, mode="wb") as v:
await v.write(response.content)
with open(save_path, mode="wb") as v:
v.write(response.content)
9 changes: 9 additions & 0 deletions animepipeline/encode/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,12 @@ class Data(BaseModel):
data: Optional[Data] = None
error: Optional[Error] = None
success: bool


class RetryMergeRequest(BaseModel):
video_key: str


class RetryMergeResponse(BaseModel):
error: Optional[Error] = None
success: bool
17 changes: 16 additions & 1 deletion animepipeline/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None:
while not await self.finalrip_client.check_task_exist(bt_downloaded_path.name):
try:
await self.finalrip_client.upload_and_new_task(bt_downloaded_path)
logger.info(f'FinalRip Task Created for "{task_info.name}" EP {task_info.episode}')
except Exception as e:
logger.error(f"Failed to upload and new finalrip task: {e}")
raise e
await asyncio.sleep(10)

try:
Expand All @@ -184,6 +184,7 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None:
encode_param=task_info.param,
script=task_info.script,
)
logger.info(f'FinalRip Task Started for "{task_info.name}" EP {task_info.episode}')
except Exception as e:
logger.error(f"Failed to start finalrip task: {e}")

Expand All @@ -192,6 +193,20 @@ async def pipeline_finalrip(self, task_info: TaskInfo) -> None:

# check task progress
while not await self.finalrip_client.check_task_completed(bt_downloaded_path.name):
# retry merge if all clips are done but merge failed?
if await self.finalrip_client.check_task_all_clips_done(bt_downloaded_path.name):
# wait 30s before retry merge
await asyncio.sleep(30)
# check again
if await self.finalrip_client.check_task_completed(bt_downloaded_path.name):
break

try:
await self.finalrip_client.retry_merge(bt_downloaded_path.name)
logger.info(f'Retry Merge Clips for "{task_info.name}" EP {task_info.episode}')
except Exception as e:
logger.error(f'Failed to retry merge clips for "{task_info.name}" EP {task_info.episode}: {e}')

await asyncio.sleep(10)

# download temp file to bt_downloaded_path's parent directory
Expand Down
1 change: 1 addition & 0 deletions animepipeline/post/tg.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async def send_video(self, video_path: Union[Path, str], caption: Optional[str]
caption=caption,
read_timeout=6000,
write_timeout=6000,
pool_timeout=6000,
)
except telegram.error.NetworkError as e:
logger.error(f"Network error: {e}, video path: {video_path}, video_caption: {caption}")
Expand Down
13 changes: 1 addition & 12 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ license = "MIT"
name = "animepipeline"
readme = "README.md"
repository = "https://github.com/TensoRaws/AnimePipeline"
version = "0.0.2"
version = "0.0.3"

# Requirements
[tool.poetry.dependencies]
python = "^3.9"

[tool.poetry.group.dev.dependencies]
aiofiles = "^24.1.0"
feedparser = "^6.0.11"
httpx = "^0.27.2"
loguru = "^0.7.2"
Expand Down
3 changes: 3 additions & 0 deletions tests/test_encode.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ async def test_task_progress(self) -> None:
task_progress = await self.finalrip._get_task_progress(GetTaskProgressRequest(video_key=video_key))
print(task_progress)

async def test_retry_merge(self) -> None:
await self.finalrip.retry_merge(video_key)

async def test_download_completed_task(self) -> None:
while True:
try:
Expand Down

0 comments on commit fd3c08b

Please sign in to comment.