Skip to content

Commit

Permalink
fix: add job_memory_warn_mb
Browse files Browse the repository at this point in the history
  • Loading branch information
longcw committed Nov 27, 2024
1 parent 0f8a38a commit 37e3ed1
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 12 deletions.
31 changes: 25 additions & 6 deletions livekit-agents/livekit/agents/ipc/proc_job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class _ProcOpts:
mp_ctx: BaseContext
initialize_timeout: float
close_timeout: float
max_job_memory_usage: float
job_memory_warn_mb: float
job_memory_limit_mb: float


class ProcJobExecutor:
Expand All @@ -89,7 +90,8 @@ def __init__(
close_timeout: float,
mp_ctx: BaseContext,
loop: asyncio.AbstractEventLoop,
max_job_memory_usage: float = 0,
job_memory_warn_mb: float = 0,
job_memory_limit_mb: float = 0,
) -> None:
self._loop = loop
self._opts = _ProcOpts(
Expand All @@ -98,7 +100,8 @@ def __init__(
initialize_timeout=initialize_timeout,
close_timeout=close_timeout,
mp_ctx=mp_ctx,
max_job_memory_usage=max_job_memory_usage,
job_memory_warn_mb=job_memory_warn_mb,
job_memory_limit_mb=job_memory_limit_mb,
)

self._user_args: Any | None = None
Expand Down Expand Up @@ -341,7 +344,7 @@ async def _main_task(self) -> None:
ping_task = asyncio.create_task(self._ping_pong_task(pong_timeout))
monitor_task = asyncio.create_task(self._monitor_task(pong_timeout))

if self._opts.max_job_memory_usage > 0:
if self._opts.job_memory_limit_mb > 0 or self._opts.job_memory_warn_mb > 0:
memory_monitor_task = asyncio.create_task(self._memory_monitor_task())
else:
memory_monitor_task = None
Expand Down Expand Up @@ -431,17 +434,33 @@ async def _memory_monitor_task(self) -> None:
memory_info = process.memory_info()
memory_mb = memory_info.rss / (1024 * 1024) # Convert to MB

if memory_mb > self._opts.max_job_memory_usage:
if (
self._opts.job_memory_limit_mb > 0
and memory_mb > self._opts.job_memory_limit_mb
):
logger.error(
"Job exceeded memory limit, killing job",
extra={
"memory_usage_mb": memory_mb,
"memory_limit_mb": self._opts.max_job_memory_usage,
"memory_limit_mb": self._opts.job_memory_limit_mb,
**self.logging_extra(),
},
)
self._exception = JobExecutorError_MemoryLimitExceeded()
self._send_kill_signal()
elif (
self._opts.job_memory_warn_mb > 0
and memory_mb > self._opts.job_memory_warn_mb
):
logger.warning(
"Job memory usage is high",
extra={
"memory_usage_mb": memory_mb,
"memory_warn_mb": self._opts.job_memory_warn_mb,
"memory_limit_mb": self._opts.job_memory_limit_mb,
**self.logging_extra(),
},
)

except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
logger.warning(
Expand Down
9 changes: 6 additions & 3 deletions livekit-agents/livekit/agents/ipc/proc_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def __init__(
job_executor_type: JobExecutorType,
mp_ctx: BaseContext,
loop: asyncio.AbstractEventLoop,
max_job_memory_usage: float = 0,
job_memory_warn_mb: float = 0,
job_memory_limit_mb: float = 0,
) -> None:
super().__init__()
self._job_executor_type = job_executor_type
Expand All @@ -44,7 +45,8 @@ def __init__(
self._close_timeout = close_timeout
self._initialize_timeout = initialize_timeout
self._loop = loop
self._max_job_memory_usage = max_job_memory_usage
self._job_memory_limit_mb = job_memory_limit_mb
self._job_memory_warn_mb = job_memory_warn_mb
self._num_idle_processes = num_idle_processes
self._init_sem = asyncio.Semaphore(MAX_CONCURRENT_INITIALIZATIONS)
self._proc_needed_sem = asyncio.Semaphore(num_idle_processes)
Expand Down Expand Up @@ -111,7 +113,8 @@ async def _proc_watch_task(self) -> None:
close_timeout=self._close_timeout,
mp_ctx=self._mp_ctx,
loop=self._loop,
max_job_memory_usage=self._max_job_memory_usage,
job_memory_warn_mb=self._job_memory_warn_mb,
job_memory_limit_mb=self._job_memory_limit_mb,
)
else:
raise ValueError(f"unsupported job executor: {self._job_executor_type}")
Expand Down
11 changes: 8 additions & 3 deletions livekit-agents/livekit/agents/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,14 @@ class WorkerOptions:
Defaults to 0.75 on "production" mode, and is disabled in "development" mode.
"""
max_job_memory_usage: float = 0

job_memory_warn_mb: float = 300
"""Memory warning threshold in MB. If the job process exceeds this limit, a warning will be logged."""
job_memory_limit_mb: float = 0
"""Maximum memory usage for a job in MB, the job process will be killed if it exceeds this limit.
Defaults to 0 (disabled).
"""

"""Number of idle processes to keep warm."""
num_idle_processes: int | _WorkerEnvOption[int] = _WorkerEnvOption(
dev_default=0, prod_default=3
Expand Down Expand Up @@ -240,7 +244,7 @@ def __init__(
)

if (
opts.max_job_memory_usage > 0
opts.job_memory_limit_mb > 0
and opts.job_executor_type != JobExecutorType.PROCESS
):
logger.warning(
Expand Down Expand Up @@ -273,7 +277,8 @@ def __init__(
mp_ctx=mp_ctx,
initialize_timeout=opts.initialize_process_timeout,
close_timeout=opts.shutdown_process_timeout,
max_job_memory_usage=opts.max_job_memory_usage,
job_memory_warn_mb=opts.job_memory_warn_mb,
job_memory_limit_mb=opts.job_memory_limit_mb,
)
self._proc_pool.on("process_started", self._on_process_started)
self._proc_pool.on("process_closed", self._on_process_closed)
Expand Down

0 comments on commit 37e3ed1

Please sign in to comment.