Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yakov/mlpiper restful refactoring #1097

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from datarobot_drum.resource.components.Python.uwsgi_component.common import constants
from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import (
UwsgiNginxCoreException,
)


class Base(object):
def __init__(self, logger=None):
super(Base, self).__init__()
self._msg_container = []
self.__logger = logger

def name(self):
return self.__class__.__name__

def logger_name(self):
return constants.LOGGER_NAME_PREFIX + "." + self.name()

def set_logger(self, logger):
self.__logger = logger
self._print_acc_messages()

def is_logger_set(self):
return bool(self.__logger)

def _print_acc_messages(self):
if not self.__logger:
raise UwsgiNginxCoreException("None logger! Invalid internal sequence!")

if self._msg_container:
for m in self._msg_container:
self.__logger.info(m)

@property
def _logger(self):
return self.__logger if self.__logger else self

def debug(self, msg):
self._msg_container.append(msg)

def info(self, msg):
self._msg_container.append(msg)

def warning(self, msg):
self._msg_container.append(msg)

def error(self, msg):
self._msg_container.append(msg)

def critical(self, msg):
self._msg_container.append(msg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import abc
from future.utils import with_metaclass
import threading

from datarobot_drum.resource.components.Python.uwsgi_component.common.base import Base
from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import (
UwsgiNginxCoreException,
)


class BgActor(with_metaclass(abc.ABCMeta, Base, threading.Thread)):
def __init__(self, mlops, ml_engine, polling_interval_sec=10.0):
super(BgActor, self).__init__()
self.set_logger(ml_engine.get_engine_logger(self.logger_name()))

if not mlops or not mlops.init_called:
raise UwsgiNginxCoreException("'mlops' was not setup properly!")

self._mlops = mlops
self._polling_interval_sec = polling_interval_sec

self._condition = threading.Condition()
self._stop_gracefully = False

def run(self):
while True:
with self._condition:
self._condition.wait(self._polling_interval_sec)
if self._mlops.done_called or self._stop_gracefully:
break

self._do_repetitive_work()

self._logger.warning("Exiting background actor ...")

def stop_gracefully(self):
with self._condition:
self._finalize()
self._stop_gracefully = True
self._condition.notify_all()

@abc.abstractmethod
def _do_repetitive_work(self):
"""
Implement any desired repetitive functionality that will be called in a background thread
every 'polling_interval_sec'
"""
pass

def _finalize(self):
"""
An overridable method, to let the derived class perform final actions before shutting down
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
class BufferToLines(object):
def __init__(self):
self._acc_buff = ""
self._last_line = ""
self._in_middle_of_line = False

def add(self, buff):
self._acc_buff += buff.decode()
self._in_middle_of_line = False if self._acc_buff[-1] == "\n" else True

def lines(self):
lines = self._acc_buff.split("\n")
up_to_index = len(lines) - 2 if self._in_middle_of_line else len(lines) - 1
self._acc_buff = lines[-1] if self._in_middle_of_line else ""

for iii in range(up_to_index):
yield lines[iii]
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import logging

LOGGER_NAME_PREFIX = ""

LOG_LEVELS = {
"noset": logging.NOTSET,
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARN,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}


REQUIREMENTS_FILENAME = "requirements.txt"
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from datetime import datetime
import os
import pytz
import subprocess
import tempfile


def tmp_filepath():
tmp_file = tempfile.NamedTemporaryFile()
filepath = tmp_file.name
tmp_file.close()
return filepath


def remove_file_safely(filepath):
if os.path.isfile(filepath):
os.unlink(filepath)


def service_installed(service_name):
try:
subprocess.check_output(
"service {} status".format(service_name),
shell=True,
stderr=subprocess.STDOUT,
)
return True
except subprocess.CalledProcessError:
return False


def utcnow():
return datetime.now(pytz.UTC)
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import logging

from datarobot_drum.resource.components.Python.uwsgi_component.common.base import Base
from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import (
UwsgiNginxCoreException,
)


class TopologicalNode(object):
def __init__(self, node, key, child_keys):
self._node = node
self._key = key
self._child_keys = child_keys

self.temp_visit = False
self.perm_visit = False

@property
def node(self):
return self._node

@property
def key(self):
return self._key

@property
def child_keys(self):
return self._child_keys

def __str__(self):
return "key: {}, childs: {}".format(self.key, self.child_keys)


class TopologicalSort(Base):
"""
Generates topological sort from a list or dict, which represent graphs.
"""

def __init__(self, graph, key_attr_name, ptr_attr_name):
"""
:param graph: a list or dict that contains the whole nodes in the graph
:param key_attr_name: a string that is literally the name of the key accessor
:param ptr_attr_name: a string that is literally the name of the children accessor
"""
super(TopologicalSort, self).__init__(logging.getLogger(self.logger_name()))

self._graph = graph
self._graph_aux = {}

self._key_attr_name = key_attr_name
self._ptr_attr_name = ptr_attr_name

self._perm_visit = {}
self._sorted_graph = []

self._generate_aux_graph(graph, key_attr_name, ptr_attr_name)

def _generate_aux_graph(self, graph, key_attr_name, ptr_attr_name):
self._logger.debug("{}".format(graph))
if isinstance(graph, dict):
for key, node in graph.items():
self._add_node(node, key_attr_name, ptr_attr_name)
elif isinstance(graph, list):
for node in graph:
self._add_node(node, key_attr_name, ptr_attr_name)
else:
raise UwsgiNginxCoreException(
"Invalid graph type for topological sort! Expected 'dict' or 'list'! "
+ "type: {}".format(type(graph))
)

self._logger.debug(self._graph_aux)

def _add_node(self, node, key_attr_name, ptr_attr_name):
key_value = TopologicalSort._call_class_attr(node, key_attr_name)
self._logger.debug("key_value: {}".format(key_value))

if key_value not in self._graph_aux:
ptr_value = TopologicalSort._call_class_attr(node, ptr_attr_name)
self._logger.debug("ptr_value: {}, type: {}".format(ptr_value, type(ptr_value)))

child_keys = []
try:
for child in ptr_value:
if child:
self._add_child_key_name(child, key_attr_name, child_keys)
except TypeError:
if ptr_value:
self._add_child_key_name(child, key_attr_name, child_keys)

self._graph_aux[key_value] = TopologicalNode(node, key_value, child_keys)

def _add_child_key_name(self, child, key_attr_name, child_keys):
key_name = TopologicalSort._call_class_attr(child, key_attr_name)
self._logger.debug("child key name: {}".format(key_name))
child_keys.append(key_name)

@staticmethod
def _call_class_attr(cls, attr_name):
attr = getattr(cls, attr_name, None)
if not attr:
raise UwsgiNginxCoreException(
"The given class does not include the given attribute name! "
+ "class: {}, attr_name: {}".format(cls, attr_name)
)
attr_value = attr() if callable(attr) else attr
return attr_value

def sort(self):
if not self._sorted_graph:
while True:
t_node = self._find_unmarked_node()
if not t_node:
break
self._visit(t_node)

return self._sorted_graph

def _find_unmarked_node(self):
for key, t_node in self._graph_aux.items():
if not t_node.perm_visit:
return t_node
return None

def _visit(self, t_node):
self._logger.debug("Visiting node: {}".format(t_node.key))
if t_node.perm_visit:
return

if t_node.temp_visit:
raise UwsgiNginxCoreException(
"The pipeline has invalid cyclic loop (Not a DAG)! node: {}".format(t_node)
)

t_node.temp_visit = True
for child_key in t_node.child_keys:
if child_key not in self._graph_aux:
raise UwsgiNginxCoreException(
"Child id was not found in the graph! key: {}".format(child_key)
)

self._visit(self._graph_aux[child_key])

t_node.temp_visit = False
t_node.perm_visit = True
self._sorted_graph.append(t_node.node)


if __name__ == "__main__":

class Node(object):
def __init__(self, key, childs):
self._key = key
self._childs = childs

@property
def key(self):
return self._key

@property
def childs(self):
return self._childs

def __str__(self):
child_keys = [c.key for c in self.childs] if self.childs else None
return "key: {}, childs: {}".format(self.key, child_keys)

logging.basicConfig(level=logging.INFO, format="%(name)s %(levelname)s: %(message)s")

n1 = Node("a", None)
n2 = Node("b", [n1])
n3 = Node("c", [n1])
n4 = Node("d", [n2, n3])
n5 = Node("e", [n3])

graph_list = [n3, n1, n2, n4, n5]
sorted_graph = TopologicalSort(graph_list, "key", "childs").sort()
print("Graph1:")
for node in sorted_graph:
print(node)

graph_dict = {n3.key: n3, n1.key: n1, n2.key: n2, n4.key: n4, n5.key: n5}
sorted_graph = TopologicalSort(graph_dict, "key", "childs").sort()
print("\nGraph2:")
for node in sorted_graph:
print(node)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class UwsgiNginxCoreException(Exception):
pass
Loading