diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/__init__.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/base.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/base.py new file mode 100644 index 000000000..3dfc6fdb6 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/base.py @@ -0,0 +1,51 @@ +from datarobot_drum.resource.components.Python.uwsgi_component.common import constants +from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import ( + UwsgiNginxCoreException, +) + + +class Base(object): + def __init__(self, logger=None): + super(Base, self).__init__() + self._msg_container = [] + self.__logger = logger + + def name(self): + return self.__class__.__name__ + + def logger_name(self): + return constants.LOGGER_NAME_PREFIX + "." + self.name() + + def set_logger(self, logger): + self.__logger = logger + self._print_acc_messages() + + def is_logger_set(self): + return bool(self.__logger) + + def _print_acc_messages(self): + if not self.__logger: + raise UwsgiNginxCoreException("None logger! Invalid internal sequence!") + + if self._msg_container: + for m in self._msg_container: + self.__logger.info(m) + + @property + def _logger(self): + return self.__logger if self.__logger else self + + def debug(self, msg): + self._msg_container.append(msg) + + def info(self, msg): + self._msg_container.append(msg) + + def warning(self, msg): + self._msg_container.append(msg) + + def error(self, msg): + self._msg_container.append(msg) + + def critical(self, msg): + self._msg_container.append(msg) diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/bg_actor.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/bg_actor.py new file mode 100644 index 000000000..2677b4d1d --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/bg_actor.py @@ -0,0 +1,54 @@ +import abc +from future.utils import with_metaclass +import threading + +from datarobot_drum.resource.components.Python.uwsgi_component.common.base import Base +from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import ( + UwsgiNginxCoreException, +) + + +class BgActor(with_metaclass(abc.ABCMeta, Base, threading.Thread)): + def __init__(self, mlops, ml_engine, polling_interval_sec=10.0): + super(BgActor, self).__init__() + self.set_logger(ml_engine.get_engine_logger(self.logger_name())) + + if not mlops or not mlops.init_called: + raise UwsgiNginxCoreException("'mlops' was not setup properly!") + + self._mlops = mlops + self._polling_interval_sec = polling_interval_sec + + self._condition = threading.Condition() + self._stop_gracefully = False + + def run(self): + while True: + with self._condition: + self._condition.wait(self._polling_interval_sec) + if self._mlops.done_called or self._stop_gracefully: + break + + self._do_repetitive_work() + + self._logger.warning("Exiting background actor ...") + + def stop_gracefully(self): + with self._condition: + self._finalize() + self._stop_gracefully = True + self._condition.notify_all() + + @abc.abstractmethod + def _do_repetitive_work(self): + """ + Implement any desired repetitive functionality that will be called in a background thread + every 'polling_interval_sec' + """ + pass + + def _finalize(self): + """ + An overridable method, to let the derived class perform final actions before shutting down + """ + pass diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/buff_to_lines.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/buff_to_lines.py new file mode 100644 index 000000000..406911bb1 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/buff_to_lines.py @@ -0,0 +1,17 @@ +class BufferToLines(object): + def __init__(self): + self._acc_buff = "" + self._last_line = "" + self._in_middle_of_line = False + + def add(self, buff): + self._acc_buff += buff.decode() + self._in_middle_of_line = False if self._acc_buff[-1] == "\n" else True + + def lines(self): + lines = self._acc_buff.split("\n") + up_to_index = len(lines) - 2 if self._in_middle_of_line else len(lines) - 1 + self._acc_buff = lines[-1] if self._in_middle_of_line else "" + + for iii in range(up_to_index): + yield lines[iii] diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/constants.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/constants.py new file mode 100644 index 000000000..28e7d4b9c --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/constants.py @@ -0,0 +1,15 @@ +import logging + +LOGGER_NAME_PREFIX = "" + +LOG_LEVELS = { + "noset": logging.NOTSET, + "debug": logging.DEBUG, + "info": logging.INFO, + "warning": logging.WARN, + "error": logging.ERROR, + "critical": logging.CRITICAL, +} + + +REQUIREMENTS_FILENAME = "requirements.txt" diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/os_util.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/os_util.py new file mode 100644 index 000000000..83f030f70 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/os_util.py @@ -0,0 +1,33 @@ +from datetime import datetime +import os +import pytz +import subprocess +import tempfile + + +def tmp_filepath(): + tmp_file = tempfile.NamedTemporaryFile() + filepath = tmp_file.name + tmp_file.close() + return filepath + + +def remove_file_safely(filepath): + if os.path.isfile(filepath): + os.unlink(filepath) + + +def service_installed(service_name): + try: + subprocess.check_output( + "service {} status".format(service_name), + shell=True, + stderr=subprocess.STDOUT, + ) + return True + except subprocess.CalledProcessError: + return False + + +def utcnow(): + return datetime.now(pytz.UTC) diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/topological_sort.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/topological_sort.py new file mode 100644 index 000000000..32ab88497 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/topological_sort.py @@ -0,0 +1,186 @@ +import logging + +from datarobot_drum.resource.components.Python.uwsgi_component.common.base import Base +from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import ( + UwsgiNginxCoreException, +) + + +class TopologicalNode(object): + def __init__(self, node, key, child_keys): + self._node = node + self._key = key + self._child_keys = child_keys + + self.temp_visit = False + self.perm_visit = False + + @property + def node(self): + return self._node + + @property + def key(self): + return self._key + + @property + def child_keys(self): + return self._child_keys + + def __str__(self): + return "key: {}, childs: {}".format(self.key, self.child_keys) + + +class TopologicalSort(Base): + """ + Generates topological sort from a list or dict, which represent graphs. + """ + + def __init__(self, graph, key_attr_name, ptr_attr_name): + """ + :param graph: a list or dict that contains the whole nodes in the graph + :param key_attr_name: a string that is literally the name of the key accessor + :param ptr_attr_name: a string that is literally the name of the children accessor + """ + super(TopologicalSort, self).__init__(logging.getLogger(self.logger_name())) + + self._graph = graph + self._graph_aux = {} + + self._key_attr_name = key_attr_name + self._ptr_attr_name = ptr_attr_name + + self._perm_visit = {} + self._sorted_graph = [] + + self._generate_aux_graph(graph, key_attr_name, ptr_attr_name) + + def _generate_aux_graph(self, graph, key_attr_name, ptr_attr_name): + self._logger.debug("{}".format(graph)) + if isinstance(graph, dict): + for key, node in graph.items(): + self._add_node(node, key_attr_name, ptr_attr_name) + elif isinstance(graph, list): + for node in graph: + self._add_node(node, key_attr_name, ptr_attr_name) + else: + raise UwsgiNginxCoreException( + "Invalid graph type for topological sort! Expected 'dict' or 'list'! " + + "type: {}".format(type(graph)) + ) + + self._logger.debug(self._graph_aux) + + def _add_node(self, node, key_attr_name, ptr_attr_name): + key_value = TopologicalSort._call_class_attr(node, key_attr_name) + self._logger.debug("key_value: {}".format(key_value)) + + if key_value not in self._graph_aux: + ptr_value = TopologicalSort._call_class_attr(node, ptr_attr_name) + self._logger.debug("ptr_value: {}, type: {}".format(ptr_value, type(ptr_value))) + + child_keys = [] + try: + for child in ptr_value: + if child: + self._add_child_key_name(child, key_attr_name, child_keys) + except TypeError: + if ptr_value: + self._add_child_key_name(child, key_attr_name, child_keys) + + self._graph_aux[key_value] = TopologicalNode(node, key_value, child_keys) + + def _add_child_key_name(self, child, key_attr_name, child_keys): + key_name = TopologicalSort._call_class_attr(child, key_attr_name) + self._logger.debug("child key name: {}".format(key_name)) + child_keys.append(key_name) + + @staticmethod + def _call_class_attr(cls, attr_name): + attr = getattr(cls, attr_name, None) + if not attr: + raise UwsgiNginxCoreException( + "The given class does not include the given attribute name! " + + "class: {}, attr_name: {}".format(cls, attr_name) + ) + attr_value = attr() if callable(attr) else attr + return attr_value + + def sort(self): + if not self._sorted_graph: + while True: + t_node = self._find_unmarked_node() + if not t_node: + break + self._visit(t_node) + + return self._sorted_graph + + def _find_unmarked_node(self): + for key, t_node in self._graph_aux.items(): + if not t_node.perm_visit: + return t_node + return None + + def _visit(self, t_node): + self._logger.debug("Visiting node: {}".format(t_node.key)) + if t_node.perm_visit: + return + + if t_node.temp_visit: + raise UwsgiNginxCoreException( + "The pipeline has invalid cyclic loop (Not a DAG)! node: {}".format(t_node) + ) + + t_node.temp_visit = True + for child_key in t_node.child_keys: + if child_key not in self._graph_aux: + raise UwsgiNginxCoreException( + "Child id was not found in the graph! key: {}".format(child_key) + ) + + self._visit(self._graph_aux[child_key]) + + t_node.temp_visit = False + t_node.perm_visit = True + self._sorted_graph.append(t_node.node) + + +if __name__ == "__main__": + + class Node(object): + def __init__(self, key, childs): + self._key = key + self._childs = childs + + @property + def key(self): + return self._key + + @property + def childs(self): + return self._childs + + def __str__(self): + child_keys = [c.key for c in self.childs] if self.childs else None + return "key: {}, childs: {}".format(self.key, child_keys) + + logging.basicConfig(level=logging.INFO, format="%(name)s %(levelname)s: %(message)s") + + n1 = Node("a", None) + n2 = Node("b", [n1]) + n3 = Node("c", [n1]) + n4 = Node("d", [n2, n3]) + n5 = Node("e", [n3]) + + graph_list = [n3, n1, n2, n4, n5] + sorted_graph = TopologicalSort(graph_list, "key", "childs").sort() + print("Graph1:") + for node in sorted_graph: + print(node) + + graph_dict = {n3.key: n3, n1.key: n1, n2.key: n2, n4.key: n4, n5.key: n5} + sorted_graph = TopologicalSort(graph_dict, "key", "childs").sort() + print("\nGraph2:") + for node in sorted_graph: + print(node) diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/uwsgi_nginx_core_exception.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/uwsgi_nginx_core_exception.py new file mode 100644 index 000000000..59fca9ecd --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/common/uwsgi_nginx_core_exception.py @@ -0,0 +1,2 @@ +class UwsgiNginxCoreException(Exception): + pass diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/ml_engine/__init__.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/ml_engine/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/ml_engine/ml_engine.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/ml_engine/ml_engine.py new file mode 100644 index 000000000..6e0cfe8f2 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/ml_engine/ml_engine.py @@ -0,0 +1,113 @@ +import abc +from future.utils import with_metaclass + +from datarobot_drum.resource.components.Python.uwsgi_component.common.base import Base +from datarobot_drum.resource.components.Python.uwsgi_component.pipeline import json_fields + + +class MLEngine(with_metaclass(abc.ABCMeta, Base)): + """ + An abstract class, which defines the interface of any implemented engine. + An example for such engines are PySpark, Tensorflow and so on. + """ + + def __init__(self, pipeline, standalone): + super(MLEngine, self).__init__() + self._pipeline = pipeline + self._standalone = standalone + self._user_data = {} + self._config = {} + self._uuid = None + + @property + def pipeline_name(self): + return self._pipeline[json_fields.PIPELINE_NAME_FIELD] + + @property + def standalone(self): + return self._standalone + + def set_standalone(self, set_standalone): + self._standalone = set_standalone + return self + + @property + def config(self): + return self._config + + @property + def user_data(self): + return self._user_data + + @property + def session(self): + """ + Returns a session, which represents a single execution. It is totally dependent + on to the engine to determine the meaning of it. + + :return: engine's session + """ + return self._session() + + @property + def context(self): + """ + Returns a context which is relevant to the given engine. (.e.g in the case of + spark it is the spark context) + :return: engine's context + """ + return self._context() + + def run(self, mlops, pipeline): + """ + The given engine can safely run after all initializations have been completed + with success. + """ + pass + + @abc.abstractmethod + def finalize(self): + """ + Will be called only after the component's materialize functions were all called. + It supposed to perform any engine specific and final code to actually drive the pipeline. + """ + pass + + def stop(self): + """ + Will be called after the pipeline's execution is completed and before the mlops is shutdown + """ + pass + + @abc.abstractmethod + def cleanup(self): + """ + Will be called to cleanup remainders after the pipeline's execution is completed, + either by success or failure + """ + pass + + @abc.abstractmethod + def get_engine_logger(self, name): + """ + Returns an engine's specific logger. The logger can be accessed by the given engine + components + + :param name: the logger name + :return: engine's logger + """ + pass + + @abc.abstractmethod + def _session(self): + pass + + @abc.abstractmethod + def _context(self): + pass + + def set_uuid(self, uuid): + self._uuid = uuid + + def get_uuid(self): + return self._uuid diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/ml_engine/python_engine.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/ml_engine/python_engine.py new file mode 100644 index 000000000..aff923204 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/ml_engine/python_engine.py @@ -0,0 +1,30 @@ +import logging + +from datarobot_drum.resource.components.Python.uwsgi_component.ml_engine.ml_engine import MLEngine + + +class PythonEngine(MLEngine): + """ + Implementing the MLEngine API for a python engine. + """ + + def __init__(self, pipeline, mlpiper_jar=None, standalone=False): + super(PythonEngine, self).__init__(pipeline, standalone) + self._config = {"mlpiper_jar": mlpiper_jar} + + self.set_logger(self.get_engine_logger(self.logger_name())) + + def finalize(self): + pass + + def cleanup(self): + pass + + def get_engine_logger(self, name): + return logging.getLogger(name) + + def _session(self): + pass + + def _context(self): + pass diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/ml_engine/rest_model_serving_engine.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/ml_engine/rest_model_serving_engine.py new file mode 100644 index 000000000..fd4cb92bc --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/ml_engine/rest_model_serving_engine.py @@ -0,0 +1,32 @@ +from datarobot_drum.resource.components.Python.uwsgi_component.ml_engine.python_engine import ( + PythonEngine, +) +from datarobot_drum.resource.components.Python.uwsgi_component.model.model_fetcher import ( + ModelFetcher, +) +from datarobot_drum.resource.components.Python.uwsgi_component.pipeline import java_mapping +from datarobot_drum.resource.components.Python.uwsgi_component.pipeline import json_fields + + +class RestModelServingEngine(PythonEngine): + """ + Implementing the MLEngine API for a RestModelServing engine. + """ + + def __init__(self, pipeline, mlpiper_jar=None, standalone=False): + super(RestModelServingEngine, self).__init__(pipeline, mlpiper_jar, standalone) + self._model_fetcher = None + + def run(self, mlops, pipeline): + if not self.standalone: + self._start_background_model_fetcher(mlops, pipeline) + + def _start_background_model_fetcher(self, mlops, pipeline): + system_conf = pipeline[json_fields.PIPELINE_SYSTEM_CONFIG_FIELD] + self._model_fetcher = ModelFetcher( + mlops, system_conf[java_mapping.MODEL_FILE_SOURCE_PATH_KEY], self + ).start() + + def stop(self): + if self._model_fetcher: + self._model_fetcher.stop_gracefully() diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/__init__.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/constants.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/constants.py new file mode 100644 index 000000000..5ee530179 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/constants.py @@ -0,0 +1,9 @@ +# Provides information about the model file path that should be fetched and used by the pipeline +METADATA_FILENAME = "metadata.json" + +# A helper file that is used to signal the uWSGI workers about new models +SYNC_FILENAME = "sync" + +# A dedicated extension that is used to avoid model file paths collisions between the +# pipeline model fetch and the agent +PIPELINE_MODEL_EXT = ".last_approved" diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/metadata.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/metadata.py new file mode 100644 index 000000000..cef363f73 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/metadata.py @@ -0,0 +1,25 @@ +import json + + +class Metadata(object): + FILEPATH_KEY_NAME = "model" + + def __init__(self, model_filepath=None): + self._model_filepath = model_filepath + + @property + def model_filepath(self): + return self._model_filepath + + def save(self, metadata_filepath): + with open(metadata_filepath, "w") as f: + json.dump(self._serialize(), f) + + def load(self, metadata_filepath): + with open(metadata_filepath, "r") as f: + content = json.load(f) + self._model_filepath = content[Metadata.FILEPATH_KEY_NAME] + return self + + def _serialize(self): + return {Metadata.FILEPATH_KEY_NAME: self._model_filepath} diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/model_env.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/model_env.py new file mode 100644 index 000000000..c5a5070a5 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/model_env.py @@ -0,0 +1,43 @@ +import os + +from datarobot_drum.resource.components.Python.uwsgi_component.model import constants + + +class ModelEnv(object): + def __init__(self, model_filepath, standalone=False): + self._model_filepath = model_filepath + self._standalone = standalone + if not self._standalone and not self._model_filepath.endswith(constants.PIPELINE_MODEL_EXT): + self._model_filepath += constants.PIPELINE_MODEL_EXT + self._model_root_dir = os.path.dirname(model_filepath) + self._metadata_filepath = os.path.join(self._model_root_dir, constants.METADATA_FILENAME) + self._sync_filepath = os.path.join(self._model_root_dir, constants.SYNC_FILENAME) + + @property + def model_filepath(self): + return self._model_filepath + + @property + def standalone(self): + return self._standalone + + @property + def model_root_dir(self): + return self._model_root_dir + + @property + def sync_filepath(self): + if not self._standalone and not os.path.isfile(self._sync_filepath): + self.touch_sync() + + return self._sync_filepath + + @property + def metadata_filepath(self): + return self._metadata_filepath + + def touch_sync(self): + try: + os.utime(self._sync_filepath, None) + except OSError: + open(self._sync_filepath, "a").close() diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/model_fetcher.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/model_fetcher.py new file mode 100644 index 000000000..a36cf84a8 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/model_fetcher.py @@ -0,0 +1,59 @@ +import os +import traceback +import uuid + +from datarobot_drum.resource.components.Python.uwsgi_component.common.bg_actor import BgActor +from datarobot_drum.resource.components.Python.uwsgi_component.model.metadata import Metadata +from datarobot_drum.resource.components.Python.uwsgi_component.model.model_env import ModelEnv + + +class ModelFetcher(BgActor): + POLLING_INTERVAL_SEC = 10.0 + + def __init__(self, mlops, model_filepath, ml_engine): + super(ModelFetcher, self).__init__(mlops, ml_engine, ModelFetcher.POLLING_INTERVAL_SEC) + + self._uuid = uuid.uuid4() + self._current_model = self._mlops.current_model() + self._logger.info("Current model: {}".format(self._current_model)) + self._model_env = None + self._setup_model_env(model_filepath) + + def _setup_model_env(self, model_filepath): + self._logger.info("Setup model env with: {}".format(model_filepath)) + self._model_env = ModelEnv(model_filepath) + if os.path.isfile(model_filepath): + self._logger.info("Rename model file path to {}".format(self._model_env.model_filepath)) + os.rename(model_filepath, self._model_env.model_filepath) + Metadata(self._model_env.model_filepath).save(self._model_env.metadata_filepath) + + # Overloaded function + def _do_repetitive_work(self): + try: + last_approved_model = self._mlops.get_last_approved_model() + self._logger.debug( + "Last approved model: {}, uuid: {}".format( + last_approved_model.get_id() if last_approved_model else None, + self._uuid, + ) + ) + if last_approved_model and ( + not self._current_model or last_approved_model != self._current_model + ): + self._current_model = last_approved_model + self._download_and_signal() + except Exception as ex: + self._logger.error( + "Failed fetch last approved model from server! {}\n{}".format( + traceback.format_exc(), ex + ) + ) + + def _download_and_signal(self): + self._logger.info("New model is about to be downloaded: {}".format(self._current_model)) + self._current_model.download(self._model_env.model_filepath) + Metadata(self._model_env.model_filepath).save(self._model_env.metadata_filepath) + self._signal() + + def _signal(self): + self._model_env.touch_sync() diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/model_selector.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/model_selector.py new file mode 100644 index 000000000..34957e758 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/model/model_selector.py @@ -0,0 +1,23 @@ +import os + +from datarobot_drum.resource.components.Python.uwsgi_component.model.metadata import Metadata +from datarobot_drum.resource.components.Python.uwsgi_component.model.model_env import ModelEnv + + +class ModelSelector(object): + def __init__(self, model_filepath, standalone=False): + self._model_env = ModelEnv(model_filepath, standalone) + + @property + def model_env(self): + return self._model_env + + def pick_model_filepath(self): + model_filepath = None + if self._model_env.standalone: + model_filepath = self._model_env.model_filepath + else: + if os.path.isfile(self._model_env.metadata_filepath): + model_filepath = Metadata().load(self._model_env.metadata_filepath).model_filepath + + return model_filepath diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/pipeline/__init__.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/pipeline/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/pipeline/java_mapping.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/pipeline/java_mapping.py new file mode 100644 index 000000000..3fadd85b5 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/pipeline/java_mapping.py @@ -0,0 +1,11 @@ +MODEL_FILE_SINK_PATH_KEY = "modelFileSinkPath" +MODEL_FILE_SOURCE_PATH_KEY = "modelFileSourcePath" + +TAGS = { + "model_dir": MODEL_FILE_SINK_PATH_KEY, + "input_model_path": MODEL_FILE_SOURCE_PATH_KEY, +} + +# The reserved keys ensures that the given attributes will not be manipulated by +# user's components +RESERVED_KEYS = {k: "__{}__tag__".format(k) for (k, v) in TAGS.items()} diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/pipeline/json_fields.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/pipeline/json_fields.py new file mode 100644 index 000000000..2cd8aa16b --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/pipeline/json_fields.py @@ -0,0 +1,60 @@ +# Constants to access pipeline json fields +PIPELINE_NAME_FIELD = "name" +PIPELINE_ENGINE_TYPE_FIELD = "engineType" +PIPELINE_LANGUAGE_FIELD = "language" +PIPELINE_SYSTEM_CONFIG_FIELD = "systemConfig" +PIPELINE_EE_CONF_FIELD = "executionEnvironment" +PIPELINE_PIPE_FIELD = "pipe" +PIPELINE_COMP_ID_FIELD = "id" +PIPELINE_COMP_TYPE_FIELD = "type" +PIPELINE_COMP_ARGUMENTS_FIELD = "arguments" +PIPELINE_COMP_PARENTS_FIELD = "parents" +PIPELINE_COMP_PARENTS_FIRST_FIELD = "parent" +PIPELINE_COMP_PARENTS_SECOND_FIELD = "output" +PIPELINE_COMP_PARENTS_THIRD_FIELD = "input" + +# Constants to access component description json fields +COMPONENT_DESC_ENGINE_TYPE_FIELD = "engineType" +COMPONENT_DESC_NAME_FIELD = "name" +COMPONENT_DESC_VERSION_FIELD = "version" +COMPONENT_DESC_LABEL_FIELD = "label" +COMPONENT_DESC_PROGRAM_FIELD = "program" +COMPONENT_DESC_LANGUAGE_FIELD = "language" +COMPONENT_DESC_DESCRIPTION_FIELD = "description" +COMPONENT_DESC_CLASS_FIELD = "componentClass" +COMPONENT_DESC_MODEL_BEHAVIOR_FIELD = "modelBehavior" +COMPONENT_DESC_USE_MLOPS_FIELD = "useMLOps" +COMPONENT_DESC_GROUP_FIELD = "group" +COMPONENT_DESC_INPUT_INFO_FIELD = "inputInfo" +COMPONENT_DESC_OUTPUT_INFO_FIELD = "outputInfo" +COMPONENT_DESC_ARGUMENTS = "arguments" +COMPONENT_DESC_USER_STAND_ALONE = "userStandalone" +COMPONENT_DESC_PYTHON_DEPS = "deps" +COMPONENT_DESC_INCLUDE_GLOB_PATTERNS = "includeGlobPatterns" +COMPONENT_DESC_EXCLUDE_GLOB_PATTERNS = "excludeGlobPatterns" + + +COMPONENT_DESC_ARGUMENT_KEY = "key" +COMPONENT_DESC_ARGUMENT_TYPE = "type" +COMPONENT_DESC_ARGUMENT_LABEL = "label" +COMPONENT_DESC_ARGUMENT_DESCRIPTION = "description" +COMPONENT_DESC_ARGUMENT_ENV_VAR = "envVar" +COMPONENT_DESC_ARGUMENT_OPTIONAL = "optional" +COMPONENT_DESC_ARGUMENT_TAG = "tag" +COMPONENT_DESC_ARGUMENT_DEFAULT_VAL = "defaultValue" + + +CONNECTION_DESC_DESCRIPTION_FIELD = "description" +CONNECTION_DESC_LABEL_FIELD = "label" +CONNECTION_DESC_DEFAULT_COMPONENT_FIELD = "defaultComponent" +CONNECTION_DESC_TYPE_FIELD = "type" +CONNECTION_DESC_GROUP_FIELD = "group" + +# Auto-generated internal fields +COMPONENT_DESC_ROOT_PATH_FIELD = "root-path" + +# Component metadata reference json +COMPONENT_METADATA_REF_FILE_NAME_FIELD = "metadataFilename" + + +PIPELINE_SYSTEM_CONFIG_TEST_MODE_PARAM = "__test_mode__" diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/__init__.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/__template__.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/__template__.py new file mode 100644 index 000000000..e69de29bb diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/constants.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/constants.py new file mode 100644 index 000000000..e851757ff --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/constants.py @@ -0,0 +1,137 @@ +from datarobot_drum.resource.components.Python.uwsgi_component.common.os_util import ( + service_installed, +) + + +class SharedConstants: + # Keys for shared configuration dict + TARGET_PATH_KEY = "target_path" + SOCK_FILENAME_KEY = "sock_filename" + STATS_SOCK_FILENAME_KEY = "stats_sock_filename" + REST_COMP_ROOT_PATH = "rest_comp_root_path" + STANDALONE = "standalone" + STATS_PATH_FILENAME_KEY = "stats_path_filename" + + +class ComponentConstants: + CONFIGURE_CALLBACK_FUNC_NAME = "configure_callback" + LOAD_MODEL_CALLBACK_FUNC_NAME = "load_model_callback" + CLEANUP_CALLBACK_FUNC_NAME = "cleanup_callback" + POST_FORK_CALLBACK_FUNC_NAME = "post_fork_callback" + + TMP_RESTFUL_ROOT = "/tmp" + TMP_RESTFUL_DIR_PREFIX = "restful_comp_" + INPUT_MODEL_TAG_NAME = "input_model_path" + + # *** RESTful component keys *** + + PORT_KEY = "port" + + HOST_KEY = "host" + DEFAULT_HOST = "localhost" + + LOG_FORMAT_KEY = "log_format" + DEFAULT_LOG_FORMAT = "%(asctime)-15s %(levelname)s [%(module)s:%(lineno)d]: %(message)s" + + LOG_LEVEL_KEY = "log_level" + DEFAULT_LOG_LEVEL = "info" + + # Disable 'uwsgi' requests logging + UWSGI_DISABLE_LOGGING_KEY = "uwsgi_disable_logging" + DEFAULT_UWSGI_DISABLE_LOGGING = True + UWSGI_MAX_WORKERS_KEY = "uwsgi_max_workers" + UWSGI_CHEAPER_RSS_LIMIT_SOFT_KEY = "uwsgi_cheaper_rss_limit_soft" + UWSGI_CHEAPER_RSS_LIMIT_HARD_KEY = "uwsgi_cheaper_rss_limit_hard" + UWSGI_MEMORY_REPORT_KEY = "uwsgi_memory_report" + SINGLE_UWSGI_WORKER_KEY = "single_uwsgi_worker" + UWSGI_THREADS = "uwsgi_threads" + + METRICS_KEY = "metrics" + METRIC_TEMPLATE = "metric = name={},type=counter,initial_value=0,oid=100.{}" + + # Specify the reporting interval as well as the time period that stats metrics are referred to + STATS_REPORTING_INTERVAL_SEC = "stats_reporting_interval_sec" + DEFAULT_STATS_REPORTING_INTERVAL_SEC = 10 + + # The dry run key, is for internal use only. It design to skip the execution of uwsgi & nginx + # applications. It is used for debugging, when someone wants to analyse all the configurations + # before actually starting up given processes + DRY_RUN_KEY = "__dry_run__" + DEFAULT_DRY_RUN = False + + +class UwsgiConstants: + DEV_AGAINST_VERSION = "2.0.17.1" + + START_CMD = "uwsgi --ini {filepath}" + STOP_CMD = "uwsgi --stop {pid_filepath}" + VER_CMD = "uwsgi --version" + + DAEMONIZE = False + INI_FILENAME = "uwsgi.ini" + PID_FILENAME = "uwsgi.pid" + ENTRY_POINT_SCRIPT_NAME = "uwsgi_entry_script.py" + SOCK_FILENAME = "restful_mlapp.sock" + STATS_SOCK_FILENAME = "stats.restful_mlapp.sock" + + MONITOR_THREAD_KEY = "monitor_th" + MONITOR_ERROR_KEY = "error" + + MODEL_RELOAD_SIGNAL_NUM = 13 + + # *** Keys for 'uwsgi' configuration dict *** + + RESTFUL_COMP_MODULE_KEY = "restful_comp_module" + RESTFUL_COMP_CLS_KEY = "restful_comp_cls" + PARAMS_KEY = "params" + PIPELINE_NAME_KEY = "pipeline_name" + MODEL_PATH_KEY = "model_path" + DEPUTY_ID_KEY = "deputy_id" + WORKER_ID = "worker_id" + LOGGING_UDP_SOCKET = "logging_udp_socket" + + +class RestfulConstants: + STATS_ROUTE = "statsinternal" + STATS_SYSTEM_INFO = "sys_info" + STATS_WID = "wid" + STATS_WUUID = "wuuid" + STATS_UUID = "uuid2" + STATS_USER = "user" + STATS_SYSTEM = "system" + STATS_SYSTEM_ERROR = "system_error" + STATS_AGGREGATE_FLAG = "AGGREGATE_STATS" + + +class NginxConstants: + DEV_AGAINST_VERSION = "nginx/1.10.3" + + START_CMD = "service nginx start" if service_installed("dbus") else "nginx" + STOP_CMD = "service nginx stop" if service_installed("dbus") else "nginx -s quit" + VER_CMD = "nginx -v" + + SERVER_CONF_FILENAME = "drum.pipeline.restful" + + NGINX_ROOT = "/etc/nginx" + SERVER_CONF_DIR_DEBIAN = NGINX_ROOT + "/sites-available" + SERVER_CONF_DIR_REDHAT = NGINX_ROOT + "/conf.d" + + NGINX_ROOT_MACOS = "/usr/local/etc/nginx" + SERVER_CONF_DIR_MACOS = NGINX_ROOT_MACOS + "/servers" + + SERVER_ENABLED_DIR = NGINX_ROOT + "/sites-enabled" + + DISABLE_ACCESS_LOG_KEY = "disable_access_log" + ACCESS_LOG_OFF_CONFIG = "access_log off;" + + +class StatsConstants: + REQS_PER_WINDOW_TIME_GRAPH_TITLE = "Total Requests / {}sec" + + ACC_REQS_TABLE_NAME = "Accumulated REST requests" + ACC_REQS_NUM_REQS_COL_NAME = "Num Requests" + ACC_REQS_STATUS_COL_NAME = "Status" + ACC_REQS_LAST_ROW_NAME = "Total" + + AVG_RESP_TIME_TABLE_NAME = "Average response time" + AVG_RESP_TIME_COL_NAME = "Time [us]" diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/flask_app_wrapper.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/flask_app_wrapper.py new file mode 100644 index 000000000..4f678a5a2 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/flask_app_wrapper.py @@ -0,0 +1,79 @@ +""" +For internal use only. These are wrappers for the Flask building blocks +""" +import inspect +from flask import Flask +from flask import json +from flask import request +from flask import Response +from flask_cors import CORS + +from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import ( + UwsgiNginxCoreException, +) + + +class EndpointAction(object): + """ + A wrapper over a Flask handler. It is called on each incoming request and therefore + should be treated with caution and should be optimized on each part of it. + """ + + def __init__(self, handler, raw): + if not inspect.ismethod(handler): + raise UwsgiNginxCoreException( + "Invalid REST endpoint handler! Should be a component's method with the " + "following prototype: (self, url_params, form_params), given: {}".format( + handler + ) + ) + + self._handler = handler + self._raw = raw + + def __call__(self): + try: + if self._raw: + # This option is intended to serve py4j handlers + status, response = self._handler( + request.query_string.decode(), request.get_data(as_text=True) + ) + else: + status, response = self._handler( + request.args.to_dict(), + request.get_json() if request.is_json else request.form.to_dict(), + ) + except ValueError: + raise UwsgiNginxCoreException( + "Invalid returned type from endpoint handler: '{}', ".format(self._handler) + + "Expecting for tuple of two elements: (status, response)" + ) + + if isinstance(response, Response): + return response + + if not isinstance(response, str): + response = json.dumps(response) + + return Response(response=response, status=status, mimetype="application/json") + + +class FlaskAppWrapper(object): + """ + A wrapper over the Flask application. + """ + + app = None + + def __init__(self, pipeline_name): + FlaskAppWrapper.app = Flask(pipeline_name) + CORS(FlaskAppWrapper.app) + + def run(self, host, port): + FlaskAppWrapper.app.run(host, port) + + def add_endpoint(self, url_rule, endpoint, handler, options, raw): + endpoint = handler.__name__ if not endpoint else endpoint + FlaskAppWrapper.app.add_url_rule( + url_rule, endpoint, EndpointAction(handler, raw), None, **options + ) diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/flask_custom_json_encoder.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/flask_custom_json_encoder.py new file mode 100644 index 000000000..48b19abb3 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/flask_custom_json_encoder.py @@ -0,0 +1,39 @@ +""" +For internal use only. It is a custom json encoder, which enables to serialize +additional types to json, beyond those that are natively supported. +""" +from flask.json import JSONEncoder +import numpy as np + + +class FlaskCustomJsonEncode(JSONEncoder): + def default(self, obj): + if isinstance(obj, np.int32) or isinstance(obj, np.int64): + return int(obj) + + if isinstance( + obj, + ( + np.int_, + np.intc, + np.intp, + np.int8, + np.int16, + np.int32, + np.int64, + np.uint8, + np.uint16, + np.uint32, + np.uint64, + ), + ): + return int(obj) + + if isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): + return float(obj) + + if isinstance(obj, (np.ndarray,)): + return obj.tolist() + + # Let the base class default method raise the TypeError + return JSONEncoder.default(self, obj) diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/flask_route.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/flask_route.py new file mode 100644 index 000000000..9a7d1076c --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/flask_route.py @@ -0,0 +1,34 @@ +""" +For internal use only. A decorator that is used to indicate a REST endpoint function. +Reference: http://flask.pocoo.org/docs/0.12/api/#flask.Flask.route +""" +from functools import wraps + + +class FlaskRoute(object): + METHODS_KEY = "methods" + RAW_KEY = "raw" + _routes = [] + + def __init__(self, rule, **options): + self._rule = rule + self._options = options + self._raw = self._options.pop(FlaskRoute.RAW_KEY, False) + + if FlaskRoute.METHODS_KEY not in self._options: + self._options[FlaskRoute.METHODS_KEY] = ["GET", "POST"] + + def __call__(self, f): + @wraps(f) + def wrapper(*args, **kwargs): + return f(*args, **kwargs) + + endpoint = self._options.pop("endpoint", None) + if not any(self._rule == e[0] for e in FlaskRoute._routes): + FlaskRoute._routes.append((self._rule, endpoint, f.__name__, self._options, self._raw)) + + return wrapper + + @staticmethod + def routes(): + return FlaskRoute._routes diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/metric.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/metric.py new file mode 100644 index 000000000..1b49efd19 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/metric.py @@ -0,0 +1,253 @@ +import logging +import six + +from datarobot_drum.resource.components.Python.uwsgi_component.common.base import Base +from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import ( + UwsgiNginxCoreException, +) + +uwsgi_loaded = False +try: + import uwsgi + + uwsgi_loaded = True +except ImportError: + pass + + +class MetricType: + COUNTER = 1 + COUNTER_PER_TIME_WINDOW = 2 + + +class MetricRelation: + AVG_PER_REQUEST = 1 + DIVIDE_BY = 2 + MULTIPLY_BY = 3 + SUM_OF = 4 + BAR_GRAPH = 5 + + +class Metric(Base): + NAME_SUFFIX = ".__pm1234__" + FLOAT_PRECISION = 100000.0 # 5 digits after the period + + _metrics = {} + + def __init__( + self, + name, + title=None, + hidden=False, + metric_type=MetricType.COUNTER, + value_type=int, + metric_relation=None, + related_metric=None, + ): + super(Metric, self).__init__(logging.getLogger(self.logger_name())) + + self._metric_name = name + Metric.NAME_SUFFIX + self._title = title + self._hidden = hidden + self._metric_type = metric_type + self._value_type = value_type + self._metric_relation = metric_relation + self._metric_already_displayed = False + + if not self._hidden and not self._title: + raise UwsgiNginxCoreException( + "A metric can be seen in the UI only if 'title' is provided! name: {}".format(name) + ) + + if self.metric_relation == MetricRelation.BAR_GRAPH: + if not isinstance(related_metric, list): + raise UwsgiNginxCoreException( + "Bar graph metric should be provided with a list of metrics tuples. " + "Each tuple should contain the related metric and its bar name! " + "name: {}, related_metrics: {}".format(self.name, self.related_metric) + ) + + self._related_metric = [] + for m in related_metric: + self.add_related_metric(m) + else: + self._related_metric = ( + related_metric if isinstance(related_metric, list) else [related_metric] + ) + + if self._related_metric[0] and self._related_metric[0].metric_type != metric_type: + raise UwsgiNginxCoreException( + "Error in metrics relation! Given metric cannot relate to other metric of " + "different type!" + + " mentric: {}, type: {}, related-metric: {}, type: {}".format( + name, + metric_type, + self._related_metric[0].metric_name, + self._related_metric[0].metric_type, + ) + ) + + if name in Metric._metrics: + raise UwsgiNginxCoreException("Metric has already been defined! name: {}".name) + + self._logger.info("Add new uwsgi metric ... {}".format(self._metric_name)) + Metric._metrics[self._metric_name] = self + + def __str__(self): + return ( + "name: {}, title: {}, hidden: {}, metric-type: {}, value-type: {}, " + "metric-relation: {}, related_metric: {}".format( + self.name, + self.title, + self.hidden, + self.metric_type, + self.value_type, + self.metric_relation, + self.related_metric, + ) + ) + + @staticmethod + def metrics(): + return Metric._metrics + + @staticmethod + def metric_by_name(metric_name): + return Metric._metrics[metric_name] + + @property + def metric_name(self): + return self._metric_name + + @property + def title(self): + return self._title + + @property + def hidden(self): + return self._hidden + + @property + def value_type(self): + return self._value_type + + @property + def metric_type(self): + return self._metric_type + + @property + def metric_relation(self): + return self._metric_relation + + @property + def related_metric(self): + return self._related_metric + + @property + def related_metric_meta(self): + if isinstance(self.related_metric[0], tuple): + return [metric_meta for metric_meta, _ in self.related_metric] + else: + return self.related_metric + + @property + def metric_already_displayed(self): + return self._metric_already_displayed + + @metric_already_displayed.setter + def metric_already_displayed(self, value): + self._metric_already_displayed = value + + def add_related_metric(self, bar_graph_metric): + if self.metric_relation != MetricRelation.BAR_GRAPH: + raise UwsgiNginxCoreException("Related metric can be added only to bar graph!") + + if not isinstance(bar_graph_metric, tuple) or len(bar_graph_metric) != 2: + raise UwsgiNginxCoreException( + "Related metric information should be a tuple of the metric itself and a" + "bar column label! related_metric: {}".format(bar_graph_metric) + ) + + if not isinstance(bar_graph_metric[0], Metric): + raise UwsgiNginxCoreException( + "First element in related bar graph metric should be a Metric! " + "provided: {}".format(bar_graph_metric[0]) + ) + + if not isinstance(bar_graph_metric[1], six.string_types): + raise UwsgiNginxCoreException( + "Second element in related bar graph metric should be a string" + "provided: {}".format(bar_graph_metric[1]) + ) + + self._related_metric.append(bar_graph_metric) + + def get(self): + value = 0 + if uwsgi_loaded: + value = uwsgi.metric_get(self._metric_name) + if self._value_type == float: + value /= Metric.FLOAT_PRECISION + return value + return value + + def set(self, value): + if uwsgi_loaded: + if self._value_type == float: + value *= Metric.FLOAT_PRECISION + + uwsgi.metric_set(self._metric_name, value) + + def set_max(self, value): + """ + only set the metric name if the give value is greater than the one currently stored + """ + if uwsgi_loaded: + if self._value_type == float: + value *= Metric.FLOAT_PRECISION + + uwsgi.metric_set_max(self._metric_name, value) + + def set_min(self, value): + """ + only set the metric name if the give value is lower than the one currently stored + """ + if uwsgi_loaded: + if self._value_type == float: + value *= Metric.FLOAT_PRECISION + + uwsgi.metric_set_min(self._metric_name, value) + + def increase(self, delta=1): + """ + increase the metric's value by the given delta + """ + if uwsgi_loaded: + if self._value_type == float: + delta *= Metric.FLOAT_PRECISION + + uwsgi.metric_inc(self._metric_name, int(delta)) + + def decrease(self, delta=1): + """ + decrease the metric's value by the given delta + """ + if uwsgi_loaded: + if self._value_type == float: + delta *= Metric.FLOAT_PRECISION + + uwsgi.metric_dec(self._metric_name, int(delta)) + + def multiply(self, delta): + """ + multiply the metric's value by the given delta + """ + if uwsgi_loaded: + uwsgi.metric_mul(self._metric_name, delta) + + def divide(self, delta): + """ + divide the metric's value by the given delta + """ + if uwsgi_loaded: + uwsgi.metric_div(self._metric_name, delta) diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/nginx_broker.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/nginx_broker.py new file mode 100644 index 000000000..274fff774 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/nginx_broker.py @@ -0,0 +1,260 @@ +""" +For internal use only. The nginx broker class is designed to handle any 'nginx' related actions, +such as setup, configuration and execution +""" +import os +import subprocess +from pathlib import Path +import platform +import re +import shutil + +from datarobot_drum.resource.components.Python.uwsgi_component.common.base import Base +from datarobot_drum.resource.components.Python.uwsgi_component.restful import util +from datarobot_drum.resource.components.Python.uwsgi_component.restful.nginx_conf_template import ( + NGINX_SERVER_CONF_TEMPLATE, + NGINX_CONF_TEMPLATE_NON_ROOT, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.constants import ( + SharedConstants, + ComponentConstants, + NginxConstants, +) +from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import ( + UwsgiNginxCoreException, +) + + +class NginxBroker(Base): + origin_nginx_conf_filepath_pattern = "{}/nginx.conf" + new_nginx_conf_filepath_pattern = "{}/nginx.conf.new" + bin_search_paths = [Path("/usr/bin"), Path("/sbin"), Path("/bin"), Path("/usr/sbin")] + + def __init__(self, ml_engine, dry_run=False): + super(NginxBroker, self).__init__() + self.set_logger(ml_engine.get_engine_logger(self.logger_name())) + self._dry_run = dry_run + self._conf_file = None + self._root_user = os.getuid() == 0 + self.__debian_platform = None + self.__redhat_platform = None + + def setup_and_run(self, shared_conf, nginx_conf): + self._logger.info("Setup 'nginx' service ...") + self._verify_dependencies() + self._generate_configuration(shared_conf, nginx_conf) + self._run(shared_conf) + return self + + def quit(self): + if not self._dry_run: + self._logger.info("Stopping 'nginx' service ...") + try: + if self._root_user: + stop_cmd = NginxConstants.STOP_CMD + else: + stop_cmd = "nginx -c {} -s stop".format(self._conf_file) + subprocess.call(stop_cmd, shell=True) + except: # noqa: E722 + # Should catch any exception in order to avoid masking of other important errors + # in the system + pass + + def _verify_dependencies(self): + util.verify_tool_installation( + NginxConstants.VER_CMD, NginxConstants.DEV_AGAINST_VERSION, self._logger + ) + + def _generate_configuration(self, shared_conf, nginx_conf): + access_log_off = ( + NginxConstants.ACCESS_LOG_OFF_CONFIG + if nginx_conf[NginxConstants.DISABLE_ACCESS_LOG_KEY] + else "" + ) + + if not self._root_user and not self._debian_platform(): + self._logger.warning( + "Running as non root was tested only for Ubuntu platform." + "You may need to change permissions to some of nginx folders." + ) + + conf_content = NGINX_SERVER_CONF_TEMPLATE.format( + port=nginx_conf[ComponentConstants.PORT_KEY], + sock_filepath=os.path.join( + shared_conf[SharedConstants.TARGET_PATH_KEY], + shared_conf[SharedConstants.SOCK_FILENAME_KEY], + ), + access_log_off=access_log_off, + uwsgi_params_prefix=NginxConstants.NGINX_ROOT + "/", + ) + + # if user is root configuration will be written to nginx system path + if self._root_user: + self._conf_file = self._server_conf_filepath() + # else it will be written to /tmp folder. + else: + conf_content = NGINX_CONF_TEMPLATE_NON_ROOT.format( + nginx_server_conf_placeholder=conf_content + ) + self._conf_file = os.path.join( + shared_conf[SharedConstants.TARGET_PATH_KEY], NginxConstants.SERVER_CONF_FILENAME + ) + + self._logger.info("Writing nginx server configuration to ... {}".format(self._conf_file)) + with open(self._conf_file, "w") as f: + f.write(conf_content) + + if self._root_user: + if self._debian_platform() or self._redhat_platform(): + # Newer versions of nginx requires the folder sites-enabled in their installation + # folder, in order to enable extended server configurations, which are configured + # in conf.d. + # Apparently, on 'redhat' platforms the given folder does not exits after nginx + # installation. + if not os.path.exists(NginxConstants.SERVER_ENABLED_DIR): + NginxBroker._fix_missing_sites_enabled_conf(NginxConstants.NGINX_ROOT) + + sym_link = os.path.join( + NginxConstants.SERVER_ENABLED_DIR, NginxConstants.SERVER_CONF_FILENAME + ) + if not os.path.isfile(sym_link): + self._logger.info("Creating nginx server sym link ... {}".format(sym_link)) + os.symlink(self._conf_file, sym_link) + + self._logger.info("Done with _generate_configuration ...") + + @staticmethod + def _fix_missing_sites_enabled_conf(nginx_root): + origin_nginx_conf_filepath = NginxBroker.origin_nginx_conf_filepath_pattern.format( + nginx_root + ) + new_nginx_conf_filepath = NginxBroker.new_nginx_conf_filepath_pattern.format(nginx_root) + + fix_configuration = True + pattern_conf_d = re.compile(r"^\s*include\s+{}/conf\.d/\*\.conf;\s*$".format(nginx_root)) + pattern_sites_enabled = re.compile( + r"^\s*include\s+{}/sites-enabled/.*;\s*$".format(nginx_root) + ) + line_to_add = " include {}/sites-enabled/*;\n".format(nginx_root) + + with open(origin_nginx_conf_filepath, "r") as fr: + with open(new_nginx_conf_filepath, "w") as fw: + for line in fr: + fw.write(line) + group = pattern_conf_d.match(line) + if group: + fw.write(line_to_add) + elif pattern_sites_enabled.match(line): + # sites-enabled already configured! Close and remove new file" + fix_configuration = False + break + + if fix_configuration: + shutil.copyfile(new_nginx_conf_filepath, origin_nginx_conf_filepath) + + if os.path.exists(new_nginx_conf_filepath): + os.remove(new_nginx_conf_filepath) + + os.mkdir(NginxConstants.SERVER_ENABLED_DIR, 0o644) + + def _server_conf_filepath(self): + if self._debian_platform(): + d = NginxConstants.SERVER_CONF_DIR_DEBIAN + elif self._redhat_platform(): + d = NginxConstants.SERVER_CONF_DIR_REDHAT + elif self._macos_platform(): + if not os.path.isdir(NginxConstants.SERVER_CONF_DIR_MACOS): + if not os.path.isdir(NginxConstants.NGINX_ROOT_MACOS): + raise UwsgiNginxCoreException( + "'{}' does not exist or not a directory. Is nginx installed?".format( + NginxConstants.NGINX_ROOT_MACOS + ) + ) + os.mkdir(NginxConstants.SERVER_CONF_DIR_MACOS) + d = NginxConstants.SERVER_CONF_DIR_MACOS + else: + raise UwsgiNginxCoreException( + "Nginx cannot be configured! Platform is not supported: {}".format(platform.uname()) + ) + + return os.path.join(d, NginxConstants.SERVER_CONF_FILENAME) + + def _redhat_platform(self): + # There are a lot of ways to try and determine the Linux distro. The modern way is to + # parse `/etc/os-release` but that doesn't support very old OSes (i.e. some versions of + # CentOS/RHEL) and also seems more involved than we need for our purposes. Since all we + # care about is how the Nginx application is packaged, we can just try and look for the + # low-level package managers that are well-known. + # + # Note: looking at the kernel (e.g. platform.uname()) would be **incorrect** since we + # support running in a container and it's possible the host OS (kernel) could be RedHat + # but the container is Debian, for example. The most correct way to determine the distro + # is to inspect the filesystem in some way (http://0pointer.de/blog/projects/os-release). + if self.__redhat_platform is None: + for path in self.bin_search_paths: + if (path / "rpm").exists(): + self.__redhat_platform = True + break + else: + self.__redhat_platform = False + return self.__redhat_platform + + def _debian_platform(self): + # All Debian derived systems will have `dpkg` installed into the path. + if self.__debian_platform is None: + for path in self.bin_search_paths: + if (path / "dpkg").exists(): + self.__debian_platform = True + break + else: + self.__debian_platform = False + return self.__debian_platform + + def _macos_platform(self): + # MacOS doesn't support containers so we can just look at the kernel to determine + # the platform. + return "Darwin" in platform.system() # platform.system() caches its output + + def _run(self, shared_conf): + self._logger.info("Starting 'nginx' service ... cmd: '{}'".format(NginxConstants.START_CMD)) + if self._dry_run: + return + + if self._root_user: + start_cmd = NginxConstants.START_CMD + else: + start_cmd = "nginx -c {} -p $PWD".format(self._conf_file) + rc = subprocess.check_call(start_cmd, shell=True) + if rc != 0: + raise UwsgiNginxCoreException( + "nginx service failed to start! It is suspected as not being installed!" + ) + + self._logger.info("'nginx' service started successfully!") + + +if __name__ == "__main__": + import logging + import tempfile + + root_dir = "/tmp/nginx-test" + + if not os.path.isdir(root_dir): + os.makedirs(root_dir) + + shared_conf = { + SharedConstants.TARGET_PATH_KEY: tempfile.mkdtemp(prefix="restful-", dir=root_dir), + SharedConstants.SOCK_FILENAME_KEY: "restful_mlapp.sock", + } + + nginx_conf = { + ComponentConstants.HOST_KEY: "localhost", + ComponentConstants.PORT_KEY: 8888, + } + + print("Target path: {}".format(shared_conf[SharedConstants.TARGET_PATH_KEY])) + + logging.basicConfig() + logger = logging.getLogger("NginxBroker") + logger.setLevel(logging.INFO) + NginxBroker(logger, dry_run=True).setup_and_run(shared_conf, nginx_conf) diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/nginx_conf_template.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/nginx_conf_template.py new file mode 100644 index 000000000..61a92979b --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/nginx_conf_template.py @@ -0,0 +1,72 @@ +NGINX_SERVER_CONF_TEMPLATE = """ +server {{ + client_max_body_size 0m; + listen {port} default_server; + listen [::]:{port} default_server; + server_name _; + {access_log_off} + + location / {{ + include {uwsgi_params_prefix}uwsgi_params; + + # change this to the location of the uWSGI socket file (set in uwsgi.ini) + uwsgi_pass unix:{sock_filepath}; + uwsgi_ignore_client_abort on; + uwsgi_send_timeout 330s; + uwsgi_read_timeout 600s; + }} +}} + +""" + +NGINX_CONF_TEMPLATE_NON_ROOT = """ +worker_processes auto; +pid /tmp/nginx.pid; +error_log stderr; + +events {{ + worker_connections 768; + # multi_accept on; +}} + +http {{ + + ## + # Basic Settings + ## + + sendfile on; + tcp_nopush on; + tcp_nodelay on; + keepalive_timeout 610s; + client_body_timeout 610s; + client_header_timeout 610s; + types_hash_max_size 2048; + # server_tokens off; + + # server_names_hash_bucket_size 64; + # server_name_in_redirect off; + + include /etc/nginx/mime.types; + default_type application/octet-stream; + + ## + # SSL Settings + ## + + ssl_protocols TLSv1 TLSv1.1 TLSv1.2; # Dropping SSLv3, ref: POODLE + ssl_prefer_server_ciphers on; + + ## + # Logging Settings + ## + + ## + # Gzip Settings + ## + + gzip on; + + {nginx_server_conf_placeholder} +}} +""" diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/util.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/util.py new file mode 100644 index 000000000..b2d6aa4de --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/util.py @@ -0,0 +1,24 @@ +""" +For internal use only. A generic utility for the RESTful module. +""" +import subprocess +from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import ( + UwsgiNginxCoreException, +) + + +def verify_tool_installation(ver_cmd, dev_against_ver, logger): + try: + tool = ver_cmd.split()[0] + logger.info("Verifying '{tool}' proper installation ...".format(tool=tool)) + + ver_msg = subprocess.check_output(ver_cmd, shell=True) + msg = "'{}', PM developed against: '{}'".format(tool, dev_against_ver) + if ver_msg: + msg += ", runtime: '{}'".format(ver_msg) + logger.info(msg) + except subprocess.CalledProcessError: + raise UwsgiNginxCoreException( + "'{tool}' is not installed! Please make sure to provide a docker container, with" + "proper '{tool}' installation.".format(tool=tool) + ) diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_broker.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_broker.py new file mode 100644 index 000000000..d03a2b943 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_broker.py @@ -0,0 +1,408 @@ +""" +For internal use only. The uwsgi borker is designed to handle any related 'uWSGI' actions, such as +setup, configuration and execution +""" +import logging +import os +import re +import subprocess +import sys + +from datarobot_drum.resource.components.Python.uwsgi_component.common import constants +from datarobot_drum.resource.components.Python.uwsgi_component.common.base import Base +from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import ( + UwsgiNginxCoreException, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful import util +from datarobot_drum.resource.components.Python.uwsgi_component.restful.flask_app_wrapper import ( + FlaskAppWrapper, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.flask_route import FlaskRoute +from datarobot_drum.resource.components.Python.uwsgi_component.restful.flask_custom_json_encoder import ( + FlaskCustomJsonEncode, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.constants import ( + SharedConstants, + ComponentConstants, + UwsgiConstants, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.uwsgi_monitor import ( + WsgiMonitor, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.uwsgi_ini_template import ( + WSGI_INI_CONTENT, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.uwsgi_entry_point_script_template import ( + WSGI_ENTRY_SCRIPT, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.uwsgi_cheaper_subsystem import ( + UwsgiCheaperSubSystem, +) +from datarobot_drum.resource.components.Python.uwsgi_component.model.model_selector import ( + ModelSelector, +) + +try: + from datarobot_drum.resource.components.Python.uwsgi_component.restful.uwsgi_post_fork import ( + UwsgiPostFork, + ) + import uwsgi +except ImportError: + # You're actually not running under uWSGI + pass + + +class UwsgiBroker(Base): + _restful_comp = None + _model_selector = None + _application = None + _wid = None + w_logger = None + + ####################################################################### + # Methods that are accessed from the Pipeline RESTful component + + def __init__(self, ml_engine, dry_run=False): + super(UwsgiBroker, self).__init__() + self.set_logger(ml_engine.get_engine_logger(self.logger_name())) + self._ml_engine = ml_engine + self._dry_run = dry_run + self._monitor_info = None + self._target_path = None + self._monitor = None + + def setup_and_run(self, shared_conf, entry_point_conf, monitor_info): + self._target_path = shared_conf[SharedConstants.TARGET_PATH_KEY] + self._logger.info("Setup 'uwsgi' server, target path: {}".format(self._target_path)) + self._monitor_info = monitor_info + self._verify_dependencies() + self._generate_entry_point_script(shared_conf, entry_point_conf) + ini_filepath = self._generate_ini_file(shared_conf, entry_point_conf) + self._run(shared_conf, entry_point_conf, ini_filepath) + return self + + def quit(self): + if not self._dry_run and self._target_path: + pid_filepath = os.path.join(self._target_path, UwsgiConstants.PID_FILENAME) + if os.path.isfile(pid_filepath): + self._logger.info("Stopping uwsgi process ...") + try: + cmd = UwsgiConstants.STOP_CMD.format(pid_filepath=pid_filepath) + subprocess.call(cmd, shell=True) + except subprocess.CalledProcessError as ex: + self._logger.info( + "'uwsgi' process stopping issue! output: {}, return-code: {}".format( + ex.output, ex.returncode + ) + ) + + def _verify_dependencies(self): + util.verify_tool_installation( + ver_cmd=UwsgiConstants.VER_CMD, + dev_against_ver=UwsgiConstants.DEV_AGAINST_VERSION, + logger=self._logger, + ) + + def _generate_entry_point_script(self, shared_conf, entry_point_conf): + self._logger.debug("shared conf {}".format(shared_conf)) + self._logger.debug("entry conf {}".format(entry_point_conf)) + + wsgi_entry_script_code = WSGI_ENTRY_SCRIPT.format( + module=self.__class__.__module__, + cls=self.__class__.__name__, + restful_comp_module=entry_point_conf[UwsgiConstants.RESTFUL_COMP_MODULE_KEY], + restful_comp_cls=entry_point_conf[UwsgiConstants.RESTFUL_COMP_CLS_KEY], + root_logger_name=constants.LOGGER_NAME_PREFIX, + log_format=entry_point_conf[ComponentConstants.LOG_FORMAT_KEY], + log_level=entry_point_conf[ComponentConstants.LOG_LEVEL_KEY], + params=entry_point_conf[UwsgiConstants.PARAMS_KEY], + pipeline_name=entry_point_conf[UwsgiConstants.PIPELINE_NAME_KEY], + model_path=entry_point_conf[UwsgiConstants.MODEL_PATH_KEY], + standalone=shared_conf[SharedConstants.STANDALONE], + deputy_id=entry_point_conf[UwsgiConstants.DEPUTY_ID_KEY], + stats_path_filename=shared_conf[SharedConstants.STATS_PATH_FILENAME_KEY], + within_uwsgi_context=(not entry_point_conf[ComponentConstants.SINGLE_UWSGI_WORKER_KEY]), + ) + + uwsgi_script_filepath = os.path.join( + self._target_path, UwsgiConstants.ENTRY_POINT_SCRIPT_NAME + ) + self._logger.info("Writing uWSGI entry point to: {}".format(uwsgi_script_filepath)) + + with open(uwsgi_script_filepath, "w") as f: + f.write(wsgi_entry_script_code) + + def _generate_ini_file(self, shared_conf, entry_point_conf): + python_paths = [p for p in sys.path if UwsgiBroker._include_py_path(p)] + python_paths = ":".join(python_paths) + + cheaper_conf = UwsgiCheaperSubSystem.get_config() + self._logger.info("Cheaper subsystem: {}".format(cheaper_conf)) + + metrics = entry_point_conf[ComponentConstants.METRICS_KEY] + + single_uwsgi_worker = entry_point_conf[ComponentConstants.SINGLE_UWSGI_WORKER_KEY] + comment_out_master = "# " if single_uwsgi_worker else "" + comment_out_non_master = "" if single_uwsgi_worker else "# " + comment_out_cheaper = "# " if single_uwsgi_worker else "" + + # define cheaper system memory limits + cheaper_memory_limits = "" + cheaper_rss_limit_soft = entry_point_conf[UwsgiConstants.PARAMS_KEY].get( + ComponentConstants.UWSGI_CHEAPER_RSS_LIMIT_SOFT_KEY, None + ) + cheaper_rss_limit_hard = entry_point_conf[UwsgiConstants.PARAMS_KEY].get( + ComponentConstants.UWSGI_CHEAPER_RSS_LIMIT_HARD_KEY, None + ) + memory_report = entry_point_conf[UwsgiConstants.PARAMS_KEY].get( + ComponentConstants.UWSGI_MEMORY_REPORT_KEY, False + ) + + if cheaper_rss_limit_soft: + cheaper_memory_limits += "cheaper-rss-limit-soft = {}\n".format(cheaper_rss_limit_soft) + if cheaper_rss_limit_hard: + cheaper_memory_limits += "cheaper-rss-limit-hard = {}\n".format(cheaper_rss_limit_hard) + if memory_report or len(cheaper_memory_limits): + cheaper_memory_limits += "memory-report = true\n" + max_workers_from_config = entry_point_conf[UwsgiConstants.PARAMS_KEY].get( + ComponentConstants.UWSGI_MAX_WORKERS_KEY, None + ) + + workers = 1 if single_uwsgi_worker else cheaper_conf[UwsgiCheaperSubSystem.WORKERS] + threads = entry_point_conf[UwsgiConstants.PARAMS_KEY].get( + ComponentConstants.UWSGI_THREADS, "1" + ) + cheaper = cheaper_conf[UwsgiCheaperSubSystem.CHEAPER] + cheaper_initial = cheaper_conf[UwsgiCheaperSubSystem.CHEAPER_INITIAL] + cheaper_step = cheaper_conf[UwsgiCheaperSubSystem.CHEAPER_STEP] + + if max_workers_from_config: + workers = min(max_workers_from_config, workers) + # cheaper param must be less than workers + cheaper = min(cheaper, workers - 1) + cheaper_initial = min(max_workers_from_config, cheaper_initial) + cheaper_step = min(max_workers_from_config, cheaper_step) + + logging_udp_port = entry_point_conf[UwsgiConstants.LOGGING_UDP_SOCKET].getsockname()[1] + logto = "localhost:{}".format(logging_udp_port) + log_socket = "socket:" + logto + + ini_content = WSGI_INI_CONTENT.format( + restful_app_folder=self._target_path, + pid_filename=UwsgiConstants.PID_FILENAME, + sock_filename=shared_conf[SharedConstants.SOCK_FILENAME_KEY], + stats_sock_filename=shared_conf[SharedConstants.STATS_SOCK_FILENAME_KEY], + restful_app_file=UwsgiConstants.ENTRY_POINT_SCRIPT_NAME, + callable_app="application", + python_paths=python_paths, + disable_logging=entry_point_conf[ComponentConstants.UWSGI_DISABLE_LOGGING_KEY], + cheaper_memory_limits=cheaper_memory_limits, + workers=workers, + threads=threads, + cheaper=cheaper, + cheaper_initial=cheaper_initial, + cheaper_step=cheaper_step, + enable_metrics="true" if metrics else "false", + metrics=self._get_metrics_configuration(metrics), + log_socket=log_socket, + logto=logto, + master="false" if single_uwsgi_worker else "true", + log_master="false" if single_uwsgi_worker else "true", + comment_out_master=comment_out_master, + comment_out_non_master=comment_out_non_master, + comment_out_cheaper=comment_out_cheaper, + ) + + ini_filepath = os.path.join(self._target_path, UwsgiConstants.INI_FILENAME) + self._logger.info("Writing uWSGI ini file to: {}".format(ini_filepath)) + + with open(ini_filepath, "w") as f: + f.write(ini_content) + + return ini_filepath + + @staticmethod + def _include_py_path(filename): + if filename.endswith(".egg"): + py_ver = "py{}".format(sys.version_info[0]) + if py_ver in filename: + return True + elif not re.search("py[2-3]", filename): + # 'py2' or 'py3' are not in the egg filename, so include it + return True + else: + return False + return True + + def _get_metrics_configuration(self, metrics): + metrics_conf = "" + if metrics: + for index, metric_name in enumerate(metrics): + metrics_conf += ( + ComponentConstants.METRIC_TEMPLATE.format(metric_name, index + 1) + "\n" + ) + return metrics_conf + + def _run(self, shared_conf, entry_point_conf, ini_filepath): + uwsgi_start_cmd = UwsgiConstants.START_CMD.format(filepath=ini_filepath) + self._logger.info("Running 'uwsgi' server, cmd: '{}'".format(uwsgi_start_cmd)) + + if self._dry_run: + return + + self._monitor = WsgiMonitor( + self._ml_engine, self._monitor_info, shared_conf, entry_point_conf + ) + self._logger.debug("Broker uuid: {}".format(self._ml_engine.get_uuid())) + + subprocess.Popen( + uwsgi_start_cmd, + shell=True, + ) + self._logger.info("uwsgi was launched, waiting for it to run ...") + self._monitor.verify_proper_startup() + + if not UwsgiConstants.DAEMONIZE: + # The output should be read by a thread + self._monitor.start() + + self._logger.info("'uwsgi' started successfully") + + ####################################################################### + # Methods that are accessed from uWSGI worker + + @classmethod + def uwsgi_entry_point( + cls, + restful_comp, + pipeline_name, + model_path, + deputy_id, + stats_path_filename, + within_uwsgi_context, + standalone, + ): + cls._wid = 0 if not within_uwsgi_context else uwsgi.worker_id() + + cls.w_logger = logging.getLogger("{}.{}".format(cls.__module__, cls.__name__)) + + cls.w_logger.info( + "Entered to uWSGI entry point ... (wid: {}, pid: {}, ppid:{})".format( + cls._wid, os.getpid(), os.getppid() + ) + ) + + cls.w_logger.info( + "Restful comp (wid: {}, pid: {}, ppid:{}): {}".format( + cls._wid, os.getpid(), os.getppid(), restful_comp + ) + ) + + cls.w_logger.info("Standalone mode (wid: {}): {}".format(cls._wid, standalone)) + restful_comp._ml_engine.set_standalone(standalone) + cls._restful_comp = restful_comp + cls._restful_comp._uuid_engine = deputy_id + cls._restful_comp._stats_path_filename = stats_path_filename + + if within_uwsgi_context: + UwsgiPostFork.init(cls) + + cls._setup_flask_app(pipeline_name, cls.w_logger) + + cls.register_model_load_handler(model_path, cls.w_logger, within_uwsgi_context) + + print("Deputy id {}".format(deputy_id)) + print("Stats path {}".format(stats_path_filename)) + # This should be the last printout in the uwsgi entry point function!!! + print(WsgiMonitor.SUCCESS_RUN_INDICATION_MSG) + + @classmethod + def restful_comp(cls): + return cls._restful_comp + + @classmethod + def _setup_flask_app(cls, pipeline_name, logger): + app = FlaskAppWrapper(pipeline_name) + for endpoint_entry in FlaskRoute.routes(): + logger.info("Add routing endpoint: {}".format(endpoint_entry)) + app.add_endpoint( + url_rule=endpoint_entry[0], + endpoint=endpoint_entry[1], + handler=getattr(cls._restful_comp, endpoint_entry[2]), + options=endpoint_entry[3], + raw=endpoint_entry[4], + ) + + cls._application = FlaskAppWrapper.app + cls._application.json_encoder = FlaskCustomJsonEncode + cls._application.config["PROPAGATE_EXCEPTIONS"] = None + + @classmethod + def register_model_load_handler(cls, model_path, logger, within_uwsgi_context): + logger.info("Model file path: {}".format(model_path)) + + if within_uwsgi_context: + cls._model_selector = ModelSelector(model_path, cls._restful_comp._ml_engine.standalone) + + logger.info( + "Register signal (model reloading): {} (worker, wid: {}, {})".format( + UwsgiConstants.MODEL_RELOAD_SIGNAL_NUM, + cls._wid, + UwsgiBroker._model_selector, + ) + ) + uwsgi.register_signal( + UwsgiConstants.MODEL_RELOAD_SIGNAL_NUM, + "workers", + cls._model_reload_signal, + ) + + logger.info( + "Model path to monitor (signal {}, wid: {}): {}".format( + UwsgiConstants.MODEL_RELOAD_SIGNAL_NUM, + cls._wid, + cls._model_selector.model_env.sync_filepath, + ) + ) + uwsgi.add_file_monitor( + UwsgiConstants.MODEL_RELOAD_SIGNAL_NUM, + cls._model_selector.model_env.sync_filepath, + ) + else: + if os.path.isfile(model_path): + UwsgiBroker._restful_comp.load_model_callback(model_path, stream=None, version=None) + + @staticmethod + def _model_reload_signal(num): + UwsgiBroker.w_logger.info( + "Received model reload signal ... wid: {}, pid: {}".format( + UwsgiBroker._wid, os.getpid() + ) + ) + UwsgiBroker._reload_last_approved_model() + + @staticmethod + def _reload_last_approved_model(): + if not UwsgiBroker._restful_comp or not UwsgiBroker._model_selector: + raise UwsgiNginxCoreException( + "Unexpected RESTful comp invariants! _restful_comp={}, _model_selector={}".format( + UwsgiBroker._restful_comp, UwsgiBroker._model_selector + ) + ) + + model_filepath = UwsgiBroker._model_selector.pick_model_filepath() + UwsgiBroker.w_logger.debug( + "Picking model (wid: {}, pid: {}, {}): {}".format( + UwsgiBroker._wid, + os.getpid(), + UwsgiBroker._model_selector, + model_filepath, + ) + ) + + # The following condition is here to handle an initial state, where a model was not set + # for the given pipeline + if model_filepath is not None and os.path.isfile(model_filepath): + UwsgiBroker._restful_comp.load_model_callback(model_filepath, stream=None, version=None) + else: + UwsgiBroker.w_logger.info("Model file does not exist: {}".format(model_filepath)) diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_cheaper_subsystem.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_cheaper_subsystem.py new file mode 100644 index 000000000..b0e4435da --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_cheaper_subsystem.py @@ -0,0 +1,67 @@ +""" +For internal use only. The uwsgi cheaper sub system provides the ability to dynamically +scale the number of running workers via pluggable algorithms + +Reference: https://uwsgi-docs.readthedocs.io/en/latest/Cheaper.html +""" +import math +import multiprocessing + + +class UwsgiCheaperSubSystem: + CPU_COUNT = multiprocessing.cpu_count() + + # workers - maximum number of workers that can be spawned + WORKERS = "workers" + + # cheaper - minimum number of workers to keep at all times + CHEAPER = "cheaper" + + # cheaper-initial - number of workers to spawn at startup + CHEAPER_INITIAL = "cheaper-initial" + + # cheaper-step - how many workers should be spawned at a time + CHEAPER_STEP = "cheaper-step" + + CONF = [ + { + WORKERS: 2, + CHEAPER: 1, + CHEAPER_INITIAL: 2, + CHEAPER_STEP: 1, + }, # CPU_COUNT: 1 ~ 3 + { + WORKERS: 3, + CHEAPER: 2, + CHEAPER_INITIAL: 2, + CHEAPER_STEP: 1, + }, # CPU_COUNT: 4 ~ 7 + { + WORKERS: CPU_COUNT - 4, + CHEAPER: 2, + CHEAPER_INITIAL: 3, + CHEAPER_STEP: 2, + }, # CPU_COUNT: 8 ~ 15 + { + WORKERS: CPU_COUNT - 4, + CHEAPER: 5, + CHEAPER_INITIAL: 5, + CHEAPER_STEP: 3, + }, # CPU_COUNT: 16 ~ 23 + { + WORKERS: CPU_COUNT - 4, + CHEAPER: 5, + CHEAPER_INITIAL: 5, + CHEAPER_STEP: 5, + }, # CPU_COUNT: > 24 + ] + + @staticmethod + def get_config(): + entry = int(math.log(UwsgiCheaperSubSystem.CPU_COUNT, 2)) - 1 + if entry < 0: + entry = 0 + elif entry > (len(UwsgiCheaperSubSystem.CONF) - 1): + entry = len(UwsgiCheaperSubSystem.CONF) - 1 + + return UwsgiCheaperSubSystem.CONF[entry] diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_entry_point_script_template.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_entry_point_script_template.py new file mode 100644 index 000000000..ce6cdc6ee --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_entry_point_script_template.py @@ -0,0 +1,26 @@ +WSGI_ENTRY_SCRIPT = """ +import logging + +from datarobot_drum.resource.components.Python.uwsgi_component.ml_engine.rest_model_serving_engine import RestModelServingEngine +from {module} import {cls} +from {restful_comp_module} import {restful_comp_cls} + + +logging.basicConfig(format='{log_format}') +logging.getLogger('{root_logger_name}').setLevel({log_level}) + +comp = {restful_comp_cls}(None) +comp.configure({params}) + +{cls}.uwsgi_entry_point( + comp, + '{pipeline_name}', + '{model_path}', + '{deputy_id}', + '{stats_path_filename}', + within_uwsgi_context={within_uwsgi_context}, + standalone={standalone} +) + +application = {cls}._application +""" diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_ini_template.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_ini_template.py new file mode 100644 index 000000000..2e35a2c71 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_ini_template.py @@ -0,0 +1,113 @@ +WSGI_INI_CONTENT = """ +[uwsgi] + +# If VIRTAL_ENV is set then use its value to specify the virtualenv directory +if-env = VIRTUAL_ENV +print = Your virtualenv is %(_) +virtualenv = %(_) +endif = + +# placeholders that you have to change +restful_app_folder = {restful_app_folder} +# logs_folder = %(restful_app_folder)/logs + +my_user = root + +# exec-asap = mkdir -p %(logs_folder) +# daemonize uWSGI after app loading. Note that this option is mandatory +# and should not be changed! See comment in 'uwsgi_broker.py' +# daemonize2 = %(logs_folder)/uwsgi-@(exec://date +%%Y-%%m-%%d).log +# logto = %(logs_folder)/uwsgi-@(exec://date +%%Y-%%m-%%d).log + +{comment_out_master}logger = {log_socket} +{comment_out_non_master}logto = {logto} + +# exit if no app can be loaded +need-app = true + +disable-logging = {disable_logging} + +# log max size is 1MB. +# log-maxsize = 1048576 +# log-reopen = true + +#Cron: +# - check if logs size bigger then 10 MB: +# du -sb /tmp/restful_comp_TbVOX1/logs | [[ $(awk '{{print $1}}') > 70000 ]] && echo bigger +# - delete 10 oldest files: +# find /tmp/restful_comp_TbVOX1/logs -type f -printf '%Ts\\t%p\\n' | \ +# sort -nr | cut -f2 | tail -n 10 | xargs -n 1 -I '_' rm _ + +pidfile = %(restful_app_folder)/{pid_filename} +socket = %(restful_app_folder)/{sock_filename} +chdir = %(restful_app_folder) +file = {restful_app_file} +callable = {callable_app} + +# environment variables +env = CUDA_VISIBLE_DEVICES=-1 +env = PYTHONPATH=%(restful_app_folder):{python_paths} + +master = {master} +log-master = {master} +processes = {workers} +threads = {threads} + +# allows nginx (and all users) to read and write on this socket +chmod-socket = 666 + +# remove the socket when the process stops +vacuum = true + +# loads your application one time per worker +# will very probably consume more memory, +# but will run in a more consistent and clean environment. +lazy-apps = false + +uid = %U +gid = %G + +# uWSGI will kill the process instead of reloading it +die-on-term = true + +# socket file for getting stats about the workers +stats = %(restful_app_folder)/{stats_sock_filename} + +# Cheaper memory limits +# cheaper-rss-limit-soft - soft limit will prevent cheaper from spawning new workers +# if workers total rss memory is equal or higher +# +# cheaper-rss-limit-hard limit will force cheaper to cheap single worker +# if workers total rss memory is equal or higher +{cheaper_memory_limits} + +# Scaling the server with the Cheaper subsystem + +# set cheaper algorithm to use, if not set default will be used +cheaper-algo = spare + +# minimum number of workers to keep at all times +{comment_out_cheaper}cheaper = {cheaper} + +# number of workers to spawn at startup +{comment_out_cheaper}cheaper-initial = {cheaper_initial} + +# maximum number of workers that can be spawned +{comment_out_cheaper}workers = {workers} + +# how many workers should be spawned at a time +{comment_out_cheaper}cheaper-step = {cheaper_step} + +# skip atexit hooks (ignored by the master) +skip-atexit = false + +# skip atexit teardown (ignored by the master) +skip-atexit-teardown = true + +# enable metrics subsystem +enable-metrics = {enable_metrics} + +# oid prefix: 1.3.6.1.4.1.35156.17.3 +# metric = name=pm-counter1,type=counter,initial_value=0,oid=100.1 +{metrics} +""" diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_monitor.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_monitor.py new file mode 100644 index 000000000..c733e3f94 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_monitor.py @@ -0,0 +1,118 @@ +""" +For internal use only. The WsgiMonitor is designed to monitor the standard output/error of the +'uWSGI'processes, as well as reading the statistics from the uwsgi master process. +""" +import select +import time +import threading +import traceback + +from datarobot_drum.resource.components.Python.uwsgi_component.common.base import Base +from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import ( + UwsgiNginxCoreException, +) +from datarobot_drum.resource.components.Python.uwsgi_component.common.buff_to_lines import ( + BufferToLines, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.constants import ( + UwsgiConstants, + ComponentConstants, + SharedConstants, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.uwsgi_statistics import ( + UwsgiStatistics, +) + + +class WsgiMonitor(Base): + SUCCESS_RUN_INDICATION_MSG = "uwsgi entry point run successfully" + STATS_JSON_MAX_SIZE_BYTES = 64 * 2048 # max 64 cores, 2k per core + + def __init__(self, ml_engine, monitor_info, shared_conf, uwsgi_entry_point_conf): + super(self.__class__, self).__init__() + self.set_logger(ml_engine.get_engine_logger(self.logger_name())) + + self._stats_reporting_interval_sec = uwsgi_entry_point_conf[ + ComponentConstants.STATS_REPORTING_INTERVAL_SEC + ] + + self._logging_udp_socket = uwsgi_entry_point_conf[UwsgiConstants.LOGGING_UDP_SOCKET] + + self._stats = None + if not shared_conf.get(SharedConstants.STANDALONE): + self._stats = UwsgiStatistics( + self._stats_reporting_interval_sec, + shared_conf["target_path"], + shared_conf["stats_sock_filename"], + self._logger, + ) + + self._monitor_info = monitor_info + self._shared_conf = shared_conf + + def verify_proper_startup(self): + # Important note: the following code snippet assumes that the application's startup is not + # demonized and consequently the logs can be read. Therefor, it is important to use + # the 'daemonize2' option in 'uwsgi.ini' file + self._monitor_uwsgi_proc(stop_msg=WsgiMonitor.SUCCESS_RUN_INDICATION_MSG) + + def start(self): + # Set as daemon thread because if main thread is about to exit, we do **not** need to + # block on this thread for a clean exit. + th = threading.Thread(name="uWSGI Monitor", target=self._run, daemon=True) + self._monitor_info[UwsgiConstants.MONITOR_THREAD_KEY] = th + th.start() + + def _run(self): + self._logger.info("Starting logging monitoring in the background ...") + try: + self._monitor_uwsgi_proc() + except: # noqa: E722 + self._monitor_info[UwsgiConstants.MONITOR_ERROR_KEY] = traceback.format_exc() + finally: + if self._logging_udp_socket: + self._logging_udp_socket.close() + self._logger.info("Exited logging monitoring!") + + def _monitor_uwsgi_proc(self, stop_msg=None): + try: + monitor_stats = not stop_msg + block_size = 2048 + stdout_buff2lines = BufferToLines() + + keep_reading = True + last_stats_read = time.time() + while keep_reading: + read_fs = [self._logging_udp_socket] + + # Sleep the exact time left within a 1 sec interval + if monitor_stats: + sleep_time = self._stats_reporting_interval_sec - ( + time.time() - last_stats_read + ) + if sleep_time < 0: + sleep_time = 0 + else: + sleep_time = self._stats_reporting_interval_sec + + readable_fd = select.select(read_fs, [], [], sleep_time)[0] + + if monitor_stats: + wakeup_time = time.time() + if wakeup_time - last_stats_read > self._stats_reporting_interval_sec: + last_stats_read = wakeup_time + if self._stats: + self._stats.report() + + if readable_fd: + buff, _ = readable_fd[0].recvfrom(block_size) + stdout_buff2lines.add(buff) + for line in stdout_buff2lines.lines(): + print(line) + + if stop_msg and stop_msg.encode() in buff: + keep_reading = False + except UwsgiNginxCoreException: + if self._logging_udp_socket: + self._logging_udp_socket.close() + raise diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_post_fork.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_post_fork.py new file mode 100644 index 000000000..22da70379 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_post_fork.py @@ -0,0 +1,83 @@ +import logging +import uuid +import uwsgi + +from datarobot_drum.resource.components.Python.uwsgi_component.restful.constants import ( + ComponentConstants, +) + + +class UwsgiPostFork: + _uwsgi_broker = None + _configure_op = None + _postfork_op = None + _verbose = False + + @staticmethod + def init(uwsgi_borker): + UwsgiPostFork._uwsgi_broker = uwsgi_borker + + UwsgiPostFork._configure_op = getattr( + uwsgi_borker.restful_comp(), + ComponentConstants.CONFIGURE_CALLBACK_FUNC_NAME, + None, + ) + UwsgiPostFork._postfork_op = getattr( + uwsgi_borker.restful_comp(), + ComponentConstants.POST_FORK_CALLBACK_FUNC_NAME, + None, + ) + UwsgiPostFork._verbose = uwsgi_borker.w_logger.isEnabledFor(logging.DEBUG) + + # Note: it is necessary to enable the uWSGI master process to use + # 'uwsgidecorators' module + import uwsgidecorators + + uwsgidecorators.postfork(UwsgiPostFork.do_post_fork) + + @staticmethod + def do_post_fork(): + if UwsgiPostFork._verbose: + UwsgiPostFork._uwsgi_broker.w_logger.debug( + "wid: {}, postfork hook called".format(uwsgi.worker_id()) + ) + + UwsgiPostFork._uwsgi_broker.restful_comp().set_wid(uwsgi.worker_id()) + UwsgiPostFork._uwsgi_broker.restful_comp().set_wuuid(str(uuid.uuid4())) + msg_prefix = "wid: {}, ".format(uwsgi.worker_id()) + + uwsgi.atexit = UwsgiPostFork._uwsgi_broker.restful_comp()._on_exit + + if callable(UwsgiPostFork._postfork_op): + UwsgiPostFork._postfork_op() + else: + if UwsgiPostFork._verbose: + UwsgiPostFork._uwsgi_broker.w_logger.debug( + msg_prefix + + "'{}' is not defined by {}".format( + ComponentConstants.POST_FORK_CALLBACK_FUNC_NAME, + UwsgiPostFork._uwsgi_broker.restful_comp(), + ) + ) + + if callable(UwsgiPostFork._configure_op): + if UwsgiPostFork._verbose: + UwsgiPostFork._uwsgi_broker.w_logger.debug( + msg_prefix + "calling configure callback ..." + ) + UwsgiPostFork._configure_op() + else: + if UwsgiPostFork._verbose: + UwsgiPostFork._uwsgi_broker.w_logger.debug( + msg_prefix + + "'{}' is not defined by {}".format( + ComponentConstants.CONFIGURE_CALLBACK_FUNC_NAME, + UwsgiPostFork._uwsgi_broker.restful_comp(), + ) + ) + + if UwsgiPostFork._verbose: + UwsgiPostFork._uwsgi_broker.w_logger.debug( + msg_prefix + "calling model load callback ..." + ) + UwsgiPostFork._uwsgi_broker._reload_last_approved_model() diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_statistics.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_statistics.py new file mode 100644 index 000000000..caff53a89 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_statistics.py @@ -0,0 +1,71 @@ +import json +import logging +import os +import socket + +from datarobot_drum.resource.components.Python.uwsgi_component.restful.uwsgi_stats_snapshot import ( + UwsiStatsSnapshot, +) + + +class UwsgiStatistics(object): + STATS_JSON_MAX_SIZE_BYTES = 64 * 2048 # max 64 cores, 2k per core + + def __init__( + self, + reporting_interval_sec, + target_path, + sock_filename, + logger, + ): + self._logger = logger + self._reporting_interval_sec = reporting_interval_sec + self._server_address = os.path.join(target_path, sock_filename) + self._stats_sock = None + + self._curr_stats_snapshot = None + self._prev_stats_snapshot = None + self._prev_metrics_snapshot = None + + def report(self): + raw_stats = self._read_raw_statistics() + if not raw_stats: + return + + self._curr_stats_snapshot = UwsiStatsSnapshot(raw_stats, self._prev_stats_snapshot) + + self._logger.info(self._curr_stats_snapshot) + self._prev_stats_snapshot = self._curr_stats_snapshot + + def _read_raw_statistics(self): + sock = self._setup_stats_connection() + if sock: + try: + data = sock.recv(UwsgiStatistics.STATS_JSON_MAX_SIZE_BYTES) + return json.loads(data.decode("utf-8")) + except ValueError as e: + self._logger.error( + "Invalid statistics json format! {}, data:\n{}\n".format(e.message, data) + ) + finally: + if sock: + sock.close() + return None + + def _setup_stats_connection(self): + if self._logger.isEnabledFor(logging.DEBUG): + self._logger.debug( + "Connecting to uWSGI statistics server via unix socket: {}".format( + self._server_address + ) + ) + + stats_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + stats_sock.connect(self._server_address) + except socket.error as ex: + self._logger.warning( + "Failed to open connection to uWSI statistics server! {}".format(ex) + ) + return None + return stats_sock diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_stats_snapshot.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_stats_snapshot.py new file mode 100644 index 000000000..53862afdf --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful/uwsgi_stats_snapshot.py @@ -0,0 +1,210 @@ +from datarobot_drum.resource.components.Python.uwsgi_component.common.topological_sort import ( + TopologicalSort, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.metric import ( + Metric, + MetricType, + MetricRelation, +) + + +class UwsiStatsSnapshot(object): + def __init__(self, raw_stats, prev_stats_snapshot): + self._raw_stats = raw_stats + self._total_requests = None + self._total_requests_diff = None + self._sorted_worker_stats = None + self._avg_rt = None + self._uwsgi_pm_metrics = None + self._uwsgi_pm_metrics_accumulation = {} + self._uwsgi_pm_metrics_per_window = None + self._metrics_execution_order = [] + self._worker_ids = None + self._extract_relevant_stats(prev_stats_snapshot) + + def _extract_relevant_stats(self, prev_stats_snapshot): + self._extract_workers_stats(prev_stats_snapshot) + self._extract_metrics_stats(prev_stats_snapshot) + + def _extract_workers_stats(self, prev_stats_snapshot): + self._avg_rt = sorted( + [ + ("wid-{:02d}".format(w["id"]), [w["avg_rt"]]) + for w in self._raw_stats["workers"] + if w["id"] != 0 + ] + ) + + worker_requests = { + "wid-{:02d}".format(w["id"]): (w["requests"], w["status"].encode("utf8")) + for w in self._raw_stats["workers"] + if w["id"] != 0 + } + self._worker_ids = sorted([w["id"] for w in self._raw_stats["workers"] if w["id"] != 0]) + + self._sorted_worker_stats = sorted( + [(k, v[0], v[1].decode()) for k, v in worker_requests.items()] + ) + + self._total_requests = sum(v for _, v, _ in self._sorted_worker_stats) + + self._total_requests_diff = ( + self._total_requests - prev_stats_snapshot.total_requests + if prev_stats_snapshot is not None + else self._total_requests + ) + + def _extract_metrics_stats(self, prev_stats_snapshot): + raw_metrics = self._raw_stats.get("metrics", None) + if not raw_metrics: + return + + self._uwsgi_pm_metrics = self._extract_relevant_raw_metrics(raw_metrics) + if not self._uwsgi_pm_metrics: + return + + if not self._metrics_execution_order: + # arrange the metrics in a topological order because it could be a DAG, where one + # metric is dependant on a metric that depends on others. Using the topological + # mechanism we can also allow more then one reference in a single metric definition. + self._metrics_execution_order = TopologicalSort( + Metric.metrics(), "metric_name", "related_metric_meta" + ).sort() + + self._uwsgi_pm_metrics_per_window = {} + + for metric_meta in self._metrics_execution_order: + metric_name = metric_meta.metric_name + metric_value = self.uwsgi_pm_metrics[metric_name] + + if metric_meta.metric_type == MetricType.COUNTER_PER_TIME_WINDOW: + if prev_stats_snapshot: + metric_value -= prev_stats_snapshot.uwsgi_pm_metric_by_name(metric_name) + + self._calculate_metric_value( + metric_value, + metric_meta, + self.total_requests_diff, + self.uwsgi_pm_metrics_per_window, + ) + else: + self._calculate_metric_value( + metric_value, + metric_meta, + self.total_requests, + self._uwsgi_pm_metrics, + ) + self._uwsgi_pm_metrics_accumulation[metric_name] = self._uwsgi_pm_metrics[ + metric_name + ] + + def _extract_relevant_raw_metrics(self, raw_metrics): + uwsgi_pm_metrics = {} + # Set values according their types + for name, body in raw_metrics.items(): + if Metric.NAME_SUFFIX in name: + value = body["value"] + if Metric.metric_by_name(name).value_type == float: + value /= Metric.FLOAT_PRECISION + uwsgi_pm_metrics[name] = value + + return uwsgi_pm_metrics + + def _calculate_metric_value(self, metric_value, metric_meta, total_requests, related_metrics): + metric_name = metric_meta.metric_name + + if metric_meta.metric_relation == MetricRelation.BAR_GRAPH: + # A graph bar is only a place holder for other metrics. It does not have a value by + # itself. + pass + + elif metric_meta.metric_relation == MetricRelation.AVG_PER_REQUEST: + if total_requests: + metric_value /= total_requests + + elif metric_meta.related_metric and metric_meta.related_metric[0]: + related_metric_name = metric_meta.related_metric[0].metric_name + if metric_meta.metric_relation == MetricRelation.DIVIDE_BY: + if metric_value and related_metrics[related_metric_name]: + metric_value /= related_metrics[related_metric_name] + elif metric_meta.metric_relation == MetricRelation.MULTIPLY_BY: + metric_value *= related_metrics[related_metric_name] + elif metric_meta.metric_relation == MetricRelation.SUM_OF: + metric_value += related_metrics[related_metric_name] + + related_metrics[metric_name] = metric_value + + def __str__(self): + return ( + "Total_requests: {}, diff-requests: {}, requests + status: {}, " + "avg response time: {}, metrics: {}".format( + self.total_requests, + self.total_requests_diff, + self.sorted_worker_stats, + self.avg_workers_response_time, + self.uwsgi_pm_metrics, + ) + ) + + @property + def total_requests(self): + return self._total_requests + + @property + def total_requests_diff(self): + return self._total_requests_diff + + @property + def sorted_worker_stats(self): + return self._sorted_worker_stats + + @property + def avg_workers_response_time(self): + return self._avg_rt + + @property + def worker_ids(self): + return self._worker_ids + + @property + def uwsgi_pm_metrics(self): + return self._uwsgi_pm_metrics + + def uwsgi_pm_metric_by_name(self, name): + return self._uwsgi_pm_metrics[name] + + @property + def uwsgi_pm_metrics_accumulation(self): + return self._uwsgi_pm_metrics_accumulation + + @property + def uwsgi_pm_metrics_per_window(self): + return self._uwsgi_pm_metrics_per_window + + def should_report_requests_per_window_time(self, stats_snapshot): + return stats_snapshot is None or ( + self.total_requests_diff != stats_snapshot.total_requests_diff + ) + + def should_report_average_response_time(self, stats_snapshot): + return stats_snapshot is None or ( + self.avg_workers_response_time != stats_snapshot.avg_workers_response_time + ) + + def should_report_worker_status(self, stats_snapshot): + return stats_snapshot is None or ( + self.sorted_worker_stats != stats_snapshot.sorted_worker_stats + ) + + def should_report_metrics_accumulation(self, stats_snapshot): + return stats_snapshot is None or ( + self.uwsgi_pm_metrics_accumulation != stats_snapshot.uwsgi_pm_metrics_accumulation + ) + + def should_report_metrics_per_time_window(self, stats_snapshot): + if self.uwsgi_pm_metrics_per_window is None: + return False + + return stats_snapshot is None or ( + self.uwsgi_pm_metrics_per_window != stats_snapshot.uwsgi_pm_metrics_per_window + ) diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful_component.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful_component.py new file mode 100644 index 000000000..86e102683 --- /dev/null +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/restful_component.py @@ -0,0 +1,356 @@ +""" +The RESTfulComponent class is used to create RESTful model serving components. +It is an abstract class that is supposed to be extended and the sub-class is +supposed to implement its abstract methods. + +The class provide the core functionality of the model restful serving, which +includes setup and preparation of the environment, configurations as well as +executing of required services (.e.g uwsig, nginx) +""" + +import abc +import os +import socket +import tempfile +import time +import logging +import subprocess +import sys + +from datarobot_drum.resource.components.Python.uwsgi_component.pipeline import java_mapping +from datarobot_drum.resource.components.Python.uwsgi_component.model.model_env import ModelEnv +from datarobot_drum.resource.components.Python.uwsgi_component.ml_engine.rest_model_serving_engine import ( + RestModelServingEngine, +) +from datarobot_drum.resource.components.Python.uwsgi_component.common import constants +from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import ( + UwsgiNginxCoreException, +) +from mlpiper.components.connectable_component import ConnectableComponent +from datarobot_drum.resource.components.Python.uwsgi_component.restful.uwsgi_broker import ( + UwsgiBroker, +) + +from datarobot_drum.resource.components.Python.uwsgi_component.restful.constants import ( + SharedConstants, + ComponentConstants, + UwsgiConstants, + NginxConstants, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.constants import ( + RestfulConstants, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.nginx_broker import ( + NginxBroker, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.flask_route import FlaskRoute +from datarobot_drum.resource.components.Python.uwsgi_component.restful.metric import ( + Metric, + MetricType, + MetricRelation, +) + + +def str2bool(p): + if isinstance(p, bool): + return p + + return p.lower() in ["true", "yes", "1"] + + +class RESTfulComponent(ConnectableComponent): + _uuid_engine = None + _stats_path_filename = None + _stats_count = 0 + + def __init__(self, engine): + # When this component is created from uwsgi entry point, engine param is None. + # So check for uwsgi installation only once from pipeline/component creation, + # when engine is not None. + if engine is not None: + if ( + subprocess.run( + [sys.executable, "-m", "pip", "show", "uwsgi"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ).returncode + != 0 + ): + err_msg = ( + "'uwsgi' package is required. " + "If you are running on a Windows system, " + "you may not be able to use RESTful component. " + "Check how to install uwsgi on Windows. " + "To install uwsgi on a Mac/Linux system, call " + "`pip install uwsgi` or `pip install drum[uwsgi]`" + ) + raise UwsgiNginxCoreException(err_msg) + + super(RESTfulComponent, self).__init__( + engine if engine else RestModelServingEngine("uwsgi-context") + ) + self._dry_run = False + self._wsgi_broker = None + self._nginx_broker = None + self._wid = None + self._wuuid = None + + self._total_stat_requests = Metric( + "mlpiper.restful.stat_requests", + title="Total number of stat requests", + metric_type=MetricType.COUNTER, + value_type=int, + metric_relation=MetricRelation.SUM_OF, + ) + + def set_wid(self, wid): + self._wid = wid + + def get_wid(self): + return self._wid + + def set_wuuid(self, wuuid): + self._wuuid = wuuid + + def _validate_output(self, objs): + pass + + def _post_validation(self, objs): + pass + + def configure(self, params): + super(RESTfulComponent, self).configure(params) + + def _materialize(self, parent_data_objs, user_data): + monitor_info = { + UwsgiConstants.MONITOR_ERROR_KEY: None, + UwsgiConstants.MONITOR_THREAD_KEY: None, + } + self._setup(self._ml_engine.pipeline_name, monitor_info) + self._wait_and_monitor_errors(monitor_info) + + def _setup(self, pipeline_name, monitor_info): + target_path = tempfile.mkdtemp( + dir=ComponentConstants.TMP_RESTFUL_ROOT, + prefix=ComponentConstants.TMP_RESTFUL_DIR_PREFIX, + ) + os.chmod(target_path, 0o777) + + fd, stats_path_filename = tempfile.mkstemp( + dir=ComponentConstants.TMP_RESTFUL_ROOT, + prefix=ComponentConstants.TMP_RESTFUL_DIR_PREFIX, + ) + os.chmod(stats_path_filename, 0o777) + + self._logger.debug("Path for stats {}".format(stats_path_filename)) + + shared_conf = { + SharedConstants.TARGET_PATH_KEY: target_path, + SharedConstants.SOCK_FILENAME_KEY: UwsgiConstants.SOCK_FILENAME, + SharedConstants.STATS_SOCK_FILENAME_KEY: UwsgiConstants.STATS_SOCK_FILENAME, + SharedConstants.STANDALONE: self._ml_engine.standalone, + SharedConstants.STATS_PATH_FILENAME_KEY: stats_path_filename, + } + + log_format = self._params.get( + ComponentConstants.LOG_FORMAT_KEY, ComponentConstants.DEFAULT_LOG_FORMAT + ) + + log_level_param = self._params.get( + ComponentConstants.LOG_LEVEL_KEY, ComponentConstants.DEFAULT_LOG_LEVEL + ).lower() + log_level = constants.LOG_LEVELS.get(log_level_param, logging.INFO) + self._logger.debug( + "log_level_param: {}, log_level: {}, level_constants: {}".format( + log_level_param, log_level, constants.LOG_LEVELS + ) + ) + + stats_reporting_interval_sec = self._params.get( + ComponentConstants.STATS_REPORTING_INTERVAL_SEC, + ComponentConstants.DEFAULT_STATS_REPORTING_INTERVAL_SEC, + ) + + model_filepath_key = java_mapping.RESERVED_KEYS[ComponentConstants.INPUT_MODEL_TAG_NAME] + self._params[model_filepath_key] = ModelEnv( + self._params[model_filepath_key], self._ml_engine.standalone + ).model_filepath + + single_uwsgi_worker = str2bool( + self._params.get(ComponentConstants.SINGLE_UWSGI_WORKER_KEY, False) + ) + + # Used for uwsgi UDP logging + logging_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + logging_socket.bind(("", 0)) + logging_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + uwsgi_entry_point_conf = { + UwsgiConstants.RESTFUL_COMP_MODULE_KEY: self.__module__, + UwsgiConstants.RESTFUL_COMP_CLS_KEY: self.__class__.__name__, + UwsgiConstants.LOGGING_UDP_SOCKET: logging_socket, + ComponentConstants.LOG_FORMAT_KEY: log_format, + ComponentConstants.LOG_LEVEL_KEY: log_level, + ComponentConstants.STATS_REPORTING_INTERVAL_SEC: stats_reporting_interval_sec, + UwsgiConstants.PARAMS_KEY: self._params, + UwsgiConstants.PIPELINE_NAME_KEY: pipeline_name, + UwsgiConstants.MODEL_PATH_KEY: self._params[model_filepath_key], + UwsgiConstants.DEPUTY_ID_KEY: self._ml_engine.get_uuid(), + ComponentConstants.UWSGI_DISABLE_LOGGING_KEY: str2bool( + self._params.get( + ComponentConstants.UWSGI_DISABLE_LOGGING_KEY, + ComponentConstants.DEFAULT_UWSGI_DISABLE_LOGGING, + ) + ), + ComponentConstants.METRICS_KEY: Metric.metrics(), + ComponentConstants.SINGLE_UWSGI_WORKER_KEY: single_uwsgi_worker, + } + self._logger.debug("uwsgi_entry_point_conf: {}".format(uwsgi_entry_point_conf)) + + nginx_conf = { + ComponentConstants.HOST_KEY: ComponentConstants.DEFAULT_HOST, + ComponentConstants.PORT_KEY: self._params[ComponentConstants.PORT_KEY], + NginxConstants.DISABLE_ACCESS_LOG_KEY: log_level != logging.DEBUG, + } + self._logger.debug("nginx_conf: {}".format(nginx_conf)) + + self._dry_run = str2bool( + self._params.get(ComponentConstants.DRY_RUN_KEY, ComponentConstants.DEFAULT_DRY_RUN) + ) + if self._dry_run: + self._logger.warning( + "\n\n" + 80 * "#" + "\n" + 25 * " " + "Running in DRY RUN mode\n" + 80 * "#" + ) + + self._dry_run = str2bool( + self._params.get(ComponentConstants.DRY_RUN_KEY, ComponentConstants.DEFAULT_DRY_RUN) + ) + + self._wsgi_broker = UwsgiBroker(self._ml_engine, self._dry_run).setup_and_run( + shared_conf, uwsgi_entry_point_conf, monitor_info + ) + + self._nginx_broker = NginxBroker(self._ml_engine, self._dry_run).setup_and_run( + shared_conf, nginx_conf + ) + + def _wait_and_monitor_errors(self, monitor_info): + self._logger.info( + "Going to read model / stop events ... (kidding, going to sleep forever ...)" + ) + + if not self._dry_run and monitor_info[UwsgiConstants.MONITOR_THREAD_KEY]: + try: + monitor_info[UwsgiConstants.MONITOR_THREAD_KEY].join() + + if monitor_info[UwsgiConstants.MONITOR_ERROR_KEY]: + self._logger.error(monitor_info[UwsgiConstants.MONITOR_ERROR_KEY]) + raise UwsgiNginxCoreException(monitor_info[UwsgiConstants.MONITOR_ERROR_KEY]) + except KeyboardInterrupt: + # When running from mlpiper tool (standalone) + pass + finally: + self._nginx_broker.quit() + self._wsgi_broker.quit() + else: + while True: + time.sleep(3600 * 24 * 365) + + @abc.abstractmethod + def load_model_callback(self, model_path, stream, version): + """ + This abstract method is called whenever a new model is supposed to be loaded. The user + is responsible to reload the model and start using it in any consequent predictions + + :param model_path: an absolute file path to the model + """ + pass + + def _on_exit(self): + cleanup_op = getattr(self, ComponentConstants.CLEANUP_CALLBACK_FUNC_NAME, None) + if callable(cleanup_op): + cleanup_op() + else: + self._logger.info( + "'{}' function is not defined by the restful child component!".format( + ComponentConstants.CLEANUP_CALLBACK_FUNC_NAME + ) + ) + + @classmethod + def run(cls, port, model_path): + raise UwsgiNginxCoreException( + "Running restful components from CLI is not allowed without mlpiper" + ) + + def _get_stats_dict(self): + return None + + # NOTE: do not rename this route or over-ride it + @FlaskRoute("/{}/".format(RestfulConstants.STATS_ROUTE)) + def stats(self, url_params, form_params): + status_code = 200 + + import os + import json + + stats_dict = {} + self._stats_count += 1 + self._total_stat_requests.increase() + + if self._stats_path_filename is None or os.stat(self._stats_path_filename).st_size == 0: + pass + else: + with open(self._stats_path_filename, "r") as input: + dict_json = "" + for line in input: + dict_json += line + try: + stats_dict = json.loads(dict_json) + except Exception as e: + stats_dict[RestfulConstants.STATS_SYSTEM_ERROR] = str(e) + # stats_dict['system_error'] = str(e) + print("Unexpected error: {}", str(e)) + + stats_dict[RestfulConstants.STATS_SYSTEM_INFO] = {} + stats_dict[RestfulConstants.STATS_SYSTEM_INFO][RestfulConstants.STATS_WID] = self._wid + stats_dict[RestfulConstants.STATS_SYSTEM_INFO][ + RestfulConstants.STATS_UUID + ] = self._uuid_engine + stats_dict[RestfulConstants.STATS_SYSTEM_INFO][RestfulConstants.STATS_WUUID] = self._wuuid + + custom_stats = self._get_stats_dict() + if custom_stats: + stats_dict.update(custom_stats) + + return status_code, stats_dict + + +if __name__ == "__main__": + + class MyResfulComp(RESTfulComponent): + def __init__(self, engine): + super(RESTfulComponent, self).__init__(engine) + + def load_model(self, model_path): + print("Model is reloading, path: {}".format(model_path)) + + @FlaskRoute("/v1/predict") + def predict_v1(self, url_params, form_params): + print("url_params: {}".format(url_params)) + print("form_params: {}".format(form_params)) + + status_code = 200 + dict_response = {"user": "jon", "age": 100} + return (status_code, dict_response) + + @FlaskRoute("/v2/predict") + def predict_v2(self, url_params, form_params): + print("url_params: {}".format(url_params)) + print("form_params: {}".format(form_params)) + + status_code = 200 + dict_response = {"user": "jon2", "age": 200} + return (status_code, dict_response) + + MyResfulComp.run(9999, "/tmp/model_path/xxx.txt") diff --git a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/uwsgi_serving.py b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/uwsgi_serving.py index e818198dc..515d538c5 100644 --- a/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/uwsgi_serving.py +++ b/custom_model_runner/datarobot_drum/resource/components/Python/uwsgi_component/uwsgi_serving.py @@ -8,10 +8,15 @@ import os import sys -from mlpiper.components.restful.flask_route import FlaskRoute -from mlpiper.components.restful_component import RESTfulComponent -from mlpiper.components.restful.metric import Metric, MetricType, MetricRelation - +from datarobot_drum.resource.components.Python.uwsgi_component.restful.flask_route import FlaskRoute +from datarobot_drum.resource.components.Python.uwsgi_component.restful_component import ( + RESTfulComponent, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.metric import ( + Metric, + MetricType, + MetricRelation, +) from datarobot_drum.drum.common import ( make_predictor_capabilities, diff --git a/requirements_test_unit.txt b/requirements_test_unit.txt index 7fa1d498d..46890cf64 100644 --- a/requirements_test_unit.txt +++ b/requirements_test_unit.txt @@ -1,3 +1,4 @@ pytest responses scikit-learn==1.3.2 +parameterized diff --git a/tests/unit/datarobot_drum/resource/components/test_mlpiper_uwsgi_broker.py b/tests/unit/datarobot_drum/resource/components/test_mlpiper_uwsgi_broker.py new file mode 100644 index 000000000..7a184e0af --- /dev/null +++ b/tests/unit/datarobot_drum/resource/components/test_mlpiper_uwsgi_broker.py @@ -0,0 +1,57 @@ +from configparser import ConfigParser + +import tempfile + +import socket + +from unittest.mock import Mock +from parameterized import parameterized + + +from datarobot_drum.resource.components.Python.uwsgi_component.restful.uwsgi_broker import ( + UwsgiBroker, +) +from datarobot_drum.resource.components.Python.uwsgi_component.restful.constants import ( + SharedConstants, + ComponentConstants, + UwsgiConstants, +) + + +class TestUwsgiBroker: + @parameterized.expand( + [ + ["7", 7], + ["1", None], + ] + ) + def test_generate_ini_file_(self, expected_amount_of_uwsgi_threads, uwsgi_threads): + ml_engine = Mock() + uwsgi_broker = UwsgiBroker(ml_engine) + params = {} + if uwsgi_threads: + params[ComponentConstants.UWSGI_THREADS] = uwsgi_threads + + logging_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + entry_point_conf = { + UwsgiConstants.PARAMS_KEY: params, + ComponentConstants.METRICS_KEY: "total", + ComponentConstants.SINGLE_UWSGI_WORKER_KEY: True, + UwsgiConstants.LOGGING_UDP_SOCKET: logging_socket, + ComponentConstants.UWSGI_DISABLE_LOGGING_KEY: True, + } + + shared_conf = { + SharedConstants.SOCK_FILENAME_KEY: "SOCK_FILENAME_KEY", + SharedConstants.STATS_SOCK_FILENAME_KEY: "STATS_SOCK_FILENAME_KEY", + } + + with tempfile.TemporaryDirectory() as tmp_dir: + uwsgi_broker._target_path = tmp_dir + uwsgi_ini = uwsgi_broker._generate_ini_file(shared_conf, entry_point_conf) + with open(uwsgi_ini) as f: + print(f.read()) + + config = ConfigParser(strict=False) + config.read(uwsgi_ini) + assert expected_amount_of_uwsgi_threads == config["uwsgi"]["threads"]