From d67ac9efec8eee336b32d9838e2bbf58c0675012 Mon Sep 17 00:00:00 2001 From: Logan Ward Date: Wed, 8 Mar 2023 13:51:15 -0500 Subject: [PATCH] Improve the utility of the log (#95) * Better names for the logger Include the name of the module or accept a user-provided name * Add timing messages for event_responder * Add timing messages for result processor * Add timings for the task submitter --- colmena/thinker/__init__.py | 37 +++++++----- colmena/thinker/tests/test_thinker.py | 86 +++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 14 deletions(-) diff --git a/colmena/thinker/__init__.py b/colmena/thinker/__init__.py index 2393193..f5c2a2f 100644 --- a/colmena/thinker/__init__.py +++ b/colmena/thinker/__init__.py @@ -3,6 +3,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from functools import partial, update_wrapper from threading import Event, local, Thread, Barrier +from time import perf_counter from traceback import TracebackException from typing import Optional, Callable, List, Union, Dict, Tuple @@ -48,7 +49,11 @@ def _result_event_agent(thinker: 'BaseThinker', process_func: Callable, topic: O result = thinker.queues.get_result(timeout=_DONE_REACTION_TIME, topic=topic) except TimeoutException: continue + + thinker.logger.info(f'Started to process result for topic={topic}') + start_time = perf_counter() process_func(thinker, result) + thinker.logger.info(f'Finished processing result for topic={topic}. Runtime: {perf_counter() - start_time:.4e}s') def result_processor(func: Optional[Callable] = None, topic: str = 'default'): @@ -82,7 +87,10 @@ def _task_submitter_agent(thinker: 'BaseThinker', process_func: Callable, task_t # Wait until resources are free or thinker.done is set acq_success = thinker.rec.acquire(task_type, n_slots, cancel_if=thinker.done) if acq_success: + thinker.logger.info(f'Acquired {n_slots} execution slots of type {task_type}') + start_time = perf_counter() process_func(thinker) + thinker.logger.info(f'Finished submitting new work. Runtime: {perf_counter() - start_time:.4e}s') def task_submitter(func: Optional[Callable] = None, task_type: str = None, n_slots: Union[int, str] = 1): @@ -131,6 +139,8 @@ def _event_responder_agent(thinker: 'BaseThinker', process_func: Callable, event while not thinker.done.is_set(): if event.wait(_DONE_REACTION_TIME): thinker.logger.info(f'Event {event_name} has been triggered') + + start_time = perf_counter() # If desired, launch the resource-allocation thread if reallocate_resources: reallocator_thread = ReallocatorThread( @@ -142,6 +152,7 @@ def _event_responder_agent(thinker: 'BaseThinker', process_func: Callable, event # Launch the function process_func(thinker) + thinker.logger.info(f'Finished responding to {event_name} event. Runtime: {perf_counter() - start_time:.4}s') # If we are using resource re-allocation, set the stop condition and wait for resources to be freed if reallocator_thread is not None: @@ -149,10 +160,12 @@ def _event_responder_agent(thinker: 'BaseThinker', process_func: Callable, event reallocator_thread.join() # Wait until all agents that responded to this event finish - barrier.wait() + rank = barrier.wait() # Then reset the event event.clear() + if rank == 0: + thinker.logger.info(f'All responses to {event_name} complete. Time elapsed: {perf_counter() - start_time:.4}s') def event_responder(func: Optional[Callable] = None, event_name: str = None, @@ -301,16 +314,22 @@ def function(self): Start the thinker by calling ``.start()`` """ - def __init__(self, queue: ColmenaQueues, resource_counter: Optional[ResourceCounter] = None, - daemon: bool = True, **kwargs): + def __init__(self, + queue: ColmenaQueues, + resource_counter: Optional[ResourceCounter] = None, + daemon: bool = True, + logger_name: Optional[str] = None, + **kwargs): """ Args: queue: Queue wrapper used to communicate with task server resource_counter: Utility to used track resource utilization daemon: Whether to launch this as a daemon thread + logger_name: An optional name to give to the root logger for this thinker **kwargs: Options passed to :class:`Thread` """ super().__init__(daemon=daemon, **kwargs) + self.logger_name = logger_name # Define thinker-wide collectives self.rec = resource_counter @@ -350,10 +369,6 @@ def tear_down_agent(self): Override to define any tear down logic.""" pass - def make_logging_handler(self) -> Optional[logging.Handler]: - """Override to create a distinct logging handler for log messages emitted from this object""" - return None - def make_logger(self, name: Optional[str] = None): """Make a sub-logger for our application @@ -364,16 +379,10 @@ def make_logger(self, name: Optional[str] = None): Logger with an appropriate name """ # Create the logger - my_name = self.__class__.__name__.lower() + my_name = f'{self.__class__.__module__}.{self.__class__.__name__}' if self.logger_name is None else self.logger_name if name is not None: my_name += "." + name new_logger = logging.getLogger(my_name) - - # Assign the handler to the root logger - if name is None: - hnd = self.make_logging_handler() - if hnd is not None: - new_logger.addHandler(hnd) return new_logger @classmethod diff --git a/colmena/thinker/tests/test_thinker.py b/colmena/thinker/tests/test_thinker.py index 3c6cdf9..8ab3c34 100644 --- a/colmena/thinker/tests/test_thinker.py +++ b/colmena/thinker/tests/test_thinker.py @@ -1,4 +1,5 @@ """Test the Thinker class""" +import logging from threading import Event from time import sleep @@ -82,6 +83,91 @@ def test_detection(): assert 'function' in [a.__name__ for a in ExampleThinker.list_agents()] +def test_logger_name(queues): + """Test the name of loggers""" + + class SimpleThinker(BaseThinker): + pass + + # See if the default name holds + thinker = SimpleThinker(queues) + assert 'SimpleThinker' in thinker.logger.name + + # See if we can provide it its own name + thinker = SimpleThinker(queues, logger_name='my_logger') + assert thinker.logger.name == 'my_logger' + + +@mark.timeout(5) +def test_logger_timings_event(queues, caplog): + class TestThinker(BaseThinker): + event: Event = Event() + + @event_responder(event_name='event') + def a(self): + self.done.set() + + @event_responder(event_name='event') + def b(self): + self.done.set() + + # Start the thinker + with caplog.at_level(logging.INFO): + thinker = TestThinker(queues, daemon=True) + thinker.start() + sleep(0.5) + thinker.event.set() + thinker.join(timeout=1) + assert thinker.done.is_set() + assert any('Runtime' in record.msg for record in caplog.records if '.a' in record.name) + assert sum('All responses to event complete' in record.msg for record in caplog.records) == 1 + + +@mark.timeout(5) +def test_logger_timings_process(queues, caplog): + class TestThinker(BaseThinker): + + @result_processor() + def process(self, _): + self.done.set() + + # Start the thinker + thinker = TestThinker(queues, daemon=True) + thinker.start() + + # Spoof a result completing + + queues.send_inputs(1, method='test') + topic, result = queues.get_task() + result.set_result(1, 1) + with caplog.at_level(logging.INFO): + queues.send_result(result, topic) + + # Wait then check the logs + sleep(0.5) + + assert thinker.done.is_set() + assert any('Runtime' in record.msg for record in caplog.records if '.process' in record.name), caplog.record_tuples + + +@mark.timeout(5) +def test_logger_timings_submitter(queues, caplog): + class TestThinker(BaseThinker): + + @task_submitter() + def submit(self): + self.done.set() + + # Start the thinker + with caplog.at_level(logging.INFO): + thinker = TestThinker(queues, ResourceCounter(1), daemon=True) + thinker.start() + sleep(0.5) + + assert thinker.done.is_set() + assert any('Runtime' in record.msg for record in caplog.records if '.submit' in record.name), caplog.record_tuples + + @mark.timeout(5) def test_run(queues): """Test the behavior of all agents"""