Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add max_job_memory_usage for worker #1136

Merged
merged 5 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/ninety-fans-jump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-agents": patch
---

add max_job_memory_usage and will kill the job if it exceeds the limit
70 changes: 70 additions & 0 deletions livekit-agents/livekit/agents/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@

import aiohttp
import jwt
import psutil
from livekit import api, rtc
from livekit.protocol import agent, models

from . import http_server, ipc, utils
from ._exceptions import AssignmentTimeoutError
from .ipc.proc_job_executor import ProcJobExecutor
from .job import (
JobAcceptArguments,
JobContext,
Expand Down Expand Up @@ -158,6 +160,11 @@ class WorkerOptions:

Defaults to 0.75 on "production" mode, and is disabled in "development" mode.
"""
max_job_memory_usage: float = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put something like 300MB by default?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to leave it empty and let the users to decide.

This might be a breaking change if there is a default limit. I have tested that the minimal assistant agent the RAM usage is around 150M, but it could be higher if users load some other data or publishing/receiving videos. Those agents will be killed unexpectedly after upgrading.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes sense to not break existing apps. Is it possible to have a "warn" mode where we initially let folks know about the high memory usage without killing the requests? Since most users don't pay attention to memory, making memory issues clearly visible would be beneficial.

maybe we should have two separate options?

  • job_memory_warn
  • job_memory_limit

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job_memory_warn_mb (defaults to 300) and job_memory_limit_mb are added.

"""Maximum memory usage for a job in MB, the job process will be killed if it exceeds this limit.
Defaults to 0 (disabled).
"""
"""Number of idle processes to keep warm."""
num_idle_processes: int | _WorkerEnvOption[int] = _WorkerEnvOption(
dev_default=0, prod_default=3
)
Expand Down Expand Up @@ -298,6 +305,18 @@ async def run(self):
self._main_task,
asyncio.create_task(self._http_server.run(), name="http_server"),
]

# start the memory monitoring task
if self._opts.max_job_memory_usage > 0:
if self._opts.job_executor_type != JobExecutorType.PROCESS:
logger.warning(
"max_job_memory_usage is only supported for process-based job executors"
)
else:
tasks.append(
asyncio.create_task(self._monitor_memory(), name="memory_monitor")
)

try:
await asyncio.gather(*tasks)
finally:
Expand Down Expand Up @@ -781,3 +800,54 @@ async def _update_job_status(self, proc: ipc.job_executor.JobExecutor) -> None:
)
msg = agent.WorkerMessage(update_job=update)
await self._queue_msg(msg)

async def _monitor_memory(self) -> None:
longcw marked this conversation as resolved.
Show resolved Hide resolved
"""Monitor memory usage of active jobs and kill those exceeding the limit."""
while not self._closed:
for proc in self._proc_pool.processes:
if not proc.running_job or not isinstance(proc, ProcJobExecutor):
continue

try:
# Get process memory info
process = psutil.Process(proc.pid)
memory_info = process.memory_info()
memory_mb = memory_info.rss / (1024 * 1024) # Convert to MB
if memory_mb > self._opts.max_job_memory_usage:
logger.error(
"Job exceeded memory limit, killing process",
extra={
"pid": proc.pid,
"job_id": proc.running_job.job.id,
"memory_usage_mb": memory_mb,
"memory_limit_mb": self._opts.max_job_memory_usage,
},
)
await asyncio.wait_for(proc.kill(), timeout=10.0)
logger.info(
"Process killed after exceeding memory limit",
extra={"pid": proc.pid, "job_id": proc.running_job.job.id},
)
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
logger.warning(
"Failed to get memory info for process",
extra={
"pid": proc.pid,
"job_id": proc.running_job.job.id,
"error": str(e),
},
)
except asyncio.TimeoutError:
logger.error(
"Failed to kill process after 10 seconds",
extra={"pid": proc.pid, "job_id": proc.running_job.job.id},
)
except Exception as e:
if self._closed:
return

logger.exception(
"Error in memory monitoring task", extra={"error": str(e)}
)

await asyncio.sleep(5) # Check every 5 seconds
Loading