diff --git a/xinference/constants.py b/xinference/constants.py index 61652357d2..7978cb218b 100644 --- a/xinference/constants.py +++ b/xinference/constants.py @@ -63,6 +63,7 @@ def get_xinference_home() -> str: XINFERENCE_DEFAULT_LOG_FILE_NAME = "xinference.log" XINFERENCE_LOG_MAX_BYTES = 100 * 1024 * 1024 XINFERENCE_LOG_BACKUP_COUNT = 30 +XINFERENCE_LOG_ARG_MAX_LENGTH = 100 XINFERENCE_HEALTH_CHECK_FAILURE_THRESHOLD = int( os.environ.get(XINFERENCE_ENV_HEALTH_CHECK_FAILURE_THRESHOLD, 5) ) diff --git a/xinference/core/model.py b/xinference/core/model.py index dd7785bf58..6da34d3fde 100644 --- a/xinference/core/model.py +++ b/xinference/core/model.py @@ -19,6 +19,7 @@ import os import time import types +import uuid import weakref from asyncio.queues import Queue from asyncio.tasks import wait_for @@ -444,18 +445,30 @@ async def _call_wrapper(self, output_type: str, fn: Callable, *args, **kwargs): @log_async(logger=logger) async def generate(self, prompt: str, *args, **kwargs): if self.allow_batching(): + # not support request_id + kwargs.pop("request_id", None) return await self.handle_batching_request( prompt, "generate", *args, **kwargs ) else: kwargs.pop("raw_params", None) if hasattr(self._model, "generate"): + # not support request_id + kwargs.pop("request_id", None) return await self._call_wrapper_json( self._model.generate, prompt, *args, **kwargs ) if hasattr(self._model, "async_generate"): + if "request_id" not in kwargs: + kwargs["request_id"] = str(uuid.uuid1()) + else: + # model only accept string + kwargs["request_id"] = str(kwargs["request_id"]) return await self._call_wrapper_json( - self._model.async_generate, prompt, *args, **kwargs + self._model.async_generate, + prompt, + *args, + **kwargs, ) raise AttributeError(f"Model {self._model.model_spec} is not for generate.") @@ -534,17 +547,26 @@ async def chat(self, messages: List[Dict], *args, **kwargs): response = None try: if self.allow_batching(): + # not support request_id + kwargs.pop("request_id", None) return await self.handle_batching_request( messages, "chat", *args, **kwargs ) else: kwargs.pop("raw_params", None) if hasattr(self._model, "chat"): + # not support request_id + kwargs.pop("request_id", None) response = await self._call_wrapper_json( self._model.chat, messages, *args, **kwargs ) return response if hasattr(self._model, "async_chat"): + if "request_id" not in kwargs: + kwargs["request_id"] = str(uuid.uuid1()) + else: + # model only accept string + kwargs["request_id"] = str(kwargs["request_id"]) response = await self._call_wrapper_json( self._model.async_chat, messages, *args, **kwargs ) @@ -577,9 +599,10 @@ async def abort_request(self, request_id: str) -> str: return await self._scheduler_ref.abort_request(request_id) return AbortRequestMessage.NO_OP.name - @log_async(logger=logger) @request_limit + @log_async(logger=logger) async def create_embedding(self, input: Union[str, List[str]], *args, **kwargs): + kwargs.pop("request_id", None) if hasattr(self._model, "create_embedding"): return await self._call_wrapper_json( self._model.create_embedding, input, *args, **kwargs @@ -589,8 +612,8 @@ async def create_embedding(self, input: Union[str, List[str]], *args, **kwargs): f"Model {self._model.model_spec} is not for creating embedding." ) - @log_async(logger=logger) @request_limit + @log_async(logger=logger) async def rerank( self, documents: List[str], @@ -602,6 +625,7 @@ async def rerank( *args, **kwargs, ): + kwargs.pop("request_id", None) if hasattr(self._model, "rerank"): return await self._call_wrapper_json( self._model.rerank, @@ -616,8 +640,8 @@ async def rerank( ) raise AttributeError(f"Model {self._model.model_spec} is not for reranking.") - @log_async(logger=logger, args_formatter=lambda _, kwargs: kwargs.pop("audio")) @request_limit + @log_async(logger=logger, ignore_kwargs=["audio"]) async def transcriptions( self, audio: bytes, @@ -626,7 +650,9 @@ async def transcriptions( response_format: str = "json", temperature: float = 0, timestamp_granularities: Optional[List[str]] = None, + **kwargs, ): + kwargs.pop("request_id", None) if hasattr(self._model, "transcriptions"): return await self._call_wrapper_json( self._model.transcriptions, @@ -641,8 +667,8 @@ async def transcriptions( f"Model {self._model.model_spec} is not for creating transcriptions." ) - @log_async(logger=logger, args_formatter=lambda _, kwargs: kwargs.pop("audio")) @request_limit + @log_async(logger=logger, ignore_kwargs=["audio"]) async def translations( self, audio: bytes, @@ -651,7 +677,9 @@ async def translations( response_format: str = "json", temperature: float = 0, timestamp_granularities: Optional[List[str]] = None, + **kwargs, ): + kwargs.pop("request_id", None) if hasattr(self._model, "translations"): return await self._call_wrapper_json( self._model.translations, @@ -668,10 +696,7 @@ async def translations( @request_limit @xo.generator - @log_async( - logger=logger, - args_formatter=lambda _, kwargs: kwargs.pop("prompt_speech", None), - ) + @log_async(logger=logger, ignore_kwargs=["prompt_speech"]) async def speech( self, input: str, @@ -681,6 +706,7 @@ async def speech( stream: bool = False, **kwargs, ): + kwargs.pop("request_id", None) if hasattr(self._model, "speech"): return await self._call_wrapper_binary( self._model.speech, @@ -695,8 +721,8 @@ async def speech( f"Model {self._model.model_spec} is not for creating speech." ) - @log_async(logger=logger) @request_limit + @log_async(logger=logger) async def text_to_image( self, prompt: str, @@ -706,6 +732,7 @@ async def text_to_image( *args, **kwargs, ): + kwargs.pop("request_id", None) if hasattr(self._model, "text_to_image"): return await self._call_wrapper_json( self._model.text_to_image, @@ -720,6 +747,10 @@ async def text_to_image( f"Model {self._model.model_spec} is not for creating image." ) + @log_async( + logger=logger, + ignore_kwargs=["image"], + ) async def image_to_image( self, image: "PIL.Image", @@ -731,6 +762,7 @@ async def image_to_image( *args, **kwargs, ): + kwargs.pop("request_id", None) if hasattr(self._model, "image_to_image"): return await self._call_wrapper_json( self._model.image_to_image, @@ -747,6 +779,10 @@ async def image_to_image( f"Model {self._model.model_spec} is not for creating image." ) + @log_async( + logger=logger, + ignore_kwargs=["image"], + ) async def inpainting( self, image: "PIL.Image", @@ -759,6 +795,7 @@ async def inpainting( *args, **kwargs, ): + kwargs.pop("request_id", None) if hasattr(self._model, "inpainting"): return await self._call_wrapper_json( self._model.inpainting, @@ -776,12 +813,13 @@ async def inpainting( f"Model {self._model.model_spec} is not for creating image." ) - @log_async(logger=logger) @request_limit + @log_async(logger=logger, ignore_kwargs=["image"]) async def infer( self, **kwargs, ): + kwargs.pop("request_id", None) if hasattr(self._model, "infer"): return await self._call_wrapper_json( self._model.infer, @@ -791,8 +829,8 @@ async def infer( f"Model {self._model.model_spec} is not for flexible infer." ) - @log_async(logger=logger) @request_limit + @log_async(logger=logger) async def text_to_video( self, prompt: str, @@ -800,6 +838,7 @@ async def text_to_video( *args, **kwargs, ): + kwargs.pop("request_id", None) if hasattr(self._model, "text_to_video"): return await self._call_wrapper_json( self._model.text_to_video, diff --git a/xinference/core/utils.py b/xinference/core/utils.py index a110278aea..9f359fa315 100644 --- a/xinference/core/utils.py +++ b/xinference/core/utils.py @@ -11,62 +11,120 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import copy import logging import os import random import string -from typing import Dict, Generator, List, Tuple, Union +import uuid +from typing import Dict, Generator, List, Optional, Tuple, Union import orjson from pynvml import nvmlDeviceGetCount, nvmlInit, nvmlShutdown from .._compat import BaseModel +from ..constants import XINFERENCE_LOG_ARG_MAX_LENGTH logger = logging.getLogger(__name__) -def log_async(logger, args_formatter=None): +def truncate_log_arg(arg) -> str: + s = str(arg) + if len(s) > XINFERENCE_LOG_ARG_MAX_LENGTH: + s = s[0:XINFERENCE_LOG_ARG_MAX_LENGTH] + "..." + return s + + +def log_async( + logger, + level=logging.DEBUG, + ignore_kwargs: Optional[List[str]] = None, + log_exception=True, +): import time from functools import wraps def decorator(func): + func_name = func.__name__ + @wraps(func) async def wrapped(*args, **kwargs): - if args_formatter is not None: - formatted_args, formatted_kwargs = copy.copy(args), copy.copy(kwargs) - args_formatter(formatted_args, formatted_kwargs) - else: - formatted_args, formatted_kwargs = args, kwargs - logger.debug( - f"Enter {func.__name__}, args: {formatted_args}, kwargs: {formatted_kwargs}" + request_id_str = kwargs.get("request_id", "") + if not request_id_str: + request_id_str = uuid.uuid1() + request_id_str = f"[request {request_id_str}]" + formatted_args = ",".join(map(truncate_log_arg, args)) + formatted_kwargs = ",".join( + [ + "%s=%s" % (k, truncate_log_arg(v)) + for k, v in kwargs.items() + if ignore_kwargs is None or k not in ignore_kwargs + ] ) - start = time.time() - ret = await func(*args, **kwargs) - logger.debug( - f"Leave {func.__name__}, elapsed time: {int(time.time() - start)} s" + logger.log( + level, + f"{request_id_str} Enter {func_name}, args: {formatted_args}, kwargs: {formatted_kwargs}", ) - return ret + start = time.time() + try: + ret = await func(*args, **kwargs) + logger.log( + level, + f"{request_id_str} Leave {func_name}, elapsed time: {int(time.time() - start)} s", + ) + return ret + except Exception as e: + if log_exception: + logger.error( + f"{request_id_str} Leave {func_name}, error: {e}, elapsed time: {int(time.time() - start)} s", + exc_info=True, + ) + else: + logger.log( + level, + f"{request_id_str} Leave {func_name}, error: {e}, elapsed time: {int(time.time() - start)} s", + ) + raise return wrapped return decorator -def log_sync(logger): +def log_sync(logger, level=logging.DEBUG, log_exception=True): import time from functools import wraps def decorator(func): @wraps(func) def wrapped(*args, **kwargs): - logger.debug(f"Enter {func.__name__}, args: {args}, kwargs: {kwargs}") - start = time.time() - ret = func(*args, **kwargs) - logger.debug( - f"Leave {func.__name__}, elapsed time: {int(time.time() - start)} s" + formatted_args = ",".join(map(truncate_log_arg, args)) + formatted_kwargs = ",".join( + map(lambda x: "%s=%s" % (x[0], truncate_log_arg(x[1])), kwargs.items()) ) - return ret + logger.log( + level, + f"Enter {func.__name__}, args: {formatted_args}, kwargs: {formatted_kwargs}", + ) + start = time.time() + try: + ret = func(*args, **kwargs) + logger.log( + level, + f"Leave {func.__name__}, elapsed time: {int(time.time() - start)} s", + ) + return ret + except Exception as e: + if log_exception: + logger.error( + f"Leave {func.__name__}, error: {e}, elapsed time: {int(time.time() - start)} s", + exc_info=True, + ) + else: + logger.log( + level, + f"Leave {func.__name__}, error: {e}, elapsed time: {int(time.time() - start)} s", + ) + raise return wrapped diff --git a/xinference/core/worker.py b/xinference/core/worker.py index fdd8343ae4..7b2c503023 100644 --- a/xinference/core/worker.py +++ b/xinference/core/worker.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import logging import os import platform import queue @@ -770,7 +771,7 @@ async def update_cache_status( version_info["model_file_location"], ) - @log_async(logger=logger) + @log_async(logger=logger, level=logging.INFO) async def launch_builtin_model( self, model_uid: str, @@ -917,7 +918,7 @@ async def launch_builtin_model( {"model_ability": abilities, "status": LaunchStatus.READY.name}, ) - @log_async(logger=logger) + @log_async(logger=logger, level=logging.INFO) async def terminate_model(self, model_uid: str, is_model_die=False): # Terminate model while its launching is not allow if model_uid in self._model_uid_launching_guard: diff --git a/xinference/model/llm/sglang/core.py b/xinference/model/llm/sglang/core.py index b2b830d23c..578252324d 100644 --- a/xinference/model/llm/sglang/core.py +++ b/xinference/model/llm/sglang/core.py @@ -318,6 +318,7 @@ async def async_generate( self, prompt: str, generate_config: Optional[SGLANGGenerateConfig] = None, + request_id: Optional[str] = None, ) -> Union[Completion, AsyncGenerator[CompletionChunk, None]]: sanitized_generate_config = self._sanitize_generate_config(generate_config) logger.debug( @@ -331,8 +332,8 @@ async def async_generate( if isinstance(stream_options, dict) else False ) - - request_id = str(uuid.uuid1()) + if not request_id: + request_id = str(uuid.uuid1()) if not stream: state = await self._non_stream_generate(prompt, **sanitized_generate_config) return self._convert_state_to_completion( @@ -439,6 +440,7 @@ async def async_chat( self, messages: List[Dict], generate_config: Optional[Dict] = None, + request_id: Optional[str] = None, ) -> Union[ChatCompletion, AsyncGenerator[ChatCompletionChunk, None]]: assert self.model_family.chat_template is not None full_prompt = self.get_full_context(messages, self.model_family.chat_template) diff --git a/xinference/model/llm/vllm/core.py b/xinference/model/llm/vllm/core.py index 3a142c7de7..8869f7fb4a 100644 --- a/xinference/model/llm/vllm/core.py +++ b/xinference/model/llm/vllm/core.py @@ -426,6 +426,7 @@ async def async_generate( prompt: Union[str, Dict[str, Any]], generate_config: Optional[Dict] = None, tools: object = False, + request_id: Optional[str] = None, ) -> Union[Completion, AsyncGenerator[CompletionChunk, None]]: try: from vllm.sampling_params import SamplingParams @@ -460,7 +461,8 @@ async def async_generate( else False ) sampling_params = SamplingParams(**sanitized_generate_config) - request_id = str(uuid.uuid1()) + if not request_id: + request_id = str(uuid.uuid1()) assert self._engine is not None results_generator = self._engine.generate( @@ -654,6 +656,7 @@ async def async_chat( self, messages: List[Dict], generate_config: Optional[Dict] = None, + request_id: Optional[str] = None, ) -> Union[ChatCompletion, AsyncGenerator[ChatCompletionChunk, None]]: tools = generate_config.pop("tools", []) if generate_config else None model_family = self.model_family.model_family or self.model_family.model_name @@ -669,13 +672,17 @@ async def async_chat( stream = generate_config.get("stream", None) if stream: - agen = await self.async_generate(full_prompt, generate_config, tools) + agen = await self.async_generate( + full_prompt, generate_config, tools, request_id=request_id + ) assert isinstance(agen, AsyncGenerator) if tools: return self._async_to_tool_completion_chunks(agen) return self._async_to_chat_completion_chunks(agen) else: - c = await self.async_generate(full_prompt, generate_config) + c = await self.async_generate( + full_prompt, generate_config, request_id=request_id + ) assert not isinstance(c, AsyncGenerator) if tools: return self._tool_calls_completion(self.model_family, self.model_uid, c) @@ -725,6 +732,7 @@ async def async_chat( self, messages: List[Dict], generate_config: Optional[Dict] = None, + request_id: Optional[str] = None, ) -> Union[ChatCompletion, AsyncGenerator[ChatCompletionChunk, None]]: # only support single image, waiting vllm support multi images model_family = self.model_family.model_family or self.model_family.model_name @@ -744,10 +752,14 @@ async def async_chat( stream = generate_config.get("stream", None) if stream: - agen = await self.async_generate(inputs, generate_config) + agen = await self.async_generate( + inputs, generate_config, request_id=request_id + ) assert isinstance(agen, AsyncGenerator) return self._async_to_chat_completion_chunks(agen) else: - c = await self.async_generate(inputs, generate_config) + c = await self.async_generate( + inputs, generate_config, request_id=request_id + ) assert not isinstance(c, AsyncGenerator) return self._to_chat_completion(c)