From 2df98c9717c52dcbd8fed9760bf46e26c948ca7a Mon Sep 17 00:00:00 2001 From: Anton Sidelnikov <53078276+anton-sidelnikov@users.noreply.github.com> Date: Thu, 11 Feb 2021 10:39:01 +0300 Subject: [PATCH] Messaging to socket (#48) Messaging to socket Reviewed-by: None Reviewed-by: Anton Kachurin Reviewed-by: Polina Gubina Reviewed-by: Rodion Gyrbu --- csm_test_utils/loadbalancer/lb_monitor.py | 73 +++++++++++------- csm_test_utils/message.py | 90 +++++++++++++++++++++++ csm_test_utils/parsers.py | 34 ++++++--- tests/unit/test_message.py | 47 ++++++++++++ 4 files changed, 209 insertions(+), 35 deletions(-) create mode 100644 csm_test_utils/message.py create mode 100644 tests/unit/test_message.py diff --git a/csm_test_utils/loadbalancer/lb_monitor.py b/csm_test_utils/loadbalancer/lb_monitor.py index 354e4b4..f2b8c3a 100644 --- a/csm_test_utils/loadbalancer/lb_monitor.py +++ b/csm_test_utils/loadbalancer/lb_monitor.py @@ -1,42 +1,65 @@ import logging import socket +from time import sleep import requests from ocomone.logging import setup_logger +from ..message import push_metric, Metric from ..parsers import AGP_LB_LOAD -LB_TIMING = "lb_timing" -LB_TIMEOUT = "lb_timeout" +LB_TIMING = 'csm_lb_timings' +LB_TIMEOUT = 'csm_lb_timeout' LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) +INSTANCES_AZ = { + 'lb-monitoring-instance0-prod': 'eu-de-01', + 'lb-monitoring-instance1-prod': 'eu-de-02', + 'lb-monitoring-instance2-prod': 'eu-de-03', +} + def main(): args, _ = AGP_LB_LOAD.parse_known_args() - setup_logger(LOGGER, "lb_load", log_dir=args.log_dir, log_format="[%(asctime)s] %(message)s") + setup_logger(LOGGER, 'lb_load', log_dir=args.log_dir, log_format='[%(asctime)s] %(message)s') timeout = 20 - try: - res = requests.get(args.target, headers={"Connection": "close"}, timeout=timeout) - except requests.Timeout as ex: - LOGGER.exception("Timeout sending request to LB") - result = { - "reason": LB_TIMEOUT, - "client": socket.gethostname(), - "timeout": timeout * 1000, - "exception": ex - } - else: - result = { - "reason": LB_TIMING, - "client": socket.gethostname(), - "server": res.headers["Server"], - "elapsed": res.elapsed.microseconds / 1000 - } - - print(result) - - -if __name__ == "__main__": + metrics = [] + for _ in range(9): + try: + res = requests.get(args.target, headers={'Connection': 'close'}, timeout=timeout) + except requests.Timeout as ex: + LOGGER.exception('Timeout sending request to LB') + metrics.append(Metric( + environment=args.environment, + zone=args.zone, + name=LB_TIMEOUT, + value=timeout * 1000, + metric_attrs={ + 'metric_type': 'ms', + 'client': socket.gethostname(), + 'exception': ex, + }) + ) + else: + metrics.append(Metric( + environment=args.environment, + zone=args.zone, + name=LB_TIMING, + value=int(res.elapsed.microseconds / 1000), + metric_attrs={ + 'metric_type': 'ms', + 'client': socket.gethostname(), + 'server': res.headers['Server'], + 'az': INSTANCES_AZ.get(res.headers['Server']), + }) + ) + sleep(2) + if args.socket: + for metric in metrics: + push_metric(metric, args.socket) + + +if __name__ == '__main__': main() diff --git a/csm_test_utils/message.py b/csm_test_utils/message.py new file mode 100644 index 0000000..23b274a --- /dev/null +++ b/csm_test_utils/message.py @@ -0,0 +1,90 @@ +import datetime +import json +import logging +import socket + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + + +class Base(dict): + """Base metric class""" + + def __init__( + self, + name: str, + environment: str, + zone: str, + timestamp: str = None + ): + super().__init__() + self['name'] = name + self['environment'] = environment + self['zone'] = zone + if timestamp: + self['timestamp'] = timestamp + else: + self['timestamp'] = datetime.datetime.now().isoformat() + + def serialize(self) -> str: + """Serialize data as json string""" + try: + return json.dumps(self, separators=(',', ':')) + except json.JSONDecodeError as err: + return err.msg + + def __bytes__(self) -> bytes: + """Returns bytes interpretation of data""" + data = self.serialize() + return ('%s\n' % data).encode('utf8') + + +class Metric(Base): + """Base metric""" + + def __init__( + self, + name: str, + value: int, + environment: str = None, + zone: str = None, + **kwargs: dict + ): + super().__init__( + name=name, + environment=environment, + zone=zone, + ) + self['__type'] = 'metric' + self['metric_type'] = kwargs.get('metric_type', 'ms') + self['value'] = value + self.update(**kwargs) + + +def get_message(msg): + """Get metric instance from dictionary or string""" + if not isinstance(msg, dict): + try: + msg = json.loads(msg, encoding='utf-8') + except json.JSONDecodeError: + return None + typ = msg.pop('__type') + if typ == 'metric': + return Metric(**msg) + return None + + +def push_metric(data: Metric, message_socket_address): + """push metrics to socket""" + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as _socket: + try: + _socket.connect(message_socket_address) + msg = '%s\n' % data.serialize() + _socket.sendall(msg.encode('utf8')) + return 'success' + except socket.error as err: + LOGGER.exception('Error establishing connection to socket') + raise err + except Exception as ex: + LOGGER.exception('Error writing message to socket') + raise ex diff --git a/csm_test_utils/parsers.py b/csm_test_utils/parsers.py index 24e9c49..5c0ff55 100644 --- a/csm_test_utils/parsers.py +++ b/csm_test_utils/parsers.py @@ -2,17 +2,33 @@ import os from argparse import ArgumentParser +__tgf_default = os.getenv("TGF_ADDRESS", "") +__apimon_socket = os.getenv("APIMON_PROFILER_MESSAGE_SOCKET", "") + _base_parser = ArgumentParser(prog="csm_test_utils", description="Multi-purpose test script") """Base for all parsers""" - _base_parser.add_argument("--target", help="Load balancer address") -__tgf_default = os.getenv("TGF_ADDRESS", "") -_base_parser.add_argument("--telegraf", - help=f"Address of telegraf server for reporting. " - f"Default is taken from TGF_ADDRESS variable ('{__tgf_default}')", - default=__tgf_default) -_base_parser.add_argument("--log-dir", "-l", help="Directory to write log file to.", - default=".") +_base_parser.add_argument( + "--telegraf", + help=f"Address of telegraf server for reporting. " + f"Default is taken from TGF_ADDRESS variable ('{__tgf_default}')", + default=__tgf_default +) +_base_parser.add_argument( + "--socket", + help=f"Address of AF_UNIX type socket for reporting. " + f"Default is taken from APIMON_PROFILER_MESSAGE_SOCKET" + f" variable ('{__apimon_socket}')", + default=__apimon_socket +) +_base_parser.add_argument("--environment", default="prod") +_base_parser.add_argument("--zone", default="eu-de") +_base_parser.add_argument( + "--log-dir", + "-l", + help="Directory to write log file to.", + default="." +) root_parser = ArgumentParser(parents=[_base_parser], add_help=False) """Root `csm_test_utils` parser""" @@ -32,7 +48,6 @@ def _subparser(name): # AS LB AGP_AS_LB = _subparser("as_load") - # LB monitor AGP_LB_MONITOR = _subparser("monitor") @@ -80,7 +95,6 @@ def _subparser(name): AGP_RDS_GENERATE.add_argument("--drivername", default="postgresql+psycopg2") # RDS backup check - AGP_BACKUP_CHECK = _subparser("rds_backup_check") AGP_BACKUP_CHECK.add_argument("--instance_id", help="RDS instance ID") AGP_BACKUP_CHECK.add_argument("--cloud_config", help="Clouds config file") diff --git a/tests/unit/test_message.py b/tests/unit/test_message.py new file mode 100644 index 0000000..2ca371a --- /dev/null +++ b/tests/unit/test_message.py @@ -0,0 +1,47 @@ +import unittest +from unittest import mock + +from csm_test_utils import message +from csm_test_utils.message import get_message + +test_metrics = [ + message.Metric( + environment='prod', + zone='eu-de', + name='lb_load', + value=25, + metric_attrs={ + 'metric_type': 'ms', + 'server': 'instance_0', + 'az': 'eu-de-01', + 'rc': 0 + } + ) +] + + +class TestMessage(unittest.TestCase): + def test_metric_serialize(self): + for metric in test_metrics: + msg = '%s\n' % metric.serialize() + assert isinstance(msg, str), f'{metric["name"]} at {metric["timestamp"]} not serialized' + + def test_metric_deserialize(self): + metric = '{"name":"lb_load","environment":"prod",' \ + '"zone":"eu-de","timestamp":"2021-02-08T14:15:27.578578",' \ + '"__type":"metric","metric_type":"ms","value":25,' \ + '"metric_attrs":{"server":"instance_0","az":"eu-de-01","rc":0}}' + instance = get_message(metric) + + assert isinstance(instance, message.Metric), 'not deserialized' + + def test_push_message_to_socket(self): + server_address = './comm_socket' + with mock.patch('socket.socket'): + for metric in test_metrics: + res = message.push_metric(metric, server_address) + assert res == 'success' + + +if __name__ == "__main__": + unittest.main()