Skip to content

Commit

Permalink
Merge pull request #42 from exalearn/alloc-race
Browse files Browse the repository at this point in the history
Fix a race condition in event_responder
  • Loading branch information
WardLT authored Jan 3, 2022
2 parents de989a3 + 265c216 commit ba34d99
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 40 deletions.
13 changes: 3 additions & 10 deletions colmena/thinker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,13 @@ def _event_responder_agent(thinker: 'BaseThinker', process_func: Callable, event
max_slots = getattr(thinker, max_slots)

# Loop until the thinker is completed
func_is_done = Event()
reallocator_thread: Optional[ReallocatorThread] = None
while not thinker.done.is_set():
if event.wait(_DONE_REACTION_TIME):
# Reset the "function is done" event
func_is_done.clear()

# If desired, launch the resource-allocation thread
if reallocate_resources:
reallocator_thread = ReallocatorThread(
thinker.rec, func_is_done,
gather_from=gather_from, gather_to=gather_to,
thinker.rec, gather_from=gather_from, gather_to=gather_to,
disperse_to=disperse_to, max_slots=max_slots,
slot_step=slot_step, logger_name=thinker.logger.name + ".allocate"
)
Expand All @@ -130,11 +125,9 @@ def _event_responder_agent(thinker: 'BaseThinker', process_func: Callable, event
# Launch the function
process_func(thinker)

# When complete, set the Event flag (killing the resource allocator)
func_is_done.set()

# Wait for the resource allocator to die
# If we are using resource re-allocation, set the stop condition and wait for resources to be freed
if reallocator_thread is not None:
reallocator_thread.stop_event.set()
reallocator_thread.join()


Expand Down
63 changes: 36 additions & 27 deletions colmena/thinker/resources.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Utilities for tracking resources"""
from threading import Semaphore, Lock, Event, Thread
from typing import List, Dict, Optional
from typing import List, Dict, Optional, Union
from time import monotonic
from math import inf
import logging
Expand Down Expand Up @@ -180,39 +180,41 @@ def acquire(self, task: Optional[str], n_slots: int, timeout: float = -1., cance

return success

def reallocate(self, task_from: Optional[str], task_to: Optional[str], n_slots: int,
def reallocate(self, task_from: Optional[str], task_to: Optional[str], n_slots: Union[int, str],
timeout: float = -1, cancel_if: Optional[Event] = None) -> bool:
"""Transfer computer resources from one task to another
Args:
task_from: Which task to pull resources from (None to request un-allocated nodes)
task_to: Which task to add resources to (None to de-allocate nodes)
n_slots: Number of nodes to request
n_slots: Number of nodes to request. Set to "all" to reallocate all slots (all allocated slots, not just all available slots)
timeout: Maximum time to wait for the request to be filled
cancel_if: Cancel the request if this event happens
Returns:
Whether request was fulfilled
"""

if task_to == task_from:
raise ValueError(f'Resources cannot be moved between the same pool. task_from = "{task_from}" = task_to')

# Pull nodes from the remaining
acq_success = self.acquire(task_from, n_slots, timeout, cancel_if)

# If successful, push those resources to the target pool
if acq_success:
# Mark resources as available
for _ in range(n_slots):
self._availability[task_to].release()

# Record changes to the total pool size
with self._allocation_lock[task_from], self._allocation_lock[task_to]:
self._allocation[task_from] -= n_slots
self._allocation[task_to] += n_slots
logger.info(f'Transferred {n_slots} slots from {task_from} to {task_to}')
# TODO (wardlt): Eventually provide some mechanism to inform a batch
# system that resources allocated to ``from_task`` should be released
with self._allocation_lock[task_from]:
if n_slots == "all":
n_slots = self.allocated_slots(task_from)
acq_success = self.acquire(task_from, n_slots, timeout, cancel_if)

# If successful, push those resources to the target pool
if acq_success:
# Mark resources as available
for _ in range(n_slots):
self._availability[task_to].release()

# Record changes to the total pool size
with self._allocation_lock[task_to]:
self._allocation[task_from] -= n_slots
self._allocation[task_to] += n_slots
logger.info(f'Transferred {n_slots} slots from {task_from} to {task_to}')
# TODO (wardlt): Eventually provide some mechanism to inform a batch
# system that resources allocated to ``from_task`` should be released

return acq_success

Expand All @@ -221,19 +223,23 @@ class ReallocatorThread(Thread):
"""Thread that reallocates resources until an event is set.
Create a thread by defining the procedure the thread should follow for reallocation
(e.g., from where to gather resources, where to store them)
and an event that will signal for it to exit.
(e.g., from where to gather resources, where to store them, where to put them when done).
The resource allocation thread is stopped by calling ``obj.stop_event.set()``.
Note that you can provide an Event object to the initializer to use instead of
the ``stop_event`` attribute.
Runs as a daemon thread."""

def __init__(self, resource_counter: ResourceCounter, stop_event: Event,
def __init__(self, resource_counter: ResourceCounter,
gather_from: Optional[str], gather_to: Optional[str],
disperse_to: Optional[str], max_slots: Optional[int] = None,
stop_event: Optional[Event] = None,
slot_step: int = 1, logger_name: Optional[str] = None):
"""
Args:
resource_counter: Resource counter used to track resources
stop_event: Event which controls when the thread should give resources back
stop_event: Event which controls when the thread should give resources back. If unset, a new Event is created.
logger_name: Name of the logger, if desired
gather_from: Name of a resource pool from which to acquire resources
gather_to: Name of the resource pool to place re-allocated resources
Expand All @@ -244,7 +250,10 @@ def __init__(self, resource_counter: ResourceCounter, stop_event: Event,
super().__init__(daemon=True)

self.resource_counter = resource_counter
self.stop_event = stop_event
if stop_event is not None:
self.stop_event = stop_event
else:
self.stop_event = Event()
self.gather_from = gather_from
self.gather_to = gather_to
self.disperse_to = disperse_to
Expand All @@ -254,7 +263,8 @@ def __init__(self, resource_counter: ResourceCounter, stop_event: Event,
self.logger = logger if logger_name is None else logging.getLogger(logger_name)

def run(self) -> None:
self.logger.info('Starting resource allocation thread')
self.logger.info(f'Starting resource allocation thread. Allocating a maximum of {self.max_slots} to {self.gather_to} from'
f' {self.gather_from} in steps of {self.slot_step}')

# Acquire resources until either the maximum is reached, or the event is triggered
while (self.max_slots is None or self.resource_counter.allocated_slots(self.gather_to) < self.max_slots) \
Expand All @@ -266,6 +276,5 @@ def run(self) -> None:
self.logger.info('Waiting for stop condition to be set')
self.stop_event.wait()
if self.gather_to != self.disperse_to:
self.resource_counter.reallocate(self.gather_to, self.disperse_to,
self.resource_counter.allocated_slots(self.gather_to))
self.resource_counter.reallocate(self.gather_to, self.disperse_to, "all")
self.logger.info('Resource allocation thread exiting')
41 changes: 38 additions & 3 deletions colmena/thinker/tests/test_resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from threading import Timer, Event
from time import sleep

from pytest import fixture
from pytest import fixture, mark

from colmena.thinker.resources import ResourceCounter, ReallocatorThread

Expand Down Expand Up @@ -63,14 +63,19 @@ def test_allocations(rec):
assert rec.available_slots("sim") == 4
assert stop.is_set()

# Test out reallocate all
rec.reallocate("sim", "ml", "all")
assert rec.allocated_slots("sim") == 0
assert rec.allocated_slots("ml") == 8


def test_reallocator(rec):
# Start with everything allocated to "simulation"
rec.reallocate(None, "sim", 8)

# Test allocating up to the maximum
stop = Event()
alloc = ReallocatorThread(rec, stop, gather_from="sim", gather_to="ml", disperse_to=None, max_slots=2)
alloc = ReallocatorThread(rec, stop_event=stop, gather_from="sim", gather_to="ml", disperse_to=None, max_slots=2)
alloc.start()
sleep(0.2)
assert alloc.is_alive()
Expand All @@ -84,7 +89,7 @@ def test_reallocator(rec):

# Test without a maximum allocation
stop.clear()
alloc = ReallocatorThread(rec, stop, gather_from="sim", gather_to="ml", disperse_to=None)
alloc = ReallocatorThread(rec, stop_event=stop, gather_from="sim", gather_to="ml", disperse_to=None)
alloc.start()
sleep(0.2)
assert alloc.is_alive()
Expand All @@ -95,3 +100,33 @@ def test_reallocator(rec):
sleep(2) # We check for the flag every 1s
assert not alloc.is_alive()
assert rec.unallocated_slots == 8


@mark.timeout(2)
@mark.repeat(4)
def test_reallocator_deadlock(rec):
"""Creates the deadlock reported in https://github.com/exalearn/colmena/issues/43"""
rec.reallocate(None, "sim", 8)
assert rec.available_slots("sim") == 8

# Create two allocators: One that pulls from sim and another that pulls from ml
ml_alloc = ReallocatorThread(rec, gather_from="sim", gather_to="ml", disperse_to="sim", max_slots=8)
sim_alloc = ReallocatorThread(rec, gather_from="ml", gather_to="sim", disperse_to="ml", max_slots=8)

# Start the ML allocator, which will pull resources from sim to ml
ml_alloc.start()
assert not ml_alloc.stop_event.is_set()
assert rec.available_slots("ml") == 8
assert rec.acquire("ml", 8)
assert rec.available_slots("ml") == 0

# Start the sim allocator, which will ask to pull resources from ml over to sim
sim_alloc.start()
sleep(0.001)
assert rec.available_slots("ml") == 0

# Send the stop signal and wait for ml_alloc to exit
ml_alloc.stop_event.set()
rec.release("ml", 8)
ml_alloc.join(1)
assert not ml_alloc.is_alive()
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ flake8
pytest
pytest-timeout
pytest-mock
pytest-repeat

0 comments on commit ba34d99

Please sign in to comment.