Skip to content

Commit

Permalink
moving restful into DRUM
Browse files Browse the repository at this point in the history
  • Loading branch information
yakov-g committed Aug 1, 2024
1 parent f9506f9 commit c1fb792
Show file tree
Hide file tree
Showing 41 changed files with 3,122 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from datarobot_drum.resource.components.Python.uwsgi_component.common import constants
from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import (
UwsgiNginxCoreException,
)


class Base(object):
def __init__(self, logger=None):
super(Base, self).__init__()
self._msg_container = []
self.__logger = logger

def name(self):
return self.__class__.__name__

def logger_name(self):
return constants.LOGGER_NAME_PREFIX + "." + self.name()

def set_logger(self, logger):
self.__logger = logger
self._print_acc_messages()

def is_logger_set(self):
return bool(self.__logger)

def _print_acc_messages(self):
if not self.__logger:
raise UwsgiNginxCoreException("None logger! Invalid internal sequence!")

if self._msg_container:
for m in self._msg_container:
self.__logger.info(m)

@property
def _logger(self):
return self.__logger if self.__logger else self

def debug(self, msg):
self._msg_container.append(msg)

def info(self, msg):
self._msg_container.append(msg)

def warning(self, msg):
self._msg_container.append(msg)

def error(self, msg):
self._msg_container.append(msg)

def critical(self, msg):
self._msg_container.append(msg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import abc
from future.utils import with_metaclass
import threading

from datarobot_drum.resource.components.Python.uwsgi_component.common.base import Base
from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import (
UwsgiNginxCoreException,
)


class BgActor(with_metaclass(abc.ABCMeta, Base, threading.Thread)):
def __init__(self, mlops, ml_engine, polling_interval_sec=10.0):
super(BgActor, self).__init__()
self.set_logger(ml_engine.get_engine_logger(self.logger_name()))

if not mlops or not mlops.init_called:
raise UwsgiNginxCoreException("'mlops' was not setup properly!")

self._mlops = mlops
self._polling_interval_sec = polling_interval_sec

self._condition = threading.Condition()
self._stop_gracefully = False

def run(self):
while True:
with self._condition:
self._condition.wait(self._polling_interval_sec)
if self._mlops.done_called or self._stop_gracefully:
break

self._do_repetitive_work()

self._logger.warning("Exiting background actor ...")

def stop_gracefully(self):
with self._condition:
self._finalize()
self._stop_gracefully = True
self._condition.notify_all()

@abc.abstractmethod
def _do_repetitive_work(self):
"""
Implement any desired repetitive functionality that will be called in a background thread
every 'polling_interval_sec'
"""
pass

def _finalize(self):
"""
An overridable method, to let the derived class perform final actions before shutting down
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
class BufferToLines(object):
def __init__(self):
self._acc_buff = ""
self._last_line = ""
self._in_middle_of_line = False

def add(self, buff):
self._acc_buff += buff.decode()
self._in_middle_of_line = False if self._acc_buff[-1] == "\n" else True

def lines(self):
lines = self._acc_buff.split("\n")
up_to_index = len(lines) - 2 if self._in_middle_of_line else len(lines) - 1
self._acc_buff = lines[-1] if self._in_middle_of_line else ""

for iii in range(up_to_index):
yield lines[iii]
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import logging

LOGGER_NAME_PREFIX = ""

LOG_LEVELS = {
"noset": logging.NOTSET,
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARN,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}


REQUIREMENTS_FILENAME = "requirements.txt"
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from datetime import datetime
import os
import pytz
import subprocess
import tempfile


def tmp_filepath():
tmp_file = tempfile.NamedTemporaryFile()
filepath = tmp_file.name
tmp_file.close()
return filepath


def remove_file_safely(filepath):
if os.path.isfile(filepath):
os.unlink(filepath)


def service_installed(service_name):
try:
subprocess.check_output(
"service {} status".format(service_name),
shell=True,
stderr=subprocess.STDOUT,
)
return True
except subprocess.CalledProcessError:
return False


def utcnow():
return datetime.now(pytz.UTC)
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import logging

from datarobot_drum.resource.components.Python.uwsgi_component.common.base import Base
from datarobot_drum.resource.components.Python.uwsgi_component.common.uwsgi_nginx_core_exception import (
UwsgiNginxCoreException,
)


class TopologicalNode(object):
def __init__(self, node, key, child_keys):
self._node = node
self._key = key
self._child_keys = child_keys

self.temp_visit = False
self.perm_visit = False

@property
def node(self):
return self._node

@property
def key(self):
return self._key

@property
def child_keys(self):
return self._child_keys

def __str__(self):
return "key: {}, childs: {}".format(self.key, self.child_keys)


class TopologicalSort(Base):
"""
Generates topological sort from a list or dict, which represent graphs.
"""

def __init__(self, graph, key_attr_name, ptr_attr_name):
"""
:param graph: a list or dict that contains the whole nodes in the graph
:param key_attr_name: a string that is literally the name of the key accessor
:param ptr_attr_name: a string that is literally the name of the children accessor
"""
super(TopologicalSort, self).__init__(logging.getLogger(self.logger_name()))

self._graph = graph
self._graph_aux = {}

self._key_attr_name = key_attr_name
self._ptr_attr_name = ptr_attr_name

self._perm_visit = {}
self._sorted_graph = []

self._generate_aux_graph(graph, key_attr_name, ptr_attr_name)

def _generate_aux_graph(self, graph, key_attr_name, ptr_attr_name):
self._logger.debug("{}".format(graph))
if isinstance(graph, dict):
for key, node in graph.items():
self._add_node(node, key_attr_name, ptr_attr_name)
elif isinstance(graph, list):
for node in graph:
self._add_node(node, key_attr_name, ptr_attr_name)
else:
raise UwsgiNginxCoreException(
"Invalid graph type for topological sort! Expected 'dict' or 'list'! "
+ "type: {}".format(type(graph))
)

self._logger.debug(self._graph_aux)

def _add_node(self, node, key_attr_name, ptr_attr_name):
key_value = TopologicalSort._call_class_attr(node, key_attr_name)
self._logger.debug("key_value: {}".format(key_value))

if key_value not in self._graph_aux:
ptr_value = TopologicalSort._call_class_attr(node, ptr_attr_name)
self._logger.debug("ptr_value: {}, type: {}".format(ptr_value, type(ptr_value)))

child_keys = []
try:
for child in ptr_value:
if child:
self._add_child_key_name(child, key_attr_name, child_keys)
except TypeError:
if ptr_value:
self._add_child_key_name(child, key_attr_name, child_keys)

self._graph_aux[key_value] = TopologicalNode(node, key_value, child_keys)

def _add_child_key_name(self, child, key_attr_name, child_keys):
key_name = TopologicalSort._call_class_attr(child, key_attr_name)
self._logger.debug("child key name: {}".format(key_name))
child_keys.append(key_name)

@staticmethod
def _call_class_attr(cls, attr_name):
attr = getattr(cls, attr_name, None)
if not attr:
raise UwsgiNginxCoreException(
"The given class does not include the given attribute name! "
+ "class: {}, attr_name: {}".format(cls, attr_name)
)
attr_value = attr() if callable(attr) else attr
return attr_value

def sort(self):
if not self._sorted_graph:
while True:
t_node = self._find_unmarked_node()
if not t_node:
break
self._visit(t_node)

return self._sorted_graph

def _find_unmarked_node(self):
for key, t_node in self._graph_aux.items():
if not t_node.perm_visit:
return t_node
return None

def _visit(self, t_node):
self._logger.debug("Visiting node: {}".format(t_node.key))
if t_node.perm_visit:
return

if t_node.temp_visit:
raise UwsgiNginxCoreException(
"The pipeline has invalid cyclic loop (Not a DAG)! node: {}".format(t_node)
)

t_node.temp_visit = True
for child_key in t_node.child_keys:
if child_key not in self._graph_aux:
raise UwsgiNginxCoreException(
"Child id was not found in the graph! key: {}".format(child_key)
)

self._visit(self._graph_aux[child_key])

t_node.temp_visit = False
t_node.perm_visit = True
self._sorted_graph.append(t_node.node)


if __name__ == "__main__":

class Node(object):
def __init__(self, key, childs):
self._key = key
self._childs = childs

@property
def key(self):
return self._key

@property
def childs(self):
return self._childs

def __str__(self):
child_keys = [c.key for c in self.childs] if self.childs else None
return "key: {}, childs: {}".format(self.key, child_keys)

logging.basicConfig(level=logging.INFO, format="%(name)s %(levelname)s: %(message)s")

n1 = Node("a", None)
n2 = Node("b", [n1])
n3 = Node("c", [n1])
n4 = Node("d", [n2, n3])
n5 = Node("e", [n3])

graph_list = [n3, n1, n2, n4, n5]
sorted_graph = TopologicalSort(graph_list, "key", "childs").sort()
print("Graph1:")
for node in sorted_graph:
print(node)

graph_dict = {n3.key: n3, n1.key: n1, n2.key: n2, n4.key: n4, n5.key: n5}
sorted_graph = TopologicalSort(graph_dict, "key", "childs").sort()
print("\nGraph2:")
for node in sorted_graph:
print(node)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class UwsgiNginxCoreException(Exception):
pass
Loading

0 comments on commit c1fb792

Please sign in to comment.