From b6dff2335f1d5dddfb17387634a8204b81a2eb63 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Tue, 26 Nov 2024 11:35:50 +0800 Subject: [PATCH 1/5] fix: add max_job_memory_usage for worker --- .changeset/ninety-fans-jump.md | 5 ++ livekit-agents/livekit/agents/worker.py | 70 +++++++++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 .changeset/ninety-fans-jump.md diff --git a/.changeset/ninety-fans-jump.md b/.changeset/ninety-fans-jump.md new file mode 100644 index 000000000..2dd38c8a4 --- /dev/null +++ b/.changeset/ninety-fans-jump.md @@ -0,0 +1,5 @@ +--- +"livekit-agents": patch +--- + +add max_job_memory_usage and will kill the job if it exceeds the limit diff --git a/livekit-agents/livekit/agents/worker.py b/livekit-agents/livekit/agents/worker.py index a9a6c39b3..821337b8e 100644 --- a/livekit-agents/livekit/agents/worker.py +++ b/livekit-agents/livekit/agents/worker.py @@ -38,11 +38,13 @@ import aiohttp import jwt +import psutil from livekit import api, rtc from livekit.protocol import agent, models from . import http_server, ipc, utils from ._exceptions import AssignmentTimeoutError +from .ipc.proc_job_executor import ProcJobExecutor from .job import ( JobAcceptArguments, JobContext, @@ -158,6 +160,11 @@ class WorkerOptions: Defaults to 0.75 on "production" mode, and is disabled in "development" mode. """ + max_job_memory_usage: float = 0 + """Maximum memory usage for a job in MB, the job process will be killed if it exceeds this limit. + Defaults to 0 (disabled). + """ + """Number of idle processes to keep warm.""" num_idle_processes: int | _WorkerEnvOption[int] = _WorkerEnvOption( dev_default=0, prod_default=3 ) @@ -298,6 +305,18 @@ async def run(self): self._main_task, asyncio.create_task(self._http_server.run(), name="http_server"), ] + + # start the memory monitoring task + if self._opts.max_job_memory_usage > 0: + if self._opts.job_executor_type != JobExecutorType.PROCESS: + logger.warning( + "max_job_memory_usage is only supported for process-based job executors" + ) + else: + tasks.append( + asyncio.create_task(self._monitor_memory(), name="memory_monitor") + ) + try: await asyncio.gather(*tasks) finally: @@ -781,3 +800,54 @@ async def _update_job_status(self, proc: ipc.job_executor.JobExecutor) -> None: ) msg = agent.WorkerMessage(update_job=update) await self._queue_msg(msg) + + async def _monitor_memory(self) -> None: + """Monitor memory usage of active jobs and kill those exceeding the limit.""" + while not self._closed: + for proc in self._proc_pool.processes: + if not proc.running_job or not isinstance(proc, ProcJobExecutor): + continue + + try: + # Get process memory info + process = psutil.Process(proc.pid) + memory_info = process.memory_info() + memory_mb = memory_info.rss / (1024 * 1024) # Convert to MB + if memory_mb > self._opts.max_job_memory_usage: + logger.error( + "Job exceeded memory limit, killing process", + extra={ + "pid": proc.pid, + "job_id": proc.running_job.job.id, + "memory_usage_mb": memory_mb, + "memory_limit_mb": self._opts.max_job_memory_usage, + }, + ) + await asyncio.wait_for(proc.kill(), timeout=10.0) + logger.info( + "Process killed after exceeding memory limit", + extra={"pid": proc.pid, "job_id": proc.running_job.job.id}, + ) + except (psutil.NoSuchProcess, psutil.AccessDenied) as e: + logger.warning( + "Failed to get memory info for process", + extra={ + "pid": proc.pid, + "job_id": proc.running_job.job.id, + "error": str(e), + }, + ) + except asyncio.TimeoutError: + logger.error( + "Failed to kill process after 10 seconds", + extra={"pid": proc.pid, "job_id": proc.running_job.job.id}, + ) + except Exception as e: + if self._closed: + return + + logger.exception( + "Error in memory monitoring task", extra={"error": str(e)} + ) + + await asyncio.sleep(5) # Check every 5 seconds From 87811f5754163818c3a49fa2caaf289dc95d0dfb Mon Sep 17 00:00:00 2001 From: Long Chen Date: Tue, 26 Nov 2024 20:56:16 +0800 Subject: [PATCH 2/5] fix: move memory monitor to the proc_job_executor --- .../livekit/agents/ipc/job_executor.py | 4 + .../livekit/agents/ipc/proc_job_executor.py | 56 ++++++++++++++ .../livekit/agents/ipc/proc_pool.py | 4 +- livekit-agents/livekit/agents/worker.py | 75 +++---------------- 4 files changed, 73 insertions(+), 66 deletions(-) diff --git a/livekit-agents/livekit/agents/ipc/job_executor.py b/livekit-agents/livekit/agents/ipc/job_executor.py index 19704791a..7d546bd2e 100644 --- a/livekit-agents/livekit/agents/ipc/job_executor.py +++ b/livekit-agents/livekit/agents/ipc/job_executor.py @@ -58,3 +58,7 @@ class JobExecutorError_Unresponsive(JobExecutorError): class JobExecutorError_Runtime(JobExecutorError): pass + + +class JobExecutorError_MemoryLimitExceeded(JobExecutorError): + pass diff --git a/livekit-agents/livekit/agents/ipc/proc_job_executor.py b/livekit-agents/livekit/agents/ipc/proc_job_executor.py index 2a956d947..669073c4a 100644 --- a/livekit-agents/livekit/agents/ipc/proc_job_executor.py +++ b/livekit-agents/livekit/agents/ipc/proc_job_executor.py @@ -11,12 +11,15 @@ from multiprocessing.context import BaseContext from typing import Any, Awaitable, Callable +import psutil + from .. import utils from ..job import JobContext, JobProcess, RunningJobInfo from ..log import logger from ..utils.aio import duplex_unix from . import channel, job_main, proc_lazy_main, proto from .job_executor import ( + JobExecutorError_MemoryLimitExceeded, JobExecutorError_Runtime, JobExecutorError_ShutdownTimeout, JobExecutorError_Unresponsive, @@ -73,6 +76,7 @@ class _ProcOpts: mp_ctx: BaseContext initialize_timeout: float close_timeout: float + max_job_memory_usage: float class ProcJobExecutor: @@ -85,6 +89,7 @@ def __init__( close_timeout: float, mp_ctx: BaseContext, loop: asyncio.AbstractEventLoop, + max_job_memory_usage: float = 0, ) -> None: self._loop = loop self._opts = _ProcOpts( @@ -93,6 +98,7 @@ def __init__( initialize_timeout=initialize_timeout, close_timeout=close_timeout, mp_ctx=mp_ctx, + max_job_memory_usage=max_job_memory_usage, ) self._user_args: Any | None = None @@ -335,11 +341,19 @@ async def _main_task(self) -> None: ping_task = asyncio.create_task(self._ping_pong_task(pong_timeout)) monitor_task = asyncio.create_task(self._monitor_task(pong_timeout)) + if self._opts.max_job_memory_usage > 0: + memory_monitor_task = asyncio.create_task(self._memory_monitor_task()) + else: + memory_monitor_task = None + await self._join_fut self._exitcode = self._proc.exitcode self._proc.close() await utils.aio.gracefully_cancel(ping_task, monitor_task) + if memory_monitor_task: + await utils.aio.gracefully_cancel(memory_monitor_task) + with contextlib.suppress(duplex_unix.DuplexClosed): await self._pch.aclose() @@ -403,6 +417,48 @@ async def _pong_timeout_co(): finally: await utils.aio.gracefully_cancel(*tasks) + @utils.log_exceptions(logger=logger) + async def _memory_monitor_task(self) -> None: + """Monitor memory usage and kill the process if it exceeds the limit.""" + while not self._closing and not self._kill_sent: + try: + if not self._pid or not self._running_job: + await asyncio.sleep(5) + continue + + # Get process memory info + process = psutil.Process(self._pid) + memory_info = process.memory_info() + memory_mb = memory_info.rss / (1024 * 1024) # Convert to MB + + if memory_mb > self._opts.max_job_memory_usage: + logger.error( + "Job exceeded memory limit, killing job", + extra={ + "memory_usage_mb": memory_mb, + "memory_limit_mb": self._opts.max_job_memory_usage, + **self.logging_extra(), + }, + ) + self._exception = JobExecutorError_MemoryLimitExceeded() + self._send_kill_signal() + + except (psutil.NoSuchProcess, psutil.AccessDenied) as e: + logger.warning( + "Failed to get memory info for process", + extra={"error": str(e), **self.logging_extra()}, + ) + except Exception as e: + if self._closing or self._kill_sent: + return + + logger.exception( + "Error in memory monitoring task", + extra={"error": str(e), **self.logging_extra()}, + ) + + await asyncio.sleep(5) # Check every 5 seconds + def logging_extra(self): extra: dict[str, Any] = { "pid": self.pid, diff --git a/livekit-agents/livekit/agents/ipc/proc_pool.py b/livekit-agents/livekit/agents/ipc/proc_pool.py index d707987ab..d6c3f0914 100644 --- a/livekit-agents/livekit/agents/ipc/proc_pool.py +++ b/livekit-agents/livekit/agents/ipc/proc_pool.py @@ -34,6 +34,7 @@ def __init__( job_executor_type: JobExecutorType, mp_ctx: BaseContext, loop: asyncio.AbstractEventLoop, + max_job_memory_usage: float = 0, ) -> None: super().__init__() self._job_executor_type = job_executor_type @@ -43,7 +44,7 @@ def __init__( self._close_timeout = close_timeout self._initialize_timeout = initialize_timeout self._loop = loop - + self._max_job_memory_usage = max_job_memory_usage self._num_idle_processes = num_idle_processes self._init_sem = asyncio.Semaphore(MAX_CONCURRENT_INITIALIZATIONS) self._proc_needed_sem = asyncio.Semaphore(num_idle_processes) @@ -110,6 +111,7 @@ async def _proc_watch_task(self) -> None: close_timeout=self._close_timeout, mp_ctx=self._mp_ctx, loop=self._loop, + max_job_memory_usage=self._max_job_memory_usage, ) else: raise ValueError(f"unsupported job executor: {self._job_executor_type}") diff --git a/livekit-agents/livekit/agents/worker.py b/livekit-agents/livekit/agents/worker.py index 821337b8e..7b1ce0a46 100644 --- a/livekit-agents/livekit/agents/worker.py +++ b/livekit-agents/livekit/agents/worker.py @@ -38,13 +38,11 @@ import aiohttp import jwt -import psutil from livekit import api, rtc from livekit.protocol import agent, models from . import http_server, ipc, utils from ._exceptions import AssignmentTimeoutError -from .ipc.proc_job_executor import ProcJobExecutor from .job import ( JobAcceptArguments, JobContext, @@ -241,6 +239,15 @@ def __init__( "api_secret is required, or add LIVEKIT_API_SECRET in your environment" ) + if ( + opts.max_job_memory_usage > 0 + and opts.job_executor_type != JobExecutorType.PROCESS + ): + logger.warning( + "max_job_memory_usage is only supported for process-based job executors, " + "ignoring max_job_memory_usage" + ) + self._opts = opts self._loop = loop or asyncio.get_event_loop() @@ -266,6 +273,7 @@ def __init__( mp_ctx=mp_ctx, initialize_timeout=opts.initialize_process_timeout, close_timeout=opts.shutdown_process_timeout, + max_job_memory_usage=opts.max_job_memory_usage, ) self._proc_pool.on("process_started", self._on_process_started) self._proc_pool.on("process_closed", self._on_process_closed) @@ -305,18 +313,6 @@ async def run(self): self._main_task, asyncio.create_task(self._http_server.run(), name="http_server"), ] - - # start the memory monitoring task - if self._opts.max_job_memory_usage > 0: - if self._opts.job_executor_type != JobExecutorType.PROCESS: - logger.warning( - "max_job_memory_usage is only supported for process-based job executors" - ) - else: - tasks.append( - asyncio.create_task(self._monitor_memory(), name="memory_monitor") - ) - try: await asyncio.gather(*tasks) finally: @@ -800,54 +796,3 @@ async def _update_job_status(self, proc: ipc.job_executor.JobExecutor) -> None: ) msg = agent.WorkerMessage(update_job=update) await self._queue_msg(msg) - - async def _monitor_memory(self) -> None: - """Monitor memory usage of active jobs and kill those exceeding the limit.""" - while not self._closed: - for proc in self._proc_pool.processes: - if not proc.running_job or not isinstance(proc, ProcJobExecutor): - continue - - try: - # Get process memory info - process = psutil.Process(proc.pid) - memory_info = process.memory_info() - memory_mb = memory_info.rss / (1024 * 1024) # Convert to MB - if memory_mb > self._opts.max_job_memory_usage: - logger.error( - "Job exceeded memory limit, killing process", - extra={ - "pid": proc.pid, - "job_id": proc.running_job.job.id, - "memory_usage_mb": memory_mb, - "memory_limit_mb": self._opts.max_job_memory_usage, - }, - ) - await asyncio.wait_for(proc.kill(), timeout=10.0) - logger.info( - "Process killed after exceeding memory limit", - extra={"pid": proc.pid, "job_id": proc.running_job.job.id}, - ) - except (psutil.NoSuchProcess, psutil.AccessDenied) as e: - logger.warning( - "Failed to get memory info for process", - extra={ - "pid": proc.pid, - "job_id": proc.running_job.job.id, - "error": str(e), - }, - ) - except asyncio.TimeoutError: - logger.error( - "Failed to kill process after 10 seconds", - extra={"pid": proc.pid, "job_id": proc.running_job.job.id}, - ) - except Exception as e: - if self._closed: - return - - logger.exception( - "Error in memory monitoring task", extra={"error": str(e)} - ) - - await asyncio.sleep(5) # Check every 5 seconds From 8044d5cc4aa3a4bda0592a5c3ec59ecda92a8e87 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Wed, 27 Nov 2024 10:31:04 +0800 Subject: [PATCH 3/5] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Théo Monnom --- livekit-agents/livekit/agents/ipc/proc_job_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/livekit-agents/livekit/agents/ipc/proc_job_executor.py b/livekit-agents/livekit/agents/ipc/proc_job_executor.py index 669073c4a..edc4be67f 100644 --- a/livekit-agents/livekit/agents/ipc/proc_job_executor.py +++ b/livekit-agents/livekit/agents/ipc/proc_job_executor.py @@ -446,7 +446,7 @@ async def _memory_monitor_task(self) -> None: except (psutil.NoSuchProcess, psutil.AccessDenied) as e: logger.warning( "Failed to get memory info for process", - extra={"error": str(e), **self.logging_extra()}, + extra=self.logging_extra(), exc_info=e, ) except Exception as e: if self._closing or self._kill_sent: @@ -454,7 +454,7 @@ async def _memory_monitor_task(self) -> None: logger.exception( "Error in memory monitoring task", - extra={"error": str(e), **self.logging_extra()}, + extra=self.logging_extra(), ) await asyncio.sleep(5) # Check every 5 seconds From 0f8a38ae66936073bde1325d4cc0024c247c131b Mon Sep 17 00:00:00 2001 From: Long Chen Date: Wed, 27 Nov 2024 10:36:04 +0800 Subject: [PATCH 4/5] chore: fix lint --- livekit-agents/livekit/agents/ipc/proc_job_executor.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/livekit-agents/livekit/agents/ipc/proc_job_executor.py b/livekit-agents/livekit/agents/ipc/proc_job_executor.py index edc4be67f..43cf19bd0 100644 --- a/livekit-agents/livekit/agents/ipc/proc_job_executor.py +++ b/livekit-agents/livekit/agents/ipc/proc_job_executor.py @@ -446,9 +446,10 @@ async def _memory_monitor_task(self) -> None: except (psutil.NoSuchProcess, psutil.AccessDenied) as e: logger.warning( "Failed to get memory info for process", - extra=self.logging_extra(), exc_info=e, + extra=self.logging_extra(), + exc_info=e, ) - except Exception as e: + except Exception: if self._closing or self._kill_sent: return From 37e3ed198437393fd13dfa948a1af0051863e359 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Wed, 27 Nov 2024 17:04:24 +0800 Subject: [PATCH 5/5] fix: add job_memory_warn_mb --- .../livekit/agents/ipc/proc_job_executor.py | 31 +++++++++++++++---- .../livekit/agents/ipc/proc_pool.py | 9 ++++-- livekit-agents/livekit/agents/worker.py | 11 +++++-- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/livekit-agents/livekit/agents/ipc/proc_job_executor.py b/livekit-agents/livekit/agents/ipc/proc_job_executor.py index 43cf19bd0..a42f11ce4 100644 --- a/livekit-agents/livekit/agents/ipc/proc_job_executor.py +++ b/livekit-agents/livekit/agents/ipc/proc_job_executor.py @@ -76,7 +76,8 @@ class _ProcOpts: mp_ctx: BaseContext initialize_timeout: float close_timeout: float - max_job_memory_usage: float + job_memory_warn_mb: float + job_memory_limit_mb: float class ProcJobExecutor: @@ -89,7 +90,8 @@ def __init__( close_timeout: float, mp_ctx: BaseContext, loop: asyncio.AbstractEventLoop, - max_job_memory_usage: float = 0, + job_memory_warn_mb: float = 0, + job_memory_limit_mb: float = 0, ) -> None: self._loop = loop self._opts = _ProcOpts( @@ -98,7 +100,8 @@ def __init__( initialize_timeout=initialize_timeout, close_timeout=close_timeout, mp_ctx=mp_ctx, - max_job_memory_usage=max_job_memory_usage, + job_memory_warn_mb=job_memory_warn_mb, + job_memory_limit_mb=job_memory_limit_mb, ) self._user_args: Any | None = None @@ -341,7 +344,7 @@ async def _main_task(self) -> None: ping_task = asyncio.create_task(self._ping_pong_task(pong_timeout)) monitor_task = asyncio.create_task(self._monitor_task(pong_timeout)) - if self._opts.max_job_memory_usage > 0: + if self._opts.job_memory_limit_mb > 0 or self._opts.job_memory_warn_mb > 0: memory_monitor_task = asyncio.create_task(self._memory_monitor_task()) else: memory_monitor_task = None @@ -431,17 +434,33 @@ async def _memory_monitor_task(self) -> None: memory_info = process.memory_info() memory_mb = memory_info.rss / (1024 * 1024) # Convert to MB - if memory_mb > self._opts.max_job_memory_usage: + if ( + self._opts.job_memory_limit_mb > 0 + and memory_mb > self._opts.job_memory_limit_mb + ): logger.error( "Job exceeded memory limit, killing job", extra={ "memory_usage_mb": memory_mb, - "memory_limit_mb": self._opts.max_job_memory_usage, + "memory_limit_mb": self._opts.job_memory_limit_mb, **self.logging_extra(), }, ) self._exception = JobExecutorError_MemoryLimitExceeded() self._send_kill_signal() + elif ( + self._opts.job_memory_warn_mb > 0 + and memory_mb > self._opts.job_memory_warn_mb + ): + logger.warning( + "Job memory usage is high", + extra={ + "memory_usage_mb": memory_mb, + "memory_warn_mb": self._opts.job_memory_warn_mb, + "memory_limit_mb": self._opts.job_memory_limit_mb, + **self.logging_extra(), + }, + ) except (psutil.NoSuchProcess, psutil.AccessDenied) as e: logger.warning( diff --git a/livekit-agents/livekit/agents/ipc/proc_pool.py b/livekit-agents/livekit/agents/ipc/proc_pool.py index d6c3f0914..a67a42d1d 100644 --- a/livekit-agents/livekit/agents/ipc/proc_pool.py +++ b/livekit-agents/livekit/agents/ipc/proc_pool.py @@ -34,7 +34,8 @@ def __init__( job_executor_type: JobExecutorType, mp_ctx: BaseContext, loop: asyncio.AbstractEventLoop, - max_job_memory_usage: float = 0, + job_memory_warn_mb: float = 0, + job_memory_limit_mb: float = 0, ) -> None: super().__init__() self._job_executor_type = job_executor_type @@ -44,7 +45,8 @@ def __init__( self._close_timeout = close_timeout self._initialize_timeout = initialize_timeout self._loop = loop - self._max_job_memory_usage = max_job_memory_usage + self._job_memory_limit_mb = job_memory_limit_mb + self._job_memory_warn_mb = job_memory_warn_mb self._num_idle_processes = num_idle_processes self._init_sem = asyncio.Semaphore(MAX_CONCURRENT_INITIALIZATIONS) self._proc_needed_sem = asyncio.Semaphore(num_idle_processes) @@ -111,7 +113,8 @@ async def _proc_watch_task(self) -> None: close_timeout=self._close_timeout, mp_ctx=self._mp_ctx, loop=self._loop, - max_job_memory_usage=self._max_job_memory_usage, + job_memory_warn_mb=self._job_memory_warn_mb, + job_memory_limit_mb=self._job_memory_limit_mb, ) else: raise ValueError(f"unsupported job executor: {self._job_executor_type}") diff --git a/livekit-agents/livekit/agents/worker.py b/livekit-agents/livekit/agents/worker.py index 7b1ce0a46..e9e0899f7 100644 --- a/livekit-agents/livekit/agents/worker.py +++ b/livekit-agents/livekit/agents/worker.py @@ -158,10 +158,14 @@ class WorkerOptions: Defaults to 0.75 on "production" mode, and is disabled in "development" mode. """ - max_job_memory_usage: float = 0 + + job_memory_warn_mb: float = 300 + """Memory warning threshold in MB. If the job process exceeds this limit, a warning will be logged.""" + job_memory_limit_mb: float = 0 """Maximum memory usage for a job in MB, the job process will be killed if it exceeds this limit. Defaults to 0 (disabled). """ + """Number of idle processes to keep warm.""" num_idle_processes: int | _WorkerEnvOption[int] = _WorkerEnvOption( dev_default=0, prod_default=3 @@ -240,7 +244,7 @@ def __init__( ) if ( - opts.max_job_memory_usage > 0 + opts.job_memory_limit_mb > 0 and opts.job_executor_type != JobExecutorType.PROCESS ): logger.warning( @@ -273,7 +277,8 @@ def __init__( mp_ctx=mp_ctx, initialize_timeout=opts.initialize_process_timeout, close_timeout=opts.shutdown_process_timeout, - max_job_memory_usage=opts.max_job_memory_usage, + job_memory_warn_mb=opts.job_memory_warn_mb, + job_memory_limit_mb=opts.job_memory_limit_mb, ) self._proc_pool.on("process_started", self._on_process_started) self._proc_pool.on("process_closed", self._on_process_closed)