diff --git a/langfun/core/concurrent.py b/langfun/core/concurrent.py index f63f3057..ae159417 100644 --- a/langfun/core/concurrent.py +++ b/langfun/core/concurrent.py @@ -333,16 +333,27 @@ def concurrent_map( total += 1 progress = tqdm.tqdm(total=total) if show_progress else None - def update_progress(success: int, failure: int) -> None: + def update_progress( + success: int, + failure: int, + last_error: Exception | None = None) -> None: if progress is not None: completed = success + failure + description = 'Success: %.2f%% (%d/%d), Failure: %.2f%% (%d/%d)' % ( + success * 100.0 / completed, success, completed, + failure * 100.0 / completed, failure, completed + ) + progress.set_description(description) + + if last_error is not None: + error_text = repr(last_error) + if len(error_text) >= 64: + error_text = error_text[:64] + '...' + postfix = {'LastError': error_text} + progress.set_postfix(postfix) progress.update(1) - progress.set_description( - 'Success: %.2f%% (%d/%d), Failure: %.2f%% (%d/%d)' - % (success * 100.0 / completed, success, completed, - failure * 100.0 / completed, failure, completed)) - success, failure = 0, 0 + success, failure, last_error = 0, 0, None if ordered: for future in pending_futures: job = future_to_job[future] @@ -350,6 +361,7 @@ def update_progress(success: int, failure: int) -> None: try: _ = future.result(timeout=wait_time) if job.error is not None: + last_error = job.error failure += 1 if not ( silence_on_errors and isinstance(job.error, silence_on_errors)): @@ -358,11 +370,11 @@ def update_progress(success: int, failure: int) -> None: success += 1 except concurrent.futures.TimeoutError: future.cancel() - job.mark_canceled( - TimeoutError(f'Execution time ({job.elapse}) ' - f'exceeds {timeout} seconds.')) + last_error = TimeoutError( + f'Execution time ({job.elapse}) exceeds {timeout} seconds.') + job.mark_canceled(last_error) failure += 1 - update_progress(success, failure) + update_progress(success, failure, last_error) yield job.arg, job.result, job.error else: while pending_futures: @@ -373,6 +385,7 @@ def update_progress(success: int, failure: int) -> None: job = future_to_job[future] del future_to_job[future] if job.error is not None: + last_error = job.error failure += 1 if not ( silence_on_errors and isinstance(job.error, silence_on_errors) @@ -380,7 +393,7 @@ def update_progress(success: int, failure: int) -> None: raise job.error # pylint: disable=g-doc-exception else: success += 1 - update_progress(success, failure) + update_progress(success, failure, last_error) yield job.arg, job.result, job.error completed_batch.add(future) except concurrent.futures.TimeoutError: @@ -401,6 +414,14 @@ def update_progress(success: int, failure: int) -> None: job.mark_canceled( TimeoutError(f'Execution time ({job.elapse}) ' f'exceeds {timeout} seconds.')) + + if job.error is not None: + last_error = job.error + failure += 1 + else: + success += 1 + + update_progress(success, failure, last_error) yield job.arg, job.result, job.error else: remaining_futures.append(future) diff --git a/langfun/core/concurrent_test.py b/langfun/core/concurrent_test.py index 0e5d1191..d5a9cd61 100644 --- a/langfun/core/concurrent_test.py +++ b/langfun/core/concurrent_test.py @@ -331,32 +331,35 @@ def fun(x): [ (i, o) for i, o, _ in concurrent.concurrent_map( - fun, [5, 2, 1, 4], timeout=3, max_workers=1 + fun, [5, 2, 1, 6], timeout=3, max_workers=1 ) ], [ (5, pg.MISSING_VALUE), (2, 2), (1, 1), - (4, pg.MISSING_VALUE), + (6, pg.MISSING_VALUE), ], ) def test_concurrent_map_with_showing_progress(self): def fun(x): + if x == 2: + raise ValueError('Intentional error.') + time.sleep(x) return x self.assertEqual( [ (i, o) for i, o, _ in concurrent.concurrent_map( - fun, [1, 2, 3], ordered=True, show_progress=True + fun, [1, 2, 3], timeout=1.5, max_workers=1, show_progress=True ) ], [ (1, 1), - (2, 2), - (3, 3), + (2, pg.MISSING_VALUE), + (3, pg.MISSING_VALUE), ], )