From 37e3ed198437393fd13dfa948a1af0051863e359 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Wed, 27 Nov 2024 17:04:24 +0800 Subject: [PATCH] fix: add job_memory_warn_mb --- .../livekit/agents/ipc/proc_job_executor.py | 31 +++++++++++++++---- .../livekit/agents/ipc/proc_pool.py | 9 ++++-- livekit-agents/livekit/agents/worker.py | 11 +++++-- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/livekit-agents/livekit/agents/ipc/proc_job_executor.py b/livekit-agents/livekit/agents/ipc/proc_job_executor.py index 43cf19bd0..a42f11ce4 100644 --- a/livekit-agents/livekit/agents/ipc/proc_job_executor.py +++ b/livekit-agents/livekit/agents/ipc/proc_job_executor.py @@ -76,7 +76,8 @@ class _ProcOpts: mp_ctx: BaseContext initialize_timeout: float close_timeout: float - max_job_memory_usage: float + job_memory_warn_mb: float + job_memory_limit_mb: float class ProcJobExecutor: @@ -89,7 +90,8 @@ def __init__( close_timeout: float, mp_ctx: BaseContext, loop: asyncio.AbstractEventLoop, - max_job_memory_usage: float = 0, + job_memory_warn_mb: float = 0, + job_memory_limit_mb: float = 0, ) -> None: self._loop = loop self._opts = _ProcOpts( @@ -98,7 +100,8 @@ def __init__( initialize_timeout=initialize_timeout, close_timeout=close_timeout, mp_ctx=mp_ctx, - max_job_memory_usage=max_job_memory_usage, + job_memory_warn_mb=job_memory_warn_mb, + job_memory_limit_mb=job_memory_limit_mb, ) self._user_args: Any | None = None @@ -341,7 +344,7 @@ async def _main_task(self) -> None: ping_task = asyncio.create_task(self._ping_pong_task(pong_timeout)) monitor_task = asyncio.create_task(self._monitor_task(pong_timeout)) - if self._opts.max_job_memory_usage > 0: + if self._opts.job_memory_limit_mb > 0 or self._opts.job_memory_warn_mb > 0: memory_monitor_task = asyncio.create_task(self._memory_monitor_task()) else: memory_monitor_task = None @@ -431,17 +434,33 @@ async def _memory_monitor_task(self) -> None: memory_info = process.memory_info() memory_mb = memory_info.rss / (1024 * 1024) # Convert to MB - if memory_mb > self._opts.max_job_memory_usage: + if ( + self._opts.job_memory_limit_mb > 0 + and memory_mb > self._opts.job_memory_limit_mb + ): logger.error( "Job exceeded memory limit, killing job", extra={ "memory_usage_mb": memory_mb, - "memory_limit_mb": self._opts.max_job_memory_usage, + "memory_limit_mb": self._opts.job_memory_limit_mb, **self.logging_extra(), }, ) self._exception = JobExecutorError_MemoryLimitExceeded() self._send_kill_signal() + elif ( + self._opts.job_memory_warn_mb > 0 + and memory_mb > self._opts.job_memory_warn_mb + ): + logger.warning( + "Job memory usage is high", + extra={ + "memory_usage_mb": memory_mb, + "memory_warn_mb": self._opts.job_memory_warn_mb, + "memory_limit_mb": self._opts.job_memory_limit_mb, + **self.logging_extra(), + }, + ) except (psutil.NoSuchProcess, psutil.AccessDenied) as e: logger.warning( diff --git a/livekit-agents/livekit/agents/ipc/proc_pool.py b/livekit-agents/livekit/agents/ipc/proc_pool.py index d6c3f0914..a67a42d1d 100644 --- a/livekit-agents/livekit/agents/ipc/proc_pool.py +++ b/livekit-agents/livekit/agents/ipc/proc_pool.py @@ -34,7 +34,8 @@ def __init__( job_executor_type: JobExecutorType, mp_ctx: BaseContext, loop: asyncio.AbstractEventLoop, - max_job_memory_usage: float = 0, + job_memory_warn_mb: float = 0, + job_memory_limit_mb: float = 0, ) -> None: super().__init__() self._job_executor_type = job_executor_type @@ -44,7 +45,8 @@ def __init__( self._close_timeout = close_timeout self._initialize_timeout = initialize_timeout self._loop = loop - self._max_job_memory_usage = max_job_memory_usage + self._job_memory_limit_mb = job_memory_limit_mb + self._job_memory_warn_mb = job_memory_warn_mb self._num_idle_processes = num_idle_processes self._init_sem = asyncio.Semaphore(MAX_CONCURRENT_INITIALIZATIONS) self._proc_needed_sem = asyncio.Semaphore(num_idle_processes) @@ -111,7 +113,8 @@ async def _proc_watch_task(self) -> None: close_timeout=self._close_timeout, mp_ctx=self._mp_ctx, loop=self._loop, - max_job_memory_usage=self._max_job_memory_usage, + job_memory_warn_mb=self._job_memory_warn_mb, + job_memory_limit_mb=self._job_memory_limit_mb, ) else: raise ValueError(f"unsupported job executor: {self._job_executor_type}") diff --git a/livekit-agents/livekit/agents/worker.py b/livekit-agents/livekit/agents/worker.py index 7b1ce0a46..e9e0899f7 100644 --- a/livekit-agents/livekit/agents/worker.py +++ b/livekit-agents/livekit/agents/worker.py @@ -158,10 +158,14 @@ class WorkerOptions: Defaults to 0.75 on "production" mode, and is disabled in "development" mode. """ - max_job_memory_usage: float = 0 + + job_memory_warn_mb: float = 300 + """Memory warning threshold in MB. If the job process exceeds this limit, a warning will be logged.""" + job_memory_limit_mb: float = 0 """Maximum memory usage for a job in MB, the job process will be killed if it exceeds this limit. Defaults to 0 (disabled). """ + """Number of idle processes to keep warm.""" num_idle_processes: int | _WorkerEnvOption[int] = _WorkerEnvOption( dev_default=0, prod_default=3 @@ -240,7 +244,7 @@ def __init__( ) if ( - opts.max_job_memory_usage > 0 + opts.job_memory_limit_mb > 0 and opts.job_executor_type != JobExecutorType.PROCESS ): logger.warning( @@ -273,7 +277,8 @@ def __init__( mp_ctx=mp_ctx, initialize_timeout=opts.initialize_process_timeout, close_timeout=opts.shutdown_process_timeout, - max_job_memory_usage=opts.max_job_memory_usage, + job_memory_warn_mb=opts.job_memory_warn_mb, + job_memory_limit_mb=opts.job_memory_limit_mb, ) self._proc_pool.on("process_started", self._on_process_started) self._proc_pool.on("process_closed", self._on_process_closed)