Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve lf.eval.v2 debugging. #379

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 63 additions & 8 deletions langfun/core/eval/v2/checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
"""Checkpointing evaluation runs."""
import threading
import traceback

import langfun.core as lf
from langfun.core.eval.v2 import example as example_lib
Expand All @@ -27,6 +28,21 @@
class Checkpointer(experiment_lib.Plugin):
"""Base class for checkpointing evaluation examples."""

def on_experiment_start(self, experiment: Experiment):
if experiment.state.evaluated_examples:
experiment.info(
'Loaded %d examples from checkpoint files. Example IDs: %s' %
(
len(experiment.state.evaluated_examples),
list(sorted(experiment.state.evaluated_examples.keys()))
),
)
else:
experiment.info(
'No previous evaluated examples are loaded. '
f'Experiment {experiment.id} starts from scratch.'
)


class PerExampleCheckpointer(Checkpointer):
"""Checkpointer that saves each example to a separate file."""
Expand Down Expand Up @@ -68,10 +84,11 @@ def _load_state(ckpt_file):
_load_state, ckpt_files, max_workers=64,
):
if error is not None:
pg.logging.warning(
experiment.warning(
'Failed to load checkpoint file %s: %s. Skipping the file.',
ckpt_file, error
)
super().on_experiment_start(experiment)

def on_example_complete(
self,
Expand All @@ -80,7 +97,11 @@ def on_example_complete(
example: Example,
) -> None:
"""Saves the example to the checkpoint file."""
if not example.has_error:
if example.has_error:
experiment.warning(
f'Example {example.id} has error. Skipping checkpointing.'
)
else:
def save_state(example: Example):
writer = SequenceWriter(
runner.current_run.output_path_for(
Expand All @@ -91,8 +112,18 @@ def save_state(example: Example):
)
)
)
writer.add(example)
writer.close()
try:
writer.add(example)
writer.close()
experiment.info(
f'Example {example.id} is saved to {writer.path}.',
)
except BaseException as e: # pylint: disable=broad-except
experiment.error(
f'Failed to save example {example.id} to {writer.path}. '
f'Error: {e}, Stacktrace: \n{traceback.format_exc()}.',
)
raise e
runner.background_run(save_state, example)

def _file_prefix_and_ext(self, filename: str) -> tuple[str, str]:
Expand Down Expand Up @@ -164,6 +195,7 @@ def on_experiment_start(
with self._lock:
if self._sequence_writer is not None:
self._sequence_writer[experiment.id] = sequence_writer
super().on_experiment_start(experiment)

def on_experiment_complete(
self,
Expand All @@ -178,8 +210,12 @@ def on_experiment_complete(
if self._sequence_writer is not None:
# Make sure the writer is closed without delay so the file will be
# available immediately.
self._sequence_writer[experiment.id].close()
del self._sequence_writer[experiment.id]
writer = self._sequence_writer.pop(experiment.id)
writer.close()
experiment.info(
f'{len(experiment.state.evaluated_examples)} examples are '
f'checkpointed to {writer.path}.'
)

def on_example_complete(
self,
Expand All @@ -189,17 +225,36 @@ def on_example_complete(
) -> None:
"""Saves the example to the checkpoint file."""
assert experiment.id in self._sequence_writer
if not example.has_error:
runner.background_run(self._sequence_writer[experiment.id].add, example)
if example.has_error:
experiment.warning(
f'Example {example.id} has error. Skipping checkpointing.'
)
else:
def _save_example(example: Example):
writer = self._sequence_writer[experiment.id]
try:
writer.add(example)
except BaseException as e: # pylint: disable=broad-except
experiment.error(
f'Failed to save example {example.id} to {writer.path}. '
f'Error: {e}, Stacktrace: \n{traceback.format_exc()}.',
)
raise e
runner.background_run(_save_example, example)


class SequenceWriter:
"""Thread safe sequence writer."""

def __init__(self, path: str):
self._lock = threading.Lock()
self._path = path
self._sequence_writer = pg.io.open_sequence(path, 'w')

@property
def path(self) -> str:
return self._path

def add(self, example: Example):
example_blob = pg.to_json_str(
example,
Expand Down
35 changes: 21 additions & 14 deletions langfun/core/eval/v2/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,36 +285,43 @@ def _reset(self) -> None:
# Evaluation-level logging.
#

def _log(self, level: lf.logging.LogLevel, message: str, **kwargs):
def _log(self, log_func, level: lf.logging.LogLevel, message: str, **kwargs):
# Write to external logging system.
log_message = f'{self.id}: {message}'
if kwargs:
log_message = f'{log_message} (metadata: {kwargs!r})'
log_func(log_message)

# Add to experiment log history.
log_entry = lf.logging.LogEntry(
level=level,
time=datetime.datetime.now(),
message=message,
metadata=kwargs,
)
with self._log_lock:
self._log_entries.append(
lf.logging.LogEntry(
level=level,
time=datetime.datetime.now(),
message=message,
metadata=kwargs,
)
)
self._log_entries.append(log_entry)

def debug(self, message: str, **kwargs):
"""Logs a debug message to the session."""
self._log('debug', message, **kwargs)
self._log(pg.logging.debug, 'debug', message, **kwargs)

def info(self, message: str, **kwargs):
"""Logs an info message to the session."""
self._log('info', message, **kwargs)
self._log(pg.logging.info, 'info', message, **kwargs)

def warning(self, message: str, **kwargs):
"""Logs a warning message to the session."""
self._log('warning', message, **kwargs)
self._log(pg.logging.warning, 'warning', message, **kwargs)

def error(self, message: str, **kwargs):
"""Logs an error message to the session."""
self._log('error', message, **kwargs)
self._log(pg.logging.error, 'error', message, **kwargs)

def fatal(self, message: str, **kwargs):
"""Logs a fatal message to the session."""
self._log('fatal', message, **kwargs)
# We use error level for fatal message, which does not trigger assertion.
self._log(pg.logging.error, 'fatal', message, **kwargs)

#
# HTML views.
Expand Down
8 changes: 8 additions & 0 deletions langfun/core/eval/v2/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,14 @@ def on_experiment_complete(
) -> None:
"""Called when an experiment (both leaf and non-leaf) is complete."""

def on_experiment_abort(
self,
runner: Runner,
experiment: Experiment,
error: BaseException,
) -> None:
"""Called when an experiment (both leaf and non-leaf) is aborted."""

def on_example_start(
self,
runner: Runner,
Expand Down
83 changes: 58 additions & 25 deletions langfun/core/eval/v2/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Reporting evaluation results."""

import time
import traceback
from typing import Annotated

from langfun.core.eval.v2 import example as example_lib
Expand Down Expand Up @@ -61,6 +62,14 @@ def on_run_complete(
) -> None:
self._maybe_update_summary(runner, force=True)

def on_run_abort(
self,
runner: Runner,
root: Experiment,
error: BaseException
) -> None:
self._maybe_update_summary(runner, force=True)

def on_experiment_start(
self,
runner: Runner,
Expand All @@ -75,6 +84,16 @@ def on_experiment_complete(
if experiment.is_leaf:
self._maybe_update_experiment_html(runner, experiment, force=True)

def on_experiment_abort(
self,
runner: Runner,
experiment: Experiment,
error: BaseException
) -> None:
del error
assert experiment.is_leaf
self._maybe_update_experiment_html(runner, experiment, force=True)

def on_example_complete(
self, runner: Runner, experiment: Experiment, example: Example
):
Expand Down Expand Up @@ -103,19 +122,26 @@ def _maybe_update_experiment_html(
self, runner: Runner, experiment: Experiment, force: bool = False
) -> None:
def _save():
html = experiment.to_html(
collapse_level=None,
extra_flags=dict(
current_run=runner.current_run,
interactive=False,
card_view=False,
),
)
html.save(
runner.current_run.output_path_for(
experiment, _EVALULATION_DETAIL_FILE
)
index_html_path = runner.current_run.output_path_for(
experiment, _EVALULATION_DETAIL_FILE
)
try:
html = experiment.to_html(
collapse_level=None,
extra_flags=dict(
current_run=runner.current_run,
interactive=False,
card_view=False,
),
)
html.save(index_html_path)
except BaseException as e: # pylint: disable=broad-except
experiment.error(
f'Failed to save HTML {index_html_path!r}. '
f'Error: {e}, Stacktrace: \n{traceback.format_exc()}.',
)
raise e

if force or (
time.time() - self._last_experiment_report_time[experiment.id]
> self.experiment_report_interval
Expand All @@ -128,17 +154,24 @@ def _save_example_html(
) -> None:
"""Saves the example."""
def _save():
html = example.to_html(
collapse_level=None,
enable_summary_tooltip=False,
extra_flags=dict(
# For properly rendering the next link.
num_examples=getattr(experiment, 'num_examples', None)
),
)
html.save(
runner.current_run.output_path_for(
experiment, f'{example.id}.html'
)
)
try:
html = example.to_html(
collapse_level=None,
enable_summary_tooltip=False,
extra_flags=dict(
# For properly rendering the next link.
num_examples=getattr(experiment, 'num_examples', None)
),
)
html.save(
runner.current_run.output_path_for(
experiment, f'{example.id}.html'
)
)
except BaseException as e: # pylint: disable=broad-except
experiment.error(
f'Failed to save HTML {example.id}.html. '
f'Error: {e}, Stacktrace: \n{traceback.format_exc()}.',
)
raise e
runner.background_run(_save)
Loading
Loading