diff --git a/src/dunedn/README.md b/src/dunedn/README.md index 0c619a2..ae28816 100644 --- a/src/dunedn/README.md +++ b/src/dunedn/README.md @@ -2,6 +2,8 @@ The DUNEdn source is organized in the following sub-packages: + - distributed + Implementation of distributed training over multiple nodes. - geometry ProtoDUNE-SP geometry parameters and helper functions for event data management. - inference diff --git a/src/dunedn/distributed/distributed.py b/src/dunedn/distributed/distributed.py new file mode 100644 index 0000000..eb07b05 --- /dev/null +++ b/src/dunedn/distributed/distributed.py @@ -0,0 +1,75 @@ +# This file is part of DUNEdn by M. Rossi +import os +import argparse +from pathlib import Path +from shutil import copyfile +from time import time as tm +import numpy as np +import random + +import torch +import torch.distributed as dist + +from dunedn.training.dataloader import CropLoader, PlaneLoader +from dunedn.training.args import Args +from dunedn.training.train import train +from dunedn.networks.models import GCNN_Net +from dunedn.utils.utils import print_summary_file +from dunedn.utils.utils import get_configcard + + +def set_random_seed(random_seed=0): + torch.manual_seed(random_seed) + torch.backends.cudnn.deterministic = True + torch.backends.cudnn.benchmark = False + np.random.seed(random_seed) + random.seed(random_seed) + + +def add_arguments_distributed_training(parser): + parser = argparse.ArgumentParser() + parser.add_argument( + "--configcard", + type=Path, + help="yaml config file path", + default="default_config.yaml", + ) + parser.add_argument("--local_rank", default=0, type=int, help="Distributed utility") + parser.add_argument( + "--local_world_size", default=1, type=int, help="Distributed utility" + ) + parser.set_defaults(func=training_distributed) + + +def training_distributed(args): + args = vars(args) + args.pop("func") + dist.init_process_group(backend="nccl") + parameters = get_configcard(args.configcard) + parameters["local_rank"] = args.local_rank + parameters["local_world_size"] = args.local_world_size + parameters["rank"] = dist.get_rank() + parameters.update(args) + args = Args(**parameters) + args.build_directories() + if args.rank == 0: + print_summary_file(args) + start = tm() + main_distributed_training(args) + if args.rank == 0: + print(f"[{os.getpid()}] Process done in {tm()-start}") + copyfile(args.runcard, args.dir_output / "input_runcard.yaml") + dist.destroy_process_group() + + +def main_distributed_training(args): + n = torch.cuda.device_count() // args.local_world_size + args.dev_ids = list(range(args.local_rank * n, (args.local_rank + 1) * n)) + # load datasets + set_random_seed(0) + train_data = CropLoader(args) + val_data = PlaneLoader(args, "val") + # model + model = GCNN_Net(args) + # train + return train(args, train_data, val_data, model) diff --git a/src/dunedn/distributed/launch_distributed.sh b/src/dunedn/distributed/launch_distributed.sh new file mode 100755 index 0000000..39e350a --- /dev/null +++ b/src/dunedn/distributed/launch_distributed.sh @@ -0,0 +1,111 @@ +#!/usr/bin/bash + +### Example command to run: >>> ./launch.sh "3 3 3"### + +# This script launches the distributed training from the local node +# It is possible to choose beetwen different setups, editing config section +# The user must input the number of gpus he wants to use for each node: +# (# ibmminsky-1, # ibmmisnky-2, # ibmminsky-3) +# Note: each node must have the same number of spawned processes, equal or less +# than the # of GPUs on the node, otherwise everything will break ! +# The master node will be automatically the last nonzero node with GPU usage +# It is mandatory to launch the job from the master node + +# General comment about bash +# ${} parameter expansion +# $() command substitution +# <() or >() process substitution +# ${EXPR} before $(command) before <(process) + +# config +email="marco.rossi@cern.ch" +workdir="/nfs/public/romarco/DUNEreco" +directory="${workdir}/denoising" +logdir="${directory}/logdir" + +setenv="source /afs/cern.ch/user/r/romarco/setup_wmla" +$setenv +launch=$(python -c "import torch.distributed.launch as t; print(t.__file__)") +main=${directory}/denoise.py + +minsky_IPs=(128.142.165.77 \ + 128.142.165.78 \ + 128.142.165.79) + +gpus=($1) + +# filter unused nodes + +num_gpus=0 +for idx in ${!gpus[*]}; do + if [ ${gpus[$idx]} -eq 0 ]; then + unset gpus[$idx] + fi +done + +nnodes=${#gpus[*]} + +if [ $nnodes -eq 0 ]; then + echo "Error: Number of GPUs must be greater than zero" + exit -1 +else + echo "Running on $(($nnodes * ${gpus[-1]})) GPUs" +fi + +hosts=(${!gpus[*]}) +master_addr=${minsky_IPs[${hosts[-1]}]} +#add 1 to hosts +for idx in ${!hosts[*]}; do + hosts[$idx]=$((${hosts[$idx]}+1)) +done + +if [ ! -d $logdir ]; then + mkdir $logdir +fi +touchtmp="mktemp -p $logdir" + +function job_func(){ + job="python $launch --nproc_per_node=$2 --nnodes=$nnodes --node_rank=$1 \ + --master_addr=$master_addr $main --local_world_size=$2" +} +trap 'rm -f "$logdir"/tmp*' EXIT # automatic clean of tmp files +separator="\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n\n" + +log=${logdir}/log.txt +err=${logdir}/err.txt +if [[ -f $log && -f $err ]]; then + rm $log $err +fi + +rank=$(( $nnodes - 1 )) +rank_h=0 +for gpu in ${gpus[*]}; do + host=${hosts[$rank_h]} + job_func $rank $gpu + errfiles[$rank]=$($touchtmp) + echo -e "ibmminsky-$host\nErrfile" | tee ${errfiles[$rank]} >/dev/null + if [ $rank -gt 0 ]; then + job="$setenv;cd $workdir;$job" + nohup ssh -K ibmminsky-$host $job 1>/dev/null \ + 2>>${errfiles[$rank]} & + else + $job 1>>${log} 2>>${errfiles[$rank]} + returncode=$? + fi + rank=$(( $rank - 1 )) + rank_h=$(( $rank_h + 1 )) +done + +for idx in ${!errfiles[*]}; do + cat ${errfiles[$idx]} <(echo -e $separator) >> $err +done + +# send logs to email +if [[ 0 -ne "$returncode" ]]; then + echo FAIL + cat "$err" | mailx -s "Job failed with exit code $returncode" \ + -a "$log" "$email" +else + echo SUCCESS; + cat "$log" | mailx -s "Job succeeded" "$email" +fi