Skip to content

Commit

Permalink
Messaging to socket (#48)
Browse files Browse the repository at this point in the history
Messaging to socket

Reviewed-by: None <None>
Reviewed-by: Anton Kachurin <[email protected]>
Reviewed-by: Polina Gubina <None>
Reviewed-by: Rodion Gyrbu <[email protected]>
  • Loading branch information
anton-sidelnikov authored Feb 11, 2021
1 parent d6754c7 commit 2df98c9
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 35 deletions.
73 changes: 48 additions & 25 deletions csm_test_utils/loadbalancer/lb_monitor.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,65 @@
import logging
import socket
from time import sleep

import requests
from ocomone.logging import setup_logger

from ..message import push_metric, Metric
from ..parsers import AGP_LB_LOAD

LB_TIMING = "lb_timing"
LB_TIMEOUT = "lb_timeout"
LB_TIMING = 'csm_lb_timings'
LB_TIMEOUT = 'csm_lb_timeout'

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

INSTANCES_AZ = {
'lb-monitoring-instance0-prod': 'eu-de-01',
'lb-monitoring-instance1-prod': 'eu-de-02',
'lb-monitoring-instance2-prod': 'eu-de-03',
}


def main():
args, _ = AGP_LB_LOAD.parse_known_args()
setup_logger(LOGGER, "lb_load", log_dir=args.log_dir, log_format="[%(asctime)s] %(message)s")
setup_logger(LOGGER, 'lb_load', log_dir=args.log_dir, log_format='[%(asctime)s] %(message)s')
timeout = 20
try:
res = requests.get(args.target, headers={"Connection": "close"}, timeout=timeout)
except requests.Timeout as ex:
LOGGER.exception("Timeout sending request to LB")
result = {
"reason": LB_TIMEOUT,
"client": socket.gethostname(),
"timeout": timeout * 1000,
"exception": ex
}
else:
result = {
"reason": LB_TIMING,
"client": socket.gethostname(),
"server": res.headers["Server"],
"elapsed": res.elapsed.microseconds / 1000
}

print(result)


if __name__ == "__main__":
metrics = []
for _ in range(9):
try:
res = requests.get(args.target, headers={'Connection': 'close'}, timeout=timeout)
except requests.Timeout as ex:
LOGGER.exception('Timeout sending request to LB')
metrics.append(Metric(
environment=args.environment,
zone=args.zone,
name=LB_TIMEOUT,
value=timeout * 1000,
metric_attrs={
'metric_type': 'ms',
'client': socket.gethostname(),
'exception': ex,
})
)
else:
metrics.append(Metric(
environment=args.environment,
zone=args.zone,
name=LB_TIMING,
value=int(res.elapsed.microseconds / 1000),
metric_attrs={
'metric_type': 'ms',
'client': socket.gethostname(),
'server': res.headers['Server'],
'az': INSTANCES_AZ.get(res.headers['Server']),
})
)
sleep(2)
if args.socket:
for metric in metrics:
push_metric(metric, args.socket)


if __name__ == '__main__':
main()
90 changes: 90 additions & 0 deletions csm_test_utils/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import datetime
import json
import logging
import socket

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


class Base(dict):
"""Base metric class"""

def __init__(
self,
name: str,
environment: str,
zone: str,
timestamp: str = None
):
super().__init__()
self['name'] = name
self['environment'] = environment
self['zone'] = zone
if timestamp:
self['timestamp'] = timestamp
else:
self['timestamp'] = datetime.datetime.now().isoformat()

def serialize(self) -> str:
"""Serialize data as json string"""
try:
return json.dumps(self, separators=(',', ':'))
except json.JSONDecodeError as err:
return err.msg

def __bytes__(self) -> bytes:
"""Returns bytes interpretation of data"""
data = self.serialize()
return ('%s\n' % data).encode('utf8')


class Metric(Base):
"""Base metric"""

def __init__(
self,
name: str,
value: int,
environment: str = None,
zone: str = None,
**kwargs: dict
):
super().__init__(
name=name,
environment=environment,
zone=zone,
)
self['__type'] = 'metric'
self['metric_type'] = kwargs.get('metric_type', 'ms')
self['value'] = value
self.update(**kwargs)


def get_message(msg):
"""Get metric instance from dictionary or string"""
if not isinstance(msg, dict):
try:
msg = json.loads(msg, encoding='utf-8')
except json.JSONDecodeError:
return None
typ = msg.pop('__type')
if typ == 'metric':
return Metric(**msg)
return None


def push_metric(data: Metric, message_socket_address):
"""push metrics to socket"""
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as _socket:
try:
_socket.connect(message_socket_address)
msg = '%s\n' % data.serialize()
_socket.sendall(msg.encode('utf8'))
return 'success'
except socket.error as err:
LOGGER.exception('Error establishing connection to socket')
raise err
except Exception as ex:
LOGGER.exception('Error writing message to socket')
raise ex
34 changes: 24 additions & 10 deletions csm_test_utils/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,33 @@
import os
from argparse import ArgumentParser

__tgf_default = os.getenv("TGF_ADDRESS", "")
__apimon_socket = os.getenv("APIMON_PROFILER_MESSAGE_SOCKET", "")

_base_parser = ArgumentParser(prog="csm_test_utils", description="Multi-purpose test script")
"""Base for all parsers"""

_base_parser.add_argument("--target", help="Load balancer address")
__tgf_default = os.getenv("TGF_ADDRESS", "")
_base_parser.add_argument("--telegraf",
help=f"Address of telegraf server for reporting. "
f"Default is taken from TGF_ADDRESS variable ('{__tgf_default}')",
default=__tgf_default)
_base_parser.add_argument("--log-dir", "-l", help="Directory to write log file to.",
default=".")
_base_parser.add_argument(
"--telegraf",
help=f"Address of telegraf server for reporting. "
f"Default is taken from TGF_ADDRESS variable ('{__tgf_default}')",
default=__tgf_default
)
_base_parser.add_argument(
"--socket",
help=f"Address of AF_UNIX type socket for reporting. "
f"Default is taken from APIMON_PROFILER_MESSAGE_SOCKET"
f" variable ('{__apimon_socket}')",
default=__apimon_socket
)
_base_parser.add_argument("--environment", default="prod")
_base_parser.add_argument("--zone", default="eu-de")
_base_parser.add_argument(
"--log-dir",
"-l",
help="Directory to write log file to.",
default="."
)

root_parser = ArgumentParser(parents=[_base_parser], add_help=False)
"""Root `csm_test_utils` parser"""
Expand All @@ -32,7 +48,6 @@ def _subparser(name):

# AS LB
AGP_AS_LB = _subparser("as_load")

# LB monitor
AGP_LB_MONITOR = _subparser("monitor")

Expand Down Expand Up @@ -80,7 +95,6 @@ def _subparser(name):
AGP_RDS_GENERATE.add_argument("--drivername", default="postgresql+psycopg2")

# RDS backup check

AGP_BACKUP_CHECK = _subparser("rds_backup_check")
AGP_BACKUP_CHECK.add_argument("--instance_id", help="RDS instance ID")
AGP_BACKUP_CHECK.add_argument("--cloud_config", help="Clouds config file")
Expand Down
47 changes: 47 additions & 0 deletions tests/unit/test_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import unittest
from unittest import mock

from csm_test_utils import message
from csm_test_utils.message import get_message

test_metrics = [
message.Metric(
environment='prod',
zone='eu-de',
name='lb_load',
value=25,
metric_attrs={
'metric_type': 'ms',
'server': 'instance_0',
'az': 'eu-de-01',
'rc': 0
}
)
]


class TestMessage(unittest.TestCase):
def test_metric_serialize(self):
for metric in test_metrics:
msg = '%s\n' % metric.serialize()
assert isinstance(msg, str), f'{metric["name"]} at {metric["timestamp"]} not serialized'

def test_metric_deserialize(self):
metric = '{"name":"lb_load","environment":"prod",' \
'"zone":"eu-de","timestamp":"2021-02-08T14:15:27.578578",' \
'"__type":"metric","metric_type":"ms","value":25,' \
'"metric_attrs":{"server":"instance_0","az":"eu-de-01","rc":0}}'
instance = get_message(metric)

assert isinstance(instance, message.Metric), 'not deserialized'

def test_push_message_to_socket(self):
server_address = './comm_socket'
with mock.patch('socket.socket'):
for metric in test_metrics:
res = message.push_metric(metric, server_address)
assert res == 'success'


if __name__ == "__main__":
unittest.main()

0 comments on commit 2df98c9

Please sign in to comment.