From 91b6cb2107e6b0c2ce0529d560df878cdd388d07 Mon Sep 17 00:00:00 2001 From: siqi chai Date: Tue, 23 Jul 2024 05:37:58 +0800 Subject: [PATCH 1/5] multi-nodes multi-gpus training --- alf/bin/train.py | 136 +++++++++++++++++++----- alf/environments/process_environment.py | 14 ++- alf/utils/per_process_context.py | 8 +- alf/utils/spawned_process_utils.py | 1 + 4 files changed, 124 insertions(+), 35 deletions(-) diff --git a/alf/bin/train.py b/alf/bin/train.py index 5665da3a5..bcd13177a 100644 --- a/alf/bin/train.py +++ b/alf/bin/train.py @@ -77,12 +77,13 @@ def _define_flags(): flags.DEFINE_bool( 'force_torch_deterministic', True, 'torch.use_deterministic_algorithms when random_seed is set') - flags.DEFINE_bool('store_snapshot', True, + flags.DEFINE_bool('store_snapshot', False, 'Whether store an ALF snapshot before training') flags.DEFINE_enum( - 'distributed', 'none', ['none', 'multi-gpu'], + 'distributed', 'none', ['none', 'multi-gpu', 'multi-node-multi-gpu'], 'Set whether and how to run trainning in distributed mode.') flags.mark_flag_as_required('root_dir') + flags.DEFINE_integer('local-rank', None, 'Local rank passed from distributed launcher') FLAGS = flags.FLAGS @@ -98,7 +99,6 @@ def _setup_logging(rank: int, log_dir: str): FLAGS.alsologtostderr = True logging.set_verbosity(logging.INFO) logging.get_absl_handler().use_absl_log_file(log_dir=log_dir) - logging.use_absl_handler() def _setup_device(rank: int = 0): @@ -116,12 +116,13 @@ def _setup_device(rank: int = 0): torch.cuda.set_device(rank) -def _train(root_dir, rank=0, world_size=1): +def _train(root_dir, local_rank=-1, rank=0, world_size=1): """Launch the trainer after the conf file has been parsed. This function could be called by grid search after the config has been modified. Args: root_dir (str): Path to the directory for writing logs/summaries/checkpoints. + local_rank (int): The ID of the process within current node rank (int): The ID of the process among all of the DDP processes. For non-distributed training, this id should be 0. world_size (int): The number of processes in total. If set to 1, it is @@ -133,6 +134,8 @@ def _train(root_dir, rank=0, world_size=1): if trainer_conf.ml_type == 'rl': ddp_rank = rank if world_size > 1 else -1 + if ddp_rank > -1 and local_rank > -1: + ddp_rank = local_rank trainer = policy_trainer.RLTrainer(trainer_conf, ddp_rank) elif trainer_conf.ml_type == 'sl': # NOTE: SLTrainer does not support distributed training yet @@ -146,13 +149,6 @@ def _train(root_dir, rank=0, world_size=1): trainer.train() -def _training_worker_helper(rank: int, *args, **kwargs): - # Helper to start the training worker with the correct rank - # so that rank 0 is from the main process and the rest are - # from the spawned processes. - training_worker(rank + 1, *args, **kwargs) - - def training_worker(rank: int, world_size: int, conf_file: str, @@ -176,13 +172,70 @@ def training_worker(rank: int, # Specialization for distributed mode dist.init_process_group('nccl', rank=rank, world_size=world_size) # Recover the flags when spawned as a sub process - if rank > 0: - _define_flags() - FLAGS(sys.argv, known_only=True) - FLAGS.mark_as_parsed() + _define_flags() + FLAGS(sys.argv, known_only=True) + FLAGS.mark_as_parsed() + # Set the rank and total number of processes for distributed training. + PerProcessContext().set_distributed( + rank=rank, local_rank=-1, num_processes=world_size) + assert paras_queue is not None + PerProcessContext().set_paras_queue(paras_queue) + + # Make PerProcessContext read-only. + PerProcessContext().finalize() + + # Parse the configuration file, which will also implicitly bring up the environments. + common.parse_conf_file(conf_file) + _train(root_dir=root_dir, rank=rank, world_size=world_size) + except KeyboardInterrupt: + pass + except Exception as e: + if world_size >= 1: + # If the training worker is running as a process in multiprocessing + # environment, this will make sure that the exception raised in this + # particular process is captured and shown. + logging.exception(f'{mp.current_process().name} - {e}') + raise e + finally: + # Note that each training worker will have its own child processes + # running the environments. In the case when training worker process + # finishes ealier (e.g. when it raises an exception), it will hang + # instead of quitting unless all child processes are killed. + alf.close_env() + + + +def training_worker_multi_node(local_rank: int, + rank: int, + world_size: int, + conf_file: str, + root_dir: str, + paras_queue: mp.Queue = None): + """An executable instance that trains and evaluate the algorithm + + Args: + local_rank (int): The ID of the process within current node. + rank (int): The ID of the process among all of the DDP processes. + world_size (int): The number of processes in total. If set to 1, it is + interpreted as "non distributed mode". + conf_file (str): Path to the training configuration. + root_dir (str): Path to the directory for writing logs/summaries/checkpoints. + paras_queue: a shared Queue for checking the consistency of model parameters + in different worker processes, if multi-gpu training is used. + """ + try: + _setup_logging(log_dir=root_dir, rank=rank) + _setup_device(local_rank) + if world_size > 1: + # Specialization for distributed mode + dist.init_process_group('nccl', rank=rank, world_size=world_size) + # Recover the flags when spawned as a sub process + # _define_flags() + FLAGS(sys.argv, known_only=True) + FLAGS.mark_as_parsed() # Set the rank and total number of processes for distributed training. PerProcessContext().set_distributed( - rank=rank, num_processes=world_size) + rank=rank, local_rank=local_rank, num_processes=world_size) assert paras_queue is not None PerProcessContext().set_paras_queue(paras_queue) @@ -191,7 +244,7 @@ def training_worker(rank: int, # Parse the configuration file, which will also implicitly bring up the environments. common.parse_conf_file(conf_file) - _train(root_dir, rank, world_size) + _train(root_dir=root_dir, local_rank=local_rank, rank=rank, world_size=world_size) except KeyboardInterrupt: pass except Exception as e: @@ -239,23 +292,48 @@ def main(_): # in different work processes. manager = mp.Manager() paras_queue = manager.Queue() - with common.get_unused_port(12355) as port: + with common.get_unused_port(12360) as port: # The other process will communicate with the authoritative # process via network protocol on localhost:port. os.environ['MASTER_PORT'] = str(port) - # We spawn the processes for rank-1 and above and use the main - # process for rank-0 so that we can request debug session - # for the main process. We need to do this because the debug - # session cannot be started in a subprocess. - context = mp.spawn( - _training_worker_helper, + processes = mp.spawn( + training_worker, args=(world_size, conf_file, root_dir, paras_queue), - join=False, - nprocs=world_size - 1, + join=True, + nprocs=world_size, start_method='spawn') - training_worker(0, world_size, conf_file, root_dir, - paras_queue) - context.join() + except KeyboardInterrupt: + pass + except Exception as e: + # ``e`` has been printed in the subprocess, so here we won't print it + # again. But we raise another error so that we will have a correct + # exit code for the program. + raise ChildProcessError(f'Training failed on subprocess exception') + + elif FLAGS.distributed == 'multi-node-multi-gpu': + local_rank = int(os.environ['LOCAL_RANK']) + rank = int(os.environ['RANK']) + world_size = int(os.environ['WORLD_SIZE']) + print("local_rank: {} | rank: {} | world_size: {}".format(local_rank, rank, world_size)) + + if world_size == 1: + logging.warn( + 'Fallback to single GPU mode as there is only one GPU') + training_worker( + rank=0, world_size=1, conf_file=conf_file, root_dir=root_dir) + return + + try: + # Create a shared queue for checking the consistency of the parameters + # in different work processes. + manager = mp.Manager() + paras_queue = manager.Queue() + training_worker_multi_node(local_rank=local_rank, + rank=rank, + world_size=world_size, + conf_file=conf_file, + root_dir=root_dir, + paras_queue=paras_queue) except KeyboardInterrupt: pass except Exception as e: diff --git a/alf/environments/process_environment.py b/alf/environments/process_environment.py index 02eeffab9..ea240775d 100644 --- a/alf/environments/process_environment.py +++ b/alf/environments/process_environment.py @@ -32,7 +32,7 @@ import alf.nest as nest from alf.utils import common from alf.utils.per_process_context import PerProcessContext -from alf.utils.schedulers import update_all_progresses, get_all_progresses, disallow_scheduler +from alf.utils.schedulers import update_all_progresses, get_all_progresses from alf.utils.spawned_process_utils import SpawnedProcessContext, get_spawned_process_context, set_spawned_process_context from . import _penv @@ -107,6 +107,7 @@ def _worker(conn: multiprocessing.connection, torch_num_threads_per_env: int = 1, ddp_num_procs: int = 1, ddp_rank: int = -1, + local_rank: int= -1, name: str = ''): """The process waits for actions and sends back environment results. @@ -142,6 +143,7 @@ def _worker(conn: multiprocessing.connection, SpawnedProcessContext( ddp_num_procs=ddp_num_procs, ddp_rank=ddp_rank, + local_rank=local_rank, env_id=env_id, env_ctor=env_constructor, pre_configs=pre_configs)) @@ -150,8 +152,9 @@ def _worker(conn: multiprocessing.connection, env = alf.get_env() else: env = env_constructor(env_id=env_id) - if not alf.get_config_value("TrainerConfig.sync_progress_to_envs"): - disallow_scheduler() + #TODO fix this disallow_scheduler in ddp context + # if not alf.get_config_value("TrainerConfig.sync_progress_to_envs"): + # disallow_scheduler() action_spec = env.action_spec() if fast: penv = _penv.ProcessEnvironment( @@ -299,13 +302,14 @@ def start(self, wait_to_start=True): ddp_num_procs = PerProcessContext().num_processes ddp_rank = PerProcessContext().ddp_rank + local_rank = PerProcessContext().local_rank self._process = mp_ctx.Process( target=_worker, args=(conn, self._env_constructor, self._start_method, alf.get_handled_pre_configs(), self._env_id, self._flatten, self._fast, self._num_envs, self._torch_num_threads, - ddp_num_procs, ddp_rank, self._name), + ddp_num_procs, ddp_rank, local_rank, self._name), name=f"ProcessEnvironment-{self._env_id}") atexit.register(self.close) self._process.start() @@ -475,4 +479,4 @@ def render(self, mode='human'): Raises: NotImplementedError: If the environment does not support rendering. """ - return self.call('render', mode)() + return self.call('render', mode)() \ No newline at end of file diff --git a/alf/utils/per_process_context.py b/alf/utils/per_process_context.py index b604f48d7..7655af7e6 100644 --- a/alf/utils/per_process_context.py +++ b/alf/utils/per_process_context.py @@ -34,6 +34,7 @@ def __new__(cls): cls._instance = super(PerProcessContext, cls).__new__(cls) cls._instance._read_only = False cls._instance._ddp_rank = -1 + cls._instance._local_rank = -1 cls._instance._num_processes = 1 return cls._instance @@ -42,7 +43,7 @@ def finalize(self) -> None: """ self._read_only = True - def set_distributed(self, rank: int, num_processes: int) -> None: + def set_distributed(self, rank: int, local_rank: int, num_processes: int) -> None: """Set the distributed properties. Args: @@ -53,6 +54,7 @@ def set_distributed(self, rank: int, num_processes: int) -> None: raise AttributeError( 'Cannot mutate PerProcessContext after it is finalized') self._ddp_rank = rank + self._local_rank = local_rank self._num_processes = num_processes def set_paras_queue(self, paras_queue: mp.Queue): @@ -77,6 +79,10 @@ def is_distributed(self): @property def ddp_rank(self): return self._ddp_rank + + @property + def local_rank(self): + return self._local_rank @property def num_processes(self): diff --git a/alf/utils/spawned_process_utils.py b/alf/utils/spawned_process_utils.py index 94adc4de1..43607308f 100644 --- a/alf/utils/spawned_process_utils.py +++ b/alf/utils/spawned_process_utils.py @@ -31,6 +31,7 @@ class SpawnedProcessContext(NamedTuple): """ ddp_num_procs: int ddp_rank: int + local_rank: int env_id: int env_ctor: Callable[..., AlfEnvironment] pre_configs: List[Tuple[str, Any]] From 1f24c17fef39cf67aeb10f5e4a3a6235854292e1 Mon Sep 17 00:00:00 2001 From: "siqi01.chai" Date: Fri, 2 Aug 2024 00:33:51 -0700 Subject: [PATCH 2/5] "Addressed review comments" --- README.md | 20 ++++++++++++++++++ alf/bin/train.py | 28 ++++++++++++++++++------- alf/environments/process_environment.py | 10 ++++----- 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index d721ff986..e7e8c38d9 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,26 @@ frames using the following command: python -m alf.bin.play --root_dir=LOG_DIR ``` +To launch single-node multi-gpu training, set the 'multi-gpu' argument +```bash +python -m alf.bin.train --conf=CONF_FILE --root_dir=LOG_DIR --distributed multi-gpu +``` + +To launch multi-node multi-gpu training, we use torch distirbuted launch module. The 'local_rank' for each process can be obtained from 'PerProcessContext' class, which can be used to assign gpu for your environment if you wish. To start training, run the following command on the host machine: +```bash +python -m torch.distributed.launch \ + --nproc_per_node=NGPU_ON_NODE \ + --nnodes=NUMBER_OF_NODES \ + --node_rank=NODE_RANK \ + --master_addr=HOST_IP \ + --master_port=12345 \ + ./alf/bin/train.py \ + --conf=CONF_FILE \ + --root_dir=LOG_DIR \ + --distributed multi-node-multi-gpu \ +``` +and simultaneously run the same command on each worker machine. For each worker machine, assign a NODE_RANK to it and update NGPU_ON_NODE if the number of GPUs is different from the host. Please make sure that all machines get a same copy of the codebase. + #### **Deprecated** An older version of ALF used [gin](https://github.com/google/gin-config) diff --git a/alf/bin/train.py b/alf/bin/train.py index bcd13177a..aef152c3e 100644 --- a/alf/bin/train.py +++ b/alf/bin/train.py @@ -77,7 +77,7 @@ def _define_flags(): flags.DEFINE_bool( 'force_torch_deterministic', True, 'torch.use_deterministic_algorithms when random_seed is set') - flags.DEFINE_bool('store_snapshot', False, + flags.DEFINE_bool('store_snapshot', True, 'Whether store an ALF snapshot before training') flags.DEFINE_enum( 'distributed', 'none', ['none', 'multi-gpu', 'multi-node-multi-gpu'], @@ -149,6 +149,13 @@ def _train(root_dir, local_rank=-1, rank=0, world_size=1): trainer.train() +def _training_worker_helper(rank: int, *args, **kwargs): + # Helper to start the training worker with the correct rank + # so that rank 0 is from the main process and the rest are + # from the spawned processes. + training_worker(rank + 1, *args, **kwargs) + + def training_worker(rank: int, world_size: int, conf_file: str, @@ -268,7 +275,7 @@ def main(_): conf_file = common.get_conf_file() - if FLAGS.store_snapshot: + if FLAGS.store_snapshot and int(os.environ['RANK']) == 0: common.generate_alf_snapshot(common.alf_root(), conf_file, root_dir) # FLAGS.distributed is guaranteed to be one of the possible values. @@ -292,16 +299,23 @@ def main(_): # in different work processes. manager = mp.Manager() paras_queue = manager.Queue() - with common.get_unused_port(12360) as port: + with common.get_unused_port(12355) as port: # The other process will communicate with the authoritative # process via network protocol on localhost:port. os.environ['MASTER_PORT'] = str(port) - processes = mp.spawn( - training_worker, + # We spawn the processes for rank-1 and above and use the main + # process for rank-0 so that we can request debug session + # for the main process. We need to do this because the debug + # session cannot be started in a subprocess. + context = mp.spawn( + _training_worker_helper, args=(world_size, conf_file, root_dir, paras_queue), - join=True, - nprocs=world_size, + join=False, + nprocs=world_size - 1, start_method='spawn') + training_worker(0, world_size, conf_file, root_dir, + paras_queue) + context.join() except KeyboardInterrupt: pass except Exception as e: diff --git a/alf/environments/process_environment.py b/alf/environments/process_environment.py index ea240775d..db581f360 100644 --- a/alf/environments/process_environment.py +++ b/alf/environments/process_environment.py @@ -32,7 +32,7 @@ import alf.nest as nest from alf.utils import common from alf.utils.per_process_context import PerProcessContext -from alf.utils.schedulers import update_all_progresses, get_all_progresses +from alf.utils.schedulers import update_all_progresses, get_all_progresses, disallow_scheduler from alf.utils.spawned_process_utils import SpawnedProcessContext, get_spawned_process_context, set_spawned_process_context from . import _penv @@ -152,9 +152,8 @@ def _worker(conn: multiprocessing.connection, env = alf.get_env() else: env = env_constructor(env_id=env_id) - #TODO fix this disallow_scheduler in ddp context - # if not alf.get_config_value("TrainerConfig.sync_progress_to_envs"): - # disallow_scheduler() + if not alf.get_config_value("TrainerConfig.sync_progress_to_envs"): + disallow_scheduler() action_spec = env.action_spec() if fast: penv = _penv.ProcessEnvironment( @@ -479,4 +478,5 @@ def render(self, mode='human'): Raises: NotImplementedError: If the environment does not support rendering. """ - return self.call('render', mode)() \ No newline at end of file + return self.call('render', mode)() + \ No newline at end of file From 2211adc22686372c3c26a5caa0fc74a59ecab3a8 Mon Sep 17 00:00:00 2001 From: "siqi.chai" Date: Fri, 13 Sep 2024 20:28:13 +0800 Subject: [PATCH 3/5] add new line at end of file --- alf/environments/process_environment.py | 1 + 1 file changed, 1 insertion(+) diff --git a/alf/environments/process_environment.py b/alf/environments/process_environment.py index db581f360..2b630b049 100644 --- a/alf/environments/process_environment.py +++ b/alf/environments/process_environment.py @@ -479,4 +479,5 @@ def render(self, mode='human'): NotImplementedError: If the environment does not support rendering. """ return self.call('render', mode)() + \ No newline at end of file From 5d31d5be2d11766e973f9d6eca38175e7216c747 Mon Sep 17 00:00:00 2001 From: "siqi.chai" Date: Fri, 13 Sep 2024 20:39:31 +0800 Subject: [PATCH 4/5] change new line --- alf/environments/process_environment.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/alf/environments/process_environment.py b/alf/environments/process_environment.py index 2b630b049..21380a91a 100644 --- a/alf/environments/process_environment.py +++ b/alf/environments/process_environment.py @@ -107,7 +107,7 @@ def _worker(conn: multiprocessing.connection, torch_num_threads_per_env: int = 1, ddp_num_procs: int = 1, ddp_rank: int = -1, - local_rank: int= -1, + local_rank: int = -1, name: str = ''): """The process waits for actions and sends back environment results. @@ -479,5 +479,3 @@ def render(self, mode='human'): NotImplementedError: If the environment does not support rendering. """ return self.call('render', mode)() - - \ No newline at end of file From 989a6fc507bddfc0d3341c9b9848e4b363f516e6 Mon Sep 17 00:00:00 2001 From: siqi chai Date: Thu, 26 Sep 2024 06:03:25 +0800 Subject: [PATCH 5/5] fixed doc str --- alf/bin/train.py | 32 +++++++++++++++++++------------- alf/utils/per_process_context.py | 5 +++-- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/alf/bin/train.py b/alf/bin/train.py index aef152c3e..194e40473 100644 --- a/alf/bin/train.py +++ b/alf/bin/train.py @@ -83,7 +83,8 @@ def _define_flags(): 'distributed', 'none', ['none', 'multi-gpu', 'multi-node-multi-gpu'], 'Set whether and how to run trainning in distributed mode.') flags.mark_flag_as_required('root_dir') - flags.DEFINE_integer('local-rank', None, 'Local rank passed from distributed launcher') + flags.DEFINE_integer('local-rank', None, + 'Local rank passed from distributed launcher') FLAGS = flags.FLAGS @@ -211,8 +212,7 @@ def training_worker(rank: int, alf.close_env() - -def training_worker_multi_node(local_rank: int, +def training_worker_multi_node(local_rank: int, rank: int, world_size: int, conf_file: str, @@ -251,7 +251,11 @@ def training_worker_multi_node(local_rank: int, # Parse the configuration file, which will also implicitly bring up the environments. common.parse_conf_file(conf_file) - _train(root_dir=root_dir, local_rank=local_rank, rank=rank, world_size=world_size) + _train( + root_dir=root_dir, + local_rank=local_rank, + rank=rank, + world_size=world_size) except KeyboardInterrupt: pass except Exception as e: @@ -323,13 +327,14 @@ def main(_): # again. But we raise another error so that we will have a correct # exit code for the program. raise ChildProcessError(f'Training failed on subprocess exception') - + elif FLAGS.distributed == 'multi-node-multi-gpu': local_rank = int(os.environ['LOCAL_RANK']) rank = int(os.environ['RANK']) world_size = int(os.environ['WORLD_SIZE']) - print("local_rank: {} | rank: {} | world_size: {}".format(local_rank, rank, world_size)) - + print("local_rank: {} | rank: {} | world_size: {}".format( + local_rank, rank, world_size)) + if world_size == 1: logging.warn( 'Fallback to single GPU mode as there is only one GPU') @@ -342,12 +347,13 @@ def main(_): # in different work processes. manager = mp.Manager() paras_queue = manager.Queue() - training_worker_multi_node(local_rank=local_rank, - rank=rank, - world_size=world_size, - conf_file=conf_file, - root_dir=root_dir, - paras_queue=paras_queue) + training_worker_multi_node( + local_rank=local_rank, + rank=rank, + world_size=world_size, + conf_file=conf_file, + root_dir=root_dir, + paras_queue=paras_queue) except KeyboardInterrupt: pass except Exception as e: diff --git a/alf/utils/per_process_context.py b/alf/utils/per_process_context.py index 7655af7e6..5644d3083 100644 --- a/alf/utils/per_process_context.py +++ b/alf/utils/per_process_context.py @@ -43,7 +43,8 @@ def finalize(self) -> None: """ self._read_only = True - def set_distributed(self, rank: int, local_rank: int, num_processes: int) -> None: + def set_distributed(self, rank: int, local_rank: int, + num_processes: int) -> None: """Set the distributed properties. Args: @@ -79,7 +80,7 @@ def is_distributed(self): @property def ddp_rank(self): return self._ddp_rank - + @property def local_rank(self): return self._local_rank