Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: optimize availability of vLLM #2046

Merged
merged 4 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions xinference/core/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ async def __pre_destroy__(self):
f"Destroy scheduler actor failed, address: {self.address}, error: {e}"
)

if hasattr(self._model, "stop") and callable(self._model.stop):
self._model.stop()

if (
isinstance(self._model, (LLMPytorchModel, LLMVLLMModel))
and self._model.model_spec.model_format == "pytorch"
Expand Down
2 changes: 2 additions & 0 deletions xinference/core/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ async def recover_sub_pool(self, address):
except Exception as e:
# Report callback error can be log and ignore, should not interrupt the Process
logger.error("report_event error: %s" % (e))
finally:
del event_model_uid

self._model_uid_to_recover_count[model_uid] = (
recover_count - 1
Expand Down
38 changes: 38 additions & 0 deletions xinference/model/llm/vllm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import json
import logging
import multiprocessing
import os
import time
import uuid
from typing import (
Expand Down Expand Up @@ -240,6 +242,42 @@ def load(self):
)
self._engine = AsyncLLMEngine.from_engine_args(engine_args)

self._check_health_task = None
if hasattr(self._engine, "check_health"):
# vLLM introduced `check_health` since v0.4.1
self._check_health_task = asyncio.create_task(self._check_healthy())

def stop(self):
# though the vLLM engine will shutdown when deleted,
# but some issue e.g. GH#1682 reported
# when deleting, the engine exists still
logger.info("Stopping vLLM engine")
if self._check_health_task:
self._check_health_task.cancel()
if model_executor := getattr(self._engine, "model_executor", None):
model_executor.shutdown()
self._engine = None

async def _check_healthy(self, interval: int = 30):
from vllm.engine.async_llm_engine import AsyncEngineDeadError

logger.debug("Begin to check health of vLLM")

while self._engine is not None:
try:
await self._engine.check_health()
except (AsyncEngineDeadError, RuntimeError):
logger.info("Detecting vLLM is not health, prepare to quit the process")
try:
self.stop()
except:
# ignore error when stop
pass
# Just kill the process and let xinference auto-recover the model
os._exit(1)
else:
await asyncio.sleep(interval)

def _sanitize_model_config(
self, model_config: Optional[VLLMModelConfig]
) -> VLLMModelConfig:
Expand Down
Loading