Skip to content

Commit

Permalink
Improve lf.eval.v2 debugging.
Browse files Browse the repository at this point in the history
- Add event `on_experiment_abort` to `lf.eval.v2.Plugin`.
- Raises non-evaluation errors. Previously errors are swallowed by `lf.concurrent_map`.
- Adding more logs for troubleshooting, which can be viewed under the "Logs" Tab.
  * Error stacktrace if process terminated abnormally.
  * HTML rendering errors.
  * Checkpointing status.
  * Completion status.

PiperOrigin-RevId: 708009471
  • Loading branch information
daiyip authored and langfun authors committed Dec 19, 2024
1 parent 57ecab1 commit f4dcfd1
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 83 deletions.
71 changes: 63 additions & 8 deletions langfun/core/eval/v2/checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
"""Checkpointing evaluation runs."""
import threading
import traceback

import langfun.core as lf
from langfun.core.eval.v2 import example as example_lib
Expand All @@ -27,6 +28,21 @@
class Checkpointer(experiment_lib.Plugin):
"""Base class for checkpointing evaluation examples."""

def on_experiment_start(self, experiment: Experiment):
if experiment.state.evaluated_examples:
experiment.info(
'Loaded %d examples from checkpoint files. Example IDs: %s' %
(
len(experiment.state.evaluated_examples),
list(sorted(experiment.state.evaluated_examples.keys()))
),
)
else:
experiment.info(
'No previous evaluated examples are loaded. '
f'Experiment {experiment.id} starts from scratch.'
)


class PerExampleCheckpointer(Checkpointer):
"""Checkpointer that saves each example to a separate file."""
Expand Down Expand Up @@ -68,10 +84,11 @@ def _load_state(ckpt_file):
_load_state, ckpt_files, max_workers=64,
):
if error is not None:
pg.logging.warning(
experiment.warning(
'Failed to load checkpoint file %s: %s. Skipping the file.',
ckpt_file, error
)
super().on_experiment_start(experiment)

def on_example_complete(
self,
Expand All @@ -80,7 +97,11 @@ def on_example_complete(
example: Example,
) -> None:
"""Saves the example to the checkpoint file."""
if not example.has_error:
if example.has_error:
experiment.warning(
f'Example {example.id} has error. Skipping checkpointing.'
)
else:
def save_state(example: Example):
writer = SequenceWriter(
runner.current_run.output_path_for(
Expand All @@ -91,8 +112,18 @@ def save_state(example: Example):
)
)
)
writer.add(example)
writer.close()
try:
writer.add(example)
writer.close()
experiment.info(
f'Example {example.id} is saved to {writer.path}.',
)
except BaseException as e: # pylint: disable=broad-except
experiment.error(
f'Failed to save example {example.id} to {writer.path}. '
f'Error: {e}, Stacktrace: \n{traceback.format_exc()}.',
)
raise e
runner.background_run(save_state, example)

def _file_prefix_and_ext(self, filename: str) -> tuple[str, str]:
Expand Down Expand Up @@ -164,6 +195,7 @@ def on_experiment_start(
with self._lock:
if self._sequence_writer is not None:
self._sequence_writer[experiment.id] = sequence_writer
super().on_experiment_start(experiment)

def on_experiment_complete(
self,
Expand All @@ -178,8 +210,12 @@ def on_experiment_complete(
if self._sequence_writer is not None:
# Make sure the writer is closed without delay so the file will be
# available immediately.
self._sequence_writer[experiment.id].close()
del self._sequence_writer[experiment.id]
writer = self._sequence_writer.pop(experiment.id)
writer.close()
experiment.info(
f'{len(experiment.state.evaluated_examples)} examples are '
f'checkpointed to {writer.path}.'
)

def on_example_complete(
self,
Expand All @@ -189,17 +225,36 @@ def on_example_complete(
) -> None:
"""Saves the example to the checkpoint file."""
assert experiment.id in self._sequence_writer
if not example.has_error:
runner.background_run(self._sequence_writer[experiment.id].add, example)
if example.has_error:
experiment.warning(
f'Example {example.id} has error. Skipping checkpointing.'
)
else:
def _save_example(example: Example):
writer = self._sequence_writer[experiment.id]
try:
writer.add(example)
except BaseException as e: # pylint: disable=broad-except
experiment.error(
f'Failed to save example {example.id} to {writer.path}. '
f'Error: {e}, Stacktrace: \n{traceback.format_exc()}.',
)
raise e
runner.background_run(_save_example, example)


class SequenceWriter:
"""Thread safe sequence writer."""

def __init__(self, path: str):
self._lock = threading.Lock()
self._path = path
self._sequence_writer = pg.io.open_sequence(path, 'w')

@property
def path(self) -> str:
return self._path

def add(self, example: Example):
example_blob = pg.to_json_str(
example,
Expand Down
35 changes: 21 additions & 14 deletions langfun/core/eval/v2/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,36 +285,43 @@ def _reset(self) -> None:
# Evaluation-level logging.
#

def _log(self, level: lf.logging.LogLevel, message: str, **kwargs):
def _log(self, log_func, level: lf.logging.LogLevel, message: str, **kwargs):
# Write to external logging system.
log_message = f'{self.id}: {message}'
if kwargs:
log_message = f'{log_message} (metadata: {kwargs!r})'
log_func(log_message)

# Add to experiment log history.
log_entry = lf.logging.LogEntry(
level=level,
time=datetime.datetime.now(),
message=message,
metadata=kwargs,
)
with self._log_lock:
self._log_entries.append(
lf.logging.LogEntry(
level=level,
time=datetime.datetime.now(),
message=message,
metadata=kwargs,
)
)
self._log_entries.append(log_entry)

def debug(self, message: str, **kwargs):
"""Logs a debug message to the session."""
self._log('debug', message, **kwargs)
self._log(pg.logging.debug, 'debug', message, **kwargs)

def info(self, message: str, **kwargs):
"""Logs an info message to the session."""
self._log('info', message, **kwargs)
self._log(pg.logging.info, 'info', message, **kwargs)

def warning(self, message: str, **kwargs):
"""Logs a warning message to the session."""
self._log('warning', message, **kwargs)
self._log(pg.logging.warning, 'warning', message, **kwargs)

def error(self, message: str, **kwargs):
"""Logs an error message to the session."""
self._log('error', message, **kwargs)
self._log(pg.logging.error, 'error', message, **kwargs)

def fatal(self, message: str, **kwargs):
"""Logs a fatal message to the session."""
self._log('fatal', message, **kwargs)
# We use error level for fatal message, which does not trigger assertion.
self._log(pg.logging.error, 'fatal', message, **kwargs)

#
# HTML views.
Expand Down
8 changes: 8 additions & 0 deletions langfun/core/eval/v2/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,14 @@ def on_experiment_complete(
) -> None:
"""Called when an experiment (both leaf and non-leaf) is complete."""

def on_experiment_abort(
self,
runner: Runner,
experiment: Experiment,
error: BaseException,
) -> None:
"""Called when an experiment (both leaf and non-leaf) is aborted."""

def on_example_start(
self,
runner: Runner,
Expand Down
83 changes: 58 additions & 25 deletions langfun/core/eval/v2/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Reporting evaluation results."""

import time
import traceback
from typing import Annotated

from langfun.core.eval.v2 import example as example_lib
Expand Down Expand Up @@ -61,6 +62,14 @@ def on_run_complete(
) -> None:
self._maybe_update_summary(runner, force=True)

def on_run_abort(
self,
runner: Runner,
root: Experiment,
error: BaseException
) -> None:
self._maybe_update_summary(runner, force=True)

def on_experiment_start(
self,
runner: Runner,
Expand All @@ -75,6 +84,16 @@ def on_experiment_complete(
if experiment.is_leaf:
self._maybe_update_experiment_html(runner, experiment, force=True)

def on_experiment_abort(
self,
runner: Runner,
experiment: Experiment,
error: BaseException
) -> None:
del error
assert experiment.is_leaf
self._maybe_update_experiment_html(runner, experiment, force=True)

def on_example_complete(
self, runner: Runner, experiment: Experiment, example: Example
):
Expand Down Expand Up @@ -103,19 +122,26 @@ def _maybe_update_experiment_html(
self, runner: Runner, experiment: Experiment, force: bool = False
) -> None:
def _save():
html = experiment.to_html(
collapse_level=None,
extra_flags=dict(
current_run=runner.current_run,
interactive=False,
card_view=False,
),
)
html.save(
runner.current_run.output_path_for(
experiment, _EVALULATION_DETAIL_FILE
)
index_html_path = runner.current_run.output_path_for(
experiment, _EVALULATION_DETAIL_FILE
)
try:
html = experiment.to_html(
collapse_level=None,
extra_flags=dict(
current_run=runner.current_run,
interactive=False,
card_view=False,
),
)
html.save(index_html_path)
except BaseException as e: # pylint: disable=broad-except
experiment.error(
f'Failed to save HTML {index_html_path!r}. '
f'Error: {e}, Stacktrace: \n{traceback.format_exc()}.',
)
raise e

if force or (
time.time() - self._last_experiment_report_time[experiment.id]
> self.experiment_report_interval
Expand All @@ -128,17 +154,24 @@ def _save_example_html(
) -> None:
"""Saves the example."""
def _save():
html = example.to_html(
collapse_level=None,
enable_summary_tooltip=False,
extra_flags=dict(
# For properly rendering the next link.
num_examples=getattr(experiment, 'num_examples', None)
),
)
html.save(
runner.current_run.output_path_for(
experiment, f'{example.id}.html'
)
)
try:
html = example.to_html(
collapse_level=None,
enable_summary_tooltip=False,
extra_flags=dict(
# For properly rendering the next link.
num_examples=getattr(experiment, 'num_examples', None)
),
)
html.save(
runner.current_run.output_path_for(
experiment, f'{example.id}.html'
)
)
except BaseException as e: # pylint: disable=broad-except
experiment.error(
f'Failed to save HTML {example.id}.html. '
f'Error: {e}, Stacktrace: \n{traceback.format_exc()}.',
)
raise e
runner.background_run(_save)
Loading

0 comments on commit f4dcfd1

Please sign in to comment.