Skip to content

Commit

Permalink
Include last error message in progress bar for lf.concurrent_map.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 570588910
  • Loading branch information
daiyip authored and langfun authors committed Oct 4, 2023
1 parent ffc1199 commit cb97f13
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 16 deletions.
43 changes: 32 additions & 11 deletions langfun/core/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,23 +333,35 @@ def concurrent_map(
total += 1

progress = tqdm.tqdm(total=total) if show_progress else None
def update_progress(success: int, failure: int) -> None:
def update_progress(
success: int,
failure: int,
last_error: Exception | None = None) -> None:
if progress is not None:
completed = success + failure
description = 'Success: %.2f%% (%d/%d), Failure: %.2f%% (%d/%d)' % (
success * 100.0 / completed, success, completed,
failure * 100.0 / completed, failure, completed
)
progress.set_description(description)

if last_error is not None:
error_text = repr(last_error)
if len(error_text) >= 64:
error_text = error_text[:64] + '...'
postfix = {'LastError': error_text}
progress.set_postfix(postfix)
progress.update(1)
progress.set_description(
'Success: %.2f%% (%d/%d), Failure: %.2f%% (%d/%d)'
% (success * 100.0 / completed, success, completed,
failure * 100.0 / completed, failure, completed))

success, failure = 0, 0
success, failure, last_error = 0, 0, None
if ordered:
for future in pending_futures:
job = future_to_job[future]
wait_time = (timeout - job.elapse) if timeout else None
try:
_ = future.result(timeout=wait_time)
if job.error is not None:
last_error = job.error
failure += 1
if not (
silence_on_errors and isinstance(job.error, silence_on_errors)):
Expand All @@ -358,11 +370,11 @@ def update_progress(success: int, failure: int) -> None:
success += 1
except concurrent.futures.TimeoutError:
future.cancel()
job.mark_canceled(
TimeoutError(f'Execution time ({job.elapse}) '
f'exceeds {timeout} seconds.'))
last_error = TimeoutError(
f'Execution time ({job.elapse}) exceeds {timeout} seconds.')
job.mark_canceled(last_error)
failure += 1
update_progress(success, failure)
update_progress(success, failure, last_error)
yield job.arg, job.result, job.error
else:
while pending_futures:
Expand All @@ -373,14 +385,15 @@ def update_progress(success: int, failure: int) -> None:
job = future_to_job[future]
del future_to_job[future]
if job.error is not None:
last_error = job.error
failure += 1
if not (
silence_on_errors and isinstance(job.error, silence_on_errors)
):
raise job.error # pylint: disable=g-doc-exception
else:
success += 1
update_progress(success, failure)
update_progress(success, failure, last_error)
yield job.arg, job.result, job.error
completed_batch.add(future)
except concurrent.futures.TimeoutError:
Expand All @@ -401,6 +414,14 @@ def update_progress(success: int, failure: int) -> None:
job.mark_canceled(
TimeoutError(f'Execution time ({job.elapse}) '
f'exceeds {timeout} seconds.'))

if job.error is not None:
last_error = job.error
failure += 1
else:
success += 1

update_progress(success, failure, last_error)
yield job.arg, job.result, job.error
else:
remaining_futures.append(future)
Expand Down
13 changes: 8 additions & 5 deletions langfun/core/concurrent_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,32 +331,35 @@ def fun(x):
[
(i, o)
for i, o, _ in concurrent.concurrent_map(
fun, [5, 2, 1, 4], timeout=3, max_workers=1
fun, [5, 2, 1, 6], timeout=3, max_workers=1
)
],
[
(5, pg.MISSING_VALUE),
(2, 2),
(1, 1),
(4, pg.MISSING_VALUE),
(6, pg.MISSING_VALUE),
],
)

def test_concurrent_map_with_showing_progress(self):
def fun(x):
if x == 2:
raise ValueError('Intentional error.')
time.sleep(x)
return x

self.assertEqual(
[
(i, o)
for i, o, _ in concurrent.concurrent_map(
fun, [1, 2, 3], ordered=True, show_progress=True
fun, [1, 2, 3], timeout=1.5, max_workers=1, show_progress=True
)
],
[
(1, 1),
(2, 2),
(3, 3),
(2, pg.MISSING_VALUE),
(3, pg.MISSING_VALUE),
],
)

Expand Down

0 comments on commit cb97f13

Please sign in to comment.