Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly free multiprocessing.Value for progress #228

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions bio2zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,12 @@
import tqdm
import zarr

# multiprocessing.util.log_to_stderr(5)

logger = logging.getLogger(__name__)

numcodecs.blosc.use_threads = False

# By default Tqdm creates a multiprocessing Lock to synchronise across processes,
# which seems to cause some problems with leaked semaphores on certain combinations
# of Mac and Python versions. We only access tqdm from the main process though,
# so we don't need it and can override with a simpler threading Lock.
# NOTE: this gets set multiple times to different locks as subprocesses are
# spawned, but it doesn't matter because the only tqdm instance that is
# used is the one in the main process.
tqdm.tqdm.set_lock(threading.RLock())


def display_number(x):
ret = "n/a"
Expand Down Expand Up @@ -196,7 +189,14 @@ class ProgressConfig:
# progressable thing happening per source process. This is
# probably fine in practise, but there could be corner cases
# where it's not. Something to watch out for.
_progress_counter = None
# _progress_counter = None

if multiprocessing.current_process().name == "MainProcess":
ctx = multiprocessing.get_context("spawn")
_progress_counter = ctx.Value("Q", 0)
print("process", multiprocessing.current_process())
# print("parent_process", multiprocessing.parent_process())
# print("MADE:", _progress_counter, repr(_progress_counter._lock._semlock))


def update_progress(inc):
Expand All @@ -213,15 +213,18 @@ def get_progress():
def setup_progress_counter(counter):
global _progress_counter
_progress_counter = counter
# print("SET:", _progress_counter, repr(_progress_counter._lock._semlock))


class ParallelWorkManager(contextlib.AbstractContextManager):
def __init__(self, worker_processes=1, progress_config=None):
# Need to specify this explicitly to suppport Macs and
# for future proofing.
ctx = multiprocessing.get_context("spawn")
global _progress_counter
_progress_counter = ctx.Value("Q", 0)
# ctx = multiprocessing.get_context("spawn")
# global _progress_counter
# _progress_counter = ctx.Value("Q", 0)
with _progress_counter.get_lock():
_progress_counter.value = 0
if worker_processes <= 0:
# NOTE: this is only for testing, not for production use!
self.executor = SynchronousExecutor()
Expand Down Expand Up @@ -289,12 +292,11 @@ def __exit__(self, exc_type, exc_val, exc_tb):
with self.completed_lock:
self.completed = True
self.executor.shutdown(wait=False)
# FIXME there's currently some thing weird happening at the end of
# Encode 1D for 1kg-p3. The progress bar disappears, like we're
# setting a total of zero or something.
self.progress_thread.join()
self._update_progress()
self.progress_bar.close()
# global _progress_counter
# _progress_counter = None
return False


Expand Down
6 changes: 4 additions & 2 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def test_one_future_progress(self, total, workers):
progress_config = core.ProgressConfig(total=total)
with core.ParallelWorkManager(workers, progress_config) as pwm:
pwm.submit(core.update_progress, total)
assert core.get_progress() == total
list(pwm.results_as_completed())
assert core.get_progress() == total

@pytest.mark.parametrize("total", [1, 10, 1000])
@pytest.mark.parametrize("workers", [0, 1, 2, 3])
Expand All @@ -72,7 +73,8 @@ def test_n_futures_progress(self, total, workers):
with core.ParallelWorkManager(workers, progress_config) as pwm:
for _ in range(total):
pwm.submit(core.update_progress, 1)
assert core.get_progress() == total
list(pwm.results_as_completed())
assert core.get_progress() == total

@pytest.mark.parametrize("total", [1, 10, 20])
@pytest.mark.parametrize("workers", [0, 1, 2, 3])
Expand Down
Loading