diff --git a/onmt/bin/train_profile.py b/onmt/bin/train_profile.py index c455f9c3dd..b0c3b1dc64 100644 --- a/onmt/bin/train_profile.py +++ b/onmt/bin/train_profile.py @@ -11,14 +11,20 @@ from onmt.utils.logging import init_logger, logger from onmt.train_single import main as single_main from onmt.utils.parse import ArgumentParser -from onmt.inputters.inputter import build_dataset_iter, \ - load_old_vocab, old_style_vocab, build_dataset_iter_multiple +from onmt.inputters.inputter import ( + build_dataset_iter, + load_old_vocab, + old_style_vocab, + build_dataset_iter_multiple, +) from itertools import cycle import torch.cuda.profiler as profiler import pyprof2 + pyprof2.init() + def train(opt): ArgumentParser.validate_train_opts(opt) ArgumentParser.update_model_opts(opt) @@ -26,19 +32,19 @@ def train(opt): # Load checkpoint if we resume from a previous training. if opt.train_from: - logger.info('Loading checkpoint from %s' % opt.train_from) - checkpoint = torch.load(opt.train_from, - map_location=lambda storage, loc: storage) - logger.info('Loading vocab from checkpoint at %s.' % opt.train_from) - vocab = checkpoint['vocab'] + logger.info("Loading checkpoint from %s" % opt.train_from) + checkpoint = torch.load( + opt.train_from, map_location=lambda storage, loc: storage + ) + logger.info("Loading vocab from checkpoint at %s." % opt.train_from) + vocab = checkpoint["vocab"] else: - vocab = torch.load(opt.data + '.vocab.pt') + vocab = torch.load(opt.data + ".vocab.pt") # check for code where vocab is saved instead of fields # (in the future this will be done in a smarter way) if old_style_vocab(vocab): - fields = load_old_vocab( - vocab, opt.model_type, dynamic_dict=opt.copy_attn) + fields = load_old_vocab(vocab, opt.model_type, dynamic_dict=opt.copy_attn) else: fields = vocab @@ -49,7 +55,7 @@ def train(opt): train_shards.append(shard_base) train_iter = build_dataset_iter_multiple(train_shards, fields, opt) else: - if opt.data_ids[0] is not None and opt.data_ids[0] != 'None': + if opt.data_ids[0] is not None and opt.data_ids[0] != "None": shard_base = "train_" + opt.data_ids[0] else: shard_base = "train" @@ -59,7 +65,7 @@ def train(opt): if opt.world_size > 1: queues = [] - mp = torch.multiprocessing.get_context('spawn') + mp = torch.multiprocessing.get_context("spawn") semaphore = mp.Semaphore(opt.world_size * opt.queue_size) # Create a thread to listen for errors in the child processes. error_queue = mp.SimpleQueue() @@ -69,14 +75,26 @@ def train(opt): for device_id in range(nb_gpu): q = mp.Queue(opt.queue_size) queues += [q] - procs.append(mp.Process(target=run, args=( - opt, device_id, error_queue, q, semaphore), daemon=True)) + procs.append( + mp.Process( + target=run, + args=(opt, device_id, error_queue, q, semaphore), + daemon=True, + ) + ) procs[device_id].start() logger.info(" Starting process pid: %d " % procs[device_id].pid) error_handler.add_child(procs[device_id].pid) - producer = mp.Process(target=batch_producer, - args=(train_iter, queues, semaphore, opt,), - daemon=True) + producer = mp.Process( + target=batch_producer, + args=( + train_iter, + queues, + semaphore, + opt, + ), + daemon=True, + ) producer.start() error_handler.add_child(producer.pid) @@ -86,7 +104,7 @@ def train(opt): elif nb_gpu == 1: # case 1 GPU only single_main(opt, 0) - else: # case only CPU + else: # case only CPU single_main(opt, -1) @@ -104,8 +122,7 @@ def pred(x): if x[0] % opt.world_size == rank: return True - generator_to_serve = filter( - pred, enumerate(generator_to_serve)) + generator_to_serve = filter(pred, enumerate(generator_to_serve)) def next_batch(device_id): new_batch = next(generator_to_serve) @@ -117,16 +134,17 @@ def next_batch(device_id): for device_id, q in cycle(enumerate(queues)): b.dataset = None if isinstance(b.src, tuple): - b.src = tuple([_.to(torch.device(device_id)) - for _ in b.src]) + b.src = tuple([_.to(torch.device(device_id)) for _ in b.src]) else: b.src = b.src.to(torch.device(device_id)) b.tgt = b.tgt.to(torch.device(device_id)) b.indices = b.indices.to(torch.device(device_id)) - b.alignment = b.alignment.to(torch.device(device_id)) \ - if hasattr(b, 'alignment') else None - b.src_map = b.src_map.to(torch.device(device_id)) \ - if hasattr(b, 'src_map') else None + b.alignment = ( + b.alignment.to(torch.device(device_id)) if hasattr(b, "alignment") else None + ) + b.src_map = ( + b.src_map.to(torch.device(device_id)) if hasattr(b, "src_map") else None + ) # hack to dodge unpicklable `dict_keys` b.fields = list(b.fields) @@ -135,18 +153,21 @@ def next_batch(device_id): def run(opt, device_id, error_queue, batch_queue, semaphore): - """ run process """ + """run process""" try: gpu_rank = onmt.utils.distributed.multi_init(opt, device_id) if gpu_rank != opt.gpu_ranks[device_id]: - raise AssertionError("An error occurred in \ - Distributed initialization") + raise AssertionError( + "An error occurred in \ + Distributed initialization" + ) single_main(opt, device_id, batch_queue, semaphore) except KeyboardInterrupt: pass # killed by parent, do nothing except Exception: # propagate exception to parent process, keeping original traceback import traceback + error_queue.put((opt.gpu_ranks[device_id], traceback.format_exc())) @@ -155,28 +176,28 @@ class ErrorHandler(object): the tracebacks to the parent process.""" def __init__(self, error_queue): - """ init error handler """ + """init error handler""" import signal import threading + self.error_queue = error_queue self.children_pids = [] - self.error_thread = threading.Thread( - target=self.error_listener, daemon=True) + self.error_thread = threading.Thread(target=self.error_listener, daemon=True) self.error_thread.start() signal.signal(signal.SIGUSR1, self.signal_handler) def add_child(self, pid): - """ error handler """ + """error handler""" self.children_pids.append(pid) def error_listener(self): - """ error listener """ + """error listener""" (rank, original_trace) = self.error_queue.get() self.error_queue.put((rank, original_trace)) os.kill(os.getpid(), signal.SIGUSR1) def signal_handler(self, signalnum, stackframe): - """ signal handler """ + """signal handler""" for pid in self.children_pids: os.kill(pid, signal.SIGINT) # kill children processes (rank, original_trace) = self.error_queue.get() @@ -187,7 +208,7 @@ def signal_handler(self, signalnum, stackframe): def _get_parser(): - parser = ArgumentParser(description='train.py') + parser = ArgumentParser(description="train.py") opts.config_opts(parser) opts.model_opts(parser) @@ -203,4 +224,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/onmt/opts.py b/onmt/opts.py index 4fc9e7f93f..9da86e1a09 100644 --- a/onmt/opts.py +++ b/onmt/opts.py @@ -112,27 +112,51 @@ def _add_logging_opts(parser, is_train=True): help="Log directory for Tensorboard. " "This is also the name of the run.", ) # Use MLflow for logging training runs and config parameters - group.add('--mlflow', '-mlflow', action="store_true", - help="Use mlflow to log training runs and parameters. " - "Must have the library mlflow >= 1.3.0") - group.add("--mlflow_experiment_name", "-mlflow_experiment_name", - type=str, default=None, - help="MLflow experiment name") - group.add("--mlflow_run_name", "-mlflow_run_name", - type=str, default=None, - help="MLflow run name") + group.add( + "--mlflow", + "-mlflow", + action="store_true", + help="Use mlflow to log training runs and parameters. " + "Must have the library mlflow >= 1.3.0", + ) + group.add( + "--mlflow_experiment_name", + "-mlflow_experiment_name", + type=str, + default=None, + help="MLflow experiment name", + ) + group.add( + "--mlflow_run_name", + "-mlflow_run_name", + type=str, + default=None, + help="MLflow run name", + ) # Use MLflow for logging training runs and config parameters # https://docs.wandb.ai/guides/track/advanced/environment-variables # should be set: WANDB_API_KEY / WANDB_BASE_URL - group.add('--wandb', '-wandb', action="store_true", - help="Use wandb to log training runs and parameters. ") - group.add("--wandb_project_name", "-wandb_project_name", - type=str, default=None, - help="wandb experiment name") - group.add("--wandb_run_name", "-wandb_run_name", - type=str, default=None, - help="wandb run name") + group.add( + "--wandb", + "-wandb", + action="store_true", + help="Use wandb to log training runs and parameters. ", + ) + group.add( + "--wandb_project_name", + "-wandb_project_name", + type=str, + default=None, + help="wandb experiment name", + ) + group.add( + "--wandb_run_name", + "-wandb_run_name", + type=str, + default=None, + help="wandb run name", + ) group.add( "--override_opts", "-override-opts", @@ -1904,8 +1928,12 @@ def translate_opts(parser): help="Path to output the predictions (each line will " "be the decoded sequence", ) - group.add('--log_probs', '-log_probs', action='store_true', - help="Output file with log_probs and gold_score ") + group.add( + "--log_probs", + "-log_probs", + action="store_true", + help="Output file with log_probs and gold_score ", + ) group.add( "--report_align", "-report_align", @@ -1954,8 +1982,12 @@ def translate_opts(parser): ) group.add("--gpu", "-gpu", type=int, default=-1, help="Device to run on") - group.add('--num_threads', '-num_threads', type=int, - help="Number of CPUs to use for translation") + group.add( + "--num_threads", + "-num_threads", + type=int, + help="Number of CPUs to use for translation", + ) group.add( "-transforms", @@ -1966,9 +1998,12 @@ def translate_opts(parser): help="Default transform pipeline to apply to data.", ) - group = parser.add_argument_group('ibmrxn') - group.add_argument('--is_ibmrxn', action='store_true', - help='Translate returns in a format that is compatible with the api') + group = parser.add_argument_group("ibmrxn") + group.add_argument( + "--is_ibmrxn", + action="store_true", + help="Translate returns in a format that is compatible with the api", + ) # Adding options related to Transforms _add_transform_opts(parser) diff --git a/onmt/train_single.py b/onmt/train_single.py index 6b1a99b882..236dee1933 100644 --- a/onmt/train_single.py +++ b/onmt/train_single.py @@ -212,14 +212,13 @@ def main(opt, device_id, batch_queue=None, semaphore=None): train_iter = build_dataset_iter_multiple(train_shards, fields, opt) else: if opt.data_ids[0] is not None: - if opt.data_ids[0] is not None and opt.data_ids[0] != 'None': + if opt.data_ids[0] is not None and opt.data_ids[0] != "None": shard_base = "train_" + opt.data_ids[0] else: shard_base = "train" train_iter = build_dataset_iter(shard_base, fields, opt) else: - assert semaphore is not None, \ - "Using batch_queue requires semaphore as well" + assert semaphore is not None, "Using batch_queue requires semaphore as well" def _train_iter(): while True: @@ -228,12 +227,11 @@ def _train_iter(): yield batch train_iter = _train_iter() - valid_iter = build_dataset_iter( - "valid", fields, opt, is_train=False) + valid_iter = build_dataset_iter("valid", fields, opt, is_train=False) if len(opt.gpu_ranks): - logger.info('Starting training on GPU: %s' % opt.gpu_ranks) + logger.info("Starting training on GPU: %s" % opt.gpu_ranks) else: - logger.info('Starting training on CPU, could be very slow') + logger.info("Starting training on CPU, could be very slow") train_steps = opt.train_steps if opt.single_pass and train_steps > 0: logger.warning("Option single_pass is enabled, ignoring train_steps.") @@ -274,6 +272,7 @@ def _train_iter(): # added for mlflow integration if opt.mlflow: import mlflow + if opt.mlflow_experiment_name is not None: mlflow.set_experiment(opt.mlflow_experiment_name) if opt.mlflow_run_name is not None: @@ -281,28 +280,34 @@ def _train_iter(): else: mlflow.start_run() for k, v in vars(opt).items(): - mlflow.log_param(k, v) - mlflow.log_param('n_enc_parameters', enc) - mlflow.log_param('n_dec_parameters', dec) - mlflow.log_param('n_total_parameters', n_params) + mlflow.log_param(k, v) + mlflow.log_param("n_enc_parameters", enc) + mlflow.log_param("n_dec_parameters", dec) + mlflow.log_param("n_total_parameters", n_params) import onmt - mlflow.log_param('onmt_version', onmt.__version__) + + mlflow.log_param("onmt_version", onmt.__version__) elif opt.wandb: import wandb + init_dict = {} if opt.wandb_project_name is not None: - init_dict['project'] = opt.wandb_project_name + init_dict["project"] = opt.wandb_project_name if opt.wandb_run_name is not None: - init_dict['name'] = opt.wandb_run_name + init_dict["name"] = opt.wandb_run_name wandb.init(**init_dict) - wandb.config.update({k:v for k, v in vars(opt).items()}) + wandb.config.update({k: v for k, v in vars(opt).items()}) import onmt - wandb.config.update({'n_enc_parameters': enc, - 'n_dec_parameters': dec, - 'n_total_parameters': n_params, - 'onmt_version': onmt.__version__ - }) + + wandb.config.update( + { + "n_enc_parameters": enc, + "n_dec_parameters": dec, + "n_total_parameters": n_params, + "onmt_version": onmt.__version__, + } + ) trainer.train( train_iter, diff --git a/onmt/translate/translator.py b/onmt/translate/translator.py index 7ab961efb2..7e3acf57fb 100644 --- a/onmt/translate/translator.py +++ b/onmt/translate/translator.py @@ -21,9 +21,9 @@ def build_translator(opt, device_id=0, report_score=True, logger=None, out_file=None): - #phs: - log_probs_out_file=None - target_score_out_file=None + # phs: + log_probs_out_file = None + target_score_out_file = None # if out_file is None: @@ -31,8 +31,10 @@ def build_translator(opt, device_id=0, report_score=True, logger=None, out_file= # phs: create files to log log probabilities and gold score. if opt.log_probs: - log_probs_out_file = codecs.open(opt.output + '_log_probs', 'w+', 'utf-8') - target_score_out_file = codecs.open(opt.output + '_gold_score', 'w+', 'utf-8') + log_probs_out_file = codecs.open(opt.output + "_log_probs", "w+", "utf-8") + target_score_out_file = codecs.open( + opt.output + "_gold_score", "w+", "utf-8" + ) # load_test_model = ( @@ -57,7 +59,7 @@ def build_translator(opt, device_id=0, report_score=True, logger=None, out_file= report_score=report_score, logger=logger, log_probs_out_file=log_probs_out_file, - target_score_out_file=target_score_out_file + target_score_out_file=target_score_out_file, ) else: translator = Translator.from_opt( @@ -71,7 +73,7 @@ def build_translator(opt, device_id=0, report_score=True, logger=None, out_file= report_score=report_score, logger=logger, log_probs_out_file=log_probs_out_file, - target_score_out_file=target_score_out_file + target_score_out_file=target_score_out_file, ) return translator @@ -149,9 +151,9 @@ def __init__( seed=-1, with_score=False, return_gold_log_probs=False, - log_probs_out_file=None, # added phs - target_score_out_file=None, # added phs - is_ibmrxn=False # added phs + log_probs_out_file=None, # added phs + target_score_out_file=None, # added phs + is_ibmrxn=False, # added phs ): self.model = model self.vocabs = vocabs @@ -221,9 +223,9 @@ def __init__( "log_probs": [], } - self.log_probs_out_file = log_probs_out_file # added phs - self.target_score_out_file = target_score_out_file # added phs - self.is_ibmrxn = is_ibmrxn # added phs + self.log_probs_out_file = log_probs_out_file # added phs + self.target_score_out_file = target_score_out_file # added phs + self.is_ibmrxn = is_ibmrxn # added phs set_random_seed(seed, self._use_cuda) self.with_score = with_score @@ -242,8 +244,8 @@ def from_opt( report_align=False, report_score=True, logger=None, - log_probs_out_file=None, # added phs - target_score_out_file=None # added phs + log_probs_out_file=None, # added phs + target_score_out_file=None, # added phs ): """Alternate constructor. @@ -298,9 +300,9 @@ def from_opt( logger=logger, seed=opt.seed, with_score=opt.with_score, - log_probs_out_file=log_probs_out_file, # added phs - target_score_out_file=target_score_out_file, # added phs - is_ibmrxn = opt.is_ibmrxn # added_phs + log_probs_out_file=log_probs_out_file, # added phs + target_score_out_file=target_score_out_file, # added phs + is_ibmrxn=opt.is_ibmrxn, # added_phs ) def _log(self, msg): @@ -322,16 +324,16 @@ def _gold_score( glp = None return gs, glp - def likelihood( - self, - src, - tgt=None, - src_dir=None, - batch_size=None, - batch_type="sents", - attn_debug=False, - phrase_table=""): + self, + src, + tgt=None, + src_dir=None, + batch_size=None, + batch_type="sents", + attn_debug=False, + phrase_table="", + ): """Translate content of ``src`` and get gold scores from ``tgt``. Args: src: See :func:`self.src_reader.read()`. @@ -352,12 +354,11 @@ def likelihood( data = inputters.Dataset( self.fields, - readers=([self.src_reader, self.tgt_reader] - if tgt else [self.src_reader]), + readers=([self.src_reader, self.tgt_reader] if tgt else [self.src_reader]), data=[("src", src), ("tgt", tgt)] if tgt else [("src", src)], dirs=[src_dir, None] if tgt else [src_dir], sort_key=inputters.str2sortkey[self.data_type], - filter_pred=self._filter_pred + filter_pred=self._filter_pred, ) data_iter = inputters.OrderedIterator( @@ -368,7 +369,7 @@ def likelihood( train=False, sort=False, sort_within_batch=True, - shuffle=False + shuffle=False, ) all_gold_scores = [] @@ -380,22 +381,29 @@ def likelihood( # import pdb; pdb.set_trace() # (0) Prep the components of the search. - # (1) Run the encoder on the src. src, enc_states, memory_bank, src_lengths = self._run_encoder(batch) self.model.decoder.init_state(src, memory_bank, enc_states) - gold_scores = self._gold_score( - batch, memory_bank, src_lengths, data.src_vocabs, use_src_map, - enc_states, batch_size, src) + batch, + memory_bank, + src_lengths, + data.src_vocabs, + use_src_map, + enc_states, + batch_size, + src, + ) gold_scores = gold_scores.detach().numpy().tolist() - all_gold_scores += [score for _, score in sorted(zip(batch.indices.numpy().tolist(), gold_scores))] + all_gold_scores += [ + score + for _, score in sorted(zip(batch.indices.numpy().tolist(), gold_scores)) + ] return all_gold_scores - def _translate( self, infer_iter, @@ -438,7 +446,7 @@ def _translate( all_scores = [] all_predictions = [] if self.is_ibmrxn: - all_attentions = [] # added phs + all_attentions = [] # added phs attn_debug = True start_time = time() @@ -552,7 +560,11 @@ def _process_bucket(bucket_translations): # phs: added to log log probs to file if self.log_probs_out_file is not None: self.log_probs_out_file.write( - '\n'.join([str(t.item()) for t in trans.pred_scores[:self.n_best]]) + '\n') + "\n".join( + [str(t.item()) for t in trans.pred_scores[: self.n_best]] + ) + + "\n" + ) self.log_probs_out_file.flush() # @@ -642,7 +654,7 @@ def _process_bucket(bucket_translations): bucket_translations = [] if self.is_ibmrxn: - all_attentions.append(trans.attns[0]) # added phs + all_attentions.append(trans.attns[0]) # added phs if len(bucket_translations) > 0: ( @@ -662,16 +674,17 @@ def _process_bucket(bucket_translations): # phs: added to log gold scores to file if self.target_score_out_file is not None: - self.target_score_out_file.write( - str(trans.gold_score.item()) + '\n') + self.target_score_out_file.write(str(trans.gold_score.item()) + "\n") self.target_score_out_file.flush() # if self.is_ibmrxn: # added phs return { - 'score': all_scores if batch_size > 1 else all_scores[0], # return more scores when batch_size > 1 - 'prediction': all_predictions, - 'context_attns': all_attentions + "score": all_scores + if batch_size > 1 + else all_scores[0], # return more scores when batch_size > 1 + "prediction": all_predictions, + "context_attns": all_attentions, } end_time = time() diff --git a/onmt/utils/report_manager.py b/onmt/utils/report_manager.py index 20ed8a262d..ebcdddf397 100644 --- a/onmt/utils/report_manager.py +++ b/onmt/utils/report_manager.py @@ -76,15 +76,11 @@ def report_training( # after logging the validation stats with the progress_step # because of the too "old" logs. if isinstance(self.tensorboard_writer, WandbSummaryWriter): - self.maybe_log_tensorboard(report_stats, - "progress", - learning_rate, - step) + self.maybe_log_tensorboard(report_stats, "progress", learning_rate, step) else: # default onmt behaviour - self.maybe_log_tensorboard(report_stats, - "progress", - learning_rate, - self.progress_step) + self.maybe_log_tensorboard( + report_stats, "progress", learning_rate, self.progress_step + ) if step % self.report_every == 0: if multigpu: @@ -180,13 +176,15 @@ class MLflowSummaryWriter(object): """ Map Summarywriter add_scalar function to mlflow log_metric """ + def __init__(self): pass def add_scalar(self, tag, scalar_value, global_step=None): import mlflow + # mlflow cannot display metric that include '/' char - tag = tag.replace('/', '_') + tag = tag.replace("/", "_") mlflow.log_metric(tag, scalar_value, step=global_step) @@ -194,11 +192,13 @@ class WandbSummaryWriter(object): """ Map Summarywriter add_scalar function to mlflow log_metric """ + def __init__(self): pass def add_scalar(self, tag, scalar_value, global_step=None): import wandb + # mlflow cannot display metric that include '/' char - tag = tag.replace('/', '_') + tag = tag.replace("/", "_") wandb.log({tag: scalar_value}, step=global_step)