Skip to content

Commit

Permalink
Add support for resource_applied() callback in k8s glue
Browse files Browse the repository at this point in the history
Add support for sending log events with k8s-provided timestamps
Refactor env vars infrastructure
  • Loading branch information
allegroai committed Nov 1, 2023
1 parent d2384a9 commit 0131db8
Show file tree
Hide file tree
Showing 11 changed files with 354 additions and 242 deletions.
4 changes: 2 additions & 2 deletions clearml_agent/backend_api/session/defs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ...backend_config.converters import safe_text_to_bool
from ...backend_config.environment import EnvEntry
from clearml_agent.helper.environment import EnvEntry
from clearml_agent.helper.environment.converters import safe_text_to_bool


ENV_HOST = EnvEntry("CLEARML_API_HOST", "TRAINS_API_HOST")
Expand Down
77 changes: 8 additions & 69 deletions clearml_agent/backend_config/converters.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,8 @@
import base64
from distutils.util import strtobool
from typing import Union, Optional, Any, TypeVar, Callable, Tuple

import six

try:
from typing import Text
except ImportError:
# windows conda-less hack
Text = Any


ConverterType = TypeVar("ConverterType", bound=Callable[[Any], Any])


def text_to_int(value, default=0):
# type: (Any, int) -> int
try:
return int(value)
except (ValueError, TypeError):
return default


def base64_to_text(value):
# type: (Any) -> Text
return base64.b64decode(value).decode("utf-8")


def text_to_bool(value):
# type: (Text) -> bool
return bool(strtobool(value))


def safe_text_to_bool(value):
# type: (Text) -> bool
try:
return text_to_bool(value)
except ValueError:
return bool(value)


def any_to_bool(value):
# type: (Optional[Union[int, float, Text]]) -> bool
if isinstance(value, six.text_type):
return text_to_bool(value)
return bool(value)


def or_(*converters, **kwargs):
# type: (ConverterType, Tuple[Exception, ...]) -> ConverterType
"""
Wrapper that implements an "optional converter" pattern. Allows specifying a converter
for which a set of exceptions is ignored (and the original value is returned)
:param converters: A converter callable
:param exceptions: A tuple of exception types to ignore
"""
# noinspection PyUnresolvedReferences
exceptions = kwargs.get("exceptions", (ValueError, TypeError))

def wrapper(value):
for converter in converters:
try:
return converter(value)
except exceptions:
pass
return value

return wrapper
from clearml_agent.helper.environment.converters import (
base64_to_text,
text_to_bool,
text_to_int,
safe_text_to_bool,
any_to_bool,
or_,
)
115 changes: 5 additions & 110 deletions clearml_agent/backend_config/entry.py
Original file line number Diff line number Diff line change
@@ -1,111 +1,6 @@
import abc
from typing import Optional, Any, Tuple, Callable, Dict
from clearml_agent.helper.environment import Entry, NotSet

import six

from .converters import any_to_bool

try:
from typing import Text
except ImportError:
# windows conda-less hack
Text = Any


NotSet = object()

Converter = Callable[[Any], Any]


@six.add_metaclass(abc.ABCMeta)
class Entry(object):
"""
Configuration entry definition
"""

@classmethod
def default_conversions(cls):
# type: () -> Dict[Any, Converter]
return {
bool: any_to_bool,
six.text_type: lambda s: six.text_type(s).strip(),
}

def __init__(self, key, *more_keys, **kwargs):
# type: (Text, Text, Any) -> None
"""
:param key: Entry's key (at least one).
:param more_keys: More alternate keys for this entry.
:param type: Value type. If provided, will be used choosing a default conversion or
(if none exists) for casting the environment value.
:param converter: Value converter. If provided, will be used to convert the environment value.
:param default: Default value. If provided, will be used as the default value on calls to get() and get_pair()
in case no value is found for any key and no specific default value was provided in the call.
Default value is None.
:param help: Help text describing this entry
"""
self.keys = (key,) + more_keys
self.type = kwargs.pop("type", six.text_type)
self.converter = kwargs.pop("converter", None)
self.default = kwargs.pop("default", None)
self.help = kwargs.pop("help", None)

def __str__(self):
return str(self.key)

@property
def key(self):
return self.keys[0]

def convert(self, value, converter=None):
# type: (Any, Converter) -> Optional[Any]
converter = converter or self.converter
if not converter:
converter = self.default_conversions().get(self.type, self.type)
return converter(value)

def get_pair(self, default=NotSet, converter=None, value_cb=None):
# type: (Any, Converter, Callable[[str, Any], None]) -> Optional[Tuple[Text, Any]]
for key in self.keys:
value = self._get(key)
if value is NotSet:
continue
try:
value = self.convert(value, converter)
except Exception as ex:
self.error("invalid value {key}={value}: {ex}".format(**locals()))
break
# noinspection PyBroadException
try:
if value_cb:
value_cb(key, value)
except Exception:
pass
return key, value

result = self.default if default is NotSet else default
return self.key, result

def get(self, default=NotSet, converter=None, value_cb=None):
# type: (Any, Converter, Callable[[str, Any], None]) -> Optional[Any]
return self.get_pair(default=default, converter=converter, value_cb=value_cb)[1]

def set(self, value):
# type: (Any, Any) -> (Text, Any)
# key, _ = self.get_pair(default=None, converter=None)
for k in self.keys:
self._set(k, str(value))

def _set(self, key, value):
# type: (Text, Text) -> None
pass

@abc.abstractmethod
def _get(self, key):
# type: (Text) -> Any
pass

@abc.abstractmethod
def error(self, message):
# type: (Text) -> None
pass
__all__ = [
"Entry",
"NotSet"
]
38 changes: 10 additions & 28 deletions clearml_agent/backend_config/environment.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,14 @@
from os import getenv, environ
from os import environ

from .converters import text_to_bool
from .entry import Entry, NotSet


class EnvEntry(Entry):
@classmethod
def default_conversions(cls):
conversions = super(EnvEntry, cls).default_conversions().copy()
conversions[bool] = text_to_bool
return conversions

def pop(self):
for k in self.keys:
environ.pop(k, None)

def _get(self, key):
value = getenv(key, "").strip()
return value or NotSet

def _set(self, key, value):
environ[key] = value

def __str__(self):
return "env:{}".format(super(EnvEntry, self).__str__())

def error(self, message):
print("Environment configuration: {}".format(message))
from clearml_agent.helper.environment import EnvEntry


def backward_compatibility_support():
from ..definitions import ENVIRONMENT_CONFIG, ENVIRONMENT_SDK_PARAMS, ENVIRONMENT_BACKWARD_COMPATIBLE
if ENVIRONMENT_BACKWARD_COMPATIBLE.get():
# Add TRAINS_ prefix on every CLEARML_ os environment we support
for k, v in ENVIRONMENT_CONFIG.items():
# noinspection PyBroadException
try:
trains_vars = [var for var in v.vars if var.startswith('CLEARML_')]
if not trains_vars:
Expand All @@ -44,6 +19,7 @@ def backward_compatibility_support():
except:
continue
for k, v in ENVIRONMENT_SDK_PARAMS.items():
# noinspection PyBroadException
try:
trains_vars = [var for var in v if var.startswith('CLEARML_')]
if not trains_vars:
Expand All @@ -62,3 +38,9 @@ def backward_compatibility_support():
backwards_k = k.replace('CLEARML_', 'TRAINS_', 1)
if backwards_k not in keys:
environ[backwards_k] = environ[k]


__all__ = [
"EnvEntry",
"backward_compatibility_support"
]
37 changes: 37 additions & 0 deletions clearml_agent/commands/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import time
from typing import List, Tuple

from clearml_agent.commands.base import ServiceCommandSection
from clearml_agent.helper.base import return_list
Expand Down Expand Up @@ -57,6 +58,42 @@ def send_packet(jsonlines):
# print('Sending events done: %d / %d events sent' % (sent_events, len(list_events)))
return sent_events

def send_log_events_with_timestamps(
self, worker_id, task_id, lines_with_ts: List[Tuple[str, str]], level="DEBUG", session=None
):
log_events = []

# break log lines into event packets
for ts, line in return_list(lines_with_ts):
# HACK ignore terminal reset ANSI code
if line == '\x1b[0m':
continue
while line:
if len(line) <= self.max_event_size:
msg = line
line = None
else:
msg = line[:self.max_event_size]
line = line[self.max_event_size:]

log_events.append(
{
"type": "log",
"level": level,
"task": task_id,
"worker": worker_id,
"msg": msg,
"timestamp": ts,
}
)

if line and ts is not None:
# advance timestamp in case we break a line to more than one part
ts += 1

# now send the events
return self.send_events(list_events=log_events, session=session)

def send_log_events(self, worker_id, task_id, lines, level='DEBUG', session=None):
log_events = []
base_timestamp = int(time.time() * 1000)
Expand Down
8 changes: 6 additions & 2 deletions clearml_agent/glue/definitions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from clearml_agent.definitions import EnvironmentConfig
from clearml_agent.helper.environment import EnvEntry

ENV_START_AGENT_SCRIPT_PATH = EnvironmentConfig('CLEARML_K8S_GLUE_START_AGENT_SCRIPT_PATH')
ENV_START_AGENT_SCRIPT_PATH = EnvEntry("CLEARML_K8S_GLUE_START_AGENT_SCRIPT_PATH", default="~/__start_agent__.sh")
"""
Script path to use when creating the bash script to run the agent inside the scheduled pod's docker container.
Script will be appended to the specified file.
"""

ENV_DEFAULT_EXECUTION_AGENT_ARGS = EnvEntry("K8S_GLUE_DEF_EXEC_AGENT_ARGS", default="--full-monitoring --require-queue")
ENV_POD_AGENT_INSTALL_ARGS = EnvEntry("K8S_GLUE_POD_AGENT_INSTALL_ARGS", default="", lstrip=False)
ENV_POD_MONITOR_LOG_BATCH_SIZE = EnvEntry("K8S_GLUE_POD_MONITOR_LOG_BATCH_SIZE", default=5, converter=int)
Loading

0 comments on commit 0131db8

Please sign in to comment.