Skip to content

Commit

Permalink
save log-block row into LogRecord attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Jul 1, 2024
1 parent 59ea586 commit bb16df5
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 18 deletions.
46 changes: 32 additions & 14 deletions clickhouse_driver/log.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
import logging

logger = logging.getLogger(__name__)

def default_message_filter(record):
record.msg = (
f'[ {record.server_host_name} ] '
f'[ {record.server_event_time}.'
f'{record.server_event_time_microseconds:06d} ] '
f'[ {record.server_thread_id} ] '
f'{{{record.server_query_id}}} '
f'<{record.server_priority}> '
f'{record.server_source}: '
f'{record.server_text}'
)
return True


def configure_logger(raw_log_record=False):
logger = logging.getLogger(__name__)
if raw_log_record:
logger.removeFilter(default_message_filter)
else:
logger.addFilter(default_message_filter)
return logger


logger = configure_logger()

# Keep in sync with ClickHouse priorities
# https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/InternalTextLogsQueue.cpp
Expand Down Expand Up @@ -30,19 +54,13 @@ def log_block(block):
row = dict(zip(column_names, row))

if 1 <= row['priority'] <= num_priorities:
priority = log_priorities[row['priority']]
row['priority'] = log_priorities[row['priority']]
else:
priority = row[0]
row['priority'] = row[0]

# thread_number in servers prior 20.x
thread_id = row.get('thread_id') or row['thread_number']

logger.info(
'[ %s ] [ %s ] {%s} <%s> %s: %s',
row['host_name'],
thread_id,
row['query_id'],
priority,
row['source'],
row['text']
)
row['thread_id'] = row.get('thread_id') or row['thread_number']

# put log block row into LogRecord extra
extra = {"server_"+k: v for k, v in row.items()}
logger.info(row['text'], extra=extra)
20 changes: 16 additions & 4 deletions docs/features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,28 @@ Query logs can be received from server by using `send_logs_level` setting:
>>>
>>> settings = {'send_logs_level': 'debug'}
>>> client.execute('SELECT 1', settings=settings)
2018-12-14 10:24:53,873 INFO clickhouse_driver.log: [ klebedev-ThinkPad-T460 ] [ 25 ] {b328ad33-60e8-4012-b4cc-97f44a7b28f2} <Debug> executeQuery: (from 127.0.0.1:57762) SELECT 1
2018-12-14 10:24:53,874 INFO clickhouse_driver.log: [ klebedev-ThinkPad-T460 ] [ 25 ] {b328ad33-60e8-4012-b4cc-97f44a7b28f2} <Debug> executeQuery: Query pipeline:
2018-12-14 10:24:53,873 INFO clickhouse_driver.log: [ klebedev-ThinkPad-T460 ] [ 2018-12-14 10:24:53.865340 ] [ 25 ] {b328ad33-60e8-4012-b4cc-97f44a7b28f2} <Debug> executeQuery: (from 127.0.0.1:57762) SELECT 1
2018-12-14 10:24:53,874 INFO clickhouse_driver.log: [ klebedev-ThinkPad-T460 ] [ 2018-12-14 10:24:53.866763 ] [ 25 ] {b328ad33-60e8-4012-b4cc-97f44a7b28f2} <Debug> executeQuery: Query pipeline:
Expression
Expression
One
2018-12-14 10:24:53,875 INFO clickhouse_driver.log: [ klebedev-ThinkPad-T460 ] [ 25 ] {b328ad33-60e8-4012-b4cc-97f44a7b28f2} <Information> executeQuery: Read 1 rows, 1.00 B in 0.004 sec., 262 rows/sec., 262.32 B/sec.
2018-12-14 10:24:53,875 INFO clickhouse_driver.log: [ klebedev-ThinkPad-T460 ] [ 25 ] {b328ad33-60e8-4012-b4cc-97f44a7b28f2} <Debug> MemoryTracker: Peak memory usage (for query): 40.23 KiB.
2018-12-14 10:24:53,875 INFO clickhouse_driver.log: [ klebedev-ThinkPad-T460 ] [ 2018-12-14 10:24:53.867802 ] [ 25 ] {b328ad33-60e8-4012-b4cc-97f44a7b28f2} <Information> executeQuery: Read 1 rows, 1.00 B in 0.004 sec., 262 rows/sec., 262.32 B/sec.
2018-12-14 10:24:53,875 INFO clickhouse_driver.log: [ klebedev-ThinkPad-T460 ] [ 2018-12-14 10:24:53.867895 ] [ 25 ] {b328ad33-60e8-4012-b4cc-97f44a7b28f2} <Debug> MemoryTracker: Peak memory usage (for query): 40.23 KiB.
[(1,)]
.. note::

``LogRecord`` instances produced by the driver contain all attributes
from ClickHouse `Log` blocks, available via `server_` prefix (e.g.
`server_event_time`, `server_query_id`, etc.), and by default the driver
concatenates all of them and places to ``LogRecord.msg``. Calling

>>> clickhouse_driver.log.configure_logger(raw_log_record=True)

allows to disable concatenation of the attributes into a single message
string.


Multiple hosts
--------------
Expand Down
28 changes: 28 additions & 0 deletions tests/test_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest.mock import patch

from clickhouse_driver.errors import ServerException
from clickhouse_driver.log import configure_logger
from tests.testcase import BaseTestCase, file_config
from tests.util import capture_logging
from clickhouse_driver.util.helpers import chunks
Expand Down Expand Up @@ -245,3 +246,30 @@ def test_logs_with_compression(self):
query = 'SELECT 1'
client.execute(query, settings=settings)
self.assertIn(query, buffer.getvalue())

def test_logs_with_raw_record(self):
def assertion_interceptor(record):
# Assert server-related LogRecord attributes have values
self.assertIsNotNone(record.server_event_time)
self.assertIsNotNone(record.server_event_time_microseconds)
self.assertIsNotNone(record.server_thread_id)
self.assertIsNotNone(record.server_host_name)
self.assertIsNotNone(record.server_query_id)
self.assertIsNotNone(record.server_priority)
self.assertIsNotNone(record.server_text)
self.assertIsNotNone(record.server_source)

# Assert LogRecord message hasn't been transformed
self.assertEqual(record.msg, record.server_text)

return True

driver_logger = configure_logger(raw_log_record=True)
driver_logger.addFilter(assertion_interceptor)
with capture_logging('clickhouse_driver.log', 'INFO') as buffer:
settings = {'send_logs_level': 'trace'}
query = 'SELECT 1'
self.client.execute(query, settings=settings)
self.assertIn(query, buffer.getvalue())

configure_logger()
2 changes: 2 additions & 0 deletions tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ def __init__(self, logger_name, level):

def __enter__(self):
buffer = StringIO()
formatter = logging.Formatter('%(asctime)s %(message)s')

self.new_handler = logging.StreamHandler(buffer)
self.new_handler.setFormatter(formatter)
self.logger.addHandler(self.new_handler)
self.old_logger_level = self.logger.level
self.logger.setLevel(self.level)
Expand Down

0 comments on commit bb16df5

Please sign in to comment.