Skip to content

Commit

Permalink
v1.8.0 PR (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeSchiessl authored Aug 27, 2024
1 parent c3da86c commit c90f587
Show file tree
Hide file tree
Showing 13 changed files with 359 additions and 26 deletions.
75 changes: 75 additions & 0 deletions bin/modules/UlsArgsParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,31 @@ def init():
default=(os.environ.get('ULS_LOGLEVEL') or uls_config.log_level_default),
choices=uls_config.log_levels_available,
help=f'Adjust the loglevel Default: {uls_config.log_level_default}')

parser.add_argument('--json-log',
action='store',
dest='jsonlog',
type=bool,
nargs='?',
default=(os.environ.get('ULS_JSONLOG') or uls_config.log_jsonlog),
const=True,
help=f"Should ULS write its own logdata in JSON format instead of plain text output ? (Default: {uls_config.log_jsonlog})")

parser.add_argument('--ulslogformat',
action='store',
dest='logformat',
type=str,
default=(os.environ.get('ULS_LOGFORMAT') or False),
help=f"Custom logging format (ULS internal logs) see additional features documentation for more information - (Default: False)")

parser.add_argument('--ulslogdatefmt',
action='store',
dest='logdatefmt',
type=str,
default=(os.environ.get('ULS_LOG_DATEFORMAT') or uls_config.log_datefmt),
help=f"Adjust the logging date/time format to your needs, (Default: {uls_config.log_datefmt.replace("%", "%%")})")
# Added double %% to have argsparser display proper string as it tries do to % replacement :D

# put loglines into debug log
parser.add_argument('--debugloglines',
action='store',
Expand All @@ -48,6 +73,15 @@ def init():
const=True,
help=f'Display {uls_config.__tool_name_short__} version and operational information')

parser.add_argument('--nocallhome',
action='store',
type=bool,
default=os.environ.get('ULS_NOCALLHOME') or not uls_config.callhome_enabled,
nargs='?',
const=True,
help=f"Disable the ULS CallHome feature that helps the ULS developers to continue improving ULS. Default: {not uls_config.callhome_enabled}")


# ----------------------
# Input GROUP
input_group = parser.add_argument_group(title="Input",
Expand Down Expand Up @@ -347,6 +381,47 @@ def init():
default=(os.environ.get('ULS_AUTORESUME_WRITEAFTER') or uls_config.autoresume_write_after),
help=f'Specify after how many loglines a checkpoint should be written [Default: {uls_config.autoresume_write_after}]')


#-------------------------
prometheus_group = parser.add_argument_group(title="Prometheus",
description="Define Prometheus Monitoring Settings")
# Prometheues switch
prometheus_group.add_argument('--prometheus',
action='store',
type=bool,
dest='prometheus_enabled',
default=(os.environ.get('ULS_PROMETHEUS') or uls_config.prometheus_enabled),
nargs='?',
const=True,
help=f'Enable prometheues monitoring support - Default: {uls_config.prometheus_enabled}')

prometheus_group.add_argument('--promport', '--prometheus-port',
action='store',
dest='prometheus_port',
type=int,
default=(os.environ.get('ULS_PROMETHEUS_PORT') or uls_config.prometheus_port),
help=f'Prometheues port to listen on [Default: {uls_config.prometheus_port}]')

prometheus_group.add_argument('--promaddr', '--prometheus-addr',
action='store',
dest='prometheus_addr',
type=str,
default=(os.environ.get('ULS_PROMETHEUS_ADDR') or uls_config.prometheus_addr),
help=f'Prometheues bind address to listen on [Default: {uls_config.prometheus_addr}]')

prometheus_group.add_argument('--promcert', '--prometheus-certfile',
action='store',
dest='prometheus_certfile',
type=str,
default=(os.environ.get('ULS_PROMETHEUS_CERTFILE') or uls_config.prometheus_certfile),
help=f'Prometheues certificate file (required alongside a keyfile) [Default: {uls_config.prometheus_certfile}]')

prometheus_group.add_argument('--promkey', '--prometheus-keyfile',
action='store',
dest='prometheus_keyfile',
type=str,
default=(os.environ.get('ULS_PROMETHEUS_KEYFILE') or uls_config.prometheus_keyfile),
help=f'Prometheues key file (required alongside a certfile) [Default: {uls_config.prometheus_keyfile}]')
return parser.parse_args()


Expand Down
63 changes: 62 additions & 1 deletion bin/modules/UlsMonitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@

class UlsMonitoring:

def __init__(self, stopEvent, product, feed, output):
def __init__(self, stopEvent, product, feed, output,
prom_enabled: bool = False,
prom_port: int = 8000,
prom_host: str = '127.0.0.1',
prom_certfile: str = None,
prom_keyfile: str = None):
"""
Hanlde ULS self monitoring, spills out performance counter on stdout.
Expand All @@ -34,11 +39,24 @@ def __init__(self, stopEvent, product, feed, output):
feed (string): specific data feed being consumed by ULS
"""

# Core monitoring stuff
self._stopEvent = stopEvent
self._product = product
self._feed = feed
self._output = output

# Prometheues
self.prometheues_enabled = prom_enabled
self.promeuteheus_port = prom_port
self.promeuteheus_host = prom_host
self.promeuteheus_cert = prom_certfile
self.promeuteheus_key = prom_keyfile



self.prom_overall_messages = None
self.prom_overall_bytes = None
self.prom_overall_messages_ingested = None
# Prevent other thread interact with the performance counters
self._metricLock = threading.Lock()

Expand All @@ -47,6 +65,7 @@ def __init__(self, stopEvent, product, feed, output):
self.monitoring_interval = uls_config.monitoring_interval # Monitoring interval
self._version = uls_config.__version__


# Definitions
self.name = "UlsMonitoring" # Class Human readable name
self.overall_messages_handled = 0 # Define overall number of messages handled
Expand All @@ -67,6 +86,42 @@ def start(self):
else:
aka_log.log.debug(f"{self.name} monitoring was disabled - not starting.")

if self.prometheues_enabled:
aka_log.log.debug(f"{self.name} Prometheus monitoring started...")
self.start_prometheus(port=self.promeuteheus_port, host=self.promeuteheus_host, cert=self.promeuteheus_cert, key=self.promeuteheus_key)

def start_prometheus(self, port, host="127.0.0.1", cert=None, key=None):
from prometheus_client import start_http_server
from prometheus_client import Info, Counter, Gauge
from prometheus_client import REGISTRY, PROCESS_COLLECTOR, PLATFORM_COLLECTOR

# Disable unwanted collectors
REGISTRY.unregister(PROCESS_COLLECTOR)
REGISTRY.unregister(PLATFORM_COLLECTOR)
REGISTRY.unregister(REGISTRY._names_to_collectors['python_gc_objects_collected_total'])

# Start the Server
server, t = start_http_server(port=port, addr=host, certfile=cert, keyfile=key)
server.base_environ.clear()

# Show the version
version = Info('uls_version', 'The current ULS Version')
version.info({'version': uls_config.__version__})

uls_stream_info = Info('uls_stream_info', "The selected ULS input product")
uls_stream_info.info({'product': self._product, 'feed': self._feed, 'output': self._output})

starttime = Info('uls_starttime', "The time, the uls process was started")
starttime.info({'starttime': f'{self.init_time}'})



# Counters
self.prom_overall_messages = Counter('uls_overall_messages_incoming', 'Number of all handled incoming log lines')
self.prom_overall_bytes = Counter('uls_overall_bytes_incoming', 'Size of all handled incoming log lines')
self.prom_overall_messages_ingested = Counter('uls_overall_messages_ingested', 'Number of all handled outgoing log lines')


def display(self):
"""
Entry point for the monitoring thread
Expand Down Expand Up @@ -109,9 +164,14 @@ def increase_message_count(self, bytes=0):
self.window_messages_handled = self.window_messages_handled + 1
self.window_messages_bytes += bytes

# Also increase the prom counters
self.prom_overall_messages.inc()
self.prom_overall_bytes.inc(bytes)

def increase_message_ingested(self):
with self._metricLock:
self.window_messages_ingested += 1
self.prom_overall_messages_ingested.inc()


def get_message_count(self):
Expand All @@ -124,5 +184,6 @@ def get_stats(self):
def _runtime(self):
return int(time.time() - self.init_time)


# EOF

12 changes: 12 additions & 0 deletions bin/modules/UlsTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

# ULS modules
import modules.aka_log as aka_log
import requests
import uls_config.global_config as uls_config


Expand Down Expand Up @@ -340,3 +341,14 @@ def get_install_id(install_id_file=str(root_path()) + "/var/uls_install_id"):
return data


def callhome(nocallhome_state: bool, input: str = "n/a", feed: str = "n/a", output: str = "n/a", position: str = "n/a"):
if not nocallhome_state:
try:
url = f"/{position}?version={uls_config.__version__}&input={input}&feed={feed}&output={output}&install_id={get_install_id()['install_id']}&os_platform={platform.platform()}&pyhton={sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}&container={check_container()}"
aka_log.log.debug(f"Sending a CallHome request containing the following data: {url}")
result = requests.get(uls_config.callhome_url + url, timeout=int(uls_config.callhome_timeout))
aka_log.log.debug(f"Callhome response code: {result.status_code}")
except:
aka_log.log.debug(f"Callhome went wrong ...")
else:
aka_log.log.debug(f"Callhome functionality has been disabled - not sending any data")
14 changes: 12 additions & 2 deletions bin/modules/UlsTransformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,17 @@ def _jmespath_transformation(self, log_line, expression):
data = json.loads(log_line.decode())
my_output = expression.search(data)


return str(my_output)
try:
# Attempt to convert the variable to a JSON string
json_data = json.dumps(my_output)
except (TypeError, ValueError) as e:
# This block executes if a TypeError or ValueError occurs during conversion
aka_log.log.debug(f'{self.name} - Transformation ({self.transformation}) '
f'transformation triggered but '
f'final json conversion failed ... ')
return str(my_output)
else:
# This block executes if no exceptions are raised
return json_data

# EOF
18 changes: 15 additions & 3 deletions bin/modules/aka_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,25 @@
# limitations under the License.

import logging
import uls_config.global_config as uls_config


def init(loglevel='WARNING', loggername=None):
def init(loglevel='WARNING', loggername=None, jsonlogs: bool = False, logformat=None, logdatefmt=uls_config.log_datefmt):
global log
log = logging.getLogger(loggername)
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(name)s %(levelname).1s %(message)s')


if jsonlogs:
if not logformat:
formatter = logging.Formatter('{"timestamp": "%(asctime)s", "log_level": "%(levelname)s", "component": "%(name)s", "message": "%(message)s"}', datefmt=logdatefmt)
else:
formatter = logging.Formatter(logformat, datefmt=logdatefmt)
else:
if not logformat:
formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s', datefmt=logdatefmt)
else:
formatter = logging.Formatter(logformat, datefmt=logdatefmt)

console_handler.setFormatter(formatter)
log.addHandler(console_handler)
log.setLevel(loglevel)
Expand Down
8 changes: 5 additions & 3 deletions bin/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
jmespath>=0.10.0
requests>=2.25.1
pytz>=2021.1
jmespath>=1.0.1
requests>=2.32.3
pytz>=2024.1
prometheus_client>=0.20.0
setuptools>=72.1.0
15 changes: 13 additions & 2 deletions bin/uls.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def main():
uls_args = ArgsParser.init()

# Load the LOG system
aka_log.init(uls_args.loglevel, uls_config.__tool_name_short__)
aka_log.init(uls_args.loglevel, uls_config.__tool_name_short__, jsonlogs=uls_args.jsonlog, logformat=uls_args.logformat, logdatefmt=uls_args.logdatefmt)

# Determine root directory
root_path = str(UlsTools.root_path())
Expand Down Expand Up @@ -94,7 +94,12 @@ def main():
my_monitor = UlsMonitoring.UlsMonitoring(stopEvent=stopEvent,
product=uls_args.input,
feed=uls_args.feed,
output=uls_args.output)
output=uls_args.output,
prom_enabled=uls_args.prometheus_enabled,
prom_port=uls_args.prometheus_port,
prom_host=uls_args.prometheus_addr,
prom_certfile=uls_args.prometheus_certfile,
prom_keyfile=uls_args.prometheus_keyfile)
my_monitor.start()

# Connect to an Input Handler UlsInputCli
Expand Down Expand Up @@ -170,12 +175,18 @@ def main():
# Connect the output handler
my_output.connect()


# Send CallHome Request, if not opted_out
UlsTools.callhome(nocallhome_state=uls_args.nocallhome, position="uls_start", input=uls_args.input, feed=uls_args.feed, output=uls_args.output)

# New ULS/1.5: the input module is ingesting messages
# into a thread safe queue. The function call will immediately
# return
event_q = queue.Queue(uls_args.input_queue_size)
my_input.ingest(stopEvent, event_q, my_monitor)



# Now we are back to the main thread to process the message
while not stopEvent.is_set():
try:
Expand Down
18 changes: 16 additions & 2 deletions bin/uls_config/global_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3

# Common global variables / constants
__version__ = "1.7.5"
__version__ = "1.8.0-alpha"
__tool_name_long__ = "Akamai Unified Log Streamer"
__tool_name_short__ = "ULS"

Expand Down Expand Up @@ -50,10 +50,12 @@
transformation_choices = ['MCAS', 'JMESPATH']


# LogLevels
# ULS Logging & LogLevels
log_levels_available = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
log_level_default = 'WARNING'
log_debugloglines_default = False
log_jsonlog = False
log_datefmt = "%Y-%m-%d %H:%M:%S%z"

# INPUT Configuration
input_rerun_retries = 3 # Number of rerun attempts before giving up
Expand Down Expand Up @@ -110,3 +112,15 @@
autoresume_supported_inputs = ['ETP', 'EAA', 'GC', 'SIA', 'ACC'] # Internal Var only, to adjust supported inputs
autoresume_write_after = 1000 # Write checkpoint only every ${autoresume_write_every} loglines

# CAllHome Configuration
callhome_enabled = "True" # CallHome Functionality is enabled / disabled
callhome_url = "https://uls-beacon.akamaized.net" # CallHome URL Target
callhome_timeout = "2" # Callhome Timeout in seconds

# Prometheus Monitoring basics
prometheus_enabled = "False" # Do not eneable prometheues by default
prometheus_port = 8000 # Default Prometheus port
prometheus_addr = "127.0.0.1" # Default Prometheus bind address
prometheus_certfile = None # Prometheus Cert file
prometheus_keyfile = None # Prometheus Key file
# prometheus_client_cafile
Loading

0 comments on commit c90f587

Please sign in to comment.