Skip to content

Commit

Permalink
save log-block row into LogRecord attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Jun 27, 2024
1 parent 59ea586 commit e83c2da
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 20 deletions.
54 changes: 34 additions & 20 deletions clickhouse_driver/log.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
import logging

logger = logging.getLogger(__name__)

def default_message_filter(record):
record.msg = (
f'[ {record.server_host_name} ] '
f'[ {record.server_event_time}.'
f'{record.server_event_time_microseconds:06d} ] '
f'[ {record.server_thread_id} ] '
f'{{{record.server_query_id}}} '
f'<{record.server_priority}> '
f'{record.server_source}: '
f'{record.server_text}'
)
return True


def configure_logger(raw_log_record=False):
logger = logging.getLogger(__name__)
if raw_log_record:
logger.removeFilter(default_message_filter)
else:
logger.addFilter(default_message_filter)
return logger


logger = configure_logger()

# Keep in sync with ClickHouse priorities
# https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/InternalTextLogsQueue.cpp
Expand All @@ -13,36 +37,26 @@
'Notice',
'Information',
'Debug',
'Trace',
'Test',
'Trace'
)

num_priorities = len(log_priorities)


def log_block(block):
if block is None:
return

column_names = [x[0] for x in block.columns_with_types]

for row in block.get_rows():
row = dict(zip(column_names, row))

if 1 <= row['priority'] <= num_priorities:
priority = log_priorities[row['priority']]
if 1 <= row['priority'] <= 8:
row['priority'] = log_priorities[row['priority']]
else:
priority = row[0]
row['priority'] = row[0]

# thread_number in servers prior 20.x
thread_id = row.get('thread_id') or row['thread_number']

logger.info(
'[ %s ] [ %s ] {%s} <%s> %s: %s',
row['host_name'],
thread_id,
row['query_id'],
priority,
row['source'],
row['text']
)
row['thread_id'] = row.get('thread_id') or row['thread_number']

# put log block row into LogRecord extra
extra = {"server"+k: v for k, v in row.items()}
logger.info(row['text'], extra=extra)
4 changes: 4 additions & 0 deletions docs/features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ Query logs can be received from server by using `send_logs_level` setting:
2018-12-14 10:24:53,875 INFO clickhouse_driver.log: [ klebedev-ThinkPad-T460 ] [ 25 ] {b328ad33-60e8-4012-b4cc-97f44a7b28f2} <Debug> MemoryTracker: Peak memory usage (for query): 40.23 KiB.
[(1,)]
.. note::

For logs received from ClickHouse, server timestamp is used as log record timestamp.


Multiple hosts
--------------
Expand Down
28 changes: 28 additions & 0 deletions tests/test_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest.mock import patch

from clickhouse_driver.errors import ServerException
from clickhouse_driver.log import configure_logger
from tests.testcase import BaseTestCase, file_config
from tests.util import capture_logging
from clickhouse_driver.util.helpers import chunks
Expand Down Expand Up @@ -245,3 +246,30 @@ def test_logs_with_compression(self):
query = 'SELECT 1'
client.execute(query, settings=settings)
self.assertIn(query, buffer.getvalue())

def test_logs_with_raw_record(self):
def assertion_interceptor(record):
# Assert server-related LogRecord attributes have values
self.assertIsNotNone(record.server_event_time)
self.assertIsNotNone(record.server_event_time_microseconds)
self.assertIsNotNone(record.server_thread_id)
self.assertIsNotNone(record.server_host_name)
self.assertIsNotNone(record.server_query_id)
self.assertIsNotNone(record.server_priority)
self.assertIsNotNone(record.server_text)
self.assertIsNotNone(record.server_source)

# Assert LogRecord message hasn't been transformed
self.assertEqual(record.msg, record.server_text)

return True

driver_logger = configure_logger(raw_log_record=True)
driver_logger.addFilter(assertion_interceptor)
with capture_logging('clickhouse_driver.log', 'INFO') as buffer:
settings = {'send_logs_level': 'trace'}
query = 'SELECT 1'
self.client.execute(query, settings=settings)
self.assertIn(query, buffer.getvalue())

configure_logger()
2 changes: 2 additions & 0 deletions tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ def __init__(self, logger_name, level):

def __enter__(self):
buffer = StringIO()
formatter = logging.Formatter('%(asctime)s %(message)s')

self.new_handler = logging.StreamHandler(buffer)
self.new_handler.setFormatter(formatter)
self.logger.addHandler(self.new_handler)
self.old_logger_level = self.logger.level
self.logger.setLevel(self.level)
Expand Down

0 comments on commit e83c2da

Please sign in to comment.