diff --git a/trainer/io.py b/trainer/io.py index 6e08aea..8fc9a58 100644 --- a/trainer/io.py +++ b/trainer/io.py @@ -6,11 +6,12 @@ from pathlib import Path from typing import Any, Callable, Dict, List, Tuple, Union from urllib.parse import urlparse - +import mlflow import fsspec import torch from coqpit import Coqpit + from trainer.logger import logger @@ -161,10 +162,10 @@ def save_checkpoint( save_func=save_func, **kwargs, ) + if save_n_checkpoints is not None: keep_n_checkpoints(output_folder, save_n_checkpoints) - def save_best_model( current_loss, best_loss, @@ -180,6 +181,8 @@ def save_best_model( save_func=None, **kwargs, ): + print("current_loss: ",current_loss) + print("best_loss: ",best_loss) if current_loss < best_loss: best_model_name = f"best_model_{current_step}.pth" checkpoint_path = os.path.join(out_path, best_model_name) diff --git a/trainer/trainer.py b/trainer/trainer.py index 9c2c9ab..34b9f05 100644 --- a/trainer/trainer.py +++ b/trainer/trainer.py @@ -54,6 +54,7 @@ rank_zero_logger_info, rank_zero_only, ) +from trainer.utils.rclone import sync_data2s3bucket logger = logging.getLogger("trainer") @@ -413,6 +414,8 @@ def __init__( # pylint: disable=dangerous-default-value # create a new output folder name output_path = get_experiment_folder_path(config.output_path, config.run_name) os.makedirs(output_path, exist_ok=True) + + self.experiment_output_path = output_path # copy training assets to the output folder copy_model_files(config, output_path, new_fields) @@ -1484,6 +1487,7 @@ def train_epoch(self) -> None: loader_start_time = time.time() # TRAINING EPOCH -> iterate over the training samples batch_num_steps = len(self.train_loader) + intermediate_eval = False for cur_step, batch in enumerate(self.train_loader): outputs, _ = self.train_step(batch, batch_num_steps, cur_step, loader_start_time) if outputs is None: @@ -1494,7 +1498,9 @@ def train_epoch(self) -> None: # RUN EVAL -> run evaluation epoch in the middle of training. Useful for big datasets. if self.config.run_eval_steps is not None and (self.total_steps_done % self.config.run_eval_steps == 0): + intermediate_eval = True self.eval_epoch() + self.test_run() if self.num_gpus > 1: self.model.module.train() else: @@ -1521,6 +1527,8 @@ def train_epoch(self) -> None: self.dashboard_logger.train_epoch_stats(self.total_steps_done, epoch_stats) if self.config.model_param_stats: self.dashboard_logger.model_weights(self.model, self.total_steps_done) + if intermediate_eval: + sync_data2s3bucket(os.getenv("VOICE_GENERATION_RESULTS_BUCKET"), os.getenv("OUTPUT_PATH")) torch.cuda.empty_cache() ####################### @@ -1612,9 +1620,12 @@ def eval_epoch(self) -> None: self.eval_samples, verbose=True, ) - if self.config.run_eval - else None + # if self.config.run_eval or not isinstance(self.config.run_eval_steps,type(None)) + # else None ) + + print("self.config.run_eval_steps",self.config.run_eval_steps) + print("self.config.run_eval",self.config.run_eval) torch.set_grad_enabled(False) self.model.eval() @@ -1706,7 +1717,7 @@ def _restore_best_loss(self): logger.info(" > Restoring best loss from %s ...", os.path.basename(self.args.best_path)) ch = load_fsspec(self.args.restore_path, map_location="cpu") if "model_loss" in ch: - self.best_loss = ch["model_loss"] + self.best_loss = ch["model_loss"]["eval_loss"] if isinstance(ch["model_loss"],dict) else ch["model_loss"] logger.info(" > Starting with loaded last best loss %f", self.best_loss) def test(self, model=None, test_samples=None) -> None: @@ -1753,24 +1764,25 @@ def _fit(self) -> None: dist.barrier() self.callbacks.on_epoch_start(self) self.keep_avg_train = KeepAverage() - self.keep_avg_eval = KeepAverage() if self.config.run_eval else None + self.keep_avg_eval = KeepAverage() #if self.config.run_eval else None self.epochs_done = epoch self.c_logger.print_epoch_start(epoch, self.config.epochs, self.output_path) if not self.skip_train_epoch and not self.start_with_eval: self.train_epoch() - if self.config.run_eval: - self.eval_epoch() - if epoch >= self.config.test_delay_epochs and self.args.rank <= 0: - self.test_run() + # if self.config.run_eval: + # self.eval_epoch() + # if epoch >= self.config.test_delay_epochs and self.args.rank <= 0: + # self.test_run() self.c_logger.print_epoch_end( epoch, - self.keep_avg_eval.avg_values if self.config.run_eval else self.keep_avg_train.avg_values, + self.keep_avg_eval.avg_values #if self.config.run_eval else self.keep_avg_train.avg_values, ) if self.args.rank in [None, 0]: self.save_best_model() self.callbacks.on_epoch_end(self) self.start_with_eval = False + def fit_with_largest_batch_size(self, starting_batch_size=2048) -> None: cuda_meminfo() diff --git a/trainer/utils/rclone.py b/trainer/utils/rclone.py new file mode 100644 index 0000000..09fdf57 --- /dev/null +++ b/trainer/utils/rclone.py @@ -0,0 +1,26 @@ +import os +import subprocess + +def get_data_from_lakefs(repo_name:str, branch_name:str, target_folder:str): + command = ["rclone", "copy", + f"lakefs:{repo_name}/{branch_name}", target_folder] + result = subprocess.run(command, capture_output=True) + print("Rclone stderr:", result.stderr) + assert result.returncode == 0 + +def sync_data2s3bucket(bucket_name:str, source_folder:str, name_from_config:str="lakefs"): + command = ["rclone", "sync", source_folder, f"{name_from_config}:{bucket_name}"] + result = subprocess.run(command, capture_output=True) + print("Rclone stderr:", result.stderr) + assert result.returncode == 0 + +def copy_data_from_s3bucket(bucket_uri:str, target_folder:str): + splitted_bucket_uri = [s for s in bucket_uri.split("/") if s != ""] + if len(splitted_bucket_uri) > 1: + #assumption: only one folder is specified in the bucket_uri + target_folder = os.path.join(target_folder,splitted_bucket_uri[-1]) + command = ["rclone", "copy", bucket_uri, target_folder] + result = subprocess.run(command, capture_output=True) + print("Rclone stderr:", result.stderr) + assert result.returncode == 0 + return target_folder \ No newline at end of file