Skip to content

Commit

Permalink
Improve the utility of the log (#95)
Browse files Browse the repository at this point in the history
* Better names for the logger

Include the name of the module or accept a user-provided name

* Add timing messages for event_responder

* Add timing messages for result processor

* Add timings for the task submitter
  • Loading branch information
WardLT authored Mar 8, 2023
1 parent 95f3222 commit d67ac9e
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 14 deletions.
37 changes: 23 additions & 14 deletions colmena/thinker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial, update_wrapper
from threading import Event, local, Thread, Barrier
from time import perf_counter
from traceback import TracebackException
from typing import Optional, Callable, List, Union, Dict, Tuple

Expand Down Expand Up @@ -48,7 +49,11 @@ def _result_event_agent(thinker: 'BaseThinker', process_func: Callable, topic: O
result = thinker.queues.get_result(timeout=_DONE_REACTION_TIME, topic=topic)
except TimeoutException:
continue

thinker.logger.info(f'Started to process result for topic={topic}')
start_time = perf_counter()
process_func(thinker, result)
thinker.logger.info(f'Finished processing result for topic={topic}. Runtime: {perf_counter() - start_time:.4e}s')


def result_processor(func: Optional[Callable] = None, topic: str = 'default'):
Expand Down Expand Up @@ -82,7 +87,10 @@ def _task_submitter_agent(thinker: 'BaseThinker', process_func: Callable, task_t
# Wait until resources are free or thinker.done is set
acq_success = thinker.rec.acquire(task_type, n_slots, cancel_if=thinker.done)
if acq_success:
thinker.logger.info(f'Acquired {n_slots} execution slots of type {task_type}')
start_time = perf_counter()
process_func(thinker)
thinker.logger.info(f'Finished submitting new work. Runtime: {perf_counter() - start_time:.4e}s')


def task_submitter(func: Optional[Callable] = None, task_type: str = None, n_slots: Union[int, str] = 1):
Expand Down Expand Up @@ -131,6 +139,8 @@ def _event_responder_agent(thinker: 'BaseThinker', process_func: Callable, event
while not thinker.done.is_set():
if event.wait(_DONE_REACTION_TIME):
thinker.logger.info(f'Event {event_name} has been triggered')

start_time = perf_counter()
# If desired, launch the resource-allocation thread
if reallocate_resources:
reallocator_thread = ReallocatorThread(
Expand All @@ -142,17 +152,20 @@ def _event_responder_agent(thinker: 'BaseThinker', process_func: Callable, event

# Launch the function
process_func(thinker)
thinker.logger.info(f'Finished responding to {event_name} event. Runtime: {perf_counter() - start_time:.4}s')

# If we are using resource re-allocation, set the stop condition and wait for resources to be freed
if reallocator_thread is not None:
reallocator_thread.stop_event.set()
reallocator_thread.join()

# Wait until all agents that responded to this event finish
barrier.wait()
rank = barrier.wait()

# Then reset the event
event.clear()
if rank == 0:
thinker.logger.info(f'All responses to {event_name} complete. Time elapsed: {perf_counter() - start_time:.4}s')


def event_responder(func: Optional[Callable] = None, event_name: str = None,
Expand Down Expand Up @@ -301,16 +314,22 @@ def function(self):
Start the thinker by calling ``.start()``
"""

def __init__(self, queue: ColmenaQueues, resource_counter: Optional[ResourceCounter] = None,
daemon: bool = True, **kwargs):
def __init__(self,
queue: ColmenaQueues,
resource_counter: Optional[ResourceCounter] = None,
daemon: bool = True,
logger_name: Optional[str] = None,
**kwargs):
"""
Args:
queue: Queue wrapper used to communicate with task server
resource_counter: Utility to used track resource utilization
daemon: Whether to launch this as a daemon thread
logger_name: An optional name to give to the root logger for this thinker
**kwargs: Options passed to :class:`Thread`
"""
super().__init__(daemon=daemon, **kwargs)
self.logger_name = logger_name

# Define thinker-wide collectives
self.rec = resource_counter
Expand Down Expand Up @@ -350,10 +369,6 @@ def tear_down_agent(self):
Override to define any tear down logic."""
pass

def make_logging_handler(self) -> Optional[logging.Handler]:
"""Override to create a distinct logging handler for log messages emitted from this object"""
return None

def make_logger(self, name: Optional[str] = None):
"""Make a sub-logger for our application
Expand All @@ -364,16 +379,10 @@ def make_logger(self, name: Optional[str] = None):
Logger with an appropriate name
"""
# Create the logger
my_name = self.__class__.__name__.lower()
my_name = f'{self.__class__.__module__}.{self.__class__.__name__}' if self.logger_name is None else self.logger_name
if name is not None:
my_name += "." + name
new_logger = logging.getLogger(my_name)

# Assign the handler to the root logger
if name is None:
hnd = self.make_logging_handler()
if hnd is not None:
new_logger.addHandler(hnd)
return new_logger

@classmethod
Expand Down
86 changes: 86 additions & 0 deletions colmena/thinker/tests/test_thinker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Test the Thinker class"""
import logging
from threading import Event
from time import sleep

Expand Down Expand Up @@ -82,6 +83,91 @@ def test_detection():
assert 'function' in [a.__name__ for a in ExampleThinker.list_agents()]


def test_logger_name(queues):
"""Test the name of loggers"""

class SimpleThinker(BaseThinker):
pass

# See if the default name holds
thinker = SimpleThinker(queues)
assert 'SimpleThinker' in thinker.logger.name

# See if we can provide it its own name
thinker = SimpleThinker(queues, logger_name='my_logger')
assert thinker.logger.name == 'my_logger'


@mark.timeout(5)
def test_logger_timings_event(queues, caplog):
class TestThinker(BaseThinker):
event: Event = Event()

@event_responder(event_name='event')
def a(self):
self.done.set()

@event_responder(event_name='event')
def b(self):
self.done.set()

# Start the thinker
with caplog.at_level(logging.INFO):
thinker = TestThinker(queues, daemon=True)
thinker.start()
sleep(0.5)
thinker.event.set()
thinker.join(timeout=1)
assert thinker.done.is_set()
assert any('Runtime' in record.msg for record in caplog.records if '.a' in record.name)
assert sum('All responses to event complete' in record.msg for record in caplog.records) == 1


@mark.timeout(5)
def test_logger_timings_process(queues, caplog):
class TestThinker(BaseThinker):

@result_processor()
def process(self, _):
self.done.set()

# Start the thinker
thinker = TestThinker(queues, daemon=True)
thinker.start()

# Spoof a result completing

queues.send_inputs(1, method='test')
topic, result = queues.get_task()
result.set_result(1, 1)
with caplog.at_level(logging.INFO):
queues.send_result(result, topic)

# Wait then check the logs
sleep(0.5)

assert thinker.done.is_set()
assert any('Runtime' in record.msg for record in caplog.records if '.process' in record.name), caplog.record_tuples


@mark.timeout(5)
def test_logger_timings_submitter(queues, caplog):
class TestThinker(BaseThinker):

@task_submitter()
def submit(self):
self.done.set()

# Start the thinker
with caplog.at_level(logging.INFO):
thinker = TestThinker(queues, ResourceCounter(1), daemon=True)
thinker.start()
sleep(0.5)

assert thinker.done.is_set()
assert any('Runtime' in record.msg for record in caplog.records if '.submit' in record.name), caplog.record_tuples


@mark.timeout(5)
def test_run(queues):
"""Test the behavior of all agents"""
Expand Down

0 comments on commit d67ac9e

Please sign in to comment.